Orchestrating Workflows

Workflow Orchestration

Viewing typescript

switch to python

Moose workflows enable developers to automate sequences of tasks in a maintainable, reliable way. A workflow is a series of tasks that execute in order. Each workflow follows these conventions:

  • Workflows live in folders inside the /app/scripts subdirectory, with the folder name being the workflow name.
  • Tasks are scripts with numerical prefixes (e.g., 1.firstTask.ts).
  • Configuration is managed through config.toml files.
Powered by Temporal

This workflow abstraction is powered by Temporal under the hood. You can use the Temporal GUI to monitor your workflow runs as they execute, providing extra debugging capabilities.

Quickstart

To create a new workflow, run the following command:

Terminal
npx moose-cli workflow init ExampleWorkflow --tasks firstTask,secondTask

Workflow Structure

A typical workflow looks like this:

        • 1.firstTask.ts
        • 2.secondTask.ts
        • config.toml
  • Writing Workflow Tasks

    Tasks are defined as asynchronous functions (of type TaskFunction) that perform some operations and return an object with two key properties: task and data.

    app/scripts/example_workflow/1.firstTask.ts
    import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
     
    interface FirstTaskInput {
        name: string;
    }
     
    const firstTask: TaskFunction = async (input: FirstTaskInput) => {
        const name = input.name ?? "world";
        const greeting = `hello, ${name}!`;
        
        return {
            task: "firstTask",
            data: {
                name: name,
                greeting: greeting,
                counter: 1
            }
        };
    };
     
    export default function createTask() {
        return {
            task: firstTask,
        } as TaskDefinition;
    };

    The file must export a function that returns a TaskDefinition object. This object is used to register the task with the workflow. Inside this TaskDefinition object, you must specify the task, which is the function you defined above.

    Data Flow Between Tasks

    Tasks communicate through their return values. Each task can return a dictionary containing a data key. The contents of this data key are automatically passed as input parameters to the next task in the workflow.

    • Only values inside the data object are passed to the next task.
    • Supported data types inside data include basic types, containers, and JSON-serializable custom classes.
    app/scripts/ExampleWorkflow/1.firstTask.ts
    import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
     
    interface FirstTaskInput {
        name: string;
    }
     
    const firstTask: TaskFunction = async (input: FirstTaskInput) => {
        const name = input.name ?? "world";
        const greeting = `hello, ${name}!`;
        
        return {
            task: "firstTask",
            data: {
                name: name,
                greeting: greeting,
                counter: 1
            }
        };
    };
     
    export default function createTask() {
        return {
            task: firstTask,
        } as TaskDefinition;
    }; 
    app/scripts/example_workflow/2.secondTask.ts
    import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
     
    interface SecondTaskInput {
        name: string;
        greeting: string;
        counter: number;
    }
     
    const secondTask: TaskFunction = async (input: SecondTaskInput) => {
        const nameLength = input.name.length;
        const expandedGreeting = `${input.greeting} Your name is ${nameLength} characters long`;
     
        // Example operation: Double the counter value
        const counter = input.counter + 1;
        
        return {
            task: "secondTask",
            data: {
                name: input.name,
                greeting: expandedGreeting,
                counter: counter,
                nameLength: nameLength
            }
        };
    };
     
    export default function createTask() {
        return {
            task: secondTask,
        } as TaskDefinition;
    };

    Running Workflows

    As you develop workflows, you can run them directly using the Moose CLI:

    Terminal
    npx moose-cli workflow run ExampleWorkflow

    The terminal will print the following:

    Terminal
          Workflow 'ExampleWorkflow' started successfully.
    View it in the Temporal dashboard: http://localhost:8080/namespaces/default/workflows/ExampleWorkflow/3a1cc066-33bf-49ce-8671-63ecdcb72f2a/history

    Notice that you are given a URL to view the workflow in the Temporal dashboard. This is a helpful way to monitor the workflow execution in real time and debug any issues through a GUI.

    Passing Input to Workflows

    When you run a workflow, you can pass input to the workflow by using the --input flag.

    Terminal
    npx moose-cli workflow run ExampleWorkflow --input '{"name": "John"}'

    The input is passed to the workflow as a JSON string.

    Debugging Workflows

    While the Temporal dashboard is a helpful tool for debugging, you can also leverage the Moose CLI to monitor and debug workflows. This is useful if you want to monitor a workflow without having to leave your terminal.

    Use the moose-cli workflow status command to monitor a workflow:

    Terminal
    npx moose-cli workflow status ExampleWorkflow

    This will print high level information about the workflow run:

    Terminal
          Workflow Workflow Status: ExampleWorkflow
    Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
    Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
    Execution Time: 66s

    If you want more detailed information about the workflow's status, including task level logs and inputs/outputs, you can use the --verbose flag:

    Terminal
    npx moose-cli workflow status ExampleWorkflow --verbose
    Terminal
          Workflow Workflow Status: ExampleWorkflow
    Run ID: 446eab6e-663d-4913-93fe-f79d6109391f
    Status: WORKFLOW_EXECUTION_STATUS_COMPLETED ✅
    Execution Time: 66s
    Request: GetWorkflowExecutionHistoryRequest { namespace: "default", execution: Some(WorkflowExecution { workflow_id: "ExampleWorkflow", run_id: "446eab6e-663d-4913-93fe-f79d6109391f" }), maximum_page_size: 0, next_page_token: [], wait_new_event: false, history_event_filter_type: Unspecified, skip_archival: false }
    
    Found 17 events
    Event History:
      • [2025-02-21T14:16:56.234808764+00:00] EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
      • [2025-02-21T14:16:56.235132389+00:00] EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
      • [2025-02-21T14:16:56.259341847+00:00] EVENT_TYPE_WORKFLOW_TASK_STARTED
      • [2025-02-21T14:16:56.329856180+00:00] EVENT_TYPE_WORKFLOW_TASK_COMPLETED
      • [2025-02-21T14:16:56.329951889+00:00] EVENT_TYPE_ACTIVITY_TASK_SCHEDULED
        Activity: ExampleWorkflow/1.firstTask
      • [2025-02-21T14:16:56.333761680+00:00] EVENT_TYPE_ACTIVITY_TASK_STARTED
      • [2025-02-21T14:16:56.497156055+00:00] EVENT_TYPE_ACTIVITY_TASK_COMPLETED
        Result: 
          {
            "data": {
              "counter": 1,
              "greeting": "hello, no name!",
              "name": "no name"
            },
            "task": "firstTask"
          }

    With this more detailed output, you can see the exact sequence of events and the inputs and outputs of each task. This is useful for debugging and understanding the workflow's behavior. The result of each task is included in the output, allowing you to inspect the data that was passed between task for debugging purposes.

    If your workflow fails due to some runtime error, you can use the event history timeline to identify the task that failed.

    Scheduling Workflows

    Workflows can be configured to run on a schedule using cron expressions. The schedule field in config.toml is used to specify the schedule. This field is optional, and blank by default:

    app/scripts/example_workflow/config.toml
    name = "example_workflow"
    timeout = "1h"
    retries = 3
    schedule = "" ## DEFAULT: Blank
    tasks = [firstTask, secondTask]

    By setting a cron expression in this field, the workflow will be scheduled to run at the specified interval. The schedule field uses standard cron expression syntax:

    app/scripts/ExampleWorkflow/config.toml
    schedule = "0 12 * * *" # Runs at 12:00 PM every day
    |------------------------------- Minute (0-59)
    |     |------------------------- Hour (0-23)
    |     |     |------------------- Day of the month (1-31)
    |     |     |     |------------- Month (1-12; or JAN to DEC)
    |     |     |     |     |------- Day of the week (0-6; or SUN to SAT; or 7 for Sunday)
    |     |     |     |     |
    |     |     |     |     |
    *     *     *     *     *

    Below are some example cron expressions along with their scheduling details:

    Cron ExpressionDescription
    0 12 * * *Runs at 12:00 PM every day
    0 0 * * 0Runs at 12:00 AM every Sunday
    0 8 * * 1-5Runs at 8:00 AM on weekdays (Monday to Friday)
    * * * * *Runs every minute
    Cron Expression Visualizer

    Use an online cron expression visualizer like crontab.guru (opens in a new tab) to help you understand how the cron expression will schedule your workflow.

    If your dev server is running, you should see logs in the terminal when your scheduled workflow is executed to make sure your schedule is working as expected.

    Advanced: Triggering Workflows Programmatically

    Python Only

    This feature is not yet available for TypeScript.

    This feature is coming soon.

    Error Detection and Handling

    Moose provides multiple layers of error protection, both at the workflow and task level:

    Workflow-Level Retries and Timeouts

    Moose automatically catches any runtime errors during workflow execution. Errors are logged for debugging, and the orchestrator will retry failed tasks according to the retry count in config.toml.

    In your workflow's config.toml file, you can configure the following options to control workflow behavior, including timeouts and retries:

    app/scripts/example_workflow/config.toml
    name = "example_workflow" # Required. The name of the workflow. Matches the workflow folder name.
    timeout = "1h" # Required. Default: 1 hour
    retries = 3 # Required. Default: 3 retries
    schedule = "0 12 * * *" # Required. Default: Blank
    tasks = [firstTask, secondTask] # Required. The list of tasks to execute. Matches the task file names.
    Config.toml

    When you initialize a new workflow, the config.toml file is automatically created with the default values.

    • workflow name: Matches the workflow name you specified when you initialized the workflow.
    • timeout: 1 hour
    • retries: 3
    • schedule: Blank
    • tasks: Matches the task names you specified in the --tasks flag when you initialized the workflow.

    Task-Level Errors and Retries

    For more granular control over task-level errors and retries, you can configure your individual tasks to have their own retry behavior:

    To configure a task to have its own retry behavior, you can add a config object to the task definition, and specify a number of retries within that object:

    app/scripts/ExampleWorkflow/1.firstTask.ts
    import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
     
    const firstTask: TaskFunction = async () => {
        throw new Error("This is a test error");
        return {
            task: "firstTask",
            data: {}
        };
    };
     
    export default function createTask() {
        return {
            task: firstTask,
            config: {
                retries: 3 // This is optional. If you don't explicitly set retries, it will default to 3. 
            }
        } as TaskDefinition;
    }

    Example: Workflow and Task Retry Interplay

    When configuring retries, it's important to understand how workflow-level and task-level retries interact. Consider the following scenario:

    • Workflow Retry Policy: 2 attempts
    app/scripts/ExampleWorkflow/config.toml
    retries = 2
    • Task Retry Policy: 3 attempts
    app/scripts/ExampleWorkflow/1.firstTask.ts
    import { TaskFunction, TaskDefinition } from "@514labs/moose-lib";
     
    const firstTask: TaskFunction = async () => {
        throw new Error("This is a test error");
        return {
            task: "firstTask",
            data: {}
        };
    };
     
    export default function createTask() {
        return {
            task: firstTask,
            config: {
                retries: 3 // This is optional. If you don't explicitly set retries, it will default to 3. 
            }
        } as TaskDefinition;
    }

    If the execution of the workflow encounters an error, the retry sequence would proceed as follows:

    1. Workflow Attempt 1

      • Task Attempt 1: Task fails
      • Task Attempt 2: Task fails
      • Task Attempt 3: Task fails
      • Workflow attempt fails after exhausting task retries
    2. Workflow Attempt 2

      • Task Attempt 1: Task fails
      • Task Attempt 2: Task fails
      • Task Attempt 3: Task fails
      • Workflow attempt fails after exhausting task retries

    In this example, the workflow will make a total of 2 attempts, and each task within those attempts will retry up to 3 times before the workflow itself retries.

    Terminating Workflows

    To terminate a workflow before it has finished running, use the workflow terminate command.

    npx moose-cli workflow terminate <workflow_name>
    Terminating Workflows

    You cannot run the same workflow concurrently. Use the terminate command to stop the workflow before triggering it again.