--- title: "Batch migration and incremental backups" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Batch migration and incremental backups} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) ``` ```{r library} library(SCDB) ``` ```{r setup, echo = FALSE, results = "hide", eval = rlang::is_installed("duckdb") && R.version$major >= "4"} # Setup conn to be used for examples conn_primary <- get_connection(duckdb::duckdb()) conn_secondary <- conn_primary # Use a wrapper for update_snapshot which uses LoggerNull to suppress all logging if (!"update_snapshot" %in% ls(envir = globalenv())) { update_snapshot <- function(...) { return(SCDB::update_snapshot(logger = SCDB::LoggerNull$new(), ...)) } } # Setup example_data table in conn example_data <- dplyr::copy_to( conn_primary, dplyr::transmute(datasets::mtcars, car = rownames(mtcars), hp), name = "example_data", overwrite = TRUE ) ``` SCDB provides a simple implementation of the concept known as "delta-loading" or "incremental-loading" in the form of the pair of functions `delta_export()` and `delta_load()`. As the names suggest, these functions are used in tandem to export a delta from a table and to load this delta onto (another) table. A "delta" is a table containing the changes in a given time period which can be used for use-cases such as migration of data and incremental backups. # Intermittent data migration Consider the following use case: You have a database running which daily collects and stores snapshots of a data source using `update_snapshot()` and you want to mirror this time-versioned data in another database which cannot easily be updated (e.g. is behind a firewall). When you need to use this second database you want to bring it to the same state as your primary database. At this stage, your options are to either naively pull all snapshots from the primary database and store them in the secondary database using `update_snapshot()`, or alternatively, you can pull all changes since your last update in the form of a "delta" and load all changes onto the secondary database in a single operation. To see such a delta load in action, lets mimic this exact scenario: Our example data is `datasets::mtcars` reduced to only two columns: row names converted to a column `car`, and `hp` ```{r example_data, eval = FALSE} conn_primary <- get_connection(...) conn_secondary <- get_connection(...) ``` ```{r example_data_hidden, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} example_data <- dplyr::tbl(conn_primary, DBI::Id(table = "example_data")) example_data ``` Imagine on Day 1, in this case January 1st, 2020, our currently available data is the first three records of the `example_data`. We then store this data in a table `mtcars` in our primary database. ```{r example_1, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} data <- head(example_data, 3) update_snapshot( .data = data, conn = conn_primary, db_table = "mtcars", # the name of the DB table to store the data in timestamp = "2020-01-01 11:00:00" ) ``` The following day, the current data is now the first five rows of our example data. We then store this data in the database using `update_snapshot()`: ```{r example_2, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} # Let's say that the next day, our data set is now the first 5 of our example data data <- head(example_data, 5) update_snapshot( .data = data, conn = conn_primary, db_table = "mtcars", # the name of the DB table to store the data in timestamp = "2020-01-02 12:00:00" ) ``` The current state of the data is then as follows (see `vignette("SCDB")` for details on `get_table()`): ```{r example_2_results_a, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} get_table(conn_primary, "mtcars", slice_ts = NULL) ``` We now want to recreate this state is our secondary database and we export the changes in the form of a single "delta". ```{r delta_1_export, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} delta_1 <- delta_export( conn = conn_primary, db_table = "mtcars", timestamp_from = "2020-01-01 11:00:00" ) delta_1 ``` We then use `delta_load()` to apply these changes in the secondary database. Notice that we do not need to create the table as `delta_load()` will create the table as needed. ```{r delta_1_load, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} delta_load( conn = conn_secondary, db_table = "mtcars_firewalled", delta = delta_1 ) get_table(conn_secondary, "mtcars_firewalled", slice_ts = NULL) ``` On day 3, we imagine that we have the same 5 records, but one of them is altered ```{r example_3, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} data <- head(example_data, 5) %>% dplyr::mutate(hp = ifelse(car == "Mazda RX4", hp / 2, hp)) update_snapshot( .data = data, conn = conn_primary, db_table = "mtcars", # the name of the DB table to store the data in timestamp = "2020-01-03 13:00:00" ) ``` On day 4, we imagine that we have the first 7 records along with the above alterations. ```{r example_4, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} data <- head(example_data, 7) %>% dplyr::mutate(hp = ifelse(car == "Mazda RX4", hp / 2, hp)) update_snapshot( .data = data, conn = conn_primary, db_table = "mtcars", # the name of the DB table to store the data in timestamp = "2020-01-04 14:00:00" ) ``` Which brings us to the following state: ```{r example_4_results_a, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} get_table(conn_primary, "mtcars", slice_ts = NULL) ``` And again, we want to replay both of these changes in the secondary database. ```{r delta_2_export, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} delta_2 <- delta_export( conn = conn_primary, db_table = "mtcars", timestamp_from = "2020-01-03 13:00:00" ) delta_2 ``` We then use `delta_load()` to apply these changes in the secondary database. ```{r delta_2_load, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} delta_load( conn = conn_secondary, db_table = "mtcars_firewalled", delta = delta_2 ) get_table(conn_secondary, "mtcars_firewalled", slice_ts = NULL) ``` # Batch data migration Consider the following use case: You have a database running which daily collects and stores snapshots of a data source using `update_snapshot()` and you want to mirror this time-versioned data in another database but a single transfer is too large to transfer. In this case, we can export a number of smaller delta "batches" by utilising the `timestamp_until` argument. Starting from the state above on the "primary" database, we split the transfer in two batches. ```{r delta_batch_export, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} delta_batch_1 <- delta_export( conn = conn_primary, db_table = "mtcars", timestamp_from = "2020-01-01 11:00:00", timestamp_until = "2020-01-03 13:00:00" ) delta_batch_2 <- delta_export( conn = conn_primary, db_table = "mtcars", timestamp_from = "2020-01-03 13:00:00" ) ``` We then use `delta_load()` to apply these changes in the secondary database. ```{r delta_batch_load, eval = rlang::is_installed("duckdb") && R.version$major >= "4"} delta_load( conn = conn_secondary, db_table = "mtcars_batch", delta = list(delta_batch_1, delta_batch_2) ) get_table(conn_secondary, "mtcars_batch", slice_ts = NULL) ```