Defining a workflow
defineWorkflow takes places, transitions, initial marking, context, and terminal places. It validates all place references, compiles guard expressions, and extracts executor functions into separate maps.
import { defineWorkflow, createExecutor, Scheduler } from "@petriflow/engine";
const definition = defineWorkflow({
name: "deploy-pipeline",
places: ["idle", "tested", "staged", "deployed"],
transitions: [
{
name: "test",
type: "automatic",
inputs: ["idle"],
outputs: ["tested"],
guard: null,
execute: async (ctx) => ({ log: [...ctx.log, "tested"] }),
},
{
name: "stage",
type: "automatic",
inputs: ["tested"],
outputs: ["staged"],
guard: null,
execute: async (ctx) => ({ log: [...ctx.log, "staged"] }),
},
{
name: "deploy",
type: "automatic",
inputs: ["staged"],
outputs: ["deployed"],
guard: "approved == true",
execute: async (ctx) => ({ log: [...ctx.log, "deployed"] }),
},
],
initialMarking: { idle: 1, tested: 0, staged: 0, deployed: 0 },
initialContext: { log: [], approved: false },
terminalPlaces: ["deployed"],
}); Transitions are WorkflowTransition objects — an intersection with petri-ts Transition, so they pass directly to all petri-ts analysis functions (reachableStates, isDeadlockFree, toDot, etc.) via toNet().
Guards
Guards are expression strings evaluated against the workflow context. A transition is only enabled if it's structurally enabled (input places have tokens) and its guard passes. Guards are compiled at definition time via filtrex.
// Guards are expressions evaluated against the context
{
name: "approve",
type: "automatic",
inputs: ["review"],
outputs: ["approved"],
guard: "amount < 10000", // simple comparison
}
{
name: "escalate",
type: "automatic",
inputs: ["review"],
outputs: ["escalated"],
guard: "amount >= 10000 and not vipCustomer", // boolean logic
}
{
name: "timeout-check",
type: "automatic",
inputs: ["waiting"],
outputs: ["expired"],
guard: "marking.retries == 0", // access marking via marking.place
} Set guard: null for transitions with no guard condition. The marking is accessible inside guards via marking.placeName.
Executor
createExecutor wraps a WorkflowDefinition into a WorkflowExecutor with a step() method. Each step finds enabled transitions, picks one, fires it (consuming input tokens, producing output tokens), and runs the executor function.
import { createExecutor } from "@petriflow/engine";
const executor = createExecutor(definition);
// Fire one transition at a time
const result = await executor.step("instance-1", marking, context);
if (result.kind === "fired") {
console.log(result.transition); // "test"
console.log(result.marking); // { idle: 0, tested: 1, staged: 0, deployed: 0 }
console.log(result.context); // { log: ["tested"], approved: false }
console.log(result.terminal); // false — more transitions available
} else {
// result.kind === "idle" — no enabled transitions
} FIRED idle→tested, context updated
FIRED tested→staged
IDLE approved == false
When multiple transitions are enabled and no decisionProvider is configured, the first enabled transition fires. A decision provider can choose between candidates at runtime — useful for LLM-driven workflows.
Batch execution
stepBatch() fires multiple independent transitions concurrently. It finds all enabled transitions, selects up to maxConcurrent non-conflicting ones, runs their executors in parallel via Promise.all, and returns the combined result.
// Fire multiple independent transitions concurrently
const result = await executor.stepBatch("instance-1", marking, context, 4);
if (result.kind === "fired") {
console.log(result.transitions); // ["do_a", "do_b"] — all that fired
console.log(result.marking); // tokens consumed/produced for all
console.log(result.context); // patches merged in name order
console.log(result.terminal); // true if no transitions remain
} Conflict resolution
Transitions that compete for the same input tokens cannot both fire. stepBatch selects greedily: it walks enabled transitions in order, consuming tokens from a working marking. If a transition's inputs are no longer available (already consumed by an earlier selection), it's skipped.
// Two transitions compete for the same input token:
// take_a: [shared] → [out_a]
// take_b: [shared] → [out_b]
//
// With shared:1, only take_a fires (first-enabled wins).
// take_b is skipped — its input token was already consumed.
const result = await executor.stepBatch("inst", { shared: 1, out_a: 0, out_b: 0 }, ctx, 10);
// result.transitions === ["take_a"] Context merge
Each executor receives the original (pre-fire) context snapshot — executors cannot see each other's patches. Patches are merged in transition-name order (alphabetical) using shallow spread. Later patches overwrite earlier ones on shared keys. Executors in a batch should write to disjoint context keys. Overlapping compound values (arrays, objects) are replaced, not deep-merged.
// Context patches merge in transition-name order (deterministic).
// If do_alpha and do_zeta both fire:
// 1. do_alpha's patch applied first (alphabetically first)
// 2. do_zeta's patch applied second (overwrites shared keys)
//
// stepBatch(id, m, ctx, 1) behaves identically to step(id, m, ctx) Error handling
stepBatch uses Promise.all — if any executor throws, the entire batch fails. The scheduler marks the instance as failed with the pre-fire marking (no tokens are lost). However, executors that completed before the failure already performed their side effects, which won't be recorded. Executors that perform side effects should be idempotent, since a retry after partial failure may re-run executors that already succeeded.
BOTH FIRE in parallel via Promise.all
CONFLICT only take_a fires (first-enabled wins)
Scheduler
The Scheduler drives workflow instances to completion. It manages persistence (SQLite), timeouts, and event callbacks. Call tick() manually or use start() for automatic polling.
import { Scheduler } from "@petriflow/engine";
import { Database } from "bun:sqlite";
import { sqliteAdapter } from "@petriflow/engine";
const db = new Database("workflow.db");
const scheduler = new Scheduler(
createExecutor(definition),
{ adapter: sqliteAdapter(db, definition.name) },
{
onFire: (id, transition, result) => {
console.log(`[${id}] fired ${transition}`);
},
onComplete: (id) => console.log(`[${id}] completed`),
onError: (id, err) => console.error(`[${id}] error:`, err),
},
);
await scheduler.createInstance("deploy-001");
// Sequential: fires one transition per tick
await scheduler.tick();
// Concurrent: fires up to 4 independent transitions per tick
await scheduler.tick({ maxConcurrent: 4 });
// Poll automatically
scheduler.start(); // ticks every 1000ms by default
scheduler.stop(); Pass { maxConcurrent } to tick() to use batch execution. When set, the scheduler fires up to that many independent transitions per instance per tick. Without the option, behavior is sequential — one transition per tick, same as before.
Dynamic expansion
expandWorkflow adds new transitions and places to an existing WorkflowDefinition. It returns a new definition — the original is not mutated. The expanded definition goes through the same validation as defineWorkflow: place references are checked, guards are compiled, and executors are extracted.
import { expandWorkflow, injectTokens, createExecutor } from "@petriflow/engine";
// Original definition has: idle → tested → staged → deployed
// Add a rollback path from staged back to tested
const expanded = expandWorkflow(
definition,
[
{
name: "rollback",
type: "automatic",
inputs: ["staged"],
outputs: ["tested"],
guard: "rollbackRequested == true",
execute: async (ctx) => ({
log: [...ctx.log, "rolled back"],
rollbackRequested: false,
}),
},
],
// No new places needed — rollback uses existing places
);
// expanded is a new WorkflowDefinition. Original is unchanged.
// Guards are compiled, executors extracted — same as defineWorkflow(). New places and terminal states
Pass new places as the third argument and new terminal places in the options. Existing transitions and executors are preserved exactly.
// Add entirely new places and terminal states
const withAudit = expandWorkflow(
definition,
[
{
name: "audit",
type: "automatic",
inputs: ["deployed"],
outputs: ["audited"],
guard: null,
execute: async (ctx) => ({ log: [...ctx.log, "audited"] }),
},
],
["audited"], // new places
{ terminalPlaces: ["audited"] }, // new terminal places
); Updating a running scheduler
Use updateExecutor() to swap the scheduler's executor at runtime. The next tick() picks up the expanded definition. Instance state (marking, context) in the database is untouched.
// Expand a running workflow without restarting
const expanded = expandWorkflow(definition, newTransitions, newPlaces);
scheduler.updateExecutor(createExecutor(expanded));
// Next tick() uses the expanded definition.
// Existing instance state (marking, context) is preserved in the database.
// Inject tokens if the new transitions need already-satisfied prerequisites:
await scheduler.injectToken("instance-1", "newPlace", 1); Injecting tokens
injectTokens returns a new marking with additional tokens in a given place. This is essential when expanding a net where some prerequisites are already satisfied — new transitions may need tokens that represent completed work.
import { injectTokens } from "@petriflow/engine";
// A transition needs tokens in two places: [waiting, approval] → [done]
// "waiting" has a token but "approval" doesn't — transition is blocked.
const marking = { waiting: 1, approval: 0, done: 0 };
// Inject a token to represent an external approval
const updated = injectTokens(marking, "approval");
// { waiting: 1, approval: 1, done: 0 }
// Now the transition is enabled.
// injectTokens is pure — original marking is unchanged.
// Works with the Scheduler too:
await scheduler.injectToken("instance-1", "approval"); Both the standalone injectTokens() function and the scheduler's injectToken() method are available. The standalone function is pure (returns a new marking). The scheduler method persists the change and reactivates the instance.
API reference
defineWorkflow(def, options?)
Creates a WorkflowDefinition. Validates place references, compiles guards, extracts executors. Throws on invalid references.
expandWorkflow(definition, newTransitions, newPlaces?, options?)
Returns a new WorkflowDefinition with additional transitions and places. Pure function — does not mutate the original. Same validation as defineWorkflow.
- definition: the existing
WorkflowDefinitionto expand - newTransitions: transitions to add (same format as
defineWorkflow) - newPlaces: optional array of new place names
- options.terminalPlaces: optional new terminal places to add
- options.nodes: optional custom node executor registry
injectTokens(marking, place, count?)
Returns a new marking with count (default 1) tokens added to place. Pure function.
createExecutor(definition, options?)
Creates a WorkflowExecutor from a definition. Optionally accepts a decisionProvider for choosing between multiple enabled transitions.
WorkflowExecutor
- step(instanceId, marking, ctx): fire one enabled transition. Returns
StepResult. - stepBatch(instanceId, marking, ctx, maxConcurrent): fire up to
maxConcurrentnon-conflicting transitions in parallel. ReturnsBatchStepResult. Executors receive the original context snapshot and should write to disjoint keys (shallow merge, last-writer-wins on conflicts). All-or-nothing on failure — executors should be idempotent. - getTimeoutCandidates(marking): returns transitions with timeouts that are currently enabled.
Scheduler
- createInstance(id): create a new workflow instance with the initial marking and context.
- tick(options?): process all active instances. Pass
{ maxConcurrent }for batch execution. Returns total transitions fired. - injectToken(id, place, count?): add tokens to a running instance and reactivate it.
- updateExecutor(executor): swap the executor for an expanded definition. Takes effect on next tick.
- start() / stop(): automatic polling at
pollIntervalMs(default 1000ms). - inspect(id): read current instance state (marking, context, status).
- getHistory(id): full transition history with before/after markings.
WorkflowDefinition<Place, Ctx>
type WorkflowDefinition<Place, Ctx> = {
name: string;
net: WorkflowNet<Place, Ctx>;
guards: Map<string, GuardFn<Place, Ctx>>;
executors: Map<string, ExecuteFn<Place, Ctx>>;
initialContext: Ctx;
terminalPlaces: Place[];
invariants?: { weights: Partial<Record<Place, number>> }[];
}; WorkflowTransition<Place, Ctx>
type WorkflowTransition<Place, Ctx> = Transition<Place> & {
type: string;
guard: string | null;
timeout?: { place: Place; ms: number };
config?: Record<string, unknown>;
}; - type: free-form string (e.g.
"automatic","manual","http","timer"). Used by the node registry to resolve executors from config. - guard: expression string or
null. Compiled to a function at definition time. - timeout: optional. When the transition is enabled, a timeout token is placed after
msmilliseconds. - config: optional. Passed to the node executor for type-based execution (e.g. HTTP config).
StepResult<Place, Ctx>
type StepResult<Place, Ctx> =
| {
kind: "fired";
marking: Marking<Place>;
context: Ctx;
transition: string;
terminal: boolean;
decision?: { reasoning: string; candidates: string[] };
}
| { kind: "idle" }; BatchStepResult<Place, Ctx>
type BatchStepResult<Place, Ctx> =
| {
kind: "fired";
marking: Marking<Place>;
context: Ctx;
transitions: string[]; // all transitions that fired
terminal: boolean;
}
| { kind: "idle" }; ExecutionResult<Place, Ctx>
type ExecutionResult<Place, Ctx> = {
marking: Marking<Place>;
context: Ctx;
firedTransition: string;
}; Utility functions
canFireWorkflow(marking, transition, ctx, guards?)— structural + guard checkenabledWorkflowTransitions(net, marking, ctx, guards?)— all fireable transitionsfireWorkflow(marking, transition, ctx, guards?, executors?)— fire and executetoNet(workflowNet)— strip extensions for petri-ts analysisanalyse(definition, options?)— reachable states, deadlock detection, invariant checkingcompileGuard(expr)— compile a guard expression string to a functionsqliteAdapter(db, workflowName)— SQLite persistence adapter