Pipes are a sequence of one or more SQL queries (Nodes) that are executed in order, and they result in either a Published or Materialized pipe.

This page dives deeper into the structure and usage of individual pipes. For an introduction to how pipes are composed into pipelines, see pipelines.

UI

Creating Draft pipe

Navigate to “Pipes” on the left menu column, and click on ”+”: Enter a name for this query, then press “Create”.

Here, you have a node where you can enter a single SQL SELECT statement to query your tables directly.

These SQL statements can interact with your tables in the same way as standard SQL tables, with additional AI columns you can leverage for deeper insights.

For example, to get the top referrers as in our quickstart example we can run the following command.

SELECT
  referrer,
  COUNT() AS num_referrers
FROM web_events
GROUP BY referrer
ORDER BY num_referrers DESC
LIMIT 3

We can run this by clicking on “Run”:

This is your Draft pipe, you can run SQL queries against your database for testing sql’s you can use the draft pipe.

Creating Published pipe

Click on “Save” to save this node, and “Publish” to publish it as an endpoint:

That’s it, your draft pipe just became a publihsed pipe, you can go to view endpoints and use any of the Javascript, Python or curl command to ping the endpoint.

Creating Incremental Materialized pipe

Let’s say we want to get the signup, purchases and page_views metrics along with total_events from the web events data. we could run the query below to get the metrics per day.

SELECT
    toDate(timestamp) AS date,
    countState(event_id) AS total_events,
    countIfState(event_type = 'signup') AS signups,
    countIfState(event_type = 'purchase') AS purchases,
    countIfState(event_type = 'page_view') AS page_views
FROM web_events
GROUP BY date

This runs fine for smaller dataset, but as no. of rows increases, the above query will take large amount of time to query all the rows and calculate the metrics. We can use an incremental materialized pipe here, and store the aggregated data on a new source as the data is being inserted and have the API query the new source. This significantly decreases the number of rows queries and hence the time taken for the query to run.

Note the materialized pipe example uses countIfState() instead of countIf().

Airfold uses ClickHouse under the hood which requires appending State to aggregate functions when used in a materialized pipe.

Fortunately, Airfold does it automatically when materializing a draft pipe using af materialize <pipe_name>.

Learn more about Materialization in ClickHouse, about the *State combinator.

Creating Refreshable Materialized pipe

Let’s consider example, if we want to get the top_pages visited by unique visitors per page_url, we can do it by the following command.

SELECT
    page_url,
    count(event_id) AS total_visits,
    uniqExact(user_id) AS unique_visitors
FROM web_events
WHERE event_type = 'page_view'
GROUP BY page_url

This will eventually take a long time to load when the data increases, this computation should be performed on the whole dataset and hence we will use refreshable materialized pipe.

CLI

Creating Draft pipe

These are the starting point for all pipes, similar to database views in functionality.

They cannot be referenced by other pipes using the FROM clause.

Draft pipes are temporary and primarily used for development. Once finalized, a draft pipe can be transitioned into either a published or a materialized pipe, but not both.

draft_pipe.yaml
description: 'Load logs and get errors'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Getting errors
        sql: select message from load where level = 'error'
description
string

A brief description of the pipe’s purpose or functionality. Optional.

nodes
Node[]
required

The sequence of nodes within the pipe. Each node represents a stage in data processing.

to
string

Specifies the target source for appending the incremental results. When set, it converts the pipe into a materialized pipe. This option cannot be used with publish and requires the final node’s schema to match the provided source schema. Optional.

publish
string

The endpoint name where the results are published, turning the pipe into a published pipe. This option cannot be used with to. Optional.

params
Param[]

A list of parameters for parameterizing node SQL queries through Jinja2 templating e.g. {{ param }}. These parameters are passed via the API and can be used to dynamically alter the pipe’s behavior. Optional.

refresh
Refresh[]

A refresh schedule definition. Used in refreshable materialization where the pipe is executed on schedule. Optional.

Creating Published pipes

This creates an API endpoint that serves the result of your pipe, the result is accessible in JSON, NDJSON, CSV, and Parquet formats.

To publish a draft pipe, assign an endpoint name to the publish field.

Published pipes can parameterize their SQL using Jinja2 via the params field.

Unlike draft pipes, published pipes can be accessed (via FROM) by other pipes.

published_pipe.yaml
description: 'Get errors based on level'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Getting errors
        sql: select message from load where level = {{ level }}
publish: level
params:
    - name: level
      type: string
      default: error

Creating Incremental Materialized pipe

Differing from the batch nature of draft and published pipes, materialized pipes stand out for their ability to incrementally transform data as it is ingested and append their results to a designated source.

To materialize a draft pipe, set the to field to the name of the target source.

Although materialized pipes themselves cannot be accessed (via FROM) by other pipes, the source they write to is accessible like any other source.

incremental_pipe.yaml
description: 'Aggregate logs by level'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Aggregating logs by level
        sql: |
            select
                level,
                countState() as count
            from load
            group by level
to: logs_by_level

Note the materialized pipe example uses countState() instead of count().

Airfold uses ClickHouse under the hood which requires appending State to aggregate functions when used in a materialized pipe.

Fortunately, Airfold does it automatically when materializing a draft pipe using af materialize <pipe_name>.

Learn more about Materialization in ClickHouse, about the *State combinator.

Best Practices

Published pipes operate in batch mode and compute their results on read; therefore, they are not suited for intensive data transformations.

Instead, a common pattern is to place a published pipe in front of a materialized pipe. The materialized pipe incrementally transforms data as it is ingested and writes it to the source, enabling the published pipe to instantly access the pre-transformed data during reads.

Creating Refreshable Materialized pipe

Some sources are batch in nature, like prices table or products table. And may be refreshed from a connector or any other external process.

To work with these sources efficiently and create transformations/queries use a "refreshable" materialization.

refreshable_pipe.yaml
description: 'Aggregate logs by level'
nodes:
    - load:
        description: Loading logs
        sql: select timestamp, level, file, message from logs
    - error:
        description: Aggregating logs by level
        sql: |
            select
                level,
                countState() as count
            from load
            group by level
to: logs_by_level
refresh:
    strategy: replace
    interval: EVERY 1 MINUTE

Push

Push pipes to your workspace using the CLI command af push.

For example, to push draft_pipe.yaml:

af push draft_pipe.yaml