Building Pipelines
LogPulse ETL Pipelines let you build visual data processing workflows using a drag-and-drop editor. Connect 18 node types across 6 categories to extract data from external sources, transform and enrich it, control execution flow, and load results into LogPulse — all without writing code (though full JavaScript support is available when you need it).
Overview
The pipeline editor provides a full-featured visual canvas built on ReactFlow. Drag nodes from the palette, connect them with edges, and configure each node's behavior through intuitive forms. Pipelines can be triggered on a schedule, via HTTP ingestion, or manually.
Visual Drag & Drop
Build pipelines visually by dragging nodes from the palette and connecting them with edges.
18 Node Types
Extract, transform, route, and load data with purpose-built nodes across 6 categories.
Real-Time Execution
Watch pipelines execute in real-time with per-node status, timing, and payload inspection.
Version History
Every save creates a version snapshot. Review history, compare changes, and roll back anytime.
Getting Started
Follow these steps to create your first pipeline:
Create a Pipeline
Navigate to ETL Pipelines in the sidebar and click "New Pipeline". Give it a name and optional description.
Add a Start Node
A start node is automatically added. This is the entry point of your pipeline.
Build Your Flow
Drag nodes from the palette on the left and connect them by drawing edges between handles.
Configure Nodes
Click any node to open its configuration panel. Set URLs, mappings, conditions, and more.
Test & Activate
Create a test event, run a test execution, inspect results, then activate the pipeline.
Node Categories
Nodes are organized into 6 color-coded categories in the palette. Each category serves a specific purpose in the data processing flow.
| Category | Color | Node Types | Description |
|---|---|---|---|
| Trigger | Emerald | start | Entry point for pipeline execution |
| Extract | Blue | httpRequest, splunkSearch | Pull data from external sources |
| Transform | Violet | transform, fieldOperations, json, csv, redactMask, mapCommonSchema, lookup | Modify, parse, and enrich data |
| Flow Control | Amber | condition, loop, loopStart, loopEnd, loopBreak | Control execution flow with branching and iteration |
| Load | Cyan | logpulseIngest | Send data to LogPulse or external destinations |
| Utility | Slate | log, error | Debug logging and error handling |
Drag any node from the palette onto the canvas to add it. Connect nodes by clicking an output handle (right side) and dragging to an input handle (left side).
Trigger Nodes
Every pipeline starts with a trigger node that defines how and when the pipeline executes.
The start node is the entry point. It defines the trigger mode and holds test event references for debugging. Every pipeline requires exactly one start node.
Pipelines support three trigger modes:
| Mode | Description | Configuration |
|---|---|---|
| Scheduled | Runs on a cron schedule | Cron expression + timezone (e.g., */5 * * * * UTC) |
| HTTP Ingestion | Triggered by incoming API key events | Link pipeline to an API key |
| Manual | Run on demand via UI or API | No configuration needed |
Extract Nodes
Extract nodes pull data from external sources into your pipeline. They support dynamic configuration, authentication, and retry logic.
HTTP Request
Make HTTP requests to any external API. Supports all standard methods, custom headers, query parameters, request bodies, and advanced retry/redirect handling. All string fields accept Dynamic Values for runtime interpolation.
| Setting | Type | Description |
|---|---|---|
| URL | DynamicValue | Target URL for the request |
| Method | DynamicValue | GET, POST, PUT, PATCH, or DELETE |
| Headers | Key-Value pairs | Custom request headers (DynamicValue per header) |
| Query Params | Key-Value pairs | URL query parameters (DynamicValue per param) |
| Body | DynamicValue | Request body (for POST/PUT/PATCH) |
| Timeout | Number | Request timeout in ms (default: 30000) |
| Retries | Number | Number of retry attempts on failure |
| Retry Delay | Number | Delay between retries in ms |
| Follow Redirects | Boolean | Follow HTTP redirects (default: true) |
| Max Redirects | Number | Maximum redirects to follow (default: 5) |
| Validate SSL | Boolean | Validate SSL certificates (default: true) |
| Output Path | String | Where to store the response in the payload |
Splunk Search
Execute SPL (Search Processing Language) queries against a Splunk instance and pull results into your pipeline. Useful for migrating data from Splunk or building cross-platform workflows.
| Setting | Type | Description |
|---|---|---|
| Host | DynamicValue | Splunk server hostname |
| Port | DynamicValue | Splunk management port (default: 8089) |
| Protocol | https | http | Connection protocol (default: https) |
| Auth Type | basic | token | Authentication method |
| Query | DynamicValue | SPL search query string |
| Earliest Time | String | Search time range start (e.g., -24h) |
| Latest Time | String | Search time range end (e.g., now) |
| Max Results | Number | Maximum result count (default: 1000) |
| Timeout | Number | Search timeout in ms |
| Output Path | String | Where to store search results |
Authentication
Both HTTP Request and Splunk Search nodes support multiple authentication methods. Credentials can use Dynamic Values to reference environment variables securely.
| Auth Type | Fields | Description |
|---|---|---|
| None | — | No authentication |
| Basic | Username, Password | HTTP Basic authentication (Base64 encoded) |
| Bearer | Token | Bearer token in Authorization header |
| API Key | Header Name, Value | Custom header with API key (default: X-API-Key) |
| Multi-Step | Steps[], Token Rules | Sequential auth flow for OAuth/complex scenarios |
Multi-step authentication enables complex OAuth and token-based flows. Define sequential steps that extract values from responses and use them in subsequent requests:
Step 1: POST /auth/login
Body: { "username": "{{envVar:API_USER}}", "password": "{{envVar:API_PASS}}" }
Extract: body → $.access_token → accessToken
Step 2: GET /api/data
Header: Authorization = Bearer {{accessToken}}
Extract: body → $.results → resultsEach auth step can extract values from three sources:
| Source | Description | Example |
|---|---|---|
| body | Extract from JSON response body | $.access_token |
| header | Extract from response header | X-Session-Id |
| cookie | Extract from response cookie | session_cookie |
Transform Nodes
Transform nodes modify, parse, enrich, and reshape data as it flows through the pipeline. They range from simple field operations to advanced JavaScript transformations and schema mapping.
Transform
The general-purpose transform node supports both a simple mapping mode and an advanced JavaScript mode for complex transformations.
Simple Mode
In simple mode, define field mappings with a target key and value. Each mapping supports four value types:
| Value Type | Description | Example |
|---|---|---|
| string | Static string value | "production" |
| number | Numeric value | 42 |
| boolean | True or false | true |
| expression | Reference payload data | payload.user.name |
Advanced Mode (JavaScript)
In advanced mode, write JavaScript code that receives the input payload and returns the transformed result. Three variables are available that all reference the input payload: data, payload, and $ (shorthand).
// Variables: data, payload, $ (all reference input payload)
const enriched = {
...data,
processed: true,
timestamp: new Date().toISOString(),
severity: data.level === 'error' ? 'high' : 'normal',
tags: [...(data.tags || []), 'processed']
};
return enriched;Field Operations
Apply structured field-level operations without writing code. Supports five operation types that can be chained in sequence.
| Operation | Fields | Description |
|---|---|---|
| add | field, value, valueType | Add a new field with a static or dynamic value |
| remove | field | Remove a field from the payload |
| rename | field, newName | Rename a field |
| keep | fields[] | Keep only the specified fields, remove all others |
| coerce | field, targetType | Convert field to string, number, boolean, or array |
Operation 1: add → field: "env", value: "production", type: string
Operation 2: rename → field: "msg", newName: "message"
Operation 3: remove → field: "debug_info"
Operation 4: coerce → field: "status_code", targetType: numberJSON & CSV Parsing
Dedicated nodes for parsing and formatting structured data formats. Use these when you need to convert between strings and structured objects.
JSON Node
| Mode | Description |
|---|---|
| parse | Convert a JSON string to an object. Specify input path (source string) and output path. |
| stringify | Convert an object to a JSON string. Specify input path (source object) and output path. |
CSV Node
| Setting | Description |
|---|---|
| Mode | parse (CSV → array of objects) or format (objects → CSV string) |
| Delimiter | Field separator character (default: comma) |
| Has Headers | First row contains field names (default: true) |
| Input Path | Path to the CSV string or array in payload |
| Output Path | Where to store the parsed/formatted result |
Redact / Mask
Automatically detect and mask personally identifiable information (PII) and sensitive data before it reaches storage. Supports 8 built-in detection patterns and custom regex rules.
Built-in Patterns
| Pattern | Default Action | Example |
|---|---|---|
| Replace → [EMAIL] | [email protected] → [EMAIL] | |
| Phone | Replace → [PHONE] | +1-555-0123 → [PHONE] |
| Credit Card | Mask (keep last 4) | 4111-1111-1111-1234 → ****-****-****-1234 |
| SSN | Replace → ***-**-**** | 123-45-6789 → ***-**-**** |
| IP Address | Replace → [IP] | 192.168.1.1 → [IP] |
| JWT Token | Replace → [JWT] | eyJhbGciOi... → [JWT] |
| API Key | Replace → [API_KEY] | sk-abc123... → [API_KEY] |
| IBAN | Mask (keep first 4) | NL91ABNA0417164300 → NL91************** |
Redaction Actions
| Action | Description | Options |
|---|---|---|
| mask | Replace characters with mask character | maskChar (default: *), style: fixed / keepLastN / keepFirstN, keepN |
| hash | SHA-256 hash of matched value | Irreversible, consistent hash |
| remove | Delete the field entirely | — |
| replace | Replace with custom text | Custom replacement string |
Custom Rules
Define your own detection rules using regex patterns or field path matching for data types not covered by the built-in presets.
| Match Type | Description | Example |
|---|---|---|
| regex | Regular expression pattern matching | /\b[A-Z]{2}\d{6}\b/ (passport numbers) |
| fieldPath | Match specific field paths | user.email, request.headers.authorization |
Schema Mapping
Map incoming data to standard or custom schemas. This ensures consistent field naming across different data sources and enables compatibility with downstream tools.
Schema Templates
| Template | Description |
|---|---|
| ECS | Elastic Common Schema — standard field naming for Elasticsearch/OpenSearch |
| CIM | Common Information Model — Splunk-compatible schema |
| OTEL | OpenTelemetry — vendor-neutral observability standard |
| LogPulse | Native LogPulse data models — custom models defined in your organization |
| Custom | User-defined mappings without a base template |
The mapping mode determines how missing fields are handled:
| Mode | Description |
|---|---|
| Permissive | Missing source fields are skipped. Partial output is produced without errors. |
| Strict | All required fields must be present. Pipeline errors if required fields are missing. |
Fields can be coerced to specific types during mapping:
| Type | Description |
|---|---|
| auto | Automatic type detection based on value |
| string | Convert to string |
| int | Convert to integer |
| float | Convert to floating-point number |
| boolean | Convert to boolean |
Lookup
Enrich events by matching a field value against a lookup dataset. This is useful for adding context like hostname-to-team mappings, IP geolocation data, or CMDB asset information.
| Setting | Description |
|---|---|
| Dataset | Select a lookup dataset from your organization |
| Input Key Path | Dot-notation path to the lookup key in your payload |
| Lookup Key Field | Field name in the dataset to match against |
| Output Mode | How to output matched data (see below) |
| Field Mappings | Optional: map specific lookup fields to event fields |
Output Modes
| Mode | Description |
|---|---|
| mergeIntoEvent | Merge all lookup fields into the event payload |
| writeToTargetPath | Write lookup data to a specific path in the payload |
| embedLookup | Embed the full lookup record as a nested object |
Flow Control
Flow control nodes let you add conditional branching and iteration to your pipelines, enabling complex routing logic and array processing.
Condition
Route data to different branches based on field values. Each condition creates a separate output handle on the node. Events are routed to the first matching condition, or to the 'else' branch if none match.
Operators
| Operator | Symbol | Description | Value Required |
|---|---|---|---|
| eq | == | Equals | Yes |
| neq | != | Not equals | Yes |
| gt | > | Greater than | Yes |
| lt | < | Less than | Yes |
| gte | >= | Greater than or equal | Yes |
| lte | <= | Less than or equal | Yes |
| contains | contains | Contains substring | Yes |
| not_contains | not contains | Does not contain | Yes |
| regex | regex | Matches regular expression | Yes |
| exists | exists | Field exists in payload | No |
| not_exists | not exists | Field does not exist | No |
Multiple conditions can be defined, each with its own output handle and label. The 'else' branch catches all events that don't match any condition.
Advanced Mode (JavaScript)
In advanced mode, write JavaScript that returns an array of branch IDs (strings) or a single string. The event will be routed to the specified branch(es).
// Must return an array of branch IDs or a single string
if (data.level === 'error' && data.status_code >= 500) {
return ['critical-errors'];
} else if (data.level === 'error') {
return ['errors'];
}
return ['default'];Loops
Iterate over arrays in the payload. The loop node processes each array element through the nodes between loopStart and loopEnd, making the current item available as a variable.
| Setting | Description |
|---|---|
| Array Path | Dot-notation path to the array to iterate (e.g., payload.items) |
| Item Variable | Variable name for each iteration item (default: item) |
| Continue on Error | Skip failed items instead of halting the loop (default: false) |
Loop-related nodes work together:
| Node | Purpose |
|---|---|
| loop | Defines the loop — configure array path and item variable |
| loopStart | Start of each iteration body (auto-created inside loop) |
| loopEnd | End of each iteration body (auto-created inside loop) |
| loopBreak | Exit the loop early based on a condition |
Load Nodes
Load nodes are the final destination of your pipeline — they send processed data to LogPulse for indexing and search.
LogPulse Ingest
Send processed events to LogPulse for indexing. Configure the source type, target index, and field mappings to control how events appear in search results.
| Setting | Default | Description |
|---|---|---|
| Source Type | etl_pipeline | Value for the sourcetype field in LogPulse |
| Index | main | Target index for ingested events |
| Host | — | Host name to tag events with |
| Source | Auto-generated | Source identifier (defaults to pipeline name) |
| Message Path | payload | Path to the event message/body in payload |
| Timestamp Path | — | Path to timestamp field (auto-detected if empty) |
| Level Path | — | Path to log level field (auto-detected if empty) |
| Field Mapping | — | Optional custom field name mappings |
Utility Nodes
Utility nodes help with debugging and error handling during pipeline development and execution.
Output a debug message to the pipeline run log. The message can reference payload fields. Useful for inspecting intermediate data during development.
Stop pipeline execution immediately and record an error. Use this in condition branches to halt processing when invalid data is detected.
Dynamic Values
Most string fields in node configuration support Dynamic Values — a system that lets you reference runtime data instead of hardcoding values. This makes pipelines reusable and configurable.
| Type | Syntax | Description | Example |
|---|---|---|---|
| static | Plain text | Literal string or number | "https://api.example.com" |
| flowVariable | Select variable | Reference a pipeline variable | vars.apiUrl |
| envVariable | {{envVar:NAME}} | Reference an environment variable | {{envVar:API_KEY}} |
| expression | Dot-notation | Reference payload data dynamically | payload.user.id |
Pipeline Settings
Pipeline-level settings control how and when your pipeline runs, as well as debugging and testing features.
Trigger Modes
Configure how your pipeline is triggered. Each pipeline supports one active trigger mode.
Scheduled Execution
Run the pipeline automatically on a cron schedule. The pipeline worker checks schedules approximately every minute. The pipeline must be active and scheduled runs must be enabled.
| Setting | Description |
|---|---|
| Enabled | Toggle scheduled runs on/off |
| Cron Expression | Standard 5-field cron (e.g., */5 * * * * for every 5 minutes) |
| Timezone | Timezone for the cron schedule (default: UTC) |
*/5 * * * * Every 5 minutes
0 * * * * Every hour
0 9 * * 1-5 Weekdays at 9:00 AM
0 0 * * * Daily at midnight
0 */6 * * * Every 6 hoursHTTP Ingestion
Link the pipeline to an API key. Events sent via the API key are automatically routed through the pipeline before being ingested. This is ideal for processing incoming log data in real-time.
Active / Inactive
The Active toggle is a master switch. When inactive, all triggers (scheduled, HTTP, webhook) are disabled. You can still edit the pipeline and run manual test executions.
Debug Mode
Debug mode captures real incoming events and stores them as test events. This is useful for creating representative test data from actual production traffic. Disable when done to avoid storing unnecessary data.
Test Events
Test events let you run your pipeline with sample data without affecting production. Create test events manually or capture them from live traffic using debug mode.
| Feature | Description |
|---|---|
| Create Test Event | Define a name and JSON payload to use for testing |
| Auto-Capture | Enable debug mode to capture real incoming events as test events |
| Select & Run | Choose a test event from the start node and execute the pipeline |
| View Results | Inspect input/output payloads at each node after a test run |
Execution & Tracking
Every pipeline run is tracked with detailed metadata, node-level execution logs, and real-time status updates via WebSocket.
Run Status
| Status | Description |
|---|---|
| pending | Run is queued and waiting to start |
| running | Pipeline is actively executing |
| success | Pipeline completed without errors |
| failed | Pipeline encountered an error and stopped |
Node Execution Details
Each node in a run is tracked individually, allowing you to pinpoint exactly where failures occurred and inspect the data at each step.
| Tracked Data | Description |
|---|---|
| Status | pending, running, success, failed, or skipped |
| Duration | Execution time in milliseconds |
| Input Payload | Data received by the node |
| Output Payload | Data produced by the node |
| Error Details | Error message and stack trace (if failed) |
Real-Time Updates
The pipeline editor connects via WebSocket to receive live updates during execution. Nodes light up as they execute, showing status, duration, and errors in real-time.
| Event | Description |
|---|---|
| etl:run:started | Pipeline execution has begun |
| etl:run:finished | Pipeline completed successfully |
| etl:run:failed | Pipeline failed with an error |
| etl:node:started | A node has started executing |
| etl:node:finished | A node has completed |
| etl:node:failed | A node has failed |
Versioning
Pipeline configurations are automatically versioned. Every save creates a new version with a complete snapshot of nodes, edges, and variables. This provides a full audit trail and the ability to roll back to any previous state.
| Feature | Description |
|---|---|
| Auto-save | A new version is created each time you save the pipeline |
| Version History | View all previous versions with timestamps |
| Comments | Add optional comments when saving a version |
| Full Snapshot | Each version stores the complete flow (nodes, edges, variables) |
| Rollback | Restore a previous version to undo changes |
API Reference
All pipeline operations are available via the REST API. All endpoints require authentication and are scoped to the current organization.
Pipeline Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/etl-pipelines | List all pipelines for the organization |
| POST | /api/v1/etl-pipelines | Create a new pipeline |
| GET | /api/v1/etl-pipelines/:id | Get pipeline details and flow configuration |
| PUT | /api/v1/etl-pipelines/:id | Update pipeline configuration |
| DELETE | /api/v1/etl-pipelines/:id | Delete a pipeline |
| PATCH | /api/v1/etl-pipelines/:id/activate | Activate or deactivate a pipeline |
Execution Endpoints
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/etl-pipelines/:id/test | Run a test execution with a test event |
| GET | /api/v1/etl-pipelines/:id/runs | List pipeline execution history |
| GET | /api/v1/etl-pipelines/:id/runs/:runId | Get details of a specific run |
| GET | /api/v1/etl-pipelines/:id/runs/:runId/nodes | Get node-level execution details |
Version Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/etl-pipelines/:id/versions | List all versions of a pipeline |
| POST | /api/v1/etl-pipelines/:id/versions | Save a new version with optional comment |
Test Event Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/etl-pipelines/:id/test-events | List test events for a pipeline |
| POST | /api/v1/etl-pipelines/:id/test-events | Create a new test event |
| DELETE | /api/v1/etl-pipelines/:id/test-events/:eventId | Delete a test event |