## ----include=FALSE------------------------------------------------------------
knitr::opts_chunk$set(
collapse = TRUE,
comment = "#>"
)
## ----setup, message=FALSE-----------------------------------------------------
library(pairwiseLLM)
library(dplyr)
library(tidyr)
library(purrr)
library(readr)
library(stringr)
## -----------------------------------------------------------------------------
check_llm_api_keys()
## -----------------------------------------------------------------------------
data("example_writing_samples", package = "pairwiseLLM")
td <- trait_description("overall_quality")
td
## -----------------------------------------------------------------------------
tmpl <- set_prompt_template()
cat(substr(tmpl, 1, 400), "...\n")
## -----------------------------------------------------------------------------
set.seed(123)
pairs_all <- example_writing_samples |>
make_pairs()
n_pairs <- min(40L, nrow(pairs_all))
pairs_forward <- pairs_all |>
sample_pairs(n_pairs = n_pairs, seed = 123) |>
randomize_pair_order(seed = 456)
pairs_reverse <- sample_reverse_pairs(
pairs_forward,
reverse_pct = 1.0,
seed = 789
)
get_pairs_for_direction <- function(direction = c("forward", "reverse")) {
direction <- match.arg(direction)
if (identical(direction, "forward")) {
pairs_forward
} else {
pairs_reverse
}
}
## -----------------------------------------------------------------------------
anthropic_models <- c(
"claude-sonnet-4-5",
"claude-haiku-4-5",
"claude-opus-4-5"
)
gemini_models <- c(
"gemini-3-pro-preview"
)
openai_models <- c(
"gpt-4.1",
"gpt-4o",
"gpt-5.1"
)
thinking_levels <- c("no_thinking", "with_thinking")
directions <- c("forward", "reverse")
anthropic_grid <- tidyr::expand_grid(
provider = "anthropic",
model = anthropic_models,
thinking = thinking_levels,
direction = directions
)
gemini_grid <- tidyr::expand_grid(
provider = "gemini",
model = gemini_models,
thinking = "with_thinking",
direction = directions
)
openai_grid <- tidyr::expand_grid(
provider = "openai",
model = openai_models,
thinking = thinking_levels,
direction = directions
) |>
# For example, only allow "with_thinking" for gpt-5.1
dplyr::filter(model == "gpt-5.1" | thinking == "no_thinking")
batch_grid <- dplyr::bind_rows(
anthropic_grid,
gemini_grid,
openai_grid
)
batch_grid
## -----------------------------------------------------------------------------
templates_tbl <- tibble::tibble(
template_id = c("test1", "test2", "test3", "test4", "test5"),
prompt_template = list(tmpl, tmpl, tmpl, tmpl, tmpl)
)
templates_tbl
## ----eval=FALSE---------------------------------------------------------------
# out_dir <- "dev-output/advanced-multi-batch"
# dir.create(out_dir, recursive = TRUE, showWarnings = FALSE)
## ----eval=FALSE---------------------------------------------------------------
# jobs <- list()
#
# for (t_row in seq_len(nrow(templates_tbl))) {
# template_id <- templates_tbl$template_id[t_row]
# tmpl_string <- templates_tbl$prompt_template[[t_row]]
#
# for (i in seq_len(nrow(batch_grid))) {
# row <- batch_grid[i, ]
#
# provider <- row$provider
# model <- row$model
# thinking <- row$thinking
# direction <- row$direction
#
# message(
# "Submitting batch: template=", template_id,
# " | ", provider, " / ", model,
# " / ", thinking, " / ", direction
# )
#
# pairs_use <- get_pairs_for_direction(direction)
# is_thinking <- identical(thinking, "with_thinking")
#
# prefix <- paste(provider, template_id, model, thinking, direction,
# sep = "_"
# )
# prefix <- gsub("[^A-Za-z0-9_.-]", "-", prefix)
#
# batch_input_path <- file.path(out_dir, paste0(prefix, "_input.jsonl"))
# batch_output_path <- file.path(out_dir, paste0(prefix, "_output.jsonl"))
# csv_path <- file.path(out_dir, paste0(prefix, ".csv"))
#
# if (identical(provider, "openai")) {
# # OpenAI: use the helpers from the dev scripts
# include_thoughts <- is_thinking && grepl("^gpt-5\\.1", model)
#
# pipeline <- run_openai_batch_pipeline(
# pairs = pairs_use,
# model = model,
# trait_name = td$name,
# trait_description = td$description,
# prompt_template = tmpl_string,
# include_thoughts = include_thoughts,
# include_raw = TRUE,
# batch_input_path = batch_input_path,
# batch_output_path = batch_output_path,
# poll = FALSE
# )
#
# jobs[[length(jobs) + 1L]] <- list(
# template_id = template_id,
# provider = provider,
# model = model,
# thinking = thinking,
# direction = direction,
# prefix = prefix,
# batch_type = "openai",
# batch_id = pipeline$batch$id,
# batch_input_path = pipeline$batch_input_path,
# batch_output_path = batch_output_path,
# csv_path = csv_path,
# done = FALSE,
# results = NULL
# )
# } else if (identical(provider, "anthropic")) {
# # Anthropic: use run_anthropic_batch_pipeline()
# reasoning <- if (is_thinking) "enabled" else "none"
# temperature_arg <- if (!is_thinking) 0 else NULL
#
# pipeline <- run_anthropic_batch_pipeline(
# pairs = pairs_use,
# model = model,
# trait_name = td$name,
# trait_description = td$description,
# prompt_template = tmpl_string,
# reasoning = reasoning,
# include_thoughts = is_thinking,
# batch_input_path = batch_input_path,
# batch_output_path = batch_output_path,
# poll = FALSE,
# temperature = temperature_arg,
# include_raw = TRUE
# )
#
# jobs[[length(jobs) + 1L]] <- list(
# template_id = template_id,
# provider = provider,
# model = model,
# thinking = thinking,
# direction = direction,
# prefix = prefix,
# batch_type = "anthropic",
# batch_id = pipeline$batch$id,
# batch_input_path = pipeline$batch_input_path,
# batch_output_path = batch_output_path,
# csv_path = csv_path,
# done = FALSE,
# results = NULL
# )
# } else if (identical(provider, "gemini")) {
# # Gemini: typically use low-level helpers, as in the dev scripts
# req_tbl <- build_gemini_batch_requests(
# pairs = pairs_use,
# model = model,
# trait_name = td$name,
# trait_description = td$description,
# prompt_template = tmpl_string,
# thinking_level = "low", # example
# include_thoughts = TRUE
# )
#
# batch <- gemini_create_batch(
# requests = req_tbl$request,
# model = model,
# api_key = Sys.getenv("GEMINI_API_KEY"),
# api_version = "v1beta"
# )
#
# batch_name <- batch$name %||% stop(
# "Gemini batch did not return a `name` field.",
# call. = FALSE
# )
#
# jobs[[length(jobs) + 1L]] <- list(
# template_id = template_id,
# provider = provider,
# model = model,
# thinking = thinking,
# direction = direction,
# prefix = prefix,
# batch_type = "gemini",
# batch_id = batch_name,
# batch_input_path = batch_input_path,
# batch_output_path = batch_output_path,
# csv_path = csv_path,
# done = FALSE,
# results = NULL
# )
# }
# }
# }
## ----eval=FALSE---------------------------------------------------------------
# jobs_tbl <- tibble::tibble(
# idx = seq_along(jobs),
# template_id = vapply(jobs, `[[`, character(1), "template_id"),
# provider = vapply(jobs, `[[`, character(1), "provider"),
# model = vapply(jobs, `[[`, character(1), "model"),
# thinking = vapply(jobs, `[[`, character(1), "thinking"),
# direction = vapply(jobs, `[[`, character(1), "direction"),
# prefix = vapply(jobs, `[[`, character(1), "prefix"),
# batch_type = vapply(jobs, `[[`, character(1), "batch_type"),
# batch_id = vapply(jobs, `[[`, character(1), "batch_id"),
# batch_input_path = vapply(jobs, `[[`, character(1), "batch_input_path"),
# batch_output_path = vapply(jobs, `[[`, character(1), "batch_output_path"),
# csv_path = vapply(jobs, `[[`, character(1), "csv_path")
# )
#
# jobs_index_path <- file.path(out_dir, "batch_jobs_index.csv")
# readr::write_csv(jobs_tbl, jobs_index_path)
#
# jobs_index_path
## ----eval=FALSE---------------------------------------------------------------
# is_terminal_openai <- function(status) {
# status %in% c("completed", "failed", "cancelled", "expired")
# }
#
# is_terminal_anthropic <- function(status) {
# status %in% c("ended", "errored", "canceled", "expired")
# }
#
# is_terminal_gemini <- function(state) {
# state %in% c("SUCCEEDED", "FAILED", "CANCELLED", "EXPIRED")
# }
## ----eval=FALSE---------------------------------------------------------------
# interval_seconds <- 60
# per_job_delay <- 2 # seconds between polling calls
#
# # Reload batch index
# jobs_index_path <- file.path(out_dir, "batch_jobs_index.csv")
# jobs_tbl <- readr::read_csv(jobs_index_path, show_col_types = FALSE)
#
# # Rebuild jobs list skeleton
# jobs <- purrr::pmap(
# jobs_tbl,
# function(idx, template_id, provider, model, thinking, direction,
# prefix, batch_type, batch_id,
# batch_input_path, batch_output_path, csv_path, ...) {
# list(
# template_id = template_id,
# provider = provider,
# model = model,
# thinking = thinking,
# direction = direction,
# prefix = prefix,
# batch_type = batch_type,
# batch_id = batch_id,
# batch_input_path = batch_input_path,
# batch_output_path = batch_output_path,
# csv_path = csv_path,
# done = FALSE,
# results = NULL
# )
# }
# )
#
# unfinished <- which(!vapply(jobs, `[[`, logical(1), "done"))
#
# while (length(unfinished) > 0L) {
# message("Polling ", length(unfinished), " unfinished batch(es)...")
#
# for (j in unfinished) {
# job <- jobs[[j]]
# if (job$done) next
#
# batch_type <- job$batch_type
#
# if (identical(batch_type, "openai")) {
# batch <- openai_get_batch(job$batch_id)
# status <- batch$status %||% "unknown"
# message(" [OpenAI] ", job$prefix, " status: ", status)
#
# if (is_terminal_openai(status)) {
# if (identical(status, "completed")) {
# openai_download_batch_output(
# batch_id = job$batch_id,
# path = job$batch_output_path
# )
#
# res <- parse_openai_batch_output(job$batch_output_path)
# jobs[[j]]$results <- res
# readr::write_csv(res, job$csv_path)
# message(" -> Results written to: ", job$csv_path)
# }
# jobs[[j]]$done <- TRUE
# }
# } else if (identical(batch_type, "anthropic")) {
# batch <- anthropic_get_batch(job$batch_id)
# status <- batch$processing_status %||% "unknown"
# message(" [Anthropic] ", job$prefix, " status: ", status)
#
# if (is_terminal_anthropic(status)) {
# if (identical(status, "ended")) {
# output_path <- anthropic_download_batch_results(
# batch_id = job$batch_id,
# output_path = job$batch_output_path
# )
#
# res <- parse_anthropic_batch_output(
# jsonl_path = output_path,
# tag_prefix = "",
# tag_suffix = ""
# )
#
# jobs[[j]]$results <- res
# readr::write_csv(res, job$csv_path)
# message(" -> Results written to: ", job$csv_path)
# }
# jobs[[j]]$done <- TRUE
# }
# } else if (identical(batch_type, "gemini")) {
# batch <- gemini_get_batch(job$batch_id)
# state <- batch$state %||% "STATE_UNSPECIFIED"
# message(" [Gemini] ", job$prefix, " state: ", state)
#
# if (is_terminal_gemini(state)) {
# if (identical(state, "SUCCEEDED")) {
# raw_res <- gemini_download_batch_results(job$batch_id)
#
# res <- parse_gemini_batch_output(
# raw_results = raw_res,
# tag_prefix = "",
# tag_suffix = ""
# )
#
# jobs[[j]]$results <- res
# readr::write_csv(res, job$csv_path)
# message(" -> Results written to: ", job$csv_path)
# }
# jobs[[j]]$done <- TRUE
# }
# }
#
# Sys.sleep(per_job_delay)
# }
#
# unfinished <- which(!vapply(jobs, `[[`, logical(1), "done"))
#
# if (length(unfinished) > 0L) {
# message("Sleeping ", interval_seconds, " seconds before next poll...")
# Sys.sleep(interval_seconds)
# }
# }
#
# message("All batches have reached a terminal state.")