2 Tasks
2.1 Introduction
A task is the basic operational unit of Sykdomspulsen Core. It is based on plnr.
In short, you can think of a Sykdomspulsen Core task as multiple plnr plans plus Sykdomspulsen Core db schemas.
2.2 Definitions
Object | Description |
---|---|
argset | A named list containing arguments. |
plnr analysis |
These are the fundamental units that are scheduled in plnr:
|
data_selector_fn |
A function that takes two arguments:
data argument to action_fn
|
action_fn |
A function that takes three arguments:
|
sc analysis |
A sc analysis is essentially a plnr analysis with database schemas:
|
plan |
|
task |
This is is the unit that Airflow schedules.
|
2.3 General tasks

Figure 2.1: A general task showing the many options of a task.
Figure 2.1 shows us the full potential of a task.
Data can be read from any sources, then within a plan the data will be extracted once by data_selector_fn
(i.e. “one data-pull”). The data will then be provided to each analysis, which will run action_fn
on:
- The provided data
- The provided argset
- The provided schemas
The action_fn
can then:
- Write data/results to db schemas
- Send emails
- Export graphs, excel files, reports, or other physical files
Typically only a subset of this would be done in a single task.
2.3.1 Plan-heavy or analysis-heavy tasks?
A plan-heavy task is one that has many plans and a few analyses per plan.
An analysis-heavy task is one that has few plans and many analyses per plan.
In general, a data-pull is slow and wastes time. This means that it is preferable to reduce the number of data-pulls performed by having each data-pull extract larger quantities of data. The analysis can then subset the data as required (identifed via argsets). i.e. If possible, an analysis-heavy task is preferable because it will be faster (at the cost of needing more RAM).
Obviously, if a plan’s data-pull is larger, it will use more RAM. If you need to conserve RAM, then you should use a plan-heavy approach.
Figure 2.1 shows only 2 location based analyses, but in reality there are 356 municipalities in Norway in 2021. If figure 2.1 had 2 plans (1 for 2021 data, 1 for 2020 data) and 356 analyses for each plan (1 for each location_code) then we would be taking an analysis-heavy approach.
2.4 Putting it together

Figure 2.2: A typical file setup for an implementation of Sykdomspulsen Core. \(plan_argset_fn\) is rarely used, and is therefore shown as blacked out in the most of the tasks.
Figure 2.2 shows a typical implementation of Sykdomspulsen Core.
config_db.r
contains all of the Sykdomspulsen Core db schemas definitions. i.e. A long list of sc::add_schema_v8
commands.
config_tasks.r
contains all of the task definitions. i.e. A long list of sc::add_task_from_config_v8
commands.
Then we have a one file for each task that contains the action_fn
, data_selector_fn
and other functions that are relevant to the task at hand.
2.5 Weather example
We will now go through an example of how a person would design and implement tasks relating to weather
2.5.1 db schema
As documented in more detail here, we create a db schema that fits our needs (recording weather data).

sc::add_schema_v8(
name_access = c("anon"),
name_grouping = "example_weather",
name_variant = NULL,
db_configs = sc::config$db_configs,
field_types = c(
"granularity_time" = "TEXT",
"granularity_geo" = "TEXT",
"country_iso3" = "TEXT",
"location_code" = "TEXT",
"border" = "INTEGER",
"age" = "TEXT",
"sex" = "TEXT",
"date" = "DATE",
"isoyear" = "INTEGER",
"isoweek" = "INTEGER",
"isoyearweek" = "TEXT",
"season" = "TEXT",
"seasonweek" = "DOUBLE",
"calyear" = "INTEGER",
"calmonth" = "INTEGER",
"calyearmonth" = "TEXT",
"tg" = "DOUBLE",
"tx" = "DOUBLE",
"tn" = "DOUBLE",
"rr" = "DOUBLE"
),
keys = c(
"granularity_time",
"location_code",
"date",
"age",
"sex"
),
censors = list(
anon = list(
)
),
validator_field_types = sc::validator_field_types_sykdomspulsen,
validator_field_contents = sc::validator_field_contents_sykdomspulsen,
info = "This db table is used for..."
)
2.5.2 task_from_config_v8
To “register” our task, we use the RStudio addin task_from_config
.

# tm_run_task("example_weather_import_data_from_api")
sc::add_task_from_config_v8(
name_grouping = "example_weather",
name_action = "import_data_from_api",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL, # "PACKAGE::TASK_NAME_plan_analysis"
for_each_plan = plnr::expand_list(
location_code = "county03" # fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
),
for_each_analysis = NULL,
universal_argset = NULL,
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_weather_import_data_from_api_action",
data_selector_fn_name = "example_weather_import_data_from_api_data_selector",
schema = list(
# input
# output
"anon_example_weather" = sc::config$schemas$anon_example_weather
),
info = "This task does..."
)
There are a number of important things in this code that need highlighting.
2.5.2.1 for_each_plan
for_each_plan
expects a list. Each component of the list will correspond to a plan, with the values added to the argset of all the analyses inside the plan.
For example, the following code would give 4 plans, with 1 analysis per each plan, with each analysis containing argset$var_1
and argset$var_2
as appropriate.
for_each_plan <- list()
for_each_plan[[1]] <- list(
var_1 = 1,
var_2 = "a"
)
for_each_plan[[2]] <- list(
var_1 = 2,
var_2 = "b"
)
for_each_plan[[3]] <- list(
var_1 = 1,
var_2 = "a"
)
for_each_plan[[4]] <- list(
var_1 = 2,
var_2 = "b"
)
You always need at least 1 plan. The most simple plan possible is:
plnr::expand_list(
x = 1
)
## [[1]]
## [[1]]$x
## [1] 1
2.5.2.2 plnr::expand_list
plnr::expand_list
is esentially the same as expand.grid
, except that its return values are lists instead of data.frame.
The code above could be simplified as follows.
for_each_plan <- plnr::expand_list(
var_1 = c(1,2),
var_2 = c("a", "b")
)
for_each_plan
## [[1]]
## [[1]]$var_1
## [1] 1
##
## [[1]]$var_2
## [1] "a"
##
##
## [[2]]
## [[2]]$var_1
## [1] 2
##
## [[2]]$var_2
## [1] "a"
##
##
## [[3]]
## [[3]]$var_1
## [1] 1
##
## [[3]]$var_2
## [1] "b"
##
##
## [[4]]
## [[4]]$var_1
## [1] 2
##
## [[4]]$var_2
## [1] "b"
2.5.2.3 for_each_analysis
for_each_plan
expects a list, which will generate length(for_each_plan)
plans.
for_each_analysis
is the same, except it will generate analyses within each of the plans.
2.5.2.5 upsert_at_end_of_each_plan
If TRUE
and schema
contains a schema called output
, then the returned values of action_fn
will be stored and upserted to schema$output
at the end of each plan.
If you choose to upsert/insert manually from within action_fn
, you can only do so at the end of each analysis.
2.5.2.6 insert_at_end_of_each_plan
If TRUE
and schema
contains a schema called output
, then the returned values of action_fn
will be stored and inserted to schema$output
at the end of each plan.
If you choose to upsert/insert manually from within action_fn
, you can only do so at the end of each analysis.
2.5.3 data_selector_fn
Use the addins dropdown to easily add in boilerplate code.

The data_selector_fn
is used to extract the data for each plan.
The lines inside if(plnr::is_run_directly()){
are used to help developers. You can run the code manually/interactively to “load” the values of argset
and schema
.
index_plan <- 1
argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan)
schema <- sc::tm_get_schema("example_weather_import_data_from_api")
print(argset)
## $`**universal**`
## [1] "*"
##
## $`**plan**`
## [1] "*"
##
## $location_code
## [1] "county03"
##
## $`**analysis**`
## [1] "*"
##
## $`**automatic**`
## [1] "*"
##
## $index
## [1] 1
##
## $today
## [1] "2022-03-01"
##
## $yesterday
## [1] "2022-02-28"
##
## $first_analysis
## [1] TRUE
##
## $first_argset
## [1] TRUE
##
## $last_analysis
## [1] TRUE
##
## $last_argset
## [1] TRUE
print(names(schema))
## [1] "anon_example_weather"
# **** data_selector **** ----
#' example_weather_import_data_from_api (data selector)
#' @param argset Argset
#' @param schema DB Schema
#' @export
example_weather_import_data_from_api_data_selector = function(argset, schema){
if(plnr::is_run_directly()){
# sc::tm_get_plans_argsets_as_dt("example_weather_import_data_from_api")
index_plan <- 1
argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan)
schema <- sc::tm_get_schema("example_weather_import_data_from_api")
}
# find the mid lat/long for the specified location_code
gps <- fhimaps::norway_nuts3_map_b2020_default_dt[location_code == argset$location_code,.(
lat = mean(lat),
long = mean(long)
)]
# download the forecast for the specified location_code
d <- httr::GET(glue::glue("https://api.met.no/weatherapi/locationforecast/2.0/classic?lat={gps$lat}&lon={gps$long}"), httr::content_type_xml())
d <- xml2::read_xml(d$content)
# The variable returned must be a named list
retval <- list(
"data" = d
)
retval
}
2.6 action_fn
The lines inside if(plnr::is_run_directly()){
are used to help developers. You can run the code manually/interactively to “load” the values of argset
and schema
.
index_plan <- 1
index_analysis <- 1
data <- sc::tm_get_data("example_weather_import_data_from_api", index_plan = index_plan)
argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan, index_analysis = index_analysis)
schema <- sc::tm_get_schema("example_weather_import_data_from_api")
print(data)
## $data
## {xml_document}
## <weatherdata noNamespaceSchemaLocation="https://schema.api.met.no/schemas/weatherapi-0.4.xsd" created="2022-03-01T10:49:16Z" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
## [1] <meta>\n <model name="met_public_forecast" termin="2022-03-01T10:00:00Z" runended="2022-03-01T10:10:35Z" nextrun="2022-03-01T11:10:35Z" from=" ...
## [2] <product class="pointData">\n <time datatype="forecast" from="2022-03-01T10:00:00Z" to="2022-03-01T10:00:00Z">\n <location altitude="252" l ...
print(argset)
## $`**universal**`
## [1] "*"
##
## $`**plan**`
## [1] "*"
##
## $location_code
## [1] "county03"
##
## $`**analysis**`
## [1] "*"
##
## $`**automatic**`
## [1] "*"
##
## $index
## [1] 1
##
## $today
## [1] "2022-03-01"
##
## $yesterday
## [1] "2022-02-28"
##
## $first_analysis
## [1] TRUE
##
## $first_argset
## [1] TRUE
##
## $last_analysis
## [1] TRUE
##
## $last_argset
## [1] TRUE
print(names(schema))
## [1] "anon_example_weather"
# **** action **** ----
#' example_weather_import_data_from_api (action)
#' @param data Data
#' @param argset Argset
#' @param schema DB Schema
#' @export
example_weather_import_data_from_api_action <- function(data, argset, schema) {
# tm_run_task("example_weather_import_data_from_api")
if(plnr::is_run_directly()){
# sc::tm_get_plans_argsets_as_dt("example_weather_import_data_from_api")
index_plan <- 1
index_analysis <- 1
data <- sc::tm_get_data("example_weather_import_data_from_api", index_plan = index_plan)
argset <- sc::tm_get_argset("example_weather_import_data_from_api", index_plan = index_plan, index_analysis = index_analysis)
schema <- sc::tm_get_schema("example_weather_import_data_from_api")
}
# code goes here
# special case that runs before everything
if(argset$first_analysis == TRUE){
}
a <- data$data
baz <- xml2::xml_find_all(a, ".//maxTemperature")
res <- vector("list", length = length(baz))
for (i in seq_along(baz)) {
parent <- xml2::xml_parent(baz[[i]])
grandparent <- xml2::xml_parent(parent)
time_from <- xml2::xml_attr(grandparent, "from")
time_to <- xml2::xml_attr(grandparent, "to")
x <- xml2::xml_find_all(parent, ".//minTemperature")
temp_min <- xml2::xml_attr(x, "value")
x <- xml2::xml_find_all(parent, ".//maxTemperature")
temp_max <- xml2::xml_attr(x, "value")
x <- xml2::xml_find_all(parent, ".//precipitation")
precip <- xml2::xml_attr(x, "value")
res[[i]] <- data.frame(
time_from = as.character(time_from),
time_to = as.character(time_to),
tx = as.numeric(temp_max),
tn = as.numeric(temp_min),
rr = as.numeric(precip)
)
}
res <- rbindlist(res)
res <- res[stringr::str_sub(time_from, 12, 13) %in% c("00", "06", "12", "18")]
res[, date := as.Date(stringr::str_sub(time_from, 1, 10))]
res[, N := .N, by = date]
res <- res[N == 4]
res <- res[
,
.(
tg = NA,
tx = max(tx),
tn = min(tn),
rr = sum(rr)
),
keyby = .(date)
]
# we look at the downloaded data
print("Data after downloading")
print(res)
# we now need to format it
res[, granularity_time := "day"]
res[, sex := "total"]
res[, age := "total"]
res[, location_code := argset$location_code]
# fill in missing structural variables
sc::fill_in_missing_v8(res, border = 2020)
# we look at the downloaded data
print("Data after missing structural variables filled in")
print(res)
# put data in db table
# schema$SCHEMA_NAME$insert_data(d)
schema$anon_example_weather$upsert_data(res)
# schema$SCHEMA_NAME$drop_all_rows_and_then_upsert_data(d)
# special case that runs after everything
# copy to anon_web?
if(argset$last_analysis == TRUE){
# sc::copy_into_new_table_where(
# table_from = "anon_X",
# table_to = "anon_webkht"
# )
}
}
2.7 Run the task
tm_run_task("example_weather_import_data_from_api")
## task: example_weather_import_data_from_api
## Running task=example_weather_import_data_from_api with plans=1 and analyses=1
## plans=sequential, argset=sequential with cores=1
##
----------------------------------------------------------------------------------------------------------------] 0/1 ( 0%) in 00:00:00, eta: ?s
[##
================================================================================================================] 1/1 (100%) in 00:00:00, eta: 0s
[## [1] "Data after downloading"
## date tg tx tn rr
## 1: 2022-03-02 NA 6.2 -3.0 0
## 2: 2022-03-03 NA 5.6 -2.3 0
## 3: 2022-03-04 NA 4.7 -3.3 0
## 4: 2022-03-05 NA 4.1 -1.9 0
## 5: 2022-03-06 NA 5.8 -2.9 0
## 6: 2022-03-07 NA 5.0 -2.6 0
## 7: 2022-03-08 NA 4.4 -1.1 0
## 8: 2022-03-09 NA 3.2 -1.4 0
## [1] "Data after missing structural variables filled in"
## date tg tx tn rr granularity_time sex age location_code granularity_geo border isoyearweek season isoyear isoweek seasonweek
## 1: 2022-03-02 NA 6.2 -3.0 0 day total total county03 county 2020 2022-09 2021/2022 2022 9 32
## 2: 2022-03-03 NA 5.6 -2.3 0 day total total county03 county 2020 2022-09 2021/2022 2022 9 32
## 3: 2022-03-04 NA 4.7 -3.3 0 day total total county03 county 2020 2022-09 2021/2022 2022 9 32
## 4: 2022-03-05 NA 4.1 -1.9 0 day total total county03 county 2020 2022-09 2021/2022 2022 9 32
## 5: 2022-03-06 NA 5.8 -2.9 0 day total total county03 county 2020 2022-09 2021/2022 2022 9 32
## 6: 2022-03-07 NA 5.0 -2.6 0 day total total county03 county 2020 2022-10 2021/2022 2022 10 33
## 7: 2022-03-08 NA 4.4 -1.1 0 day total total county03 county 2020 2022-10 2021/2022 2022 10 33
## 8: 2022-03-09 NA 3.2 -1.4 0 day total total county03 county 2020 2022-10 2021/2022 2022 10 33
## calyear calmonth calyearmonth country_iso3
## 1: 2022 3 2022-M03 nor
## 2: 2022 3 2022-M03 nor
## 3: 2022 3 2022-M03 nor
## 4: 2022 3 2022-M03 nor
## 5: 2022 3 2022-M03 nor
## 6: 2022 3 2022-M03 nor
## 7: 2022 3 2022-M03 nor
## 8: 2022 3 2022-M03 nor
## Task ran in 0 mins
2.8 Examples of different types of tasks
2.8.1 Importing data

sc::add_task_from_config_v8(
name_grouping = "example",
name_action = "import_data",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
x = 1
),
for_each_analysis = NULL,
universal_argset = list(
folder = sc::path("input", "example")
),
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_import_data_action",
data_selector_fn_name = "example_import_data_data_selector",
schema = list(
# input
# output
"output" = sc::config$schemas$output
),
info = "This task does..."
)
2.8.2 Analysis

sc::add_task_from_config_v8(
name_grouping = "example",
name_action = "analysis",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
location_code = fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
),
for_each_analysis = NULL,
universal_argset = NULL,
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_analysis_action",
data_selector_fn_name = "example_analysis_data_selector",
schema = list(
# input
"input" = sc::config$schemas$input,
# output
"output" = sc::config$schemas$output
),
info = "This task does..."
)
2.8.3 Exporting multiple sets of results

sc::add_task_from_config_v8(
name_grouping = "example",
name_action = "export_results",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
location_code = fhidata::norway_locations_names()[granularity_geo %in% c("county")]$location_code
),
for_each_analysis = NULL,
universal_argset = list(
folder = sc::path("output", "example")
),
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_export_results_action",
data_selector_fn_name = "example_export_results_data_selector",
schema = list(
# input
"input" = sc::config$schemas$input
# output
),
info = "This task does..."
)
2.8.4 Exporting combined results

sc::add_task_from_config_v8(
name_grouping = "example",
name_action = "export_results",
name_variant = NULL,
cores = 1,
plan_analysis_fn_name = NULL,
for_each_plan = plnr::expand_list(
x = 1
),
for_each_analysis = NULL,
universal_argset = list(
folder = sc::path("output", "example"),
granularity_geos = c("nation", "county")
),
upsert_at_end_of_each_plan = FALSE,
insert_at_end_of_each_plan = FALSE,
action_fn_name = "example_export_results_action",
data_selector_fn_name = "example_export_results_data_selector",
schema = list(
# input
"input" = sc::config$schemas$input
# output
),
info = "This task does..."
)