Under the Hood of an Event-Driven “Workflow As Code” Engine

Gilles Barbier
6 min readNov 16, 2020

--

In my previous article, I’ve described how code can be used to semantically describe workflows in a distributed environment. But I did not explain how it can be used to actually pilot those workflows. How a class — apparently written to run on a single thread — can orchestrate long-running workflows in a distributed environment where services fail from time to time?

Example of a simple workflow that describes and orchestrates 3 tasks sequentially, each of which can be processed in distributed servers

As there is no magic in computer science, a “workflow as code” engine (such as Infinitic) will need to be able to store the state of a workflow in a persistent storage and restart a workflow from where it failed/stopped.

Even if an actual implementation is quite complex, the theory behind how to do this is actually simple. Let’s go through it, with the simple example of a workflow — described in the code above — containing 3 consecutive tasks:

  • download an image from a url
  • resize this image
  • upload the resized image to a server

Sequential Workflow Example

This workflow can be processed using an event-driven architecture as follows:

  • A client dispatches aRunWorkflowevent (with parameters describing which workflow and which input parameters)
  • A WorkflowEngine service catches this event and runs the workflow code CropInamge.handle(imageUrl, size). During this execution, the ImageUtil proxy stumbles upon a call of its method download(imageUrl). As this task is not known yet, the processing of the workflow code is stopped here and a RunTask event (describing this task) is dispatched by the workflow engine.
  • An ImageUtil service catches this event, runs the task code ImageUtil.download(imageUrl), and returns the serialized result within aTaskCompleted event
  • A WorkflowEngine service catches this event and runs the the workflow codeCropImage.handle(imageUrl, size) again. During this execution, the ImageUtil proxy stumbles upon a call of its method download(imageUrl). This time, this task is known from the workflow history, so the proxy can return its output (after deserialization) and the processing of the workflow code can continue. Then the ImageUtil proxy stumbles upon a call of its method resize(image, size). As this task is not known yet, the processing of the workflow code is stopped here and a RunTask event (describing this new task) is dispatched by the workflow engine.
  • And so on… up to the workflow completion.

This process is illustrated below:

For Infinitic, especially when used with Pulsar, the workflow engine and services will be consumers of Pulsar topics.

Let’s derive a few conclusions from this architecture:

  • The workflow process is not owned by a long-running thread running somewhere. It’s just composed of the history of exchanged messages;
  • To be consistent, a workflow implementation needs to produce the same result when executed multiple times, so it excludes the use of any non-deterministic functions (such as random()or now()) or any multithreading that would potentially modify command orders;
  • The workers processing tasks are stateless, only the engine needs to store data related to the workflow’s history;
  • Exchanged data are serialized/deserialized (using Avro in Infinitic case);
  • Tasks can run in a different programming language than the one used for workflows.

WorkflowTasks

The engine above has 2 different roles:

  1. Receiving completion events from tasks processors, updating the workflow’s history accordingly and triggering tasks
  2. Running the workflow code to decide what to do next, based on current workflow history

If we want to be able to write workflows in another programming language, only (2) has to re-written, so it appears a good idea to clearly separate those roles. It can be done by delegating (2) to some special tasks named “WorkflowTasks”:

Now roles are clearer: we have on one side the workflow engine (maintaining the workflow history and triggering tasks / workflowTasks accordingly) — and on the other side, the stateless workers (running tasks / workflowTasks and sending back the output).

To better grasp the temporal aspect of the workflow processing, we can use the representation below:

Parallel Processing

The sequential example is actually a bit too easy. Things are becoming significantly more complex when a workflow contains parallel processings, which occurs — for example — with asynchronous tasks or asynchronous branches. In such a situation, a TaskCompleted event could trigger a workflowTask while another one is running. This case is illustrated below:

Incorrect implementation of a workflow engine triggering parallel workflowTasks

This is problematic as it makes the state of the workflow not-well defined: the first WorkflowTask would decide to trigger TaskC, while the second one would decide to trigger TaskD.

To be correct, all workflowTasks must be processed sequentially, and our workflow engine must buffer any event received while a workflowTask is ongoing. Once the workflowTask is completed, a new one is triggered, taking into account the previous ones and the buffered events.

Correct behavior of a workflow engine making sure than workflowTasks are processed sequentially

As you can see in this example, the processing 2 and 3 of the workflow engine must be aware of each other to handle the situation correctly. It can be difficult in a distributed environment if you run multiple instances of workflow engine to avoid a single point of failure. There are typically 2 ways to respect this constraint:

Error Management

What happens when a task (or a workflowTask) fails? To make sure that each task is actually processed and managed properly, Infinitic adds an additional layer in charge of task management: instead of sending a task directly to the workers, the Workflow Engine will send it to a Task Engine in charge of guaranteeing that each task is managed up to its completion or its cancellation.

The Task Engine:

  • retry failed task based on a pre-defined retry strategy
  • handles manual retry request
  • manages timeouts
Each Task / WorkflowTask is handled by a TaskEngine up to its completion or its cancellation

Conclusions

This article describes the general concepts and constraints behind a “workflow as code” event-driven workflow engine such as Infinitic, which is new pattern to orchestrate distributed tasks at scale. A lot of details are still to be described, like how to manage workflow properties. If you are interested to follow this development, please subscribe here, and do not hesitate to comment below :)

Ps: I'm not a Kotlin expert, but it seems to me there are a lot of similarities between the algorithm presented in this article and the low-level implementation of coroutines. Maybe some day, the teams at JetBrains will look at this.

--

--