Pipeline Controller API
Complete API reference for the Pipeline Controller object passed to action handlers for advanced pipeline control.
Overview
The Pipeline Controller provides advanced control flow capabilities within action handlers. It allows handlers to modify payloads, share results, control execution flow, and coordinate with other handlers in the pipeline.
Core Control Methods
controller.abort(reason, error?)
Aborts the entire action pipeline execution.
Parameters:
reason
: Human-readable reason for abortingerror?
: Optional error object
Behavior: Stops all remaining handlers and throws PipelineAbortError
useActionHandler('validateInput', (payload, controller) => {
if (!payload.email || !payload.email.includes('@')) {
controller.abort('Valid email is required');
return;
}
if (payload.age < 18) {
controller.abort('User must be 18 or older', new ValidationError('Age requirement'));
return;
}
return { valid: true };
});
controller.skip(reason?)
Skips the current handler without affecting other handlers.
Parameters:
reason?
: Optional reason for skipping
Returns: Special skip result
useActionHandler('premiumFeature', (payload, controller) => {
if (!payload.user.isPremium) {
return controller.skip('User is not premium');
}
// Premium feature logic
return { featureExecuted: true };
});
Payload Management
controller.getPayload()
Gets the current (possibly modified) payload.
Returns: Current payload object
useActionHandler('logAction', (_, controller) => {
const currentPayload = controller.getPayload();
console.log('Final payload used:', currentPayload);
auditLogger.log('action_payload', currentPayload);
return { logged: true };
}, { priority: 1 }); // Low priority to run last
controller.modifyPayload(modifier)
Modifies the payload for subsequent handlers.
Parameters:
modifier
: Function that receives current payload and returns modified payload
useActionHandler('enrichPayload', (payload, controller) => {
controller.modifyPayload(current => ({
...current,
timestamp: Date.now(),
sessionId: getSessionId(),
userId: getCurrentUserId(),
userAgent: navigator.userAgent,
source: 'web-app'
}));
return { enriched: true };
}, { priority: 95 }); // High priority to run early
controller.setPayloadProperty(key, value)
Sets a specific property on the payload.
Parameters:
key
: Property key to setvalue
: Value to assign
useActionHandler('addMetadata', (payload, controller) => {
controller.setPayloadProperty('requestId', generateRequestId());
controller.setPayloadProperty('timestamp', Date.now());
return { metadataAdded: true };
});
Result Management
controller.setResult(result)
Sets a result that can be accessed by later handlers.
Parameters:
result
: Result object to store
useActionHandler('processPayment', async (payload, controller) => {
try {
const transaction = await paymentService.charge({
amount: payload.amount,
source: payload.paymentMethod
});
controller.setResult({
step: 'payment',
transactionId: transaction.id,
amount: transaction.amount,
currency: transaction.currency,
success: true,
provider: 'stripe'
});
return { success: true, transactionId: transaction.id };
} catch (error) {
controller.setResult({
step: 'payment',
success: false,
error: error.message,
provider: 'stripe'
});
controller.abort(`Payment failed: ${error.message}`);
}
}, { priority: 90, id: 'payment-processor' });
controller.getResults()
Gets all results set by previous handlers.
Returns: Array of result objects
useActionHandler('sendConfirmation', async (payload, controller) => {
const results = controller.getResults();
const paymentResult = results.find(r => r.step === 'payment');
const userResult = results.find(r => r.step === 'user-update');
if (paymentResult?.success && userResult?.success) {
await emailService.sendConfirmation({
email: payload.email,
transactionId: paymentResult.transactionId,
userId: userResult.userId
});
return { confirmationSent: true };
}
return { confirmationSent: false, reason: 'Prerequisites not met' };
}, { priority: 70, id: 'confirmation-sender' });
controller.getResult(predicate)
Gets a specific result using a predicate function.
Parameters:
predicate
: Function to find the desired result
Returns: Matching result or undefined
useActionHandler('processRefund', async (payload, controller) => {
const paymentResult = controller.getResult(r =>
r.step === 'payment' && r.provider === 'stripe'
);
if (paymentResult?.transactionId) {
const refund = await stripeService.refund(paymentResult.transactionId);
return { refunded: true, refundId: refund.id };
}
controller.abort('No valid payment found for refund');
}, { priority: 80 });
Pipeline State Management
controller.setPipelineState(key, value)
Sets pipeline-level state that persists across handlers.
Parameters:
key
: State keyvalue
: State value
useActionHandler('initializeSession', (payload, controller) => {
const sessionId = generateSessionId();
const userId = payload.userId;
controller.setPipelineState('sessionId', sessionId);
controller.setPipelineState('userId', userId);
controller.setPipelineState('startTime', Date.now());
return { sessionInitialized: true, sessionId };
}, { priority: 100 });
controller.getPipelineState(key)
Gets pipeline-level state.
Parameters:
key
: State key to retrieve
Returns: State value or undefined
useActionHandler('trackDuration', (payload, controller) => {
const startTime = controller.getPipelineState('startTime');
const duration = Date.now() - (startTime || Date.now());
controller.setResult({
step: 'duration-tracking',
duration,
startTime
});
return { durationTracked: true, duration };
}, { priority: 10 }); // Low priority to run last
controller.getAllPipelineState()
Gets all pipeline state.
Returns: Object with all pipeline state
useActionHandler('pipelineSummary', (payload, controller) => {
const allState = controller.getAllPipelineState();
const allResults = controller.getResults();
const summary = {
pipelineState: allState,
handlerResults: allResults,
finalPayload: controller.getPayload(),
executionSummary: {
totalHandlers: allResults.length,
successfulHandlers: allResults.filter(r => !r.error).length,
duration: Date.now() - (allState.startTime || Date.now())
}
};
console.log('Pipeline execution summary:', summary);
return summary;
}, { priority: 5 }); // Very low priority to run at the end
Advanced Pipeline Patterns
Multi-Stage Processing
function MultiStageProcessor() {
// Stage 1: Input Processing
useActionHandler('processOrder', (payload, controller) => {
const processedPayload = {
...payload,
orderId: generateOrderId(),
processedAt: Date.now()
};
controller.modifyPayload(() => processedPayload);
controller.setPipelineState('stage', 'input-processed');
return { stage: 'input', success: true };
}, { priority: 100, id: 'input-processor' });
// Stage 2: Validation
useActionHandler('processOrder', (payload, controller) => {
const validationResult = validateOrder(payload);
if (!validationResult.valid) {
controller.abort(`Validation failed: ${validationResult.errors.join(', ')}`);
}
controller.setPipelineState('stage', 'validated');
controller.setResult({
step: 'validation',
valid: true,
checks: validationResult.checks
});
return { stage: 'validation', success: true };
}, { priority: 90, id: 'validator' });
// Stage 3: Business Logic
useActionHandler('processOrder', async (payload, controller) => {
const stage = controller.getPipelineState('stage');
if (stage !== 'validated') {
controller.abort('Order not properly validated');
}
const orderResult = await orderService.createOrder(payload);
controller.setResult({
step: 'order-creation',
orderId: orderResult.id,
amount: orderResult.amount,
success: true
});
controller.setPipelineState('stage', 'completed');
return orderResult;
}, { priority: 80, id: 'order-processor' });
return null;
}
Conditional Pipeline Branching
function ConditionalBranching() {
// Route to different handlers based on payload
useActionHandler('processPayment', (payload, controller) => {
controller.setPipelineState('paymentMethod', payload.method);
if (payload.method === 'credit_card') {
controller.setPipelineState('processor', 'stripe');
} else if (payload.method === 'paypal') {
controller.setPipelineState('processor', 'paypal');
} else {
controller.abort(`Unsupported payment method: ${payload.method}`);
}
return { routed: true, processor: controller.getPipelineState('processor') };
}, { priority: 100, id: 'payment-router' });
// Stripe handler (conditional)
useActionHandler('processPayment', async (payload, controller) => {
const processor = controller.getPipelineState('processor');
if (processor !== 'stripe') {
return controller.skip('Not a Stripe payment');
}
const result = await stripeService.charge(payload);
controller.setResult({ processor: 'stripe', ...result });
return result;
}, { priority: 80, id: 'stripe-processor' });
// PayPal handler (conditional)
useActionHandler('processPayment', async (payload, controller) => {
const processor = controller.getPipelineState('processor');
if (processor !== 'paypal') {
return controller.skip('Not a PayPal payment');
}
const result = await paypalService.charge(payload);
controller.setResult({ processor: 'paypal', ...result });
return result;
}, { priority: 80, id: 'paypal-processor' });
return null;
}
Controller State Inspection
controller.getExecutionContext()
Gets information about the current execution context.
Returns: Execution context object
useActionHandler('debugHandler', (payload, controller) => {
const context = controller.getExecutionContext();
console.log('Execution context:', {
actionName: context.actionName,
handlerId: context.currentHandlerId,
executionId: context.executionId,
startTime: context.startTime,
remainingHandlers: context.remainingHandlers
});
return { debugInfo: context };
});
controller.getHandlerInfo()
Gets information about the current handler.
Returns: Handler information object
useActionHandler('selfAwareHandler', (payload, controller) => {
const handlerInfo = controller.getHandlerInfo();
console.log('Current handler:', {
id: handlerInfo.id,
priority: handlerInfo.priority,
registeredAt: handlerInfo.registeredAt,
executionCount: handlerInfo.executionCount
});
return handlerInfo;
}, { id: 'self-aware', priority: 50 });
Error Handling
Pipeline Error Recovery
function ErrorRecoveryHandler() {
useActionHandler('riskyOperation', async (payload, controller) => {
try {
const result = await riskyService.performOperation(payload);
controller.setResult({
step: 'risky-operation',
success: true,
data: result
});
return result;
} catch (error) {
// Log error but don't abort - let recovery handler try
controller.setResult({
step: 'risky-operation',
success: false,
error: error.message,
needsRecovery: true
});
return { error: error.message, recovered: false };
}
}, { priority: 90, id: 'risky-handler' });
// Recovery handler
useActionHandler('riskyOperation', async (payload, controller) => {
const results = controller.getResults();
const needsRecovery = results.some(r => r.needsRecovery);
if (needsRecovery) {
try {
const result = await fallbackService.performOperation(payload);
controller.setResult({
step: 'recovery',
success: true,
data: result,
recoveredFrom: 'risky-operation'
});
return { recovered: true, data: result };
} catch (error) {
controller.abort(`Recovery failed: ${error.message}`);
}
}
return controller.skip('No recovery needed');
}, { priority: 80, id: 'recovery-handler' });
return null;
}
Advanced Pipeline Control
Dynamic Handler Execution
function DynamicHandler() {
useActionHandler('dynamicAction', async (payload, controller) => {
// Modify execution based on results
const results = controller.getResults();
const hasValidation = results.some(r => r.step === 'validation');
if (!hasValidation) {
// Trigger validation dynamically
controller.modifyPayload(current => ({
...current,
needsValidation: true
}));
// Set state to trigger validation in next handler
controller.setPipelineState('requireValidation', true);
}
return { dynamicExecutionApplied: true };
}, { priority: 85 });
// Conditional validation handler
useActionHandler('dynamicAction', (payload, controller) => {
const needsValidation = controller.getPipelineState('requireValidation');
if (needsValidation) {
const isValid = performValidation(payload);
if (!isValid) {
controller.abort('Dynamic validation failed');
}
controller.setResult({ step: 'validation', success: true });
return { validated: true };
}
return controller.skip('Validation not required');
}, { priority: 80, id: 'dynamic-validator' });
return null;
}
Pipeline Coordination
function PipelineCoordinator() {
// Coordinator handler
useActionHandler('complexWorkflow', async (payload, controller) => {
// Check if all prerequisites are met
const results = controller.getResults();
const hasAuth = results.some(r => r.step === 'authentication' && r.success);
const hasValidation = results.some(r => r.step === 'validation' && r.success);
const hasPermission = results.some(r => r.step === 'permission' && r.success);
if (!hasAuth || !hasValidation || !hasPermission) {
const missing = [];
if (!hasAuth) missing.push('authentication');
if (!hasValidation) missing.push('validation');
if (!hasPermission) missing.push('permission');
controller.abort(`Missing prerequisites: ${missing.join(', ')}`);
return;
}
// All prerequisites met, proceed with main operation
const mainResult = await performMainOperation(payload);
controller.setResult({
step: 'main-operation',
success: true,
data: mainResult,
prerequisites: { hasAuth, hasValidation, hasPermission }
});
return mainResult;
}, { priority: 50, id: 'workflow-coordinator' });
return null;
}
Controller Configuration
controller.setTimeout(timeout)
Sets a timeout for the current handler execution.
Parameters:
timeout
: Timeout in milliseconds
useActionHandler('longRunningTask', async (payload, controller) => {
controller.setTimeout(30000); // 30 second timeout
try {
const result = await longRunningService.process(payload);
return { success: true, data: result };
} catch (error) {
if (error.name === 'TimeoutError') {
controller.abort('Operation timed out after 30 seconds');
} else {
controller.abort(`Operation failed: ${error.message}`);
}
}
});
controller.setMetadata(metadata)
Sets metadata for the current execution.
Parameters:
metadata
: Metadata object
useActionHandler('trackedAction', (payload, controller) => {
controller.setMetadata({
handlerVersion: '1.2.0',
environment: process.env.NODE_ENV,
component: 'UserManager',
feature: 'profile-update'
});
// Business logic
return { success: true };
}, { id: 'tracked-handler' });
Pipeline Debugging
controller.enableDebug()
Enables debug logging for the current pipeline.
useActionHandler('debuggableAction', (payload, controller) => {
if (process.env.NODE_ENV === 'development') {
controller.enableDebug();
}
controller.setResult({ step: 'debug-setup', enabled: true });
return { debugEnabled: true };
}, { priority: 100 });
controller.log(message, data?)
Logs debug information during pipeline execution.
Parameters:
message
: Log messagedata?
: Optional data to log
useActionHandler('verboseHandler', async (payload, controller) => {
controller.log('Starting user validation');
const user = await userService.getUser(payload.userId);
controller.log('User retrieved', { userId: user.id, name: user.name });
const isValid = validateUser(user);
controller.log('Validation completed', { isValid });
if (!isValid) {
controller.log('Validation failed - aborting');
controller.abort('User validation failed');
}
return { user, validated: true };
});
Pipeline Metrics
controller.recordMetric(name, value)
Records a custom metric for the current execution.
Parameters:
name
: Metric namevalue
: Metric value
useActionHandler('performanceTrackedAction', async (payload, controller) => {
const startTime = performance.now();
try {
const result = await expensiveOperation(payload);
const duration = performance.now() - startTime;
controller.recordMetric('execution_time', duration);
controller.recordMetric('payload_size', JSON.stringify(payload).length);
controller.recordMetric('result_size', JSON.stringify(result).length);
return result;
} catch (error) {
const duration = performance.now() - startTime;
controller.recordMetric('error_time', duration);
controller.recordMetric('error_type', error.constructor.name);
controller.abort(`Operation failed after ${duration}ms: ${error.message}`);
}
});
Best Practices
1. Error Handling
- Use
abort()
for critical failures that should stop execution - Use result sharing for non-critical errors
- Implement proper error recovery strategies
2. Payload Management
- Modify payloads early in the pipeline (high priority)
- Validate payload modifications for type safety
- Document payload structure changes
3. Result Coordination
- Use meaningful result structures for inter-handler communication
- Include step identifiers and success flags
- Share results that other handlers might need
4. Performance
- Set appropriate timeouts for long-running handlers
- Record metrics for performance monitoring
- Use pipeline state for expensive computations
5. Debugging
- Enable debug mode in development
- Use descriptive log messages
- Include relevant context in logs
Related
- Action Only Methods - Action dispatching and handler registration
- Action Registry API - Registry management methods
- Action Only Example - Complete usage examples