Building Pipelines

LogPulse ETL Pipelines let you build visual data processing workflows using a drag-and-drop editor. Connect 18 node types across 6 categories to extract data from external sources, transform and enrich it, control execution flow, and load results into LogPulse — all without writing code (though full JavaScript support is available when you need it).

Overview

The pipeline editor provides a full-featured visual canvas built on ReactFlow. Drag nodes from the palette, connect them with edges, and configure each node's behavior through intuitive forms. Pipelines can be triggered on a schedule, via HTTP ingestion, or manually.

Visual Drag & Drop

Build pipelines visually by dragging nodes from the palette and connecting them with edges.

18 Node Types

Extract, transform, route, and load data with purpose-built nodes across 6 categories.

Real-Time Execution

Watch pipelines execute in real-time with per-node status, timing, and payload inspection.

Version History

Every save creates a version snapshot. Review history, compare changes, and roll back anytime.

Getting Started

Follow these steps to create your first pipeline:

1

Create a Pipeline

Navigate to ETL Pipelines in the sidebar and click "New Pipeline". Give it a name and optional description.

2

Add a Start Node

A start node is automatically added. This is the entry point of your pipeline.

3

Build Your Flow

Drag nodes from the palette on the left and connect them by drawing edges between handles.

4

Configure Nodes

Click any node to open its configuration panel. Set URLs, mappings, conditions, and more.

5

Test & Activate

Create a test event, run a test execution, inspect results, then activate the pipeline.

Tip: Use the snap-to-grid feature (enabled by default) to keep your pipeline visually organized. The grid snaps at 20px intervals.

Node Categories

Nodes are organized into 6 color-coded categories in the palette. Each category serves a specific purpose in the data processing flow.

CategoryColorNode TypesDescription
TriggerEmeraldstartEntry point for pipeline execution
ExtractBluehttpRequest, splunkSearchPull data from external sources
TransformViolettransform, fieldOperations, json, csv, redactMask, mapCommonSchema, lookupModify, parse, and enrich data
Flow ControlAmbercondition, loop, loopStart, loopEnd, loopBreakControl execution flow with branching and iteration
LoadCyanlogpulseIngestSend data to LogPulse or external destinations
UtilitySlatelog, errorDebug logging and error handling

Drag any node from the palette onto the canvas to add it. Connect nodes by clicking an output handle (right side) and dragging to an input handle (left side).

Trigger Nodes

Every pipeline starts with a trigger node that defines how and when the pipeline executes.

startStart Node

The start node is the entry point. It defines the trigger mode and holds test event references for debugging. Every pipeline requires exactly one start node.

Pipelines support three trigger modes:

ModeDescriptionConfiguration
ScheduledRuns on a cron scheduleCron expression + timezone (e.g., */5 * * * * UTC)
HTTP IngestionTriggered by incoming API key eventsLink pipeline to an API key
ManualRun on demand via UI or APINo configuration needed
Note: When a pipeline is linked to an API key for HTTP ingestion, the scheduled trigger mode is automatically disabled. A pipeline can only use one trigger mode at a time.

Extract Nodes

Extract nodes pull data from external sources into your pipeline. They support dynamic configuration, authentication, and retry logic.

HTTP Request

httpRequest

Make HTTP requests to any external API. Supports all standard methods, custom headers, query parameters, request bodies, and advanced retry/redirect handling. All string fields accept Dynamic Values for runtime interpolation.

SettingTypeDescription
URLDynamicValueTarget URL for the request
MethodDynamicValueGET, POST, PUT, PATCH, or DELETE
HeadersKey-Value pairsCustom request headers (DynamicValue per header)
Query ParamsKey-Value pairsURL query parameters (DynamicValue per param)
BodyDynamicValueRequest body (for POST/PUT/PATCH)
TimeoutNumberRequest timeout in ms (default: 30000)
RetriesNumberNumber of retry attempts on failure
Retry DelayNumberDelay between retries in ms
Follow RedirectsBooleanFollow HTTP redirects (default: true)
Max RedirectsNumberMaximum redirects to follow (default: 5)
Validate SSLBooleanValidate SSL certificates (default: true)
Output PathStringWhere to store the response in the payload
Warning: HTTP requests are protected against SSRF attacks. Requests to private IP ranges, localhost, and cloud metadata endpoints are automatically blocked.

Authentication

Both HTTP Request and Splunk Search nodes support multiple authentication methods. Credentials can use Dynamic Values to reference environment variables securely.

Auth TypeFieldsDescription
NoneNo authentication
BasicUsername, PasswordHTTP Basic authentication (Base64 encoded)
BearerTokenBearer token in Authorization header
API KeyHeader Name, ValueCustom header with API key (default: X-API-Key)
Multi-StepSteps[], Token RulesSequential auth flow for OAuth/complex scenarios

Multi-step authentication enables complex OAuth and token-based flows. Define sequential steps that extract values from responses and use them in subsequent requests:

Multi-Step Auth Example
Step 1: POST /auth/login
  Body: { "username": "{{envVar:API_USER}}", "password": "{{envVar:API_PASS}}" }
  Extract: body → $.access_token → accessToken

Step 2: GET /api/data
  Header: Authorization = Bearer {{accessToken}}
  Extract: body → $.results → results

Each auth step can extract values from three sources:

SourceDescriptionExample
bodyExtract from JSON response body$.access_token
headerExtract from response headerX-Session-Id
cookieExtract from response cookiesession_cookie

Transform Nodes

Transform nodes modify, parse, enrich, and reshape data as it flows through the pipeline. They range from simple field operations to advanced JavaScript transformations and schema mapping.

Transform

transform

The general-purpose transform node supports both a simple mapping mode and an advanced JavaScript mode for complex transformations.

Simple Mode

In simple mode, define field mappings with a target key and value. Each mapping supports four value types:

Value TypeDescriptionExample
stringStatic string value"production"
numberNumeric value42
booleanTrue or falsetrue
expressionReference payload datapayload.user.name

Advanced Mode (JavaScript)

In advanced mode, write JavaScript code that receives the input payload and returns the transformed result. Three variables are available that all reference the input payload: data, payload, and $ (shorthand).

JavaScript Transform Example
// Variables: data, payload, $ (all reference input payload)
const enriched = {
  ...data,
  processed: true,
  timestamp: new Date().toISOString(),
  severity: data.level === 'error' ? 'high' : 'normal',
  tags: [...(data.tags || []), 'processed']
};
return enriched;

Field Operations

fieldOperations

Apply structured field-level operations without writing code. Supports five operation types that can be chained in sequence.

OperationFieldsDescription
addfield, value, valueTypeAdd a new field with a static or dynamic value
removefieldRemove a field from the payload
renamefield, newNameRename a field
keepfields[]Keep only the specified fields, remove all others
coercefield, targetTypeConvert field to string, number, boolean, or array
Field Operations Example
Operation 1: add    → field: "env", value: "production", type: string
Operation 2: rename → field: "msg", newName: "message"
Operation 3: remove → field: "debug_info"
Operation 4: coerce → field: "status_code", targetType: number

JSON & CSV Parsing

jsoncsv

Dedicated nodes for parsing and formatting structured data formats. Use these when you need to convert between strings and structured objects.

JSON Node

ModeDescription
parseConvert a JSON string to an object. Specify input path (source string) and output path.
stringifyConvert an object to a JSON string. Specify input path (source object) and output path.

CSV Node

SettingDescription
Modeparse (CSV → array of objects) or format (objects → CSV string)
DelimiterField separator character (default: comma)
Has HeadersFirst row contains field names (default: true)
Input PathPath to the CSV string or array in payload
Output PathWhere to store the parsed/formatted result

Redact / Mask

redactMask

Automatically detect and mask personally identifiable information (PII) and sensitive data before it reaches storage. Supports 8 built-in detection patterns and custom regex rules.

Built-in Patterns

PatternDefault ActionExample
EmailReplace → [EMAIL][email protected] → [EMAIL]
PhoneReplace → [PHONE]+1-555-0123 → [PHONE]
Credit CardMask (keep last 4)4111-1111-1111-1234 → ****-****-****-1234
SSNReplace → ***-**-****123-45-6789 → ***-**-****
IP AddressReplace → [IP]192.168.1.1 → [IP]
JWT TokenReplace → [JWT]eyJhbGciOi... → [JWT]
API KeyReplace → [API_KEY]sk-abc123... → [API_KEY]
IBANMask (keep first 4)NL91ABNA0417164300 → NL91**************

Redaction Actions

ActionDescriptionOptions
maskReplace characters with mask charactermaskChar (default: *), style: fixed / keepLastN / keepFirstN, keepN
hashSHA-256 hash of matched valueIrreversible, consistent hash
removeDelete the field entirely
replaceReplace with custom textCustom replacement string

Custom Rules

Define your own detection rules using regex patterns or field path matching for data types not covered by the built-in presets.

Match TypeDescriptionExample
regexRegular expression pattern matching/\b[A-Z]{2}\d{6}\b/ (passport numbers)
fieldPathMatch specific field pathsuser.email, request.headers.authorization
Tip: Enable the audit option to include redaction statistics in the output. This logs which fields were redacted and how many matches were found, useful for compliance reporting.

Schema Mapping

mapCommonSchema

Map incoming data to standard or custom schemas. This ensures consistent field naming across different data sources and enables compatibility with downstream tools.

Schema Templates

TemplateDescription
ECSElastic Common Schema — standard field naming for Elasticsearch/OpenSearch
CIMCommon Information Model — Splunk-compatible schema
OTELOpenTelemetry — vendor-neutral observability standard
LogPulseNative LogPulse data models — custom models defined in your organization
CustomUser-defined mappings without a base template

The mapping mode determines how missing fields are handled:

ModeDescription
PermissiveMissing source fields are skipped. Partial output is produced without errors.
StrictAll required fields must be present. Pipeline errors if required fields are missing.

Fields can be coerced to specific types during mapping:

TypeDescription
autoAutomatic type detection based on value
stringConvert to string
intConvert to integer
floatConvert to floating-point number
booleanConvert to boolean
Note: Enable "Keep Original" to merge mapped fields with the original payload instead of replacing it. This is useful when you want to add standard field names alongside existing ones.

Lookup

lookup

Enrich events by matching a field value against a lookup dataset. This is useful for adding context like hostname-to-team mappings, IP geolocation data, or CMDB asset information.

SettingDescription
DatasetSelect a lookup dataset from your organization
Input Key PathDot-notation path to the lookup key in your payload
Lookup Key FieldField name in the dataset to match against
Output ModeHow to output matched data (see below)
Field MappingsOptional: map specific lookup fields to event fields

Output Modes

ModeDescription
mergeIntoEventMerge all lookup fields into the event payload
writeToTargetPathWrite lookup data to a specific path in the payload
embedLookupEmbed the full lookup record as a nested object

Flow Control

Flow control nodes let you add conditional branching and iteration to your pipelines, enabling complex routing logic and array processing.

Condition

condition

Route data to different branches based on field values. Each condition creates a separate output handle on the node. Events are routed to the first matching condition, or to the 'else' branch if none match.

Operators

OperatorSymbolDescriptionValue Required
eq==EqualsYes
neq!=Not equalsYes
gt>Greater thanYes
lt<Less thanYes
gte>=Greater than or equalYes
lte<=Less than or equalYes
containscontainsContains substringYes
not_containsnot containsDoes not containYes
regexregexMatches regular expressionYes
existsexistsField exists in payloadNo
not_existsnot existsField does not existNo

Multiple conditions can be defined, each with its own output handle and label. The 'else' branch catches all events that don't match any condition.

Advanced Mode (JavaScript)

In advanced mode, write JavaScript that returns an array of branch IDs (strings) or a single string. The event will be routed to the specified branch(es).

JavaScript Condition Example
// Must return an array of branch IDs or a single string
if (data.level === 'error' && data.status_code >= 500) {
  return ['critical-errors'];
} else if (data.level === 'error') {
  return ['errors'];
}
return ['default'];

Loops

looploopStartloopEndloopBreak

Iterate over arrays in the payload. The loop node processes each array element through the nodes between loopStart and loopEnd, making the current item available as a variable.

SettingDescription
Array PathDot-notation path to the array to iterate (e.g., payload.items)
Item VariableVariable name for each iteration item (default: item)
Continue on ErrorSkip failed items instead of halting the loop (default: false)

Loop-related nodes work together:

NodePurpose
loopDefines the loop — configure array path and item variable
loopStartStart of each iteration body (auto-created inside loop)
loopEndEnd of each iteration body (auto-created inside loop)
loopBreakExit the loop early based on a condition
Tip: Use 'Continue on Error' for resilient pipelines that should process remaining items even if one fails. The loopBreak node is useful for search-and-find patterns where you want to stop after finding a match.

Load Nodes

Load nodes are the final destination of your pipeline — they send processed data to LogPulse for indexing and search.

LogPulse Ingest

logpulseIngest

Send processed events to LogPulse for indexing. Configure the source type, target index, and field mappings to control how events appear in search results.

SettingDefaultDescription
Source Typeetl_pipelineValue for the sourcetype field in LogPulse
IndexmainTarget index for ingested events
HostHost name to tag events with
SourceAuto-generatedSource identifier (defaults to pipeline name)
Message PathpayloadPath to the event message/body in payload
Timestamp PathPath to timestamp field (auto-detected if empty)
Level PathPath to log level field (auto-detected if empty)
Field MappingOptional custom field name mappings

Utility Nodes

Utility nodes help with debugging and error handling during pipeline development and execution.

log

Output a debug message to the pipeline run log. The message can reference payload fields. Useful for inspecting intermediate data during development.

error

Stop pipeline execution immediately and record an error. Use this in condition branches to halt processing when invalid data is detected.

Dynamic Values

Most string fields in node configuration support Dynamic Values — a system that lets you reference runtime data instead of hardcoding values. This makes pipelines reusable and configurable.

TypeSyntaxDescriptionExample
staticPlain textLiteral string or number"https://api.example.com"
flowVariableSelect variableReference a pipeline variablevars.apiUrl
envVariable{{envVar:NAME}}Reference an environment variable{{envVar:API_KEY}}
expressionDot-notationReference payload data dynamicallypayload.user.id
Note: Use environment variables for secrets (API keys, passwords) so they never appear in pipeline configuration. Environment variables are stored encrypted and injected at runtime.

Pipeline Settings

Pipeline-level settings control how and when your pipeline runs, as well as debugging and testing features.

Trigger Modes

Configure how your pipeline is triggered. Each pipeline supports one active trigger mode.

Scheduled Execution

Run the pipeline automatically on a cron schedule. The pipeline worker checks schedules approximately every minute. The pipeline must be active and scheduled runs must be enabled.

SettingDescription
EnabledToggle scheduled runs on/off
Cron ExpressionStandard 5-field cron (e.g., */5 * * * * for every 5 minutes)
TimezoneTimezone for the cron schedule (default: UTC)
Cron Examples
*/5 * * * *      Every 5 minutes
0 * * * *        Every hour
0 9 * * 1-5      Weekdays at 9:00 AM
0 0 * * *        Daily at midnight
0 */6 * * *      Every 6 hours

HTTP Ingestion

Link the pipeline to an API key. Events sent via the API key are automatically routed through the pipeline before being ingested. This is ideal for processing incoming log data in real-time.

Note: When an API key is linked, scheduled execution is automatically disabled. Only one trigger mode can be active at a time.

Active / Inactive

The Active toggle is a master switch. When inactive, all triggers (scheduled, HTTP, webhook) are disabled. You can still edit the pipeline and run manual test executions.

Debug Mode

Debug mode captures real incoming events and stores them as test events. This is useful for creating representative test data from actual production traffic. Disable when done to avoid storing unnecessary data.

Test Events

Test events let you run your pipeline with sample data without affecting production. Create test events manually or capture them from live traffic using debug mode.

FeatureDescription
Create Test EventDefine a name and JSON payload to use for testing
Auto-CaptureEnable debug mode to capture real incoming events as test events
Select & RunChoose a test event from the start node and execute the pipeline
View ResultsInspect input/output payloads at each node after a test run
Tip: Enable debug mode temporarily to capture a few real events, then disable it and use those captured events for testing. This gives you realistic test data without any manual effort.

Execution & Tracking

Every pipeline run is tracked with detailed metadata, node-level execution logs, and real-time status updates via WebSocket.

Run Status

StatusDescription
pendingRun is queued and waiting to start
runningPipeline is actively executing
successPipeline completed without errors
failedPipeline encountered an error and stopped

Node Execution Details

Each node in a run is tracked individually, allowing you to pinpoint exactly where failures occurred and inspect the data at each step.

Tracked DataDescription
Statuspending, running, success, failed, or skipped
DurationExecution time in milliseconds
Input PayloadData received by the node
Output PayloadData produced by the node
Error DetailsError message and stack trace (if failed)

Real-Time Updates

The pipeline editor connects via WebSocket to receive live updates during execution. Nodes light up as they execute, showing status, duration, and errors in real-time.

EventDescription
etl:run:startedPipeline execution has begun
etl:run:finishedPipeline completed successfully
etl:run:failedPipeline failed with an error
etl:node:startedA node has started executing
etl:node:finishedA node has completed
etl:node:failedA node has failed
Note: WebSocket events are scoped to your pipeline. You only receive updates for pipelines you are viewing. The connection is automatically established when you open the pipeline editor.

Versioning

Pipeline configurations are automatically versioned. Every save creates a new version with a complete snapshot of nodes, edges, and variables. This provides a full audit trail and the ability to roll back to any previous state.

FeatureDescription
Auto-saveA new version is created each time you save the pipeline
Version HistoryView all previous versions with timestamps
CommentsAdd optional comments when saving a version
Full SnapshotEach version stores the complete flow (nodes, edges, variables)
RollbackRestore a previous version to undo changes
Tip: Add descriptive comments when saving versions to make it easier to identify changes later. This is especially useful when collaborating with team members.

API Reference

All pipeline operations are available via the REST API. All endpoints require authentication and are scoped to the current organization.

Pipeline Endpoints

MethodEndpointDescription
GET/api/v1/etl-pipelinesList all pipelines for the organization
POST/api/v1/etl-pipelinesCreate a new pipeline
GET/api/v1/etl-pipelines/:idGet pipeline details and flow configuration
PUT/api/v1/etl-pipelines/:idUpdate pipeline configuration
DELETE/api/v1/etl-pipelines/:idDelete a pipeline
PATCH/api/v1/etl-pipelines/:id/activateActivate or deactivate a pipeline

Execution Endpoints

MethodEndpointDescription
POST/api/v1/etl-pipelines/:id/testRun a test execution with a test event
GET/api/v1/etl-pipelines/:id/runsList pipeline execution history
GET/api/v1/etl-pipelines/:id/runs/:runIdGet details of a specific run
GET/api/v1/etl-pipelines/:id/runs/:runId/nodesGet node-level execution details

Version Endpoints

MethodEndpointDescription
GET/api/v1/etl-pipelines/:id/versionsList all versions of a pipeline
POST/api/v1/etl-pipelines/:id/versionsSave a new version with optional comment

Test Event Endpoints

MethodEndpointDescription
GET/api/v1/etl-pipelines/:id/test-eventsList test events for a pipeline
POST/api/v1/etl-pipelines/:id/test-eventsCreate a new test event
DELETE/api/v1/etl-pipelines/:id/test-events/:eventIdDelete a test event