Properties

Below is an example of a pipe.yaml file that defines a pipeline to load logs, filter error messages, and group them by level for aggregation:

description: 'Aggregate error logs by level'
nodes:
    - load:
        description: Load raw logs
        sql: SELECT timestamp, level, file, message FROM logs
    - filter:
        description: Filter error messages
        sql: SELECT * FROM load WHERE level = 'error'
    - aggregate:
        description: Count errors by level
        sql: |
            SELECT
                level,
                count(*) AS error_count
            FROM filter
            GROUP BY level
publish: error_aggregation
params:
    - name: log_level
      type: string
      default: error
description
string

Provides a brief summary of the pipeline’s purpose or functionality. Optional.

nodes
Node[]
required

The sequence of nodes within the pipe. Each node represents a stage in the data processing workflow.

publish
string

Specifies that this pipe is a published pipe, creating an API endpoint for accessing its results. The value defines the endpoint name. Optional.

params
Param[]

A list of parameters for dynamic SQL queries, allowing customization of pipeline behavior through Jinja2 templates.