Pipeline System
Advanced action pipeline features for sophisticated business logic orchestration.
Overview
The Action Pipeline System provides advanced control mechanisms for managing complex business logic flows through priority-based handler execution, blocking operations, abort mechanisms, and comprehensive result collection.
Core Features
🏆 Priority-Based Execution
Handlers execute in priority order (highest first) ensuring critical operations run before optional ones.
actionRegister.register('authenticate', validateInput, { priority: 100 }); // 1st
actionRegister.register('authenticate', checkSecurity, { priority: 90 }); // 2nd
actionRegister.register('authenticate', performAuth, { priority: 80 }); // 3rd
actionRegister.register('authenticate', logAudit, { priority: 30 }); // 4th🚧 Blocking Operations
Control execution flow with blocking and non-blocking handlers.
// Blocking: Wait for completion
actionRegister.register('process', criticalHandler, { blocking: true });
// Non-blocking: Continue immediately
actionRegister.register('process', analyticsHandler, { blocking: false });🛑 Abort Mechanisms
Stop pipeline execution when conditions aren't met.
actionRegister.register('validate', (payload, controller) => {
if (!isValid(payload)) {
controller.abort('Validation failed');
return;
}
// Continue pipeline
});📊 Result Collection
Collect and coordinate results across handlers.
// Set intermediate results
controller.setResult({ step: 'validation', valid: true });
// Access previous results
const results = controller.getResults();
const validationResult = results.find(r => r.step === 'validation');Pipeline Features
| Feature | Purpose | Documentation |
|---|---|---|
| Priority System | Control execution order | Priority-based handler execution |
| Blocking Operations | Control execution flow | Blocking vs non-blocking handlers |
| Concurrency Control | Thread safety & queuing | Pipeline concurrency and race condition prevention |
| Dispatch Methods | Trigger pipelines | Basic dispatch vs result collection |
| Abort Mechanisms | Stop execution | Graceful pipeline termination |
| Result Handling | Collect results | Inter-handler communication |
Quick Start
1. Basic Pipeline Setup
import { ActionRegister, type ActionPayloadMap } from '@context-action/core';
interface AppActions extends ActionPayloadMap {
processUser: { userId: string; action: string };
}
const appRegister = new ActionRegister<AppActions>();2. Register Handlers with Features
// High priority validation with blocking
appRegister.register('processUser', (payload, controller) => {
if (!payload.userId) {
controller.abort('User ID required');
return;
}
controller.setResult({ step: 'validation', valid: true });
return { step: 'user-validation', success: true };
}, {
priority: 100, // High priority
blocking: true, // Wait for completion
id: 'validator' // Unique identifier
});
// Medium priority processing with blocking
appRegister.register('processUser', async (payload, controller) => {
const results = controller.getResults();
const isValid = results.find(r => r.step === 'validation')?.valid;
if (!isValid) {
controller.abort('Cannot process invalid user');
return;
}
const processed = await processUserData(payload.userId, payload.action);
controller.setResult({
step: 'processing',
userId: payload.userId,
processedAt: Date.now()
});
return { step: 'user-processing', result: processed };
}, {
priority: 80,
blocking: true,
id: 'processor'
});
// Low priority analytics without blocking
appRegister.register('processUser', (payload, controller) => {
const results = controller.getResults();
// Track analytics in background
analytics.track('user_processed', {
userId: payload.userId,
action: payload.action,
steps: results.map(r => r.step)
});
return { step: 'analytics', tracked: true };
}, {
priority: 30,
blocking: false, // Don't wait for analytics
id: 'analytics'
});3. Execute Pipeline
// Basic execution
await appRegister.dispatch('processUser', {
userId: '123',
action: 'profile-update'
});
// Execution with result collection
const result = await appRegister.dispatchWithResult('processUser',
{ userId: '123', action: 'profile-update' },
{ result: { collect: true } }
);
if (result.success) {
console.log('Pipeline completed:', result.results);
} else if (result.aborted) {
console.log('Pipeline aborted:', result.abortReason);
}Real-World Pipeline Example
E-commerce Order Processing
interface OrderActions extends ActionPayloadMap {
placeOrder: {
items: OrderItem[];
userId: string;
paymentMethod: string;
};
}
const orderRegister = new ActionRegister<OrderActions>();
// Step 1: Input validation (Priority 100, Blocking)
orderRegister.register('placeOrder', (payload, controller) => {
if (!payload.items?.length) {
controller.abort('Order must contain items');
return;
}
if (!payload.userId) {
controller.abort('User ID required');
return;
}
controller.setResult({ step: 'validation', itemCount: payload.items.length });
return { step: 'input-validation', success: true };
}, { priority: 100, blocking: true, id: 'input-validator' });
// Step 2: Inventory check (Priority 90, Blocking)
orderRegister.register('placeOrder', async (payload, controller) => {
const inventoryChecks = await Promise.all(
payload.items.map(item => checkInventory(item.productId, item.quantity))
);
const insufficientItems = inventoryChecks
.filter(check => !check.available)
.map(check => check.productId);
if (insufficientItems.length > 0) {
controller.abort('Insufficient inventory', { insufficientItems });
return;
}
controller.setResult({
step: 'inventory',
allAvailable: true,
checkedItems: inventoryChecks.length
});
return { step: 'inventory-check', success: true };
}, { priority: 90, blocking: true, id: 'inventory-checker' });
// Step 3: Payment processing (Priority 80, Blocking)
orderRegister.register('placeOrder', async (payload, controller) => {
const total = calculateOrderTotal(payload.items);
const payment = await processPayment({
userId: payload.userId,
amount: total,
method: payload.paymentMethod
});
if (!payment.success) {
controller.abort('Payment failed', { reason: payment.error });
return;
}
controller.setResult({
step: 'payment',
transactionId: payment.transactionId,
amount: total
});
return {
step: 'payment-processing',
success: true,
transactionId: payment.transactionId
};
}, { priority: 80, blocking: true, id: 'payment-processor' });
// Step 4: Order creation (Priority 70, Blocking)
orderRegister.register('placeOrder', async (payload, controller) => {
const results = controller.getResults();
const paymentResult = results.find(r => r.step === 'payment');
const order = await createOrder({
userId: payload.userId,
items: payload.items,
transactionId: paymentResult.transactionId,
total: paymentResult.amount
});
controller.setResult({
step: 'order-creation',
orderId: order.id,
createdAt: order.createdAt
});
return {
step: 'order-created',
success: true,
orderId: order.id
};
}, { priority: 70, blocking: true, id: 'order-creator' });
// Step 5: Inventory update (Priority 60, Blocking)
orderRegister.register('placeOrder', async (payload, controller) => {
await Promise.all(
payload.items.map(item =>
updateInventory(item.productId, -item.quantity)
)
);
return { step: 'inventory-update', success: true };
}, { priority: 60, blocking: true, id: 'inventory-updater' });
// Step 6: Email notification (Priority 40, Non-blocking)
orderRegister.register('placeOrder', async (payload, controller) => {
const results = controller.getResults();
const orderResult = results.find(r => r.step === 'order-creation');
// Send confirmation email in background
await sendOrderConfirmation({
userId: payload.userId,
orderId: orderResult.orderId,
items: payload.items
});
return { step: 'email-notification', sent: true };
}, { priority: 40, blocking: false, id: 'email-notifier' });
// Step 7: Analytics tracking (Priority 30, Non-blocking)
orderRegister.register('placeOrder', (payload, controller) => {
const results = controller.getResults();
const orderResult = results.find(r => r.step === 'order-creation');
const paymentResult = results.find(r => r.step === 'payment');
// Track order analytics
analytics.track('order_placed', {
orderId: orderResult.orderId,
userId: payload.userId,
itemCount: payload.items.length,
orderValue: paymentResult.amount,
paymentMethod: payload.paymentMethod
});
return { step: 'analytics', tracked: true };
}, { priority: 30, blocking: false, id: 'analytics-tracker' });
// Execute complete order pipeline
const result = await orderRegister.dispatchWithResult('placeOrder', {
items: [{ productId: 'prod-1', quantity: 2 }],
userId: 'user-123',
paymentMethod: 'credit-card'
}, { result: { collect: true } });
// Pipeline executes: validation → inventory → payment → order → inventory-update
// Email and analytics run in backgroundMigration from Basic Actions
Before: Simple Action Handlers
// Basic action handling
const handleUserUpdate = async (userData: UserData) => {
await validateUser(userData);
await updateDatabase(userData);
await sendNotification(userData.id);
await trackAnalytics('user_updated', userData);
};After: Pipeline-Based Actions
// Pipeline-based action handling
interface UserActions extends ActionPayloadMap {
updateUser: UserData;
}
const userRegister = new ActionRegister<UserActions>();
// Validation (Priority 100, Blocking)
userRegister.register('updateUser', async (payload, controller) => {
const validation = await validateUser(payload);
if (!validation.valid) {
controller.abort('User validation failed', validation.errors);
return;
}
return { step: 'validation', success: true };
}, { priority: 100, blocking: true });
// Database update (Priority 80, Blocking)
userRegister.register('updateUser', async (payload) => {
const result = await updateDatabase(payload);
return { step: 'database', success: true, userId: result.id };
}, { priority: 80, blocking: true });
// Notification (Priority 40, Non-blocking)
userRegister.register('updateUser', async (payload) => {
await sendNotification(payload.id);
return { step: 'notification', sent: true };
}, { priority: 40, blocking: false });
// Analytics (Priority 30, Non-blocking)
userRegister.register('updateUser', (payload) => {
trackAnalytics('user_updated', payload);
return { step: 'analytics', tracked: true };
}, { priority: 30, blocking: false });
// Execute pipeline
await userRegister.dispatch('updateUser', userData);Architecture Integration
MVVM Layer Integration
graph TD
A[View Layer] --> B[ViewModel Layer]
B --> C[Model Layer]
B1[Priority System] --> B
B2[Blocking Control] --> B
B3[Abort Mechanisms] --> B
B4[Result Collection] --> B
B5[Dispatch Methods] --> B
B --> C1[Store Updates]
B --> C2[External APIs]
B --> C3[Side Effects]Integration with Store Patterns
Pipeline features work seamlessly with store integration:
// Pipeline updates stores based on results
actionRegister.register('updateUserData', async (payload, controller) => {
// Get current store state
const currentUser = userStore.getValue();
// Validate update
const validation = validateUserUpdate(currentUser, payload.updates);
if (!validation.valid) {
controller.abort('Update validation failed');
return;
}
// Apply update
const updatedUser = { ...currentUser, ...payload.updates };
userStore.setValue(updatedUser);
controller.setResult({
step: 'store-update',
previousValue: currentUser,
newValue: updatedUser
});
return { step: 'user-update', success: true };
}, { priority: 80, blocking: true });Best Practices Summary
1. Priority Guidelines
- 90-100: Critical validation, security, input checking
- 70-89: Business logic, data processing, core operations
- 50-69: State updates, external API calls
- 30-49: Notifications, secondary operations
- 10-29: Analytics, logging, cleanup
2. Blocking Guidelines
- Use blocking for operations that affect subsequent handlers
- Use non-blocking for analytics, logging, optional enhancements
3. Abort Guidelines
- Use abort for business rule violations, validation failures
- Use throw for unexpected system errors
4. Result Guidelines
- Use consistent result structures
- Include meaningful step names and timing
- Leverage results for handler coordination
5. Concurrency Guidelines
- Keep default queue system for thread safety
- Consider disabling only for pure analytics or read-only operations
- Use debounce/throttle for high-frequency user interactions
Advanced Patterns
Explore specific pipeline patterns:
- Priority System - Priority-based execution order and best practices
- Blocking Operations - Controlling execution flow and performance
- Concurrency Control - Thread safety, queuing, and race condition prevention
- Dispatch Methods - Different ways to trigger pipelines
- Abort Mechanisms - Graceful pipeline termination
- Result Handling - Comprehensive result collection and usage
Live Examples from Example App
The Context-Action Example App provides real-world implementations of all pipeline features:
Featured Examples
- Priority Performance Demo - Priority-based execution with performance tracking
- API Blocking Demo - Rate limiting with blocking/non-blocking patterns
- Enhanced Abortable Search - Search cancellation and automatic cleanup
- UseActionWithResult Demo - Sequential workflows with comprehensive result handling
These examples demonstrate production-ready patterns for common pipeline scenarios.
Related
- Action Patterns - Action Only pattern implementation
- Store Integration - Combining pipelines with state management
- MVVM Architecture - Pipeline role in MVVM pattern
- Hook Lifecycle - How hooks connect to the pipeline system