SQL aggregation functions can be computationally expensive when applied to large data sets. As data sets grow, recalculating metrics for the entire data set repeatedly becomes inefficient. To address this challenge, incremental aggregation Often used: a method that involves maintaining a previous state and updating it with new incoming data. While this approach is simple for aggregations like COUNT or SUM, the question arises: how can it be applied to more complex metrics like standard deviation?
Standard deviation It is a statistical metric that measures the degree of variation or dispersion of the values of a variable in relation to its mean.
It is obtained by taking the square root of difference.
The formula to calculate the variance of a sample is the following:
Calculating the standard deviation can be complex, as it involves updating both the mean and the sum of the squared differences across all data points. However, with algebraic manipulation, we can derive a formula for incremental calculation, allowing updates using an existing data set and incorporating new data seamlessly. This approach avoids recalculating from scratch every time new data is added, making the process much more efficient (a detailed derivation is available on my GitHub).
The formula was basically divided into 3 parts:
1. The existing set weighted variance
2. The weighted variance of the new set.
3. The variance of the difference in means, which represents the variance between groups.
This method allows the calculation of incremental variance by retaining the COUNT (k), AVG (μk) and VAR (Sk) from the existing set, and combining them with the COUNT (n), AVG (μn) and VAR (Sn) from the new one. set. As a result, the updated standard deviation can be calculated efficiently without rescanning the entire data set.
Now that we have understood the mathematics behind incremental standard deviation (or at least grasped its essence), let's dive into the implementation of dbt SQL. In the following example, we'll see how to set up an incremental model to calculate and update these statistics for a user's transaction data.
Consider a transaction table called stg__transactionswhich tracks user transactions (events). Our goal is to create a static temporary table, int__user_tx_statewhich adds the “state” of users' transactions. The column details of both tables are provided in the following image.
To make the process efficient, our goal is to update the state table incrementally by combining data from new incoming transactions with existing aggregate data (i.e., the current state of the user). This approach allows us to calculate the user's updated state without scanning all the historical data.
The following code assumes an understanding of some dbt concepts; if you're not familiar with it, you may still be able to understand the code, although I highly recommend checking it out dbt incremental guide or read this amazing post.
We will build a complete SQL dbt step by step, aiming to compute incremental aggregations efficiently without repeatedly scanning the entire table. The process begins by defining the model as incremental in dbt and using unique_key
to update existing rows instead of inserting new ones.
-- depends_on: {{ ref('stg__transactions') }}
{{ config(materialized='incremental', unique_key=('USER_ID'), incremental_strategy='merge') }}
Next, we retrieve records from the stg__transactions table.
He is_incremental
block filters out transactions with timestamps after the user's last update, effectively including “new transactions only”.
WITH NEW_USER_TX_DATA AS (
SELECT
USER_ID,
TX_ID,
TX_TIMESTAMP,
TX_VALUE
FROM {{ ref('stg__transactions') }}
{% if is_incremental() %}
WHERE TX_TIMESTAMP > COALESCE((select max(UPDATED_AT) from {{ this }}), 0::TIMESTAMP_NTZ)
{% endif %}
)
After retrieving the new transaction logs, we aggregate them per user, allowing us to incrementally update the state of each user in subsequent CTEs.
INCREMENTAL_USER_TX_DATA AS (
SELECT
USER_ID,
MAX(TX_TIMESTAMP) AS UPDATED_AT,
COUNT(TX_VALUE) AS INCREMENTAL_COUNT,
AVG(TX_VALUE) AS INCREMENTAL_AVG,
SUM(TX_VALUE) AS INCREMENTAL_SUM,
COALESCE(STDDEV(TX_VALUE), 0) AS INCREMENTAL_STDDEV,
FROM
NEW_USER_TX_DATA
GROUP BY
USER_ID
)
Now we come to the heavy part where we need to calculate the aggregations. When we are not in incremental mode (i.e. we don't have “state” rows yet) we simply select the new aggregations.
NEW_USER_CULMULATIVE_DATA AS (
SELECT
NEW_DATA.USER_ID,
{% if not is_incremental() %}
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_DATA.INCREMENTAL_COUNT AS COUNT_TX,
NEW_DATA.INCREMENTAL_AVG AS AVG_TX,
NEW_DATA.INCREMENTAL_SUM AS SUM_TX,
NEW_DATA.INCREMENTAL_STDDEV AS STDDEV_TX
{% else %}
...
But when we are in incremental mode, we need to join previous data and combine it with the new data we created in the INCREMENTAL_USER_TX_DATA
CTE based on the formula described above.
We start by calculating the new SUM, COUNT and AVG:
...
{% else %}
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) AS _n, -- this is n
NEW_DATA.INCREMENTAL_COUNT AS _k, -- this is k
COALESCE(EXISTING_USER_DATA.SUM_TX, 0) + NEW_DATA.INCREMENTAL_SUM AS NEW_SUM_TX, -- new sum
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) + NEW_DATA.INCREMENTAL_COUNT AS NEW_COUNT_TX, -- new count
NEW_SUM_TX / NEW_COUNT_TX AS AVG_TX, -- new avg
...
We then calculate the three parts of the variance formula.
1. The existing weighted variance, which is truncated to 0 if the previous set is composed of one or fewer elements:
...
CASE
WHEN _n > 1 THEN (((_n - 1) / (NEW_COUNT_TX - 1)) * POWER(COALESCE(EXISTING_USER_DATA.STDDEV_TX, 0), 2))
ELSE 0
END AS EXISTING_WEIGHTED_VARIANCE, -- existing weighted variance
...
2. The incremental weighted variance in the same way:
...
CASE
WHEN _k > 1 THEN (((_k - 1) / (NEW_COUNT_TX - 1)) * POWER(NEW_DATA.INCREMENTAL_STDDEV, 2))
ELSE 0
END AS INCREMENTAL_WEIGHTED_VARIANCE, -- incremental weighted variance
...
3. The mean difference variance, as described above, along with SQL join terms to include past data.
...
POWER((COALESCE(EXISTING_USER_DATA.AVG_TX, 0) - NEW_DATA.INCREMENTAL_AVG), 2) AS MEAN_DIFF_SQUARED,
CASE
WHEN NEW_COUNT_TX = 1 THEN 0
ELSE (_n * _k) / (NEW_COUNT_TX * (NEW_COUNT_TX - 1))
END AS BETWEEN_GROUP_WEIGHT, -- between group weight
BETWEEN_GROUP_WEIGHT * MEAN_DIFF_SQUARED AS MEAN_DIFF_VARIANCE, -- mean diff variance
EXISTING_WEIGHTED_VARIANCE + INCREMENTAL_WEIGHTED_VARIANCE + MEAN_DIFF_VARIANCE AS VARIANCE_TX,
CASE
WHEN _n = 0 THEN NEW_DATA.INCREMENTAL_STDDEV -- no "past" data
WHEN _k = 0 THEN EXISTING_USER_DATA.STDDEV_TX -- no "new" data
ELSE SQRT(VARIANCE_TX) -- stddev (which is the root of variance)
END AS STDDEV_TX,
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_SUM_TX AS SUM_TX,
NEW_COUNT_TX AS COUNT_TX
{% endif %}
FROM
INCREMENTAL_USER_TX_DATA new_data
{% if is_incremental() %}
LEFT JOIN
{{ this }} EXISTING_USER_DATA
ON
NEW_DATA.USER_ID = EXISTING_USER_DATA.USER_ID
{% endif %}
)
Finally, we select the columns of the table, taking into account both the incremental and non-incremental cases:
SELECT
USER_ID,
UPDATED_AT,
COUNT_TX,
SUM_TX,
AVG_TX,
STDDEV_TX
FROM NEW_USER_CULMULATIVE_DATA
Combining all these steps, we arrive at the final SQL model:
-- depends_on: {{ ref('stg__initial_table') }}
{{ config(materialized='incremental', unique_key=('USER_ID'), incremental_strategy='merge') }}
WITH NEW_USER_TX_DATA AS (
SELECT
USER_ID,
TX_ID,
TX_TIMESTAMP,
TX_VALUE
FROM {{ ref('stg__initial_table') }}
{% if is_incremental() %}
WHERE TX_TIMESTAMP > COALESCE((select max(UPDATED_AT) from {{ this }}), 0::TIMESTAMP_NTZ)
{% endif %}
),
INCREMENTAL_USER_TX_DATA AS (
SELECT
USER_ID,
MAX(TX_TIMESTAMP) AS UPDATED_AT,
COUNT(TX_VALUE) AS INCREMENTAL_COUNT,
AVG(TX_VALUE) AS INCREMENTAL_AVG,
SUM(TX_VALUE) AS INCREMENTAL_SUM,
COALESCE(STDDEV(TX_VALUE), 0) AS INCREMENTAL_STDDEV,
FROM
NEW_USER_TX_DATA
GROUP BY
USER_ID
),NEW_USER_CULMULATIVE_DATA AS (
SELECT
NEW_DATA.USER_ID,
{% if not is_incremental() %}
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_DATA.INCREMENTAL_COUNT AS COUNT_TX,
NEW_DATA.INCREMENTAL_AVG AS AVG_TX,
NEW_DATA.INCREMENTAL_SUM AS SUM_TX,
NEW_DATA.INCREMENTAL_STDDEV AS STDDEV_TX
{% else %}
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) AS _n, -- this is n
NEW_DATA.INCREMENTAL_COUNT AS _k, -- this is k
COALESCE(EXISTING_USER_DATA.SUM_TX, 0) + NEW_DATA.INCREMENTAL_SUM AS NEW_SUM_TX, -- new sum
COALESCE(EXISTING_USER_DATA.COUNT_TX, 0) + NEW_DATA.INCREMENTAL_COUNT AS NEW_COUNT_TX, -- new count
NEW_SUM_TX / NEW_COUNT_TX AS AVG_TX, -- new avg
CASE
WHEN _n > 1 THEN (((_n - 1) / (NEW_COUNT_TX - 1)) * POWER(COALESCE(EXISTING_USER_DATA.STDDEV_TX, 0), 2))
ELSE 0
END AS EXISTING_WEIGHTED_VARIANCE, -- existing weighted variance
CASE
WHEN _k > 1 THEN (((_k - 1) / (NEW_COUNT_TX - 1)) * POWER(NEW_DATA.INCREMENTAL_STDDEV, 2))
ELSE 0
END AS INCREMENTAL_WEIGHTED_VARIANCE, -- incremental weighted variance
POWER((COALESCE(EXISTING_USER_DATA.AVG_TX, 0) - NEW_DATA.INCREMENTAL_AVG), 2) AS MEAN_DIFF_SQUARED,
CASE
WHEN NEW_COUNT_TX = 1 THEN 0
ELSE (_n * _k) / (NEW_COUNT_TX * (NEW_COUNT_TX - 1))
END AS BETWEEN_GROUP_WEIGHT, -- between group weight
BETWEEN_GROUP_WEIGHT * MEAN_DIFF_SQUARED AS MEAN_DIFF_VARIANCE,
EXISTING_WEIGHTED_VARIANCE + INCREMENTAL_WEIGHTED_VARIANCE + MEAN_DIFF_VARIANCE AS VARIANCE_TX,
CASE
WHEN _n = 0 THEN NEW_DATA.INCREMENTAL_STDDEV -- no "past" data
WHEN _k = 0 THEN EXISTING_USER_DATA.STDDEV_TX -- no "new" data
ELSE SQRT(VARIANCE_TX) -- stddev (which is the root of variance)
END AS STDDEV_TX,
NEW_DATA.UPDATED_AT AS UPDATED_AT,
NEW_SUM_TX AS SUM_TX,
NEW_COUNT_TX AS COUNT_TX
{% endif %}
FROM
INCREMENTAL_USER_TX_DATA new_data
{% if is_incremental() %}
LEFT JOIN
{{ this }} EXISTING_USER_DATA
ON
NEW_DATA.USER_ID = EXISTING_USER_DATA.USER_ID
{% endif %}
)
SELECT
USER_ID,
UPDATED_AT,
COUNT_TX,
SUM_TX,
AVG_TX,
STDDEV_TX
FROM NEW_USER_CULMULATIVE_DATA
Throughout this process, we demonstrate how to effectively handle incremental and non-incremental modes, leveraging mathematical techniques to update metrics such as variance and standard deviation efficiently. By seamlessly combining historical and new data, we achieve a streamlined and scalable approach to real-time data aggregation.
In this article, we explore the mathematical technique for incrementally calculating standard deviation and how to implement it using dbt's incremental models. This approach proves to be very efficient and allows processing of large data sets without the need to rescan the entire data set. In practice, this leads to faster, more scalable systems that can handle real-time updates efficiently. If you would like to talk more about this or share your thoughts, please feel free to contact us. I'd love to hear your opinions!