Under the Hood of an Event-Driven “Workflow As Code” Engine
In my previous article, I’ve described how code can be used to semantically describe workflows in a distributed environment. But I did not explain how it can be used to actually pilot those workflows. How a class — apparently written to run on a single thread — can orchestrate long-running workflows in a distributed environment where services fail from time to time?
As there is no magic in computer science, a “workflow as code” engine (such as Infinitic) will need to be able to store the state of a workflow in a persistent storage and restart a workflow from where it failed/stopped.
Even if an actual implementation is quite complex, the theory behind how to do this is actually simple. Let’s go through it, with the simple example of a workflow — described in the code above — containing 3 consecutive tasks:
- download an image from a url
- resize this image
- upload the resized image to a server
Sequential Workflow Example
This workflow can be processed using an event-driven architecture as follows:
- A client dispatches a
RunWorkflow
event (with parameters describing which workflow and which input parameters) - A WorkflowEngine service catches this event and runs the workflow code
CropInamge.handle(imageUrl, size)
. During this execution, theImageUtil
proxy stumbles upon a call of its methoddownload(imageUrl)
. As this task is not known yet, the processing of the workflow code is stopped here and aRunTask
event (describing this task) is dispatched by the workflow engine. - An ImageUtil service catches this event, runs the task code
ImageUtil.download(imageUrl)
, and returns the serialized result within aTaskCompleted
event - A WorkflowEngine service catches this event and runs the the workflow code
CropImage.handle(imageUrl, size)
again. During this execution, theImageUtil
proxy stumbles upon a call of its methoddownload(imageUrl)
. This time, this task is known from the workflow history, so the proxy can return its output (after deserialization) and the processing of the workflow code can continue. Then theImageUtil
proxy stumbles upon a call of its methodresize(image, size)
. As this task is not known yet, the processing of the workflow code is stopped here and aRunTask
event (describing this new task) is dispatched by the workflow engine. - And so on… up to the workflow completion.
This process is illustrated below:
For Infinitic, especially when used with Pulsar, the workflow engine and services will be consumers of Pulsar topics.
Let’s derive a few conclusions from this architecture:
- The workflow process is not owned by a long-running thread running somewhere. It’s just composed of the history of exchanged messages;
- To be consistent, a workflow implementation needs to produce the same result when executed multiple times, so it excludes the use of any non-deterministic functions (such as
random()
ornow()
) or any multithreading that would potentially modify command orders; - The workers processing tasks are stateless, only the engine needs to store data related to the workflow’s history;
- Exchanged data are serialized/deserialized (using Avro in Infinitic case);
- Tasks can run in a different programming language than the one used for workflows.
WorkflowTasks
The engine above has 2 different roles:
- Receiving completion events from tasks processors, updating the workflow’s history accordingly and triggering tasks
- Running the workflow code to decide what to do next, based on current workflow history
If we want to be able to write workflows in another programming language, only (2) has to re-written, so it appears a good idea to clearly separate those roles. It can be done by delegating (2) to some special tasks named “WorkflowTasks”:
Now roles are clearer: we have on one side the workflow engine (maintaining the workflow history and triggering tasks / workflowTasks accordingly) — and on the other side, the stateless workers (running tasks / workflowTasks and sending back the output).
To better grasp the temporal aspect of the workflow processing, we can use the representation below:
Parallel Processing
The sequential example is actually a bit too easy. Things are becoming significantly more complex when a workflow contains parallel processings, which occurs — for example — with asynchronous tasks or asynchronous branches. In such a situation, a TaskCompleted
event could trigger a workflowTask while another one is running. This case is illustrated below:
This is problematic as it makes the state of the workflow not-well defined: the first WorkflowTask would decide to trigger TaskC, while the second one would decide to trigger TaskD.
To be correct, all workflowTasks must be processed sequentially, and our workflow engine must buffer any event received while a workflowTask is ongoing. Once the workflowTask is completed, a new one is triggered, taking into account the previous ones and the buffered events.
As you can see in this example, the processing 2 and 3 of the workflow engine must be aware of each other to handle the situation correctly. It can be difficult in a distributed environment if you run multiple instances of workflow engine to avoid a single point of failure. There are typically 2 ways to respect this constraint:
- By making sure that your storage avoids the read-modify-update anti-pattern (see for example https://www.2ndquadrant.com/en/blog/postgresql-anti-patterns-read-modify-write-cycles/)
- By making sure that all events for a workflow instance always reach the same workflow engine instance (for example with a key-shared subscription — https://pulsar.apache.org/docs/en/concepts-messaging/#key_shared)
Error Management
What happens when a task (or a workflowTask) fails? To make sure that each task is actually processed and managed properly, Infinitic adds an additional layer in charge of task management: instead of sending a task directly to the workers, the Workflow Engine will send it to a Task Engine in charge of guaranteeing that each task is managed up to its completion or its cancellation.
The Task Engine:
- retry failed task based on a pre-defined retry strategy
- handles manual retry request
- manages timeouts
Conclusions
This article describes the general concepts and constraints behind a “workflow as code” event-driven workflow engine such as Infinitic, which is new pattern to orchestrate distributed tasks at scale. A lot of details are still to be described, like how to manage workflow properties. If you are interested to follow this development, please subscribe here, and do not hesitate to comment below :)
Ps: I'm not a Kotlin expert, but it seems to me there are a lot of similarities between the algorithm presented in this article and the low-level implementation of coroutines. Maybe some day, the teams at JetBrains will look at this.