Pipeline¶
Robust orchestration engine for building, executing, and managing complex data processing workflows.
🎯 Overview¶
The Pipeline Module provides a robust orchestration engine for building, executing, and managing complex data processing workflows. It enables you to create reusable, scalable pipelines with error handling, parallel execution, and resource management.
What is Pipeline Orchestration?¶
Pipeline orchestration is the process of coordinating multiple processing steps into a workflow. The Pipeline module enables: - DAG Construction: Build directed acyclic graphs (DAGs) of processing steps - Parallel Execution: Run independent steps simultaneously - Error Handling: Retry, fallback, and recovery strategies - Resource Management: CPU and memory allocation - Progress Tracking: Monitor pipeline execution
Why Use the Pipeline Module?¶
- Complex Workflows: Coordinate multi-step data processing
- Reusability: Create reusable pipeline templates
- Reliability: Built-in error handling and retry logic
- Performance: Parallel execution for faster processing
- Monitoring: Track progress and performance
- Scalability: Handle large-scale data processing
How It Works¶
- Pipeline Definition: Define steps and their dependencies
- Validation: Validate pipeline structure (no cycles, valid dependencies)
- Execution: Execute steps in dependency order
- Parallelization: Run independent steps in parallel
- Error Handling: Retry failed steps, apply fallbacks
- Monitoring: Track progress and resource usage
-
Pipeline Builder
Fluent API for constructing complex DAG workflows
-
Execution Engine
Robust execution with status tracking and progress monitoring
-
Error Handling
Configurable retry policies, fallbacks, and error recovery
-
Parallel Execution
Execute independent steps in parallel for maximum performance
-
Resource Scheduling
Manage CPU/Memory allocation for resource-intensive tasks
-
Templates
Pre-built templates for common workflows (ETL, GraphRAG)
When to Use
- ETL Workflows: Ingest -> Parse -> Split -> Embed -> Store
- Graph Construction: Extract Entities -> Extract Relations -> Build Graph
- Batch Processing: Processing large volumes of documents reliably
⚙️ Algorithms Used¶
Execution Management¶
Purpose: Manage pipeline execution order and state tracking.
How it works:
- DAG Topological Sort: Determines execution order of steps based on dependencies
- State Management: Tracks
`PENDING`,`RUNNING`,`COMPLETED`,`FAILED`states - Checkpointing: Saves intermediate results to allow resuming failed pipelines
Parallelism¶
Purpose: Execute independent steps concurrently for maximum performance.
How it works:
- ThreadPoolExecutor: For I/O-bound tasks (network requests, DB writes)
- ProcessPoolExecutor: For CPU-bound tasks (parsing, embedding generation)
- Dependency Resolution: Identifies steps that can run concurrently
Error Handling¶
Purpose: Robust error recovery with configurable retry policies.
How it works:
- Exponential Backoff:
`wait = base * (factor ^ attempt)` - Jitter: Randomization to prevent thundering herd problem
- Circuit Breaker: Stops execution after threshold failures to prevent cascading issues
Resource Scheduling¶
Purpose: Manage CPU/Memory allocation for resource-intensive tasks.
How it works:
- Token Bucket: Rate limiting for API calls
- Semaphore: Concurrency limiting for resource constraints
- Priority Queue: Scheduling critical tasks first
API Reference¶
Types¶
Pipeline— Pipeline definition dataclassPipelineStep— Pipeline step definition dataclassStepStatus— Enum:pending,running,completed,failed,skippedExecutionResult— Execution result dataclassPipelineStatus— Enum:pending,running,paused,completed,failed,stoppedValidationResult— Validation result dataclassRetryPolicy— Retry policy dataclassRetryStrategy— Enum:linear,exponential,fixedErrorSeverity— Enum:low,medium,high,criticalFailureRecovery— Failure recovery dataclassTask— Parallel task dataclassParallelExecutionResult— Parallel execution result dataclassResourceType— Enum:cpu,gpu,memory,disk,networkResource— Resource definition dataclassResourceAllocation— Resource allocation record dataclass
PipelineBuilder¶
Fluent interface for constructing pipelines.
Methods:
add_step(step_name, step_type, **config)— Add a stepconnect_steps(from_step, to_step, **options)— Add dependencyset_parallelism(level)— Configure parallelismbuild(name="default_pipeline")— Build pipelinebuild_pipeline(pipeline_config, **options)— Build from dictregister_step_handler(step_type, handler)— Register handlerget_step(step_name)— Get step by nameserialize(format="json")— Serialize builder statevalidate_pipeline()— Validate pipeline
Example:
from semantica.pipeline import PipelineBuilder
builder = (
PipelineBuilder()
.add_step("ingest", "ingest", handler=ingest_handler)
.add_step("parse", "parse", dependencies=["ingest"], handler=parse_handler)
.add_step("embed", "embed", dependencies=["parse"], model="text-embedding-3-large")
.set_parallelism(2)
)
pipeline = builder.build(name="MyPipeline")
step = builder.get_step("parse")
serialized = builder.serialize(format="json")
validation = builder.validate_pipeline()
PipelineSerializer¶
Serialization utilities for pipelines.
Methods:
serialize_pipeline(pipeline, format="json")deserialize_pipeline(serialized_pipeline, **options)version_pipeline(pipeline, version_info)
Example:
from semantica.pipeline import PipelineBuilder, PipelineSerializer
builder = PipelineBuilder()
pipeline = builder.add_step("step1", "type1").build()
serializer = PipelineSerializer()
serialized = serializer.serialize_pipeline(pipeline, format="json")
restored = serializer.deserialize_pipeline(serialized)
versioned = serializer.version_pipeline(restored, {"version": "1.1"})
ExecutionEngine¶
Executes pipelines and manages lifecycle.
Methods:
execute_pipeline(pipeline, data=None, **options)— Run pipelinepause_pipeline(pipeline_id)— Pause executionresume_pipeline(pipeline_id)— Resume executionstop_pipeline(pipeline_id)— Stop executionget_pipeline_status(pipeline_id)— Get statusget_progress(pipeline_id)— Get progress
Example:
from semantica.pipeline import ExecutionEngine
engine = ExecutionEngine(max_workers=4)
result = engine.execute_pipeline(pipeline, data={"path": "document.pdf"})
status = engine.get_pipeline_status(pipeline.name)
progress = engine.get_progress(pipeline.name)
engine.pause_pipeline(pipeline.name)
engine.resume_pipeline(pipeline.name)
engine.stop_pipeline(pipeline.name)
Failure Handling¶
Classes: FailureHandler, RetryHandler, FallbackHandler, ErrorRecovery
FailureHandler Methods:
handle_step_failure(step, error, **options)classify_error(error)set_retry_policy(step_type, policy)get_retry_policy(step_type)retry_failed_step(step, error, **options)get_error_history(step_name=None)clear_error_history()
Example:
from semantica.pipeline import FailureHandler, RetryPolicy, RetryStrategy
handler = FailureHandler(default_max_retries=3, default_backoff_factor=2.0)
policy = RetryPolicy(max_retries=5, strategy=RetryStrategy.EXPONENTIAL, initial_delay=1.0)
handler.set_retry_policy("network", policy)
classification = handler.classify_error(RuntimeError("timeout"))
history_before = handler.get_error_history()
handler.clear_error_history()
Parallelism¶
Classes: ParallelismManager, ParallelExecutor
ParallelismManager Methods:
execute_parallel(tasks, **options)execute_pipeline_steps_parallel(steps, data, **options)identify_parallelizable_steps(pipeline)optimize_parallel_execution(pipeline, available_workers)
Example:
from semantica.pipeline import ParallelismManager, Task, ParallelExecutor
def work(x):
return x * 2
manager = ParallelismManager(max_workers=4)
tasks = [Task(task_id=f"t{i}", handler=work, args=(i,)) for i in range(4)]
results = manager.execute_parallel(tasks)
executor = ParallelExecutor(max_workers=2)
exec_results = executor.execute_parallel(tasks)
Resources¶
Class: ResourceScheduler
Methods:
allocate_resources(pipeline, **options)allocate_cpu(cores, pipeline_id, step_name=None)allocate_memory(memory_gb, pipeline_id, step_name=None)allocate_gpu(device_id, pipeline_id, step_name=None)release_resources(allocations)get_resource_usage()optimize_resource_allocation(pipeline, **options)
Example:
from semantica.pipeline import ResourceScheduler, ResourceType
scheduler = ResourceScheduler()
cpu = scheduler.allocate_cpu(cores=2, pipeline_id="p1")
mem = scheduler.allocate_memory(memory_gb=1.0, pipeline_id="p1")
usage = scheduler.get_resource_usage()
scheduler.release_resources({cpu.allocation_id: cpu, mem.allocation_id: mem})
Validation¶
Class: PipelineValidator
Methods:
validate_pipeline(pipeline_or_builder, **options)validate_step(step, **constraints)check_dependencies(pipeline_or_builder)validate_performance(pipeline, **options)
Example:
from semantica.pipeline import PipelineValidator, PipelineBuilder
builder = PipelineBuilder()
builder.add_step("a", "type")
builder.add_step("b", "type", dependencies=["a"])
validator = PipelineValidator()
result = validator.validate_pipeline(builder)
deps = validator.check_dependencies(builder)
perf = validator.validate_performance(builder.build())
Templates¶
Classes: PipelineTemplateManager, PipelineTemplate
PipelineTemplateManager Methods:
get_template(template_name)create_pipeline_from_template(template_name, **overrides)register_template(template)list_templates(category=None)get_template_info(template_name)
Example:
from semantica.pipeline import PipelineTemplateManager
tm = PipelineTemplateManager()
names = tm.list_templates()
info = tm.get_template_info(names[0])
builder = tm.create_pipeline_from_template(names[0])
pipeline = builder.build()
Configuration¶
Environment Variables¶
export PIPELINE_MAX_WORKERS=4
export PIPELINE_DEFAULT_TIMEOUT=300
export PIPELINE_CHECKPOINT_DIR=./checkpoints
YAML Configuration¶
This module does not include built-in YAML loaders. Use your own configuration system to populate arguments for PipelineBuilder, ExecutionEngine, and related classes.
Integration Examples¶
RAG-Style Pipeline¶
from semantica.pipeline import PipelineBuilder, ExecutionEngine
builder = (
PipelineBuilder()
.add_step("ingest", "ingest")
.add_step("chunk", "chunk", dependencies=["ingest"])
.add_step("embed", "embed", dependencies=["chunk"])
.add_step("store_vectors", "store_vectors", dependencies=["embed"])
)
pipeline = builder.build(name="RAGPipeline")
engine = ExecutionEngine(max_workers=4)
result = engine.execute_pipeline(pipeline, data={"path": "document.pdf"})
Best Practices¶
- Idempotency: Ensure steps are idempotent (can be run multiple times without side effects) to support retries.
- Granularity: Keep steps focused on a single task. Smaller steps are easier to debug and retry.
- Context Passing: Use the execution context to pass metadata between steps, not just return values.
- Error Handling: Always define specific exceptions for retries; don't retry on
ValueErrororTypeError.
Troubleshooting¶
Issue: Pipeline stuck in RUNNING state. Solution: Check for deadlocks in dependency graph or infinite loops in steps. Use timeout_seconds.
Issue: PickleError during parallel execution. Solution: Ensure all data passed between steps is serializable. Avoid passing open file handles or database connections.
Cookbook¶
Interactive tutorials to learn pipeline orchestration:
- Pipeline Orchestration: Build robust, automated data processing pipelines
- Topics: Workflows, automation, error handling, pipeline orchestration, DAG construction
- Difficulty: Advanced
- Use Cases: Complex multi-step workflows, production pipelines, ETL processes
See Also¶
- Ingest Module - Common first step
- Split Module - Common processing step
- Vector Store Module - Common sink step