Skip to main content

Data Flow Management

Comprehensive guide to managing data flow in Axon OS workflows, including data types, transformations, routing, and optimization.

Overview

Data flow management is at the heart of Axon OS workflows, enabling seamless data transformation and routing between nodes. The system provides robust data handling capabilities with type safety, automatic conversions, and optimization features.

Key Features

  • Type-Safe Data Flow: Automatic type checking and validation
  • Data Transformations: Built-in and custom transformation functions
  • Smart Routing: Conditional and parallel data routing
  • Stream Processing: Handle large datasets efficiently
  • Data Validation: Schema-based validation and error handling
  • Caching & Optimization: Performance optimization for data operations

Data Types and Schema

Supported Data Types

enum DataType {
// Primitive types
STRING = 'string',
NUMBER = 'number',
BOOLEAN = 'boolean',
NULL = 'null',
UNDEFINED = 'undefined',

// Complex types
OBJECT = 'object',
ARRAY = 'array',

// Specialized types
DATE = 'date',
BUFFER = 'buffer',
STREAM = 'stream',
FILE = 'file',

// Custom types
JSON = 'json',
XML = 'xml',
CSV = 'csv',
BINARY = 'binary',

// Generic type
ANY = 'any'
}

interface DataSchema {
type: DataType;
required: boolean;
format?: string;
validation?: ValidationRule[];
properties?: { [key: string]: DataSchema };
items?: DataSchema;
enum?: any[];
default?: any;
}

Schema Definition Examples

// Simple string schema
const emailSchema: DataSchema = {
type: DataType.STRING,
required: true,
format: 'email',
validation: [
{
type: 'regex',
pattern: /^[^\s@]+@[^\s@]+\.[^\s@]+$/,
message: 'Invalid email format'
}
]
};

// Complex object schema
const userSchema: DataSchema = {
type: DataType.OBJECT,
required: true,
properties: {
id: {
type: DataType.NUMBER,
required: true,
validation: [{ type: 'min', value: 1 }]
},
name: {
type: DataType.STRING,
required: true,
validation: [
{ type: 'minLength', value: 2 },
{ type: 'maxLength', value: 100 }
]
},
email: emailSchema,
age: {
type: DataType.NUMBER,
required: false,
validation: [
{ type: 'min', value: 0 },
{ type: 'max', value: 150 }
]
},
tags: {
type: DataType.ARRAY,
required: false,
items: {
type: DataType.STRING,
required: true
}
}
}
};

// Array schema
const usersArraySchema: DataSchema = {
type: DataType.ARRAY,
required: true,
items: userSchema,
validation: [
{ type: 'minItems', value: 1 },
{ type: 'maxItems', value: 1000 }
]
};

Data Ports and Connections

Port Definition

interface DataPort {
id: string;
name: string;
nodeId: string;
direction: 'input' | 'output';
dataType: DataType;
schema: DataSchema;
required: boolean;
multiple: boolean; // Can accept multiple connections
description?: string;
metadata: PortMetadata;
}

interface PortMetadata {
displayName: string;
group?: string; // Group related ports
order: number; // Display order
hidden: boolean; // Hide from UI
advanced: boolean; // Show in advanced mode only
}

// Example: HTTP Request node ports
const httpRequestPorts: DataPort[] = [
// Input ports
{
id: 'url',
name: 'url',
nodeId: 'http-request-1',
direction: 'input',
dataType: DataType.STRING,
schema: {
type: DataType.STRING,
required: true,
format: 'url',
validation: [{ type: 'url' }]
},
required: true,
multiple: false,
description: 'The URL to send the request to',
metadata: {
displayName: 'URL',
order: 1,
hidden: false,
advanced: false
}
},
{
id: 'headers',
name: 'headers',
nodeId: 'http-request-1',
direction: 'input',
dataType: DataType.OBJECT,
schema: {
type: DataType.OBJECT,
required: false,
properties: {}
},
required: false,
multiple: false,
description: 'HTTP headers to include',
metadata: {
displayName: 'Headers',
order: 3,
hidden: false,
advanced: true
}
},

// Output ports
{
id: 'response',
name: 'response',
nodeId: 'http-request-1',
direction: 'output',
dataType: DataType.OBJECT,
schema: {
type: DataType.OBJECT,
required: true,
properties: {
status: { type: DataType.NUMBER, required: true },
data: { type: DataType.ANY, required: true },
headers: { type: DataType.OBJECT, required: true }
}
},
required: true,
multiple: false,
description: 'HTTP response object',
metadata: {
displayName: 'Response',
order: 1,
hidden: false,
advanced: false
}
}
];

Connection Validation

interface DataConnection {
id: string;
sourceNodeId: string;
sourcePortId: string;
targetNodeId: string;
targetPortId: string;
transform?: DataTransformation;
condition?: ConnectionCondition;
metadata: ConnectionMetadata;
}

interface ConnectionMetadata {
created: Date;
createdBy: string;
description?: string;
tags: string[];
}

class ConnectionValidator {
validateConnection(
sourcePort: DataPort,
targetPort: DataPort
): ValidationResult {
const errors: ValidationError[] = [];
const warnings: ValidationWarning[] = [];

// 1. Check direction compatibility
if (sourcePort.direction !== 'output' || targetPort.direction !== 'input') {
errors.push({
type: 'invalid_direction',
message: 'Can only connect output ports to input ports'
});
}

// 2. Check type compatibility
const typeCompatibility = this.checkTypeCompatibility(
sourcePort.dataType,
targetPort.dataType
);

if (!typeCompatibility.compatible) {
if (typeCompatibility.autoConvertible) {
warnings.push({
type: 'type_conversion',
message: `Automatic conversion from ${sourcePort.dataType} to ${targetPort.dataType}`,
suggestion: typeCompatibility.suggestion
});
} else {
errors.push({
type: 'type_mismatch',
message: `Cannot connect ${sourcePort.dataType} to ${targetPort.dataType}`
});
}
}

// 3. Check schema compatibility
const schemaResult = this.validateSchemaCompatibility(
sourcePort.schema,
targetPort.schema
);

if (!schemaResult.compatible) {
errors.push(...schemaResult.errors);
warnings.push(...schemaResult.warnings);
}

// 4. Check multiple connection constraints
if (!targetPort.multiple) {
const existingConnections = this.getExistingConnections(targetPort);
if (existingConnections.length > 0) {
errors.push({
type: 'multiple_connections',
message: 'Target port does not accept multiple connections'
});
}
}

return {
valid: errors.length === 0,
errors,
warnings
};
}

private checkTypeCompatibility(
sourceType: DataType,
targetType: DataType
): TypeCompatibility {
// Direct match
if (sourceType === targetType || targetType === DataType.ANY) {
return { compatible: true, autoConvertible: false };
}

// Auto-convertible types
const conversions: Record<string, DataType[]> = {
[DataType.STRING]: [DataType.NUMBER, DataType.BOOLEAN, DataType.DATE],
[DataType.NUMBER]: [DataType.STRING, DataType.BOOLEAN],
[DataType.BOOLEAN]: [DataType.STRING, DataType.NUMBER],
[DataType.OBJECT]: [DataType.JSON, DataType.STRING],
[DataType.ARRAY]: [DataType.JSON, DataType.STRING],
[DataType.DATE]: [DataType.STRING, DataType.NUMBER]
};

const convertibleTypes = conversions[sourceType] || [];
if (convertibleTypes.includes(targetType)) {
return {
compatible: true,
autoConvertible: true,
suggestion: `Add automatic conversion from ${sourceType} to ${targetType}`
};
}

return { compatible: false, autoConvertible: false };
}
}

Data Transformations

Built-in Transformations

interface DataTransformation {
id: string;
type: TransformationType;
name: string;
description: string;
inputSchema: DataSchema;
outputSchema: DataSchema;
parameters: TransformationParameter[];
implementation: TransformationFunction;
}

enum TransformationType {
// Type conversions
STRING_TO_NUMBER = 'string_to_number',
NUMBER_TO_STRING = 'number_to_string',
OBJECT_TO_JSON = 'object_to_json',
JSON_TO_OBJECT = 'json_to_object',

// String transformations
UPPERCASE = 'uppercase',
LOWERCASE = 'lowercase',
TRIM = 'trim',
REPLACE = 'replace',
SPLIT = 'split',
JOIN = 'join',

// Number transformations
ROUND = 'round',
CEIL = 'ceil',
FLOOR = 'floor',
ABS = 'abs',

// Array transformations
MAP = 'map',
FILTER = 'filter',
REDUCE = 'reduce',
SORT = 'sort',
UNIQUE = 'unique',
FLATTEN = 'flatten',

// Object transformations
PICK = 'pick',
OMIT = 'omit',
RENAME = 'rename',
MERGE = 'merge',

// Date transformations
FORMAT_DATE = 'format_date',
PARSE_DATE = 'parse_date',
ADD_DAYS = 'add_days',

// Custom transformation
CUSTOM = 'custom'
}

// Example transformations
const transformations: DataTransformation[] = [
{
id: 'string_to_number',
type: TransformationType.STRING_TO_NUMBER,
name: 'String to Number',
description: 'Convert string value to number',
inputSchema: { type: DataType.STRING, required: true },
outputSchema: { type: DataType.NUMBER, required: true },
parameters: [
{
name: 'radix',
type: DataType.NUMBER,
required: false,
default: 10,
description: 'Number base for parsing'
}
],
implementation: (input: string, params: any) => {
const radix = params.radix || 10;
const result = parseInt(input, radix);
if (isNaN(result)) {
throw new Error(`Cannot convert "${input}" to number`);
}
return result;
}
},

{
id: 'array_map',
type: TransformationType.MAP,
name: 'Array Map',
description: 'Transform each element in array',
inputSchema: { type: DataType.ARRAY, required: true },
outputSchema: { type: DataType.ARRAY, required: true },
parameters: [
{
name: 'expression',
type: DataType.STRING,
required: true,
description: 'JavaScript expression to apply to each element (use "item" variable)'
}
],
implementation: (input: any[], params: any) => {
const expression = params.expression;
return input.map(item => {
// Safe evaluation of expression
return this.evaluateExpression(expression, { item });
});
}
},

{
id: 'object_pick',
type: TransformationType.PICK,
name: 'Pick Properties',
description: 'Select specific properties from object',
inputSchema: { type: DataType.OBJECT, required: true },
outputSchema: { type: DataType.OBJECT, required: true },
parameters: [
{
name: 'properties',
type: DataType.ARRAY,
required: true,
description: 'Array of property names to pick',
items: { type: DataType.STRING, required: true }
}
],
implementation: (input: any, params: any) => {
const properties = params.properties;
const result: any = {};

for (const prop of properties) {
if (input.hasOwnProperty(prop)) {
result[prop] = input[prop];
}
}

return result;
}
}
];

Custom Transformations

interface CustomTransformation {
id: string;
name: string;
description: string;
code: string; // JavaScript/TypeScript code
language: 'javascript' | 'typescript';
inputSchema: DataSchema;
outputSchema: DataSchema;
dependencies: string[]; // Required npm packages
testCases: TransformationTestCase[];
}

interface TransformationTestCase {
name: string;
input: any;
expectedOutput: any;
parameters?: any;
}

class CustomTransformationManager {
async createTransformation(
transformation: CustomTransformation
): Promise<DataTransformation> {
// Validate code syntax
await this.validateCode(transformation.code, transformation.language);

// Install dependencies if needed
if (transformation.dependencies.length > 0) {
await this.installDependencies(transformation.dependencies);
}

// Run test cases
await this.runTestCases(transformation);

// Compile and create transformation
const compiledFunction = await this.compileTransformation(transformation);

return {
id: transformation.id,
type: TransformationType.CUSTOM,
name: transformation.name,
description: transformation.description,
inputSchema: transformation.inputSchema,
outputSchema: transformation.outputSchema,
parameters: [],
implementation: compiledFunction
};
}

private async validateCode(code: string, language: string): Promise<void> {
if (language === 'typescript') {
// Use TypeScript compiler to validate
const ts = require('typescript');
const result = ts.transpile(code, {
target: ts.ScriptTarget.ES2020,
module: ts.ModuleKind.CommonJS
});

// Check for compilation errors
if (result.diagnostics && result.diagnostics.length > 0) {
throw new Error('TypeScript compilation failed');
}
} else {
// Use JavaScript parser to validate syntax
try {
new Function(code);
} catch (error) {
throw new Error(`JavaScript syntax error: ${error.message}`);
}
}
}

private async runTestCases(transformation: CustomTransformation): Promise<void> {
const compiledFunction = await this.compileTransformation(transformation);

for (const testCase of transformation.testCases) {
try {
const result = await compiledFunction(testCase.input, testCase.parameters || {});

// Deep equality check
if (!this.deepEqual(result, testCase.expectedOutput)) {
throw new Error(`Test case "${testCase.name}" failed: expected ${JSON.stringify(testCase.expectedOutput)}, got ${JSON.stringify(result)}`);
}

} catch (error) {
throw new Error(`Test case "${testCase.name}" failed: ${error.message}`);
}
}
}
}

// Example custom transformation
const customEmailExtractor: CustomTransformation = {
id: 'extract_emails',
name: 'Extract Email Addresses',
description: 'Extract all email addresses from text',
code: `
function transform(input, params) {
const emailRegex = /\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b/g;
const emails = input.match(emailRegex) || [];

if (params.unique) {
return [...new Set(emails)];
}

return emails;
}

return transform;
`,
language: 'javascript',
inputSchema: { type: DataType.STRING, required: true },
outputSchema: {
type: DataType.ARRAY,
required: true,
items: { type: DataType.STRING, required: true }
},
dependencies: [],
testCases: [
{
name: 'Basic email extraction',
input: 'Contact us at info@example.com or support@test.org',
expectedOutput: ['info@example.com', 'support@test.org'],
parameters: { unique: false }
},
{
name: 'Unique email extraction',
input: 'Send to admin@site.com, admin@site.com, user@site.com',
expectedOutput: ['admin@site.com', 'user@site.com'],
parameters: { unique: true }
}
]
};

Conditional Data Routing

Condition-Based Routing

interface ConditionalRoute {
id: string;
condition: RouteCondition;
targetPortId: string;
priority: number;
description?: string;
}

interface RouteCondition {
type: 'expression' | 'schema' | 'value' | 'custom';
expression?: string;
schema?: DataSchema;
value?: any;
operator?: ComparisonOperator;
customFunction?: string;
}

enum ComparisonOperator {
EQUALS = 'equals',
NOT_EQUALS = 'not_equals',
GREATER_THAN = 'greater_than',
LESS_THAN = 'less_than',
GREATER_THAN_OR_EQUAL = 'greater_than_or_equal',
LESS_THAN_OR_EQUAL = 'less_than_or_equal',
CONTAINS = 'contains',
STARTS_WITH = 'starts_with',
ENDS_WITH = 'ends_with',
MATCHES_REGEX = 'matches_regex',
IN_ARRAY = 'in_array',
HAS_PROPERTY = 'has_property',
IS_TYPE = 'is_type'
}

class ConditionalRouter {
route(data: any, routes: ConditionalRoute[]): RouteResult[] {
const results: RouteResult[] = [];

// Sort routes by priority
const sortedRoutes = routes.sort((a, b) => b.priority - a.priority);

for (const route of sortedRoutes) {
if (this.evaluateCondition(data, route.condition)) {
results.push({
routeId: route.id,
targetPortId: route.targetPortId,
data: data,
matched: true
});

// Stop at first match for exclusive routing
if (route.condition.type !== 'expression' || !route.condition.expression?.includes('||')) {
break;
}
}
}

return results;
}

private evaluateCondition(data: any, condition: RouteCondition): boolean {
switch (condition.type) {
case 'expression':
return this.evaluateExpression(condition.expression!, { data });

case 'value':
return this.compareValues(data, condition.value!, condition.operator!);

case 'schema':
return this.validateAgainstSchema(data, condition.schema!);

case 'custom':
return this.evaluateCustomFunction(condition.customFunction!, data);

default:
return false;
}
}

private compareValues(actual: any, expected: any, operator: ComparisonOperator): boolean {
switch (operator) {
case ComparisonOperator.EQUALS:
return actual === expected;

case ComparisonOperator.NOT_EQUALS:
return actual !== expected;

case ComparisonOperator.GREATER_THAN:
return actual > expected;

case ComparisonOperator.LESS_THAN:
return actual < expected;

case ComparisonOperator.CONTAINS:
return String(actual).includes(String(expected));

case ComparisonOperator.STARTS_WITH:
return String(actual).startsWith(String(expected));

case ComparisonOperator.MATCHES_REGEX:
return new RegExp(expected).test(String(actual));

case ComparisonOperator.IN_ARRAY:
return Array.isArray(expected) && expected.includes(actual);

case ComparisonOperator.HAS_PROPERTY:
return typeof actual === 'object' && actual !== null && actual.hasOwnProperty(expected);

case ComparisonOperator.IS_TYPE:
return typeof actual === expected;

default:
return false;
}
}
}

// Example: Route HTTP responses based on status code
const httpResponseRoutes: ConditionalRoute[] = [
{
id: 'success_route',
condition: {
type: 'expression',
expression: 'data.status >= 200 && data.status < 300'
},
targetPortId: 'success_output',
priority: 100,
description: 'Route successful responses'
},
{
id: 'client_error_route',
condition: {
type: 'expression',
expression: 'data.status >= 400 && data.status < 500'
},
targetPortId: 'client_error_output',
priority: 90,
description: 'Route client error responses'
},
{
id: 'server_error_route',
condition: {
type: 'expression',
expression: 'data.status >= 500'
},
targetPortId: 'server_error_output',
priority: 80,
description: 'Route server error responses'
},
{
id: 'default_route',
condition: {
type: 'value',
value: true,
operator: ComparisonOperator.EQUALS
},
targetPortId: 'default_output',
priority: 1,
description: 'Default route for unmatched responses'
}
];

Stream Processing

Stream Data Handling

interface DataStream {
id: string;
type: 'readable' | 'writable' | 'transform' | 'duplex';
encoding?: BufferEncoding;
objectMode: boolean;
highWaterMark: number;
metadata: StreamMetadata;
}

interface StreamMetadata {
contentType: string;
size?: number;
checksum?: string;
compression?: 'gzip' | 'deflate' | 'brotli';
}

class StreamProcessor {
async processStream(
inputStream: NodeJS.ReadableStream,
transformations: DataTransformation[],
options: StreamProcessingOptions
): Promise<NodeJS.ReadableStream> {

let currentStream = inputStream;

// Apply transformations in sequence
for (const transformation of transformations) {
currentStream = this.createTransformStream(transformation, options);
inputStream.pipe(currentStream);
inputStream = currentStream;
}

return currentStream;
}

private createTransformStream(
transformation: DataTransformation,
options: StreamProcessingOptions
): NodeJS.Transform {

return new Transform({
objectMode: options.objectMode,
highWaterMark: options.batchSize,

transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
try {
// Apply transformation to chunk
const result = transformation.implementation(chunk, {});

// Push result to output stream
this.push(result);
callback();

} catch (error) {
callback(error);
}
},

flush(callback: TransformCallback) {
// Finalize stream processing
callback();
}
});
}

// Batch processing for better performance
createBatchProcessor(
batchSize: number,
transformation: DataTransformation
): NodeJS.Transform {

let batch: any[] = [];

return new Transform({
objectMode: true,

transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
batch.push(chunk);

if (batch.length >= batchSize) {
this.processBatch(batch, transformation, callback);
batch = [];
} else {
callback();
}
},

flush(callback: TransformCallback) {
if (batch.length > 0) {
this.processBatch(batch, transformation, callback);
} else {
callback();
}
}
});
}

private processBatch(
batch: any[],
transformation: DataTransformation,
callback: TransformCallback
): void {
try {
// Process entire batch at once
const results = transformation.implementation(batch, {});

// Push each result
if (Array.isArray(results)) {
results.forEach(result => this.push(result));
} else {
this.push(results);
}

callback();

} catch (error) {
callback(error);
}
}
}

// Example: CSV processing stream
class CSVProcessor extends StreamProcessor {
createCSVParser(options: CSVParseOptions): NodeJS.Transform {
let header: string[] | null = null;
let buffer = '';

return new Transform({
objectMode: true,

transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback) {
buffer += chunk.toString();
const lines = buffer.split('\n');

// Keep last incomplete line in buffer
buffer = lines.pop() || '';

for (const line of lines) {
if (!line.trim()) continue;

const values = this.parseCSVLine(line);

if (!header && options.hasHeader) {
header = values;
continue;
}

const record = header ?
this.createRecord(header, values) :
values;

this.push(record);
}

callback();
},

flush(callback: TransformCallback) {
// Process remaining buffer
if (buffer.trim()) {
const values = this.parseCSVLine(buffer);
const record = header ?
this.createRecord(header, values) :
values;
this.push(record);
}
callback();
}
});
}

private parseCSVLine(line: string): string[] {
// Simple CSV parsing (for production, use a proper CSV library)
return line.split(',').map(value => value.trim().replace(/^"|"$/g, ''));
}

private createRecord(header: string[], values: string[]): any {
const record: any = {};
header.forEach((field, index) => {
record[field] = values[index] || null;
});
return record;
}
}

Data Validation

Schema Validation

interface ValidationRule {
type: ValidationType;
message?: string;
parameters?: any;
}

enum ValidationType {
REQUIRED = 'required',
TYPE = 'type',
MIN = 'min',
MAX = 'max',
MIN_LENGTH = 'minLength',
MAX_LENGTH = 'maxLength',
PATTERN = 'pattern',
EMAIL = 'email',
URL = 'url',
DATE = 'date',
ENUM = 'enum',
UNIQUE = 'unique',
CUSTOM = 'custom'
}

class DataValidator {
validate(data: any, schema: DataSchema): ValidationResult {
const errors: ValidationError[] = [];
const warnings: ValidationWarning[] = [];

try {
this.validateValue(data, schema, '', errors, warnings);
} catch (error) {
errors.push({
type: 'validation_error',
path: '',
message: error.message,
value: data
});
}

return {
valid: errors.length === 0,
errors,
warnings
};
}

private validateValue(
value: any,
schema: DataSchema,
path: string,
errors: ValidationError[],
warnings: ValidationWarning[]
): void {

// Check required
if (schema.required && (value === null || value === undefined)) {
errors.push({
type: 'required',
path,
message: 'Value is required',
value
});
return;
}

// Skip validation if value is null/undefined and not required
if (value === null || value === undefined) {
return;
}

// Type validation
if (!this.isValidType(value, schema.type)) {
errors.push({
type: 'type',
path,
message: `Expected ${schema.type}, got ${typeof value}`,
value
});
return;
}

// Custom validation rules
if (schema.validation) {
for (const rule of schema.validation) {
const result = this.validateRule(value, rule, path);
if (!result.valid) {
errors.push(...result.errors);
warnings.push(...result.warnings);
}
}
}

// Recursive validation for objects and arrays
if (schema.type === DataType.OBJECT && schema.properties) {
this.validateObject(value, schema, path, errors, warnings);
} else if (schema.type === DataType.ARRAY && schema.items) {
this.validateArray(value, schema, path, errors, warnings);
}
}

private validateRule(
value: any,
rule: ValidationRule,
path: string
): ValidationResult {
const errors: ValidationError[] = [];
const warnings: ValidationWarning[] = [];

switch (rule.type) {
case ValidationType.MIN:
if (typeof value === 'number' && value < rule.parameters.value) {
errors.push({
type: 'min',
path,
message: rule.message || `Value must be at least ${rule.parameters.value}`,
value
});
}
break;

case ValidationType.MAX:
if (typeof value === 'number' && value > rule.parameters.value) {
errors.push({
type: 'max',
path,
message: rule.message || `Value must be at most ${rule.parameters.value}`,
value
});
}
break;

case ValidationType.MIN_LENGTH:
if (typeof value === 'string' && value.length < rule.parameters.value) {
errors.push({
type: 'minLength',
path,
message: rule.message || `Length must be at least ${rule.parameters.value}`,
value
});
}
break;

case ValidationType.PATTERN:
if (typeof value === 'string' && !rule.parameters.pattern.test(value)) {
errors.push({
type: 'pattern',
path,
message: rule.message || 'Value does not match required pattern',
value
});
}
break;

case ValidationType.EMAIL:
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (typeof value === 'string' && !emailRegex.test(value)) {
errors.push({
type: 'email',
path,
message: rule.message || 'Invalid email format',
value
});
}
break;

case ValidationType.CUSTOM:
try {
const customResult = rule.parameters.validator(value);
if (!customResult) {
errors.push({
type: 'custom',
path,
message: rule.message || 'Custom validation failed',
value
});
}
} catch (error) {
errors.push({
type: 'custom',
path,
message: `Custom validation error: ${error.message}`,
value
});
}
break;
}

return {
valid: errors.length === 0,
errors,
warnings
};
}
}

Performance Optimization

Data Flow Optimization

interface OptimizationHint {
type: 'cache' | 'batch' | 'stream' | 'parallel' | 'compress';
nodeId: string;
portId: string;
parameters: any;
estimatedImprovement: number;
}

class DataFlowOptimizer {
analyzeWorkflow(workflow: WorkflowDefinition): OptimizationHint[] {
const hints: OptimizationHint[] = [];

// Analyze each node for optimization opportunities
for (const node of workflow.nodes) {
hints.push(...this.analyzeNode(node, workflow));
}

// Analyze data flow patterns
hints.push(...this.analyzeDataFlowPatterns(workflow));

return hints.sort((a, b) => b.estimatedImprovement - a.estimatedImprovement);
}

private analyzeNode(node: NodeDefinition, workflow: WorkflowDefinition): OptimizationHint[] {
const hints: OptimizationHint[] = [];

// Check for cacheable operations
if (this.isCacheable(node)) {
hints.push({
type: 'cache',
nodeId: node.id,
portId: 'output',
parameters: { ttl: 3600, keyStrategy: 'input_hash' },
estimatedImprovement: 0.8
});
}

// Check for batchable operations
if (this.isBatchable(node)) {
hints.push({
type: 'batch',
nodeId: node.id,
portId: 'input',
parameters: { batchSize: 100, timeout: 1000 },
estimatedImprovement: 0.6
});
}

// Check for streamable operations
if (this.isStreamable(node)) {
hints.push({
type: 'stream',
nodeId: node.id,
portId: 'input',
parameters: { chunkSize: 1024, highWaterMark: 16 },
estimatedImprovement: 0.7
});
}

return hints;
}

private analyzeDataFlowPatterns(workflow: WorkflowDefinition): OptimizationHint[] {
const hints: OptimizationHint[] = [];

// Find fan-out patterns (one source, multiple targets)
const fanOutPatterns = this.findFanOutPatterns(workflow);
for (const pattern of fanOutPatterns) {
hints.push({
type: 'parallel',
nodeId: pattern.sourceNodeId,
portId: pattern.sourcePortId,
parameters: { parallelism: pattern.targetCount },
estimatedImprovement: Math.min(pattern.targetCount * 0.3, 0.9)
});
}

// Find compression opportunities
const compressionCandidates = this.findCompressionCandidates(workflow);
for (const candidate of compressionCandidates) {
hints.push({
type: 'compress',
nodeId: candidate.nodeId,
portId: candidate.portId,
parameters: { algorithm: 'gzip', threshold: 1024 },
estimatedImprovement: 0.4
});
}

return hints;
}

applyOptimizations(
workflow: WorkflowDefinition,
hints: OptimizationHint[]
): OptimizedWorkflow {

const optimizedWorkflow = JSON.parse(JSON.stringify(workflow));
const appliedOptimizations: AppliedOptimization[] = [];

for (const hint of hints) {
try {
const optimization = this.applyOptimization(optimizedWorkflow, hint);
appliedOptimizations.push(optimization);
} catch (error) {
console.warn(`Failed to apply optimization ${hint.type} for node ${hint.nodeId}:`, error);
}
}

return {
original: workflow,
optimized: optimizedWorkflow,
optimizations: appliedOptimizations,
estimatedSpeedup: this.calculateSpeedup(appliedOptimizations)
};
}
}

Caching Strategies

interface CacheStrategy {
type: 'memory' | 'redis' | 'disk' | 'hybrid';
ttl: number;
maxSize: number;
keyStrategy: 'input_hash' | 'custom' | 'time_window';
evictionPolicy: 'lru' | 'lfu' | 'ttl';
compression: boolean;
}

class DataFlowCache {
private caches: Map<string, Cache> = new Map();

async get(
nodeId: string,
portId: string,
input: any,
strategy: CacheStrategy
): Promise<any | null> {

const cacheKey = this.generateCacheKey(nodeId, portId, input, strategy);
const cache = this.getOrCreateCache(nodeId, strategy);

return await cache.get(cacheKey);
}

async set(
nodeId: string,
portId: string,
input: any,
output: any,
strategy: CacheStrategy
): Promise<void> {

const cacheKey = this.generateCacheKey(nodeId, portId, input, strategy);
const cache = this.getOrCreateCache(nodeId, strategy);

await cache.set(cacheKey, output, strategy.ttl);
}

private generateCacheKey(
nodeId: string,
portId: string,
input: any,
strategy: CacheStrategy
): string {

switch (strategy.keyStrategy) {
case 'input_hash':
const inputHash = this.hashObject(input);
return `${nodeId}:${portId}:${inputHash}`;

case 'time_window':
const timeWindow = Math.floor(Date.now() / (strategy.ttl * 1000));
return `${nodeId}:${portId}:${timeWindow}`;

case 'custom':
// Custom key generation logic
return this.generateCustomKey(nodeId, portId, input);

default:
return `${nodeId}:${portId}:${JSON.stringify(input)}`;
}
}

private hashObject(obj: any): string {
const crypto = require('crypto');
const str = JSON.stringify(obj, Object.keys(obj).sort());
return crypto.createHash('sha256').update(str).digest('hex').substring(0, 16);
}
}

Need Help?