π¦ This is a library package - Add it to your Elixir application via Hex.pm as
{:singularity_workflow, "~> 0.1"}
Production-ready Elixir library for workflow orchestration with database-driven DAG execution.
Singularity.Workflow is a library that you add to your Elixir applications to provide reliable, scalable workflow execution using PostgreSQL + pgmq extension with real-time messaging via PostgreSQL NOTIFY (NATS replacement).
This is a library, not a standalone application. You integrate it into your existing Elixir/Phoenix applications to add workflow orchestration capabilities. Think of it like Ecto or Oban - a dependency you add to your mix.exs to gain powerful workflow features.
- β Database-Driven DAGs - Workflows stored and executed via PostgreSQL
- β Real-time Messaging - PostgreSQL NOTIFY for instant message delivery (NATS replacement)
- β Parallel Execution - Independent branches run concurrently
- β Multi-Instance Scaling - Horizontal scaling via pgmq + PostgreSQL
- β Workflow Lifecycle Management - Cancel, pause, resume, retry workflows
- β Phoenix Integration - Direct LiveView/Channels integration (no Phoenix.PubSub needed)
- β Comprehensive Logging - Structured logging for all workflow events
- β Static & Dynamic Workflows - Code-based and runtime-generated workflows
- β Map Steps - Variable task counts for bulk processing
- β Dependency Merging - Steps receive outputs from all dependencies
- β HTDAG Orchestration - Goal-driven workflow decomposition (hierarchical task DAGs)
- β Workflow Optimization - Learn from execution patterns to optimize future workflows
- β 100% Feature Parity - Matches workflow orchestration architecture patterns
- Quick Start
- Architecture
- Real-time Messaging
- Workflow Types
- HTDAG Orchestration
- API Reference
- Examples
- Testing
- Deployment
- Contributing
Add singularity_workflow to your application's dependencies in mix.exs:
def deps do
[
{:singularity_workflow, "~> 0.1.5"}
]
endRun:
mix deps.get- Install PostgreSQL with pgmq extension:
# Install pgmq extension in YOUR database
psql -d your_app_database -c "CREATE EXTENSION IF NOT EXISTS pgmq;"- Configure your application's repo:
# config/config.exs
config :my_app, MyApp.Repo,
database: "my_app_dev",
username: "postgres",
password: "postgres",
hostname: "localhost"
config :my_app,
ecto_repos: [MyApp.Repo]- Start your application:
# lib/my_app/application.ex
def start(_type, _args) do
children = [
MyApp.Repo, # Your repo - Singularity.Workflow uses it
# ... other children
]
Supervisor.start_link(children, strategy: :one_for_one)
end# Define a simple workflow
defmodule MyWorkflow do
def __workflow_steps__ do
[
{:step1, &__MODULE__.step1/1, depends_on: []},
{:step2, &__MODULE__.step2/1, depends_on: [:step1]},
{:step3, &__MODULE__.step3/1, depends_on: [:step1, :step2]}
]
end
def step1(input), do: {:ok, %{processed: input.data * 2}}
def step2(input), do: {:ok, %{validated: input.processed > 0}}
def step3(input), do: {:ok, %{result: input.validated && input.processed}}
end
# Execute workflow
{:ok, result} = Singularity.Workflow.Executor.execute(MyWorkflow, %{data: 5}, MyApp.Repo)graph TB
A[Workflow Definition] --> B[Singularity.Workflow.Executor]
B --> C[PostgreSQL + pgmq]
C --> D[Task Execution]
D --> E[PostgreSQL NOTIFY]
E --> F[Real-time Messaging]
subgraph "Database Layer"
C
G[workflows table]
H[tasks table]
I[pgmq queues]
end
subgraph "Execution Layer"
B
J[Task Scheduler]
K[Dependency Resolver]
L[Parallel Executor]
end
subgraph "Messaging Layer"
E
M[Singularity.Workflow.Notifications]
N[Event Listeners]
end
| Component | Purpose | Key Features |
|---|---|---|
| Singularity.Workflow.Executor | Workflow execution engine | Static/dynamic workflows, parallel execution |
| Singularity.Workflow.FlowBuilder | Dynamic workflow creation | Runtime workflow generation, AI/LLM integration |
| Singularity.Workflow.Notifications | Real-time messaging | PostgreSQL NOTIFY messaging, structured logging |
| PostgreSQL + pgmq | Data persistence & coordination | ACID transactions, message queuing |
| Task Scheduler | Dependency resolution | DAG traversal, parallel execution |
Singularity.Workflow provides a complete messaging infrastructure via PostgreSQL NOTIFY (NATS replacement):
# Send workflow event with NOTIFY
{:ok, message_id} = Singularity.Workflow.Notifications.send_with_notify(
"workflow_events",
%{
type: "task_completed",
task_id: "task_123",
workflow_id: "workflow_456",
status: "success",
duration_ms: 1500
},
MyApp.Repo
)# Start listening for workflow messages
{:ok, listener_pid} = Singularity.Workflow.Notifications.listen("workflow_events", MyApp.Repo)
# Handle messages
receive do
{:notification, ^listener_pid, channel, message_id} ->
Logger.info("Workflow message received: #{channel} -> #{message_id}")
# Process the message...
after
5000 -> :timeout
end
# Stop listening
:ok = Singularity.Workflow.Notifications.unlisten(listener_pid, MyApp.Repo)| Event Type | Description | Payload |
|---|---|---|
workflow_started |
Workflow execution begins | {workflow_id, input} |
task_started |
Individual task starts | {task_id, workflow_id, step_name} |
task_completed |
Task finishes successfully | {task_id, result, duration_ms} |
task_failed |
Task fails with error | {task_id, error, retry_count} |
workflow_completed |
Entire workflow finishes | {workflow_id, final_result} |
workflow_failed |
Workflow fails | {workflow_id, error, failed_task} |
All notifications include comprehensive logging:
# Success logging
Logger.info("PGMQ + NOTIFY sent successfully",
queue: "workflow_events",
message_id: "msg_123",
duration_ms: 45,
message_type: "task_completed"
)
# Error logging
Logger.error("PGMQ + NOTIFY send failed",
queue: "workflow_events",
error: "Connection timeout",
message_type: "workflow_started"
)Define workflows as Elixir modules:
defmodule DataProcessingWorkflow do
def __workflow_steps__ do
[
{:fetch_data, &__MODULE__.fetch_data/1, depends_on: []},
{:validate_data, &__MODULE__.validate_data/1, depends_on: [:fetch_data]},
{:transform_data, &__MODULE__.transform_data/1, depends_on: [:validate_data]},
{:save_data, &__MODULE__.save_data/1, depends_on: [:transform_data]}
]
end
def fetch_data(_input) do
# Fetch data from API
{:ok, %{data: [1, 2, 3, 4, 5]}}
end
def validate_data(input) do
# Validate data
valid = Enum.all?(input.data, &is_number/1)
{:ok, %{valid: valid, data: input.data}}
end
def transform_data(input) do
# Transform data
transformed = Enum.map(input.data, &(&1 * 2))
{:ok, %{transformed: transformed}}
end
def save_data(input) do
# Save to database
{:ok, %{saved: length(input.transformed)}}
end
end
# Execute
{:ok, result} = Singularity.Workflow.Executor.execute(DataProcessingWorkflow, %{}, MyApp.Repo)Create workflows at runtime:
# Create workflow
{:ok, _} = Singularity.Workflow.FlowBuilder.create_flow("ai_generated_workflow", MyApp.Repo)
# Add steps
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("ai_generated_workflow", "analyze", [], MyApp.Repo)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("ai_generated_workflow", "generate", ["analyze"], MyApp.Repo)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("ai_generated_workflow", "validate", ["generate"], MyApp.Repo)
# Define step functions
step_functions = %{
"analyze" => fn input -> {:ok, %{analysis: "complex"}} end,
"generate" => fn input -> {:ok, %{content: "generated content"}} end,
"validate" => fn input -> {:ok, %{valid: true}} end
}
# Execute
{:ok, result} = Singularity.Workflow.Executor.execute_dynamic(
"ai_generated_workflow",
%{prompt: "Generate a report"},
step_functions,
MyApp.Repo
)Process multiple items in parallel:
defmodule BulkProcessingWorkflow do
def __workflow_steps__ do
[
{:process_items, &__MODULE__.process_items/1,
depends_on: [],
initial_tasks: 5} # Process 5 items in parallel
]
end
def process_items(input) do
# This function will be called 5 times in parallel
item_id = input.item_id
# Process individual item
{:ok, %{processed_item: item_id, result: "success"}}
end
end
# Execute with multiple items
items = Enum.map(1..5, &%{item_id: &1})
{:ok, results} = Singularity.Workflow.Executor.execute(BulkProcessingWorkflow, %{items: items}, MyApp.Repo)Singularity.Workflow includes Hierarchical Task DAG (HTDAG) support for goal-driven workflow decomposition. Convert high-level goals into executable workflows automatically.
# Define a goal decomposer
defmodule MyApp.GoalDecomposer do
def decompose("Build authentication system") do
{:ok, [
%{id: "design", description: "Design auth flow", depends_on: []},
%{id: "implement", description: "Implement", depends_on: ["design"]},
%{id: "test", description: "Test", depends_on: ["implement"]}
]}
end
end
# Define step functions
step_functions = %{
"design" => &MyApp.Tasks.design_auth/1,
"implement" => &MyApp.Tasks.implement_auth/1,
"test" => &MyApp.Tasks.test_auth/1
}
# Execute goal-driven workflow
{:ok, result} = Singularity.Workflow.WorkflowComposer.compose_from_goal(
"Build authentication system",
&MyApp.GoalDecomposer.decompose/1,
step_functions,
MyApp.Repo,
optimize: true,
monitor: true
)- Automatic Decomposition: Convert goals to task graphs
- Optimization: Learn from execution patterns to optimize future workflows (toggle with
optimize: true) - Real-time Monitoring: Event-driven notifications during execution (toggle with
monitor: true) - Multi-Workflow Composition: Execute multiple goals in parallel
| Module | Purpose |
|---|---|
Singularity.Workflow.Orchestrator |
Goal decomposition engine |
Singularity.Workflow.WorkflowComposer |
High-level composition API |
Singularity.Workflow.OrchestratorOptimizer |
Optimization engine with learning |
Singularity.Workflow.OrchestratorNotifications |
Real-time event broadcasting |
For detailed guide, see HTDAG_ORCHESTRATOR_GUIDE.md.
Singularity.Workflow integrates directly with Phoenix LiveView and Channels - no Phoenix.PubSub needed.
defmodule MyAppWeb.WorkflowLive do
use MyAppWeb, :live_view
def mount(_params, _session, socket) do
# Listen to workflow events
{:ok, listener_pid} = Singularity.Workflow.listen("workflow_events", MyApp.Repo)
{:ok, assign(socket, :listener_pid, listener_pid)}
end
def handle_info({:notification, _pid, "workflow_events", message_id}, socket) do
# Update UI in real-time
{:noreply, update_workflow_list(socket, message_id)}
end
def terminate(_reason, socket) do
Singularity.Workflow.unlisten(socket.assigns.listener_pid, MyApp.Repo)
:ok
end
end| Feature | Singularity.Workflow | Phoenix.PubSub |
|---|---|---|
| Persistence | PostgreSQL (survives restarts) | Memory only (ephemeral) |
| Multi-node | PostgreSQL coordination | Requires node clustering |
| Message History | Queryable via pgmq | Not available |
| Reliability | ACID guarantees | Best-effort delivery |
For comprehensive Phoenix integration guide, see API_REFERENCE.md.
# Execute static workflow
Singularity.Workflow.Executor.execute(workflow_module, input, repo, opts \\ [])
# Execute dynamic workflow
Singularity.Workflow.Executor.execute_dynamic(workflow_slug, input, step_functions, repo, opts \\ [])
# Options
opts = [
timeout: 600_000, # Execution timeout in milliseconds (default: 5 minutes)
poll_interval: 100, # Task polling interval in milliseconds (default: 100ms)
worker_id: "worker_1" # Optional worker identifier for distributed execution
]
# Note: Parallel execution is automatic based on workflow dependencies
# Independent branches run concurrently without additional configuration# Get workflow status
{:ok, status, metadata} = Singularity.Workflow.get_run_status(run_id, repo)
# Returns: {:ok, :in_progress, %{total_steps: 5, completed_steps: 2}}
# List all workflows
{:ok, runs} = Singularity.Workflow.list_workflow_runs(repo, status: "started")
# Pause running workflow
:ok = Singularity.Workflow.pause_workflow_run(run_id, repo)
# Resume paused workflow
:ok = Singularity.Workflow.resume_workflow_run(run_id, repo)
# Cancel workflow
:ok = Singularity.Workflow.cancel_workflow_run(run_id, repo, reason: "User cancelled")
# Retry failed workflow
{:ok, new_run_id} = Singularity.Workflow.retry_failed_workflow(failed_run_id, repo)# Create workflow
Singularity.Workflow.FlowBuilder.create_flow(workflow_slug, repo, opts)
# Add single step
Singularity.Workflow.FlowBuilder.add_step(workflow_slug, step_slug, depends_on, repo)
# Add map step (parallel processing)
Singularity.Workflow.FlowBuilder.add_step(
workflow_slug,
step_slug,
depends_on,
repo,
step_type: "map",
initial_tasks: 50
)
# Get workflow
Singularity.Workflow.FlowBuilder.get_flow(workflow_slug, repo)
# List workflows
Singularity.Workflow.FlowBuilder.list_flows(repo)
# Delete workflow
Singularity.Workflow.FlowBuilder.delete_flow(workflow_slug, repo)# Send with NOTIFY
Singularity.Workflow.Notifications.send_with_notify(queue_name, message, repo)
# Listen for events
Singularity.Workflow.Notifications.listen(queue_name, repo)
# Stop listening
Singularity.Workflow.Notifications.unlisten(pid, repo)
# Send NOTIFY only (no persistence)
Singularity.Workflow.Notifications.notify_only(channel, payload, repo)# Run all tests
mix test
# Run with coverage
mix test --cover
# Run specific test file
mix test test/singularity_workflow/executor_test.exstest/
βββ singularity_workflow/
β βββ executor_test.exs # Workflow execution tests
β βββ flow_builder_test.exs # Dynamic workflow tests
β βββ notifications_test.exs # NOTIFY functionality tests
β βββ integration_test.exs # End-to-end tests
βββ support/
β βββ test_repo.ex # Test database setup
β βββ workflow_helpers.ex # Test utilities
βββ test_helper.exs # Test configuration
defmodule Singularity.Workflow.ExecutorTest do
use ExUnit.Case, async: true
alias Singularity.Workflow.Executor
test "executes simple workflow" do
defmodule TestWorkflow do
def __workflow_steps__ do
[{:step1, &__MODULE__.step1/1, depends_on: []}]
end
def step1(input), do: {:ok, %{result: input.value * 2}}
end
{:ok, result} = Executor.execute(TestWorkflow, %{value: 5}, TestRepo)
assert result.step1.result == 10
end
endSee the test suite (test/singularity_workflow/) for comprehensive examples:
- Static Workflows - Define workflows as Elixir modules
- Dynamic Workflows - Generate workflows at runtime
- Parallel Processing - Map steps for bulk operations
- Error Handling - Retry logic and failure recovery
- Real-time Notifications - PostgreSQL NOTIFY integration
- Phoenix Integration - LiveView and Channel integration
Note: These examples show how to deploy your application that uses the Singularity.Workflow library. This library itself doesn't need deployment - you add it as a dependency.
# config/prod.exs in YOUR application
config :my_app, MyApp.Repo,
url: System.get_env("DATABASE_URL"),
pool_size: String.to_integer(System.get_env("POOL_SIZE") || "10")
# Your application uses Singularity.Workflow as a library
# No special configuration needed - just use your repo# Dockerfile for YOUR application
FROM elixir:1.19-alpine
WORKDIR /app
COPY mix.exs mix.lock ./
COPY config config
COPY lib lib
COPY priv priv
# Singularity.Workflow will be fetched as a dependency
RUN mix deps.get && mix compile
CMD ["mix", "phx.server"]# k8s/deployment.yaml for YOUR application
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-app
spec:
replicas: 3
template:
spec:
containers:
- name: my-app
image: my-app:latest # Your app, which depends on singularity_workflow
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: my-app-secrets
key: database-urlWant to contribute to the Singularity.Workflow library? Here's how to set up the development environment:
# Clone the library repository
git clone https://github.com/Singularity-ng/singularity-workflows.git
cd singularity-workflows
# Install dependencies
mix deps.get
# Setup test database (for library development/testing)
mix ecto.setup
# Run library tests
mix test# Format code
mix format
# Lint code
mix credo
# Type check
mix dialyzer
# Security scan
mix sobelow- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open Pull Request
This project is licensed under the MIT License - see the LICENSE.md file for details.
- PostgreSQL - Robust database foundation
- pgmq - Message queuing extension
- Elixir Community - Amazing ecosystem
- Singularity-ng - Workflow orchestration initiative
- Documentation: https://hexdocs.pm/singularity_workflow
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Email: support@singularity.dev
Made with β€οΈ by the Singularity-ng Team