As your data grows, analytical queries can become increasingly slower. This is where materialized views help optimize performance. They allow you to define transformations on your data that are either applied at the time of insertion (incremental) or periodically (refreshable). API endpoints can then query these materialized pipes directly, resulting in significantly faster execution.

Incremental Materialization

Airfold’s materialized pipes support incremental data transformation by automatically ingesting new data as it arrives. This model ensures your pipeline only processes fresh data ideal for time-series logs, user activity streams, or continuously growing datasets, Enabling near-real-time aggregations and rollups

Note: Incremental Materialized Pipes in Airfold are designed to process only newly arriving data. If historical data already exists in your target tables, you must perform a backfill to ensure completeness.

Backfilling refers to the process of reprocessing or re-ingesting historical data so that it is captured by the incremental materialization logic. Without this step, pre-existing data will be excluded from downstream views and analytics. (Reach out to your solutions team to know more)

UI

Let’s say we want to get the signup, purchases and page_views metrics along with total_events from the web events data. we could run the query below to get the metrics per day.

SELECT
    toDate(timestamp) AS date,
    countState(event_id) AS total_events,
    countIfState(event_type = 'signup') AS signups,
    countIfState(event_type = 'purchase') AS purchases,
    countIfState(event_type = 'page_view') AS page_views
FROM web_events
GROUP BY date

This runs fine for smaller dataset, but as no. of rows increases, the above query will take large amount of time to query all the rows and calculate the metrics. We can use an incremental materialized pipe here, and store the aggregated data on a new source as the data is being inserted and have the API query the new source. This significantly decreases the number of rows queries and hence the time taken for the query to run.

Note the materialized pipe example uses countIfState() instead of countIf().

Airfold uses ClickHouse under the hood which requires appending State to aggregate functions when used in a materialized pipe.

Fortunately, Airfold does it automatically when materializing a draft pipe using af materialize <pipe_name>.

Learn more about Materialization in ClickHouse, about the *State combinator.

CLI

To materialize a draft pipe, set the to field to the name of the target source.

Although materialized pipes themselves cannot be accessed (via FROM) by other pipes, the source they write to is accessible like any other source.

incremental_pipe.yaml
name: agg_logs_mv
description: 'Aggregate logs by level'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Aggregating logs by level
        sql: |
            select
                level,
                countState() as count
            from load
            group by level
to: logs_by_level
name
string

The name of the pipe. Optional

description
string

A brief description of the pipe’s purpose or functionality. Optional

nodes
Node[]
required

The sequence of nodes within the pipe. Each node represents a stage in data processing.

to
string

Specifies the target source for appending the incremental results. When set, it converts the pipe into a materialized pipe. This option cannot be used with publish and requires the final node’s schema to match the provided source schema. Optional

params
Param[]

A list of parameters for parameterizing node SQL queries through Jinja2 templating e.g. {{ param }}. These parameters are passed via the API and can be used to dynamically alter the pipe’s behavior. Optional

Best Practices

Published pipes operate in batch mode and compute their results on read; therefore, they are not suited for intensive data transformations.

Instead, a common pattern is to place a published pipe in front of a materialized pipe. The materialized pipe incrementally transforms data as it is ingested and writes it to the source, enabling the published pipe to instantly access the pre-transformed data during reads.

Refreshable Materialization

UI

Let’s consider an example, if we want to get the top_pages visited by unique visitors per page_url, we can do it by the following command.

SELECT
    page_url,
    count(event_id) AS total_visits,
    uniqExact(user_id) AS unique_visitors
FROM web_events
WHERE event_type = 'page_view'
GROUP BY page_url

This will eventually take a long time to load when the data increases, this computation should be performed on the whole dataset and hence we will use refreshable materialized pipe.

CLI

To create a refreshable materialized pipe using the CLI in Airfold, simply add the refresh block to the YAML definition of the incremental materialized pipe and fill in the refresh strategy and interval.

refreshable_pipe.yaml
description: 'Aggregate logs by level'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Aggregating logs by level
        sql: |
            select
                level,
                countState() as count
            from load
            group by level
to: logs_by_level
refresh:
    strategy: replace
    interval: EVERY 1 MINUTE
name
string

The name of the pipe. Optional

description
string

A brief description of the pipe’s purpose or functionality. Optional

nodes
Node[]
required

The sequence of nodes within the pipe. Each node represents a stage in data processing.

to
string

Specifies the target source for appending the incremental results. When set, it converts the pipe into a materialized pipe. This option cannot be used with publish and requires the final node’s schema to match the provided source schema. Optional

refresh
Refresh[]

A refresh schedule definition. Used in refreshable materialization where the pipe is executed on schedule. Optional

As your data grows, analytical queries can become increasingly slower. This is where materialized views help optimize performance. They allow you to define transformations on your data that are either applied at the time of insertion (incremental) or periodically (refreshable). API endpoints can then query these materialized pipes directly, resulting in significantly faster execution.

Incremental Materialization

Airfold’s materialized pipes support incremental data transformation by automatically ingesting new data as it arrives. This model ensures your pipeline only processes fresh data ideal for time-series logs, user activity streams, or continuously growing datasets, Enabling near-real-time aggregations and rollups

Note: Incremental Materialized Pipes in Airfold are designed to process only newly arriving data. If historical data already exists in your target tables, you must perform a backfill to ensure completeness.

Backfilling refers to the process of reprocessing or re-ingesting historical data so that it is captured by the incremental materialization logic. Without this step, pre-existing data will be excluded from downstream views and analytics. (Reach out to your solutions team to know more)

UI

Let’s say we want to get the signup, purchases and page_views metrics along with total_events from the web events data. we could run the query below to get the metrics per day.

SELECT
    toDate(timestamp) AS date,
    countState(event_id) AS total_events,
    countIfState(event_type = 'signup') AS signups,
    countIfState(event_type = 'purchase') AS purchases,
    countIfState(event_type = 'page_view') AS page_views
FROM web_events
GROUP BY date

This runs fine for smaller dataset, but as no. of rows increases, the above query will take large amount of time to query all the rows and calculate the metrics. We can use an incremental materialized pipe here, and store the aggregated data on a new source as the data is being inserted and have the API query the new source. This significantly decreases the number of rows queries and hence the time taken for the query to run.

Note the materialized pipe example uses countIfState() instead of countIf().

Airfold uses ClickHouse under the hood which requires appending State to aggregate functions when used in a materialized pipe.

Fortunately, Airfold does it automatically when materializing a draft pipe using af materialize <pipe_name>.

Learn more about Materialization in ClickHouse, about the *State combinator.

CLI

To materialize a draft pipe, set the to field to the name of the target source.

Although materialized pipes themselves cannot be accessed (via FROM) by other pipes, the source they write to is accessible like any other source.

incremental_pipe.yaml
name: agg_logs_mv
description: 'Aggregate logs by level'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Aggregating logs by level
        sql: |
            select
                level,
                countState() as count
            from load
            group by level
to: logs_by_level
name
string

The name of the pipe. Optional

description
string

A brief description of the pipe’s purpose or functionality. Optional

nodes
Node[]
required

The sequence of nodes within the pipe. Each node represents a stage in data processing.

to
string

Specifies the target source for appending the incremental results. When set, it converts the pipe into a materialized pipe. This option cannot be used with publish and requires the final node’s schema to match the provided source schema. Optional

params
Param[]

A list of parameters for parameterizing node SQL queries through Jinja2 templating e.g. {{ param }}. These parameters are passed via the API and can be used to dynamically alter the pipe’s behavior. Optional

Best Practices

Published pipes operate in batch mode and compute their results on read; therefore, they are not suited for intensive data transformations.

Instead, a common pattern is to place a published pipe in front of a materialized pipe. The materialized pipe incrementally transforms data as it is ingested and writes it to the source, enabling the published pipe to instantly access the pre-transformed data during reads.

Refreshable Materialization

UI

Let’s consider an example, if we want to get the top_pages visited by unique visitors per page_url, we can do it by the following command.

SELECT
    page_url,
    count(event_id) AS total_visits,
    uniqExact(user_id) AS unique_visitors
FROM web_events
WHERE event_type = 'page_view'
GROUP BY page_url

This will eventually take a long time to load when the data increases, this computation should be performed on the whole dataset and hence we will use refreshable materialized pipe.

CLI

To create a refreshable materialized pipe using the CLI in Airfold, simply add the refresh block to the YAML definition of the incremental materialized pipe and fill in the refresh strategy and interval.

refreshable_pipe.yaml
description: 'Aggregate logs by level'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Aggregating logs by level
        sql: |
            select
                level,
                countState() as count
            from load
            group by level
to: logs_by_level
refresh:
    strategy: replace
    interval: EVERY 1 MINUTE
name
string

The name of the pipe. Optional

description
string

A brief description of the pipe’s purpose or functionality. Optional

nodes
Node[]
required

The sequence of nodes within the pipe. Each node represents a stage in data processing.

to
string

Specifies the target source for appending the incremental results. When set, it converts the pipe into a materialized pipe. This option cannot be used with publish and requires the final node’s schema to match the provided source schema. Optional

refresh
Refresh[]

A refresh schedule definition. Used in refreshable materialization where the pipe is executed on schedule. Optional