Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with pins for writing agents, informants #487

kmasiello opened this issue Aug 1, 2023 · 1 comment

Integrate with pins for writing agents, informants #487

kmasiello opened this issue Aug 1, 2023 · 1 comment


Copy link

Object ops functions for agents and informants revolve around writing and reading from disk.

A logical workflow would include writing agents and informants to a pin with a board back end on Connect, S3, or other.

This would enable reading agents or informants into downstream reports or apps. Notably, creation of multiagents would be significantly improved if the multiagent could be created from a versioned pin.

One current challenge with trying to do this manually is the size of the agent object when written. The input tbl is redundantly saved within each validation. This makes the object very large difficult to pin. Some optimization in the agent object would be required. Then this would be a super slick workflow.

Copy link

Hey all, so I am talking at posit::conf(2023) about this topic (sort of). We redesigned how we do data validation using this package. We utilize Posit Connect for deploying apps, docs, and pins and so the natural next thing to do was to also pin pointblank objects. Now, for our specific use case, we needed something like a "multiagent" but that worked nicely with Connect, so we created a "test plan" object, which is essentially a list of pointblank objects. Also, we didn't want to save the test plan objects themselves on Connect for extensibility and storage reasons mentioned, but instead the "instructions" aka results of as_agent_yaml_list. When we save these test plans to Connect, we write them as JSON - we liked the idea of being able to preview the test plan as plain text on Connect instead of a .rds file only readable in R. When reading the test plan in, we also capture the pin name and version on Connect as attributes of the list (part of the "test plan" class). We also had to create a JSON deserializer for the test plan, similar to how the yaml ops do this. We specify the data needed to execute the test plan instructions when reading in from Connect, thus creating the test plan object. We did this and more and integrated into an internal package for our team. Below is a function we created in our package that deploys a list of pointblank object "code" to hopefully illustrate what I'm talking about, even though it is tailored to our needs. I would love to extend these operations to the multiagent class and do some pull requests, just need to find the time. I hope this made sense and the below could help in anyway!

#' Upload a test plan (list of {pointblank} agent objects) to Posit Connect
#' @param ... pointblank agent objects to create "test plan". Use `.test_list` to include custom names
#' @param test_name name to assign the test plan (pin) in RStudio Connect
#' @param test_type is this test plan assessing data "integrity" or output "validation"?
#' @param overwrite if the test plan exists, overwrite with new version?
#' @param commit_message message to provide when saving a test plan. Useful for overwriting tests
#' @param server full url to the Posit Connect server. If NULL, function will look for environment variable CONNECT_SERVER
#' @param key the Posit Connect API key. If NULL, function will look for environment variable CONNECT_API_KEY
#' @param .test_list named list of pointblank agent objects. Supersedes `...` if not NULL
#' @import pins
#' @import pointblank
#' @importFrom rlang names2 warn abort dots_list
#' @importFrom purrr map walk2 set_names modify_at map_depth
#' @importFrom dplyr select filter pull tibble
#' @importFrom httr GET POST add_headers content status_code http_error
#' @importFrom glue glue double_quote
#' @importFrom jsonlite read_json
#' @importFrom yaml write_yaml
#' @importFrom stringr str_extract str_trim str_remove_all str_split
#' @description Registering a test plan is built on the {pins} package. Each plan that is registered is stored as
#' a pin on our Posit Connect server. This function sets ownership permissions to all those in the Posit Connect group "data_team".
#' Therefore, pins can be created and updated by anyone in the group. Also, the pins are versioned and the name of the user
#' who uploaded a specific version of a pin can be found by looking at the variable "user" under the "metadata" variable. The easiest way to
#' activate versions or delete versions would be to log in to Posit Connect and navigate to the pin of interest. Furthermore, each template (pin)
#' saved from this function is automatically tagged to allow for easy navigation in Posit Connect.
#' @examples
#' \dontrun{
#' # Create agents
#' myagent = create_agent(
#'   tbl = ~ small_table,
#'   tbl_name = "small_table",
#'   label = "First agent testing constraints on 'small_table'",
#'   actions = action_levels(
#'     warn_at = 0.10,
#'     stop_at = 0.25,
#'     notify_at = 0.35
#'   )
#' ) %>%
#'   col_exists(columns = vars(date, date_time)) %>%
#'   col_vals_regex(
#'     columns = vars(b),
#'     regex = "[0-9]-[a-z]{3}-[0-9]{3}"
#'   ) %>%
#'   rows_distinct(columns = everything()) %>%
#'   col_vals_gt(columns = vars(d), value = 100) %>%
#'   col_vals_lte(columns = vars(c), value = 5) %>%
#'   col_vals_between(
#'     columns = vars(c),
#'     left = vars(a), right = vars(d),
#'     na_pass = TRUE
#'   )
#' smaller_tbl = dplyr::tibble(a = 1:5,b = letters[1:5])
#' youragent =
#'   create_agent(
#'     tbl = ~ smaller_tbl,
#'     label = "Next agent looking at 'smaller_table'",
#'     actions = action_levels(
#'       warn_at = 0.10,
#'       stop_at = 0.25,
#'       notify_at = 0.35
#'   )
#'     ) %>%
#'   col_schema_match(
#'     schema = col_schema(
#'       a = "integer",
#'       b = "character"
#'     )
#'   )
#' smallest_tbl = dplyr::tibble(a = c(1:10, NA))
#' ouragent =
#'   create_agent(
#'     tbl = ~ smallest_tbl,
#'     label = "Final agent testing at 'smallest_table'",
#'     actions = action_levels(
#'       warn_at = 0.10,
#'       stop_at = 0.25,
#'       notify_at = 0.35
#'   )
#'     ) %>%
#'   col_vals_gte(
#'     columns = vars(a),
#'     value = 6,
#'     na_pass = FALSE
#'   )
#' register_test_plan(
#'   myagent,
#'   youragent,
#'   ouragent,
#'   test_name = "dataintegrity-packagedemo",
#'   test_type = "integrity",
#'   overwrite = TRUE,
#'   commit_message = glue::glue("Ran this from the example doc on {Sys.time()}")
#' )
#' }
#' pin_browse(board_connect(), name = 'int-dataintegrity-packagedemo')
#' @return no return; run for its side-effect
#' @export
register_test_plan = function(..., test_name,
                              test_type = c("integrity", "validation"),
                              overwrite = FALSE, commit_message = NULL,
                              server = NULL, key = NULL,
                              .test_list = NULL) {
  if(!is.null(.test_list)) {
    agent_list = .test_list
    agent_nms = rlang::names2(agent_list)
    if(length(agent_nms) != length(agent_list))
      rlang::warn("Names not detected for every test in `.test_list`")
  } else {
    agent_list = rlang::dots_list(..., .named = TRUE)

  if(!length(agent_list)) rlang::abort("Provide agents as separate objects or in a list with `.test_list`")

  if(is.null(server)) server = Sys.getenv("CONNECT_SERVER")
  if(is.null(key)) key = Sys.getenv("CONNECT_API_KEY")

  agent_list_c = agent_list %>%
    purrr::map(.f = convert_agent_list)

  rsc_board = pins::board_connect(name = "test_plans",
                                    server = server, key = key,
                                    versioned = TRUE)

  # Check if plan exists
  test_name = generate_test_name(test_name, test_type)
  plan_exists = suppressMessages(
    as.logical(nrow(pins::pin_search(board = pins::board_connect(), search = test_name)))
  #plan_exists = any(fs::path_file(pins::pin_list(board = pins::board_connect())) %in% c(test_name))

  if(plan_exists) {
    if(overwrite) {
      message(glue::glue("Updating {test_name} test plan"))
    } else {
      rlang::abort("Test plan already exists. Change name of test plan or set \"overwrite\" = TRUE")

    board = rsc_board,
    x = agent_list_c,
    name = test_name,
    title = glue::glue("{test_name} data integrity plan"),
    description = "This is a data integrity test plan created by {datatransfer} package.",
    type = "json",
    metadata = list(user = Sys.getenv("USER"),
                    commit_message = commit_message,
                    test_type = test_type)

  content_guid = get_connect_content_guid(test_name, server, key)
  group_guid = get_connect_group_guid("data_team", server, key)

  # For new pins only
  if(!plan_exists) {
    # Set permissions
    set_pin_permissions(content_guid, group_guid, server, key)
    # Set tag
    tag_id = get_test_tag_id(test_type, "plan", server, key)
    set_testplan_tag(content_guid, tag_id, server, key)

convert_agent_list = function(agent) {
  agent_ls = agent %>%
    as_agent_yaml_list(expanded = TRUE)
  class(agent_ls$actions) = "list"
  agent_steps = agent_ls$steps
  agent_steps_c = agent_steps %>%
    purrr::map_depth(.depth = 2,
              .f = function(x) {
                purrr::modify_at(.x = x,
                          .at = c("columns","left","right"),
                          .f = function(x) {
                            cols_to_change = x %>%
                              stringr::str_extract("\\(([^()]+)\\)") %>%
                              stringr::str_remove_all("\\(|\\)") %>%
                              stringr::str_split("\\,") %>%
                              unlist %>%

                            paste0("vars(", paste0(glue::double_quote(cols_to_change), collapse = ", "), ")")
  agent_ls$steps = agent_steps_c

generate_test_name = function(test_name, test_type) {
  test_type_pre = switch(test_type,
                         integrity = "int",
                         validation = "val")

set_testplan_tag = function(content_guid, tag_id, server, key) {
  url = server
  path = glue::glue("/__api__/v1/content/{content_guid}/tags")

  body = list(tag_id = tag_id) %>%
    jsonlite::toJSON(auto_unbox = TRUE)

  result = POST(url = url,
                path = path,
                add_headers(Authorization = paste("Key", key)),
                body = body,
                encode = "raw")

  if (http_error(result)) {
        "RSConnect request \"set_testplan_tag\" failed [%s]",
      call. = FALSE

get_test_tag_id = function(type, tag_name, server, key) {
  url = server
  path = glue::glue("/__api__/v1/tags")
  result = GET(url = url,
               path = path,
               add_headers(Authorization = paste("Key", key)))

  parsed = suppressMessages(jsonlite::fromJSON(httr::content(result, "text")))

  if (http_error(result)) {
        "RSConnect request \"get_program_tag_id\" failed [%s]\n%s",
      call. = FALSE

  tag_tbl = httr::content(result) %>% purrr::map_dfr(.f = purrr::pluck)
  tag_pc_tbl = tag_tbl %>%
    dplyr::inner_join(tag_tbl %>% dplyr::select(id, parent_name = name), by = c("parent_id" = "id"))
  tag_template_tbl = tag_pc_tbl %>%
    filter(parent_id %in%
             (tag_pc_tbl %>% filter(parent_name == "data-quality") %>% pull(id))
           ) %>%
    filter(parent_name == type, name == tag_name)
  #tag_template_tbl = tag_pc_tbl %>% dplyr::filter(name == "test plans", parent_name == glue::glue("data-{type}"))
  tag_template_id = tag_template_tbl$id[[1]]

@rich-iannone rich-iannone modified the milestones: v0.12.0, v0.13.0 Feb 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
None yet

No branches or pull requests

3 participants