@effectstream/batcher-sdk
Package:
@effectstream/batcher-sdk· Source
A blockchain batching system for aggregating and submitting user inputs to multiple chains.
Features
- Crash-safe batching - Storage is the single source of truth; no in-memory pools that can be lost on restart
- Pluggable storage - Use files, PostgreSQL, Redis, or any backend by
implementing
BatcherStorage - Multi-chain support - Abstract away blockchain differences through custom adapters
- Flexible batching strategies - Configure when to submit batches (time, size, value, or custom logic)
- Structured concurrency - Uses Effection for automatic resource cleanup and graceful cancellation
- Event-driven observability - Hook into batch lifecycle events for logging, monitoring, and custom behavior
Quick Start
Here's a minimal example based on the E2E test setup:
1. Configure the Batcher
import {
FileStorage,
BatcherConfig,
EffectstreamL2DefaultAdapter,
} from "@effectstream/batcher";
// Create an adapter for your blockchain
const effectstreamL2 = new EffectstreamL2DefaultAdapter(
"0x...", // Contract address
"0x...", // Private key
0n, // Fee
"parallelEvmRPC_fast", // Sync protocol name
);
// Configure the batcher
const config: BatcherConfig = {
pollingIntervalMs: 1000,
enableHttpServer: true,
confirmationLevel: "wait-effectstream-processed",
enableEventSystem: true,
port: 3334,
};
const storage = new FileStorage("./batcher-data");
2. Run with Effection
import { main, suspend } from "effection";
import { createNewBatcher } from "@effectstream/batcher";
const batcher = createNewBatcher(config, storage);
batcher
.addBlockchainAdapter("effectstream-l2", effectstreamL2Adapter, { criteriaType: "time", timeWindowMs: 1000 })
// Add custom startup logging
batcher.addStateTransition("startup", ({ publicConfig }) => {
console.log(
`🚀 Batcher started - polling every ${publicConfig.pollingIntervalMs}ms`,
);
console.log(`📍 Default Target: ${publicConfig.defaultTarget}`);
});
// Add HTTP server logging
batcher.addStateTransition("http:start", ({ port }) => {
console.log(`🌐 HTTP Server: http://localhost:${port}`);
});
main(function* () {
// Run the batcher (handles HTTP server and polling automatically)
yield* batcher.runBatcher();
yield* suspend();
});
The batcher is now running and will:
- Accept inputs via HTTP API on port 3334
- Process batches every 1000ms
- Submit to the blockchain via the adapter
- Emit state transition events for observability
Core Concepts
Storage as Single Source of Truth
All inputs are immediately persisted to storage. There's no in-memory pool, eliminating consistency issues on crashes or restarts.
Storage is fully pluggable - implement the BatcherStorage interface to use
any backend (PostgreSQL, Redis, S3, etc.). The default FileStorage uses JSONL
files for simplicity.
Adapters
Adapters abstract blockchain-specific logic. Each adapter implements
BlockchainAdapter:
interface BlockchainAdapter {
submitBatch(data: string, fee: string | bigint): Promise<BlockchainHash>;
waitForTransactionReceipt(
hash: BlockchainHash,
): Promise<BlockchainTransactionReceipt>;
getAccountAddress(): string;
getChainName(): string;
estimateBatchFee(data: string): Promise<string | bigint> | string | bigint;
isReady(): boolean;
getBlockNumber(): Promise<bigint>;
}
Out of the box we ship adapters for the most common targets:
EffectstreamL2DefaultAdapter– Effectstream's default L2 contract entrypoint.EvmContractAdapter– generic viem-powered adapter that loads any Hardhat artifact and validates{ method, args, value }inputs.MidnightAdapter– circuit-based submission flow for the Midnight chain.BitcoinAdapter– UTXO batching + signature validation for regtest/local setups.
Batching Criteria
Configure when batches are submitted per adapter:
time- Submit every N millisecondssize- Submit when N inputs accumulatevalue- Submit when accumulated value reaches thresholdhybrid- Submit when time OR size criteria metcustom- Provide your ownisBatchReadyFn
State Transitions
Hook into batch lifecycle for logging, metrics, or custom behavior:
batcher.addStateTransition("startup", (payload) => {
// Batcher initialized
});
batcher.addStateTransition("batch:process:start", ({ target, inputCount }) => {
console.log(`Processing ${inputCount} inputs for ${target}`);
});
batcher.addStateTransition("error", ({ phase, error }) => {
// Handle errors
});
Configuration
Key configuration options:
type BatcherConfig = {
// Polling interval for checking batch criteria
pollingIntervalMs: number;
// Blockchain adapters keyed by name
adapters: Record<string, BlockchainAdapter>;
// Default adapter when input.target not specified
defaultTarget: string;
// Per-adapter batching rules
batchingCriteria?: {
[adapterName: string]: {
criteriaType: "time" | "size" | "value" | "hybrid" | "custom";
timeWindowMs?: number;
maxBatchSize?: number;
targetValue?: number;
valueAccumulatorFn?: (input: T) => number;
isBatchReadyFn?: (
inputs: T[],
lastProcessTime: number,
) => boolean | Promise<boolean>;
};
};
// Wait behavior: "no-wait" | "wait-receipt" | "wait-effectstream-processed"
confirmationLevel?: string | Record<string, string>;
// Enable HTTP REST API
enableHttpServer?: boolean;
port?: number;
// Enable state transition events
enableEventSystem?: boolean;
// Signature verification namespace
namespace?: string;
// Batch building configuration
batchBuilding?: {
maxSize?: number;
defaultBuilder?: BatchDataBuilder<T>;
targetBuilders?: Record<string, BatchDataBuilder<T>>;
};
};
Customization
Custom Batching Criteria
Define your own logic for when to submit batches:
batchingCriteria: {
myAdapter: {
criteriaType: "custom",
isBatchReadyFn: async (inputs, lastProcessTime) => {
// Custom logic: batch when we have priority inputs
const hasPriority = inputs.some(input => input.priority === "high");
const timePassed = Date.now() - lastProcessTime > 5000;
return hasPriority || timePassed;
}
}
}
Custom Adapters
Implement BlockchainAdapter for new chains:
class MyChainAdapter implements BlockchainAdapter {
async submitBatch(
data: string,
fee: string | bigint,
): Promise<BlockchainHash> {
// Submit to your blockchain
return txHash;
}
// Implement other required methods...
}
Custom Batch Builders
Control how inputs are serialized into batch data:
class MyBatchBuilder implements BatchDataBuilder<MyInput> {
buildBatchData(
inputs: MyInput[],
options: any,
): BatchBuildingResult<MyInput> {
// Custom serialization logic
const data = myCustomEncoding(inputs);
return { data, includedInputs: inputs, excludedInputs: [] };
}
}
const config = {
// ...
batchBuilding: {
defaultBuilder: new MyBatchBuilder(),
},
};
State Transition Listeners
Add custom behavior at any point in the batch lifecycle:
// Log all batch submissions
batcher.addStateTransition("batch:submit", ({ target, hash, inputCount }) => {
console.log(`Submitted batch ${hash} with ${inputCount} inputs to ${target}`);
});
// Track confirmation times
batcher.addStateTransition(
"batch:confirmed",
({ target, hash, blockNumber }) => {
metrics.recordConfirmation(target, Date.now() - startTime);
},
);
// Handle errors
batcher.addStateTransition("error", ({ phase, error }) => {
errorReporter.report(phase, error);
});
API Overview
Main Operations
runBatcher(): Operation<void>- Effection operation that runs HTTP server and polling loopbatchInput(input: T, confirmationLevel?, timeout?): Promise<Receipt | null>- Submit an input for batchingaddStateTransition(prefix, listener): void- Register event listener for batch lifecyclegracefulShutdownOp(): Operation<void>- Gracefully shutdown with cleanupgetPublicConfig()- Get public configuration informationgetBatchingStatus()- Get current queue statistics
Confirmation Levels
When calling batchInput(), choose how long to wait:
no-wait- Return immediately after queuingwait-receipt- Wait for blockchain transaction receiptwait-effectstream-processed- Wait until EffectStream has processed the batch
// Don't wait
await batcher.batchInput(input, "no-wait");
// Wait for receipt (default)
const receipt = await batcher.batchInput(input, "wait-receipt");
// Wait for full processing
const result = await batcher.batchInput(input, "wait-effectstream-processed");
console.log(`Processed in rollup block ${result.rollup}`);
Folder Structure
packages/batcher/
├── core/ # Core batcher functionality
│ ├── batcher.ts # Main Batcher class
│ ├── types.ts # Core types and interfaces
│ ├── config.ts # Configuration types
│ ├── storage.ts # Storage interface + FileStorage (✨ pluggable)
│ ├── batch-processor.ts # Batch processing logic
│ ├── shutdown-manager.ts # Graceful shutdown handling
│ └── batcher-events.ts # Event system types
│
├── batch-data-builder/ # Batch serialization (✨ pluggable)
│ ├── batch-data-builder.ts # Interface and types
│ └── default-batch-builder.ts # Default implementation
│
├── adapters/ # Blockchain adapters (✨ pluggable)
│ ├── adapter.ts # Base adapter interface
│ ├── effectstream-l2-adapter.ts # EffectstreamL2 EVM implementation
│ ├── evm-contract-adapter.ts # Generic ABI-driven EVM adapter
│ ├── midnight-adapter.ts # Midnight circuit adapter
│ └── bitcoin-adapter.ts # Bitcoin regtest adapter
│
└── server/ # HTTP API
└── batcher-server.ts # Fastify server implementation
HTTP API
When enableHttpServer: true, the batcher exposes a REST API:
POST /batch-input- Submit an input for batchingGET /status- Get batching status and statisticsGET /health- Health check endpointPOST /force-process- Manually trigger batch processing
Example:
curl -X POST http://localhost:3334/batch-input \
-H "Content-Type: application/json" \
-d '{
"address": "0x...",
"input": "myGameInput",
"signature": "0x...",
"timestamp": 1234567890
}'
signature is optional for chains/adapters that override verifySignature (for example Midnight). When omitted, the adapter must implement its own verification semantics.