Data Processing Pipelines

Real-world patterns for building data transformation pipelines with Orlando.

ETL: Extract, Transform, Load

Normalizing User Data

import init, { Pipeline, both } from 'orlando-transducers';
await init();

const normalizeUsers = new Pipeline()
  .filter(u => u != null)
  .filter(u => u.email != null && u.email.includes('@'))
  .map(u => ({
    id: u.id,
    name: u.name.trim(),
    email: u.email.toLowerCase().trim(),
    role: u.role || 'user',
    createdAt: new Date(u.created_at).toISOString(),
  }))
  .unique();  // deduplicate consecutive entries

// Reuse on multiple data sources
const fromCsv = normalizeUsers.toArray(csvRecords);
const fromApi = normalizeUsers.toArray(apiResponse.users);

Log Processing

// Parse and filter error logs
const errorPipeline = new Pipeline()
  .map(line => {
    const [timestamp, level, ...messageParts] = line.split(' ');
    return { timestamp, level, message: messageParts.join(' ') };
  })
  .filter(entry => entry.level === 'ERROR' || entry.level === 'FATAL')
  .map(entry => ({
    ...entry,
    timestamp: new Date(entry.timestamp),
  }));

const errors = errorPipeline.toArray(logLines);

Analytics Aggregation

Revenue Calculation

const revenuePipeline = new Pipeline()
  .filter(event => event.type === 'purchase')
  .filter(event => event.status === 'completed')
  .map(event => event.amount);

const totalRevenue = revenuePipeline.reduce(
  events,
  (sum, amount) => sum + amount,
  0
);

Top Products by Category

import { Pipeline, sortBy, topK } from 'orlando-transducers';

// Extract and score products
const scoredProducts = new Pipeline()
  .filter(p => p.inStock && p.rating >= 3.0)
  .map(p => ({
    ...p,
    score: p.rating * Math.log(p.salesCount + 1),
  }))
  .toArray(products);

// Get top 10 by computed score
const top10 = topK(scoredProducts, 10);

Funnel Analysis

// Count users at each stage of a conversion funnel
const stages = ['visit', 'signup', 'activate', 'purchase'];

const funnelCounts = stages.map(stage => {
  const count = new Pipeline()
    .filter(event => event.stage === stage)
    .unique()  // deduplicate by consecutive user
    .toArray(events)
    .length;

  return { stage, count };
});

Pagination

function paginate(data, page, pageSize) {
  return new Pipeline()
    .drop((page - 1) * pageSize)
    .take(pageSize)
    .toArray(data);
}

const page2 = paginate(users, 2, 20);  // items 21-40

Filtered Pagination

function searchAndPaginate(data, query, page, pageSize) {
  const pipeline = new Pipeline()
    .filter(item => item.name.toLowerCase().includes(query.toLowerCase()))
    .filter(item => item.active)
    .drop((page - 1) * pageSize)
    .take(pageSize);

  return pipeline.toArray(data);
}

Search with Multiple Filters

import { Pipeline, both, allPass } from 'orlando-transducers';

function searchProducts(catalog, filters) {
  let pipeline = new Pipeline();

  if (filters.category) {
    pipeline = pipeline.filter(p => p.category === filters.category);
  }

  if (filters.minPrice != null) {
    pipeline = pipeline.filter(p => p.price >= filters.minPrice);
  }

  if (filters.maxPrice != null) {
    pipeline = pipeline.filter(p => p.price <= filters.maxPrice);
  }

  if (filters.minRating) {
    pipeline = pipeline.filter(p => p.rating >= filters.minRating);
  }

  if (filters.inStockOnly) {
    pipeline = pipeline.filter(p => p.inStock);
  }

  return pipeline.take(filters.limit || 20).toArray(catalog);
}

const results = searchProducts(catalog, {
  category: 'electronics',
  minPrice: 50,
  maxPrice: 500,
  minRating: 4.0,
  inStockOnly: true,
  limit: 20,
});

Combining Multiple Data Sources

Using Multi-Input Operations

import { Pipeline, intersection, difference, union, merge } from 'orlando-transducers';

// Find users active on both platforms
const mobileUsers = new Pipeline()
  .filter(e => e.platform === 'mobile')
  .map(e => e.userId)
  .toArray(events);

const webUsers = new Pipeline()
  .filter(e => e.platform === 'web')
  .map(e => e.userId)
  .toArray(events);

const crossPlatform = intersection(mobileUsers, webUsers);
const mobileOnly = difference(mobileUsers, webUsers);
const allUsers = union(mobileUsers, webUsers);

Interleaving Data Streams

import { merge, Pipeline } from 'orlando-transducers';

// Process logs from multiple servers
const processLogs = new Pipeline()
  .filter(log => log.level === 'error')
  .map(log => ({
    server: log.source,
    message: log.message,
    time: new Date(log.timestamp),
  }));

const server1Errors = processLogs.toArray(server1Logs);
const server2Errors = processLogs.toArray(server2Logs);

// Interleave for chronological review
const allErrors = merge([server1Errors, server2Errors]);

Debugging Pipelines

Use .tap() to inspect values flowing through the pipeline without modifying them:

const pipeline = new Pipeline()
  .tap(x => console.log('[input]', x))
  .filter(x => x.active)
  .tap(x => console.log('[after filter]', x))
  .map(x => x.email.toLowerCase())
  .tap(x => console.log('[after map]', x))
  .take(5);

const result = pipeline.toArray(users);

Conditional Debugging

const DEBUG = process.env.NODE_ENV === 'development';

function debug(label) {
  return DEBUG
    ? x => console.log(`[${label}]`, x)
    : () => {};
}

const pipeline = new Pipeline()
  .tap(debug('raw'))
  .filter(isValid)
  .tap(debug('valid'))
  .map(transform)
  .tap(debug('transformed'));

Rust: PipelineBuilder for ETL

#![allow(unused)]
fn main() {
use orlando_transducers::iter_ext::PipelineBuilder;

// Extract numeric values, filter outliers, take top results
let cleaned: Vec<f64> = PipelineBuilder::new()
    .map(|record: Record| record.value)
    .filter(|v: &f64| *v > 0.0 && *v < 1000.0)
    .take(100)
    .run(raw_records.into_iter());
}

Rust: Hybrid Composition

#![allow(unused)]
fn main() {
use orlando_transducers::{Map, Filter, Take, to_vec, intersection};

// Process each dataset independently
let pipeline = Map::new(|r: Record| r.user_id)
    .compose(Filter::new(|id: &u64| *id > 0));

let dataset_a_ids = to_vec(&pipeline, dataset_a);
let dataset_b_ids = to_vec(&pipeline, dataset_b);

// Find common users
let common_users = intersection(dataset_a_ids, dataset_b_ids);
}