Workflow Orchestration
Viewing typescript
switch to python
Moose workflows enable developers to automate sequences of tasks in a maintainable, reliable way. A workflow is a series of tasks that execute in order. Each workflow follows these conventions:
- Workflows live in folders inside the
/app/scripts
subdirectory, with the folder name being the workflow name. - Tasks are scripts with numerical prefixes (e.g.,
1.first_task.py
1.firstTask.ts
). - Configuration is managed through
config.toml
files.
This workflow abstraction is powered by Temporal under the hood. You can use the Temporal GUI to monitor your workflow runs as they execute, providing extra debugging capabilities.
Quickstart
To create a new workflow, run the following command:
moose-cli workflow init example_workflow --tasks first_task,second_task
npx moose-cli workflow init ExampleWorkflow --tasks firstTask,secondTask
Workflow Structure
A typical workflow looks like this:
- 1.first_task.py
- 2.second_task.py
- config.toml
- 1.firstTask.ts
- 2.secondTask.ts
- config.toml
Writing Workflow Tasks
Tasks are Python functions decorated with @task
. Each task can receive inputs and return outputs:
from moose_lib import task, Logger
@task
def first_task(input: dict):
logger = Logger('first_task')
logger.info(f'Received data: {input}')
# Example operation: Increment a counter
result = input.get("counter", 0) + 1
return {
"task": "first_task",
"data": {
"counter": result,
"timestamp": datetime.now().isoformat()
}
}
Tasks are defined as asynchronous functions (of type TaskFunction
) that perform some operations and return an object with two key properties: task
and data
.
import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
interface FirstTaskInput {
name: string;
}
const firstTask: TaskFunction = async (input: FirstTaskInput) => {
const name = input.name ?? "world";
const greeting = `hello, ${name}!`;
return {
task: "firstTask",
data: {
name: name,
greeting: greeting,
counter: 1
}
};
};
export default function createTask() {
return {
task: firstTask,
} as TaskDefinition;
};
The file must export a function that returns a TaskDefinition
object. This object is used to register the task with the workflow. Inside this TaskDefinition
object, you must specify the task
, which is the function you defined above.
Data Flow Between Tasks
Tasks communicate through their return values. Each task can return a dictionary containing a data
key. The contents of this data
key are automatically passed as input parameters to the next task in the workflow.
- Only values inside the
data
object are passed to the next task. - Supported data types inside
data
include basic types, containers, and JSON-serializable custom classes.
from moose_lib import task, Logger
@task
def first_task(input: dict):
logger = Logger('first_task')
logger.info(f'Received input: {input}')
name = input.get("name", "no name")
greeting = f"hello, {name}!"
counter = 1
return {
"task": "first_task",
"data": {
"name": name,
"greeting": greeting,
"counter": counter
}
}
from moose_lib import task, Logger
@task
def second_task(input: dict):
logger = Logger('second_task')
logger.info(f'Received input: {input}')
name = input.get("name", "no name")
greeting = input.get("greeting", "no greeting")
counter = input.get("counter", 0)
name_length = len(name)
expanded_greeting = f"{greeting} Your name is {name_length} characters long"
return {
"task": "second_task",
"data": {
"name": name,
"greeting": expanded_greeting,
"counter": counter,
"name_length": name_length
}
}
import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
interface FirstTaskInput {
name: string;
}
const firstTask: TaskFunction = async (input: FirstTaskInput) => {
const name = input.name ?? "world";
const greeting = `hello, ${name}!`;
return {
task: "firstTask",
data: {
name: name,
greeting: greeting,
counter: 1
}
};
};
export default function createTask() {
return {
task: firstTask,
} as TaskDefinition;
};
import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
interface SecondTaskInput {
name: string;
greeting: string;
counter: number;
}
const secondTask: TaskFunction = async (input: SecondTaskInput) => {
const nameLength = input.name.length;
const expandedGreeting = `${input.greeting} Your name is ${nameLength} characters long`;
// Example operation: Double the counter value
const counter = input.counter + 1;
return {
task: "secondTask",
data: {
name: input.name,
greeting: expandedGreeting,
counter: counter,
nameLength: nameLength
}
};
};
export default function createTask() {
return {
task: secondTask,
} as TaskDefinition;
};
Running Workflows
As you develop workflows, you can run them directly using the Moose CLI:
moose-cli workflow run example_workflow
The terminal will print the following:
Workflow 'example_workflow' started successfully.
View it in the Temporal dashboard: http://localhost:8080/namespaces/default/workflows/example_workflow/3a1cc066-33bf-49ce-8671-63ecdcb72f2a/history
npx moose-cli workflow run ExampleWorkflow
The terminal will print the following:
Workflow 'ExampleWorkflow' started successfully.
View it in the Temporal dashboard: http://localhost:8080/namespaces/default/workflows/ExampleWorkflow/3a1cc066-33bf-49ce-8671-63ecdcb72f2a/history
Notice that you are given a URL to view the workflow in the Temporal dashboard. This is a helpful way to monitor the workflow execution in real time and debug any issues through a GUI.
Passing Input to Workflows
When you run a workflow, you can pass input to the workflow by using the --input
flag.
moose-cli workflow run example_workflow --input '{"name": "John"}'
npx moose-cli workflow run ExampleWorkflow --input '{"name": "John"}'
The input is passed to the workflow as a JSON string.
Debugging Workflows
While the Temporal dashboard is a helpful tool for debugging, you can also leverage the Moose CLI to monitor and debug workflows. This is useful if you want to monitor a workflow without having to leave your terminal.
Use the moose-cli workflow status
command to monitor a workflow:
moose-cli workflow status example_workflow
This will print high level information about the workflow run:
Workflow Workflow Status: example_workflow
Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
Execution Time: 66s
npx moose-cli workflow status ExampleWorkflow
This will print high level information about the workflow run:
Workflow Workflow Status: ExampleWorkflow
Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
Execution Time: 66s
If you want more detailed information about the workflow's status, including task level logs and inputs/outputs, you can use the --verbose
flag:
moose-cli workflow status example_workflow --verbose
Workflow Workflow Status: example_workflow
Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
Execution Time: 66s
Request: GetWorkflowExecutionHistoryRequest { namespace: "default", execution: Some(WorkflowExecution { workflow_id: "example_workflow", run_id: "446eab6e-663d-4913-93fe-f79d6109391f" }), maximum_page_size: 0, next_page_token: [], wait_new_event: false, history_event_filter_type: Unspecified, skip_archival: false }
Found 17 events
Event History:
• [2025-02-21T14:16:56.234808764+00:00] EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
• [2025-02-21T14:16:56.235132389+00:00] EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
• [2025-02-21T14:16:56.259341847+00:00] EVENT_TYPE_WORKFLOW_TASK_STARTED
• [2025-02-21T14:16:56.329856180+00:00] EVENT_TYPE_WORKFLOW_TASK_COMPLETED
• [2025-02-21T14:16:56.329951889+00:00] EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
Activity: example_workflow/1.first_task
• [2025-02-21T14:16:56.333761680+00:00] EVENT_TYPE_ACTIVITY_TASK_STARTED
• [2025-02-21T14:16:56.497156055+00:00] EVENT_TYPE_ACTIVITY_TASK_COMPLETED
Result:
{
"data": {
"counter": 1,
"greeting": "hello, no name!",
"name": "no name"
},
"task": "first_task"
}
npx moose-cli workflow status ExampleWorkflow --verbose
Workflow Workflow Status: ExampleWorkflow
Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
Execution Time: 66s
Request: GetWorkflowExecutionHistoryRequest { namespace: "default", execution: Some(WorkflowExecution { workflow_id: "ExampleWorkflow", run_id: "446eab6e-663d-4913-93fe-f79d6109391f" }), maximum_page_size: 0, next_page_token: [], wait_new_event: false, history_event_filter_type: Unspecified, skip_archival: false }
Found 17 events
Event History:
• [2025-02-21T14:16:56.234808764+00:00] EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
• [2025-02-21T14:16:56.235132389+00:00] EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
• [2025-02-21T14:16:56.259341847+00:00] EVENT_TYPE_WORKFLOW_TASK_STARTED
• [2025-02-21T14:16:56.329856180+00:00] EVENT_TYPE_WORKFLOW_TASK_COMPLETED
• [2025-02-21T14:16:56.329951889+00:00] EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
Activity: ExampleWorkflow/1.firstTask
• [2025-02-21T14:16:56.333761680+00:00] EVENT_TYPE_ACTIVITY_TASK_STARTED
• [2025-02-21T14:16:56.497156055+00:00] EVENT_TYPE_ACTIVITY_TASK_COMPLETED
Result:
{
"data": {
"counter": 1,
"greeting": "hello, no name!",
"name": "no name"
},
"task": "firstTask"
}
With this more detailed output, you can see the exact sequence of events and the inputs and outputs of each task. This is useful for debugging and understanding the workflow's behavior. The result of each task is included in the output, allowing you to inspect the data that was passed between task for debugging purposes.
If your workflow fails due to some runtime error, you can use the event history timeline to identify the task that failed.
Scheduling Workflows
Workflows can be configured to run on a schedule using cron expressions. The schedule
field in config.toml
is used to specify the schedule. This field is optional, and blank by default:
name = "example_workflow"
timeout = "1h"
retries = 3
schedule = "" ## DEFAULT: Blank
tasks = [first_task, second_task]
name = "example_workflow"
timeout = "1h"
retries = 3
schedule = "" ## DEFAULT: Blank
tasks = [firstTask, secondTask]
By setting a cron expression in this field, the workflow will be scheduled to run at the specified interval. The schedule field uses standard cron expression syntax:
schedule = "0 12 * * *" # Runs at 12:00 PM every day
schedule = "0 12 * * *" # Runs at 12:00 PM every day
|------------------------------- Minute (0-59)
| |------------------------- Hour (0-23)
| | |------------------- Day of the month (1-31)
| | | |------------- Month (1-12; or JAN to DEC)
| | | | |------- Day of the week (0-6; or SUN to SAT; or 7 for Sunday)
| | | | |
| | | | |
* * * * *
Below are some example cron expressions along with their scheduling details:
Cron Expression | Description |
---|---|
0 12 * * * | Runs at 12:00 PM every day |
0 0 * * 0 | Runs at 12:00 AM every Sunday |
0 8 * * 1-5 | Runs at 8:00 AM on weekdays (Monday to Friday) |
* * * * * | Runs every minute |
Use an online cron expression visualizer like crontab.guru (opens in a new tab) to help you understand how the cron expression will schedule your workflow.
If your dev server is running, you should see logs in the terminal when your scheduled workflow is executed to make sure your schedule is working as expected.
Advanced: Triggering Workflows Programmatically
This feature is not yet available for TypeScript.
You can create an API to trigger workflows by using the MooseClient
that is automatically instantiated and passed to your API route handler:
from moose_lib import MooseClient
from pydantic import BaseModel, Field
class WorkflowParams(BaseModel):
input_value: str = Field(default="default")
def run(client: MooseClient, params: WorkflowParams) -> dict:
return client.workflows.execute(
workflow="example_workflow",
params={input_value: params.input_value}
)
For more information on how to set up a Moose API, see the Consumption API documentation.
This feature is coming soon.
Error Detection and Handling
Moose provides multiple layers of error protection, both at the workflow and task level:
Workflow-Level Retries and Timeouts
Moose automatically catches any runtime errors during workflow execution. Errors are logged for debugging, and the orchestrator will retry failed tasks according to the retry count in config.toml
.
In your workflow's config.toml
file, you can configure the following options to control workflow behavior, including timeouts and retries:
name = "example_workflow" # Required. The name of the workflow. Matches the workflow folder name.
timeout = "1h" # Required. Default: 1 hour
retries = 3 # Required. Default: 3 retries
schedule = "0 12 * * *" # Required. Default: Blank
tasks = [first_task, second_task] # Required. The list of tasks to execute. Matches the task file names.
name = "example_workflow" # Required. The name of the workflow. Matches the workflow folder name.
timeout = "1h" # Required. Default: 1 hour
retries = 3 # Required. Default: 3 retries
schedule = "0 12 * * *" # Required. Default: Blank
tasks = [firstTask, secondTask] # Required. The list of tasks to execute. Matches the task file names.
When you initialize a new workflow, the config.toml
file is automatically created with the default values.
- workflow name: Matches the workflow name you specified when you initialized the workflow.
- timeout: 1 hour
- retries: 3
- schedule: Blank
- tasks: Matches the task names you specified in the
--tasks
flag when you initialized the workflow.
Task-Level Errors and Retries
For more granular control over task-level errors and retries, you can configure your individual tasks to have their own retry behavior:
To configure a task to have its own retry behavior, you can use the retries
parameter in the @task
decorator:
from moose_lib import task
@task(retries=3)
def first_task(input: dict):
raise Exception("This is a test error")
return {
"task": "first_task",
"data": {}
}
To configure a task to have its own retry behavior, you can add a config
object to the task definition, and specify a number of retries
within that object:
import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
const firstTask: TaskFunction = async () => {
throw new Error("This is a test error");
return {
task: "firstTask",
data: {}
};
};
export default function createTask() {
return {
task: firstTask,
config: {
retries: 3 // This is optional. If you don't explicitly set retries, it will default to 3.
}
} as TaskDefinition;
}
Example: Workflow and Task Retry Interplay
When configuring retries, it's important to understand how workflow-level and task-level retries interact. Consider the following scenario:
- Workflow Retry Policy: 2 attempts
retries = 2
- Task Retry Policy: 3 attempts
@task(retries=3)
def first_task(input: dict):
raise Exception("This is a test error")
return {
"task": "first_task",
"data": {}
}
- Workflow Retry Policy: 2 attempts
retries = 2
- Task Retry Policy: 3 attempts
import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
const firstTask: TaskFunction = async () => {
throw new Error("This is a test error");
return {
task: "firstTask",
data: {}
};
};
export default function createTask() {
return {
task: firstTask,
config: {
retries: 3 // This is optional. If you don't explicitly set retries, it will default to 3.
}
} as TaskDefinition;
}
If the execution of the workflow encounters an error, the retry sequence would proceed as follows:
-
Workflow Attempt 1
- Task Attempt 1: Task fails
- Task Attempt 2: Task fails
- Task Attempt 3: Task fails
- Workflow attempt fails after exhausting task retries
-
Workflow Attempt 2
- Task Attempt 1: Task fails
- Task Attempt 2: Task fails
- Task Attempt 3: Task fails
- Workflow attempt fails after exhausting task retries
In this example, the workflow will make a total of 2 attempts, and each task within those attempts will retry up to 3 times before the workflow itself retries.
Terminating Workflows
To terminate a workflow before it has finished running, use the workflow terminate
command.
moose-cli workflow terminate <workflow_name>
npx moose-cli workflow terminate <workflow_name>
You cannot run the same workflow concurrently. Use the terminate
command to stop the workflow before triggering it again.