In this section, we’ll go over pipes and nodes, the core components of your data pipelines. Pipes define the flow of data transformation, while nodes represent individual steps in the process.

To see how to create a query, refer to pipes.

Pipes

Pipes represent a sequence of connected nodes, where each node performs a specific SQL operation. A pipe defines the flow of data transformations.

What Is a Pipe?

A pipe can be thought of as a data processing workflow. It takes your input data, processes it through one or more nodes, and outputs the results. Pipes are flexible and can be used to:

  • Load and transform raw data
  • Aggregate and summarize data
  • Build advanced analytics pipelines using AI columns

A pipe consists of:

  • Nodes: Individual SQL statements that represent discrete steps in data processing
  • Flow: The order in which the nodes are executed, where each node can reference the output of the previous one

Example

Here’s a simple draft pipe example for loading logs and extracting error messages:

description: 'Load logs and get errors'
nodes:
    - load:
        description: Loading logs
        sql: SELECT timestamp, level, file, message FROM logs
    - error:
        description: Getting errors
        sql: SELECT message FROM load WHERE level = 'error'

Pipe Description:

  • description: Provides context about the pipe’s purpose (e.g., “Load logs and get errors”)

Nodes in Sequence:

  • load Node: Loads raw data from the logs table, selecting timestamp, level, file, and message
  • error Node: Filters the output from the load node to extract only error messages

Flow:

  • The error node references the results of the load node (FROM load), creating a sequential transformation

Types of Pipes

Draft Pipes

Draft pipes are temporary pipelines used for development and testing:

  • Similar to database views; they do not persist results
  • Cannot be referenced by other pipes using FROM
  • Serve as the foundation for published or materialized pipes

Use Case: Iterative development and testing of SQL queries

Publihsed Pipes

Published pipes create API endpoints that expose query results in multiple formats (JSON, CSV):

  • Accessible via endpoints and usable in other pipes
  • Support parameterized SQL using Jinja2 templates
  • Results are computed on read (batch processing)

Example:

description: 'Get errors based on level'
nodes:
    - load:
        description: Loading logs
        sql: SELECT timestamp, level, file, message FROM logs
    - error:
        description: Filter errors by level
        sql: SELECT message FROM load WHERE level = {{ level }}
publish: level
params:
    - name: level
      type: string
      default: error

Use Case: Building reusable pipelines that other queries or systems can consume

Materialized Pipes

Materialized pipes incrementally transform and store results in a target source:

  • Results are appended to a designated source for faster access
  • Designed for incremental updates
  • Results persist, unlike batch processing

Example:

description: 'Aggregate logs by level'
nodes:
    - load:
        description: Load logs
        sql: SELECT timestamp, level, message FROM logs
    - aggregate:
        description: Count logs by level
        sql: |
            SELECT
                level,
                countState() AS count
            FROM load
            GROUP BY level
to: logs_by_level

Use Case: Real-time pipelines that write pre-aggregated results to a source


Nodes

Nodes are the individual building blocks of a pipe, representing a single SQL operation. Each node operates on a dataset, transforming it or creating new insights, and passes the results to the next node in the pipe.

How Nodes Work

1. Input: A node takes data as input from a source, a preceding node, or a published pipe

2. SQL Query: Each node contains a SQL SELECT statement that defines its specific operation

3. Output: The result of the SQL query becomes the input for the next node in the pipe

Example

load Node:

- load:
    description: Loading logs
    sql: SELECT timestamp, level, file, message FROM logs
  • description: Provides a summary of the node’s function
  • sql: Fetches raw data from the logs table

Full Pipeline

Here’s a complete data pipeline that:

  • Loads log data
  • Filters for error messages
  • Aggregates error counts by level
description: 'Analyze logs'
nodes:
    - load:
        description: Load raw logs
        sql: SELECT timestamp, level, message FROM logs
    - filter:
        description: Filter error messages
        sql: SELECT * FROM load WHERE level = 'error'
    - aggregate:
        description: Count errors by level
        sql: |
            SELECT
                level,
                COUNT(*) AS error_count
            FROM filter
            GROUP BY level
  • load: Loads raw data from the logs table
  • filter: Filters the loaded data to include only error messages
  • aggregate: Aggregates the filtered data, counting the number of errors for each level

Output: A table with level and error_count columns

Next Steps

Publish your query as an endpoint.