## ----include = FALSE---------------------------------------------------------- knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) library(DBmaps) library(data.table) ## ----load_data, message=FALSE------------------------------------------------- transactions[, time := as.POSIXct(time, format = "%Y-%m-%dT%H:%M:%OSZ", tz = "UTC")] views[, time := as.POSIXct(time, format = "%Y-%m-%dT%H:%M:%OSZ", tz = "UTC")] cat("--- All 4 Raw Data Tables Loaded Successfully ---\n") cat("---Customers Data---\n") print(head(customers, 5)) cat("---products Data---\n") print(head(products, 5)) cat("---Transactions Data---\n") print(head(transactions, 5)) cat("---Views Data---\n") print(head(views, 5)) ## ----define_metadata---------------------------------------------------------- customers_info <- table_info( table_name = "customers", source_identifier = "customers.csv", identifier_columns = "customer_id", key_outcome_specs = list(list(OutcomeName = "CustomerCount", ValueExpression = 1, AggregationMethods = list( list(AggregatedName = "CountByRegion", AggregationFunction = "sum", GroupingVariables = "region") ))) ) products_info <- table_info( table_name = "products", source_identifier = "products.csv", identifier_columns = "product_id", key_outcome_specs = list(list(OutcomeName = "ProductCount", ValueExpression = 1, AggregationMethods = list( list(AggregatedName = "ProductsPerCategory", AggregationFunction = "sum", GroupingVariables = "category") ))) ) transactions_info <- table_info( table_name = "transactions", source_identifier = "transactions.csv", identifier_columns = c("customer_id", "product_id", "time"), key_outcome_specs = list( list(OutcomeName = "Revenue", ValueExpression = quote(price * quantity), AggregationMethods = list( list(AggregatedName = "RevenueByCustomer", AggregationFunction = "sum", GroupingVariables = "customer_id"), list(AggregatedName = "RevenueByProduct", AggregationFunction = "sum", GroupingVariables = "product_id"), list(AggregatedName = "DailyRevenueByCustomerProduct", AggregationFunction = "sum", GroupingVariables = c("customer_id", "product_id", "time")) )), list(OutcomeName = "UnitsSold", ValueExpression = quote(quantity), AggregationMethods = list( list(AggregatedName = "TotalUnitsByCustomer", AggregationFunction = "sum", GroupingVariables = "customer_id") )) ) ) views_info <- table_info( table_name = "views", source_identifier = "views.csv", identifier_columns = c("customer_id", "product_id", "time"), key_outcome_specs = list(list(OutcomeName = "ViewCount", ValueExpression = 1, AggregationMethods = list( list(AggregatedName = "ViewsByProduct", AggregationFunction = "count", GroupingVariables = "product_id"), list(AggregatedName = "ViewsByCustomer", AggregationFunction = "count", GroupingVariables = "customer_id") ))) ) cat("---Metadata for transactions---\n") print(transactions_info) ## ----creating-registry, echo = TRUE------------------------------------------- meta <- create_metadata_registry() meta <- add_table(meta, customers_info) meta <- add_table(meta, products_info) meta <- add_table(meta, views_info) meta <- add_table(meta, transactions_info) print(meta) ## ----map_paths, message=TRUE-------------------------------------------------- # Create a named list of the actual data tables for the functions to use all_tables <- list( customers = customers, products = products, transactions = transactions, views = views ) # Generate the join map paths <- map_join_paths(meta, all_tables) print(paths) ## ----create_plan-------------------------------------------------------------- # Define our desired output selections <- list( products = c("product_id", "category"), # Base columns from the products table transactions = "RevenueByProduct", # The aggregated revenue by product views = "ViewsByProduct" # The aggregated view count by product ) # Generate the plan plan <- create_join_plan( base_table = "products", selections = selections, metadata_dt = meta, join_map = paths ) print(plan) ## ----setup_invalid------------------------------------------------------------ # Add product metadata for this example products_meta <- table_info("products", "p.csv", "product_id", list(list(OutcomeName="x",ValueExpression=1,AggregationMethods=list(list(AggregatedName="y",AggregationFunction="z",GroupingVariables="category"))))) transactions_meta_v2 <- table_info("transactions", "t.csv", "trans_id", list( list(OutcomeName="Revenue", ValueExpression=quote(price*qty), AggregationMethods=list( # This aggregation is by product_id, not customer_id list(AggregatedName="RevenueByProduct", AggregationFunction="sum", GroupingVariables="product_id") )) )) invalid_metadata <- create_metadata_registry() invalid_metadata <- add_table(invalid_metadata, products_meta) invalid_metadata <- add_table(invalid_metadata, transactions_meta_v2) # The invalid request invalid_selections <- list( customers = "customer_id", transactions = "RevenueByProduct" ) ## ----run_invalid, error=TRUE-------------------------------------------------- try({ create_join_plan( base_table = "customers", selections = invalid_selections, metadata_dt = invalid_metadata ) }) ## ----plot_plan, fig.align='center', results='asis', fig.width=10-------------- # This requires the DiagrammeR package if (requireNamespace("DiagrammeR", quietly = TRUE)) { # Generate the plot object visualize <- plot_join_plan(plan) visualize } else { cat("Install the 'DiagrammeR' package to visualize the join plan.") } ## ----execute_plan, message=TRUE----------------------------------------------- # The executor runs the plan in a clean environment final_dt <- execute_join_plan(plan, all_tables) # Show the first few rows of the final, merged data.table print(head(final_dt))