Using Apache Pulsar With Kotlin

Gilles Barbier
4 min readDec 16, 2020

--

Often described as a next-generation Kafka, Apache Pulsar is a rising star in the toolset of developers. Pulsar is a multi-tenant, high-performance solution for server-to-server messaging and is often used as as the backbone for scalable applications.

Pulsar can be used with Kotlin since it is written in Java. However, its API does not consider the great features that Kotlin brings, such as data classes, coroutines, or reflectionless serialization.

In this article, I’ll discuss how to use Pulsar with idiomatic Kotlin. This article is based on the work done by building Infinitic — the easiest way for the technical teams to orchestrate tasks running on distributed servers.

Using Native Serialization For Messages

A natural way to define messages in Kotlin is to use data classes, which are classes whose primary purpose is to hold data. For such classes, Kotlin automatically provides methods equals(), toString(), copy()… which shortens the length of the code and reduces the risks of bugs.

When creating a Pulsar producer in Java:

Producer<MyAvro> avroProducer = client
.newProducer(Schema.AVRO(MyAvro.class))
.topic(“some-avro-topic”)
.create();

the Schema.AVRO(MyAvro.class) instruction will introspect MyAvro java class and deduce a schema from it. This is needed to check that this new producer will produce messages that are actually compatible with existing consumers. Unfortunately, the Java implementations of Kotlin data classes do not work well with the default serializer used by Pulsar. Fortunately, Pulsar lets you use a custom serializer with producers and consumers, since the 2.7.0 version.

First, you need the official Kotlin serialization plugin installed. With it, a typical message class is described like this:

@Serializable
data class RunTask(
val taskName: TaskName,
val taskId: TaskId,
val taskInput: TaskInput,
val taskOptions: TaskOptions,
val taskMeta: TaskMeta
)

Note the @Serializable annotation. With it, you can use RunTask.serialiser() to get a serialiser working without introspection, which is very performant!

Currently, the serialization plugin supports only JSON (and some other formats — in beta — such as protobuf). So we also need the great avro4k library to extend it and support Avro format.

Using these tools, we can create a working Producer<RunTask> like so:

BUT, there is still a bug in the 2.7.0 version when consuming messages. This should work, but does not:

val message = runTaskConsumer.receive()
val runTask = message.value()

Instead, we need to do:

val message = runTaskConsumer.receive()
val runTask = RunTaskSchemaReader().read(message.data.inputStream())

The fix should be deployed in the next 2.7.1 version.

Sealed class For Messages and One Envelope Per Topic

Pulsar allows only one type of message per topic. For some situations, it’s not enough. A workaround to this limitation is to use an envelope design pattern.

First, use a sealed class to create all messages from a topic:

Then create an envelope for them:

Note how elegantly Kotlin checks data consistency within init! An envelope can easily be created by TaskEngineEnvelope.from(msg). And envelope.message() returns the original message.

You may wonder why I’ve added an explicit taskId value, or why I do not use only one field message:TaskEngineMessage instead of a field for each message type. It’s because this way, I am able to request the topic using PulsarSQL by taskId and/or by type.

Building Workers With Coroutines

Using Thread in plain java has always been complex and error prone. Fortunately Koltin provides coroutines — an easier abstraction for asynchronous processing — and channels — a convenient way to transfer data between coroutines.

I can build a worker by having :

  • A coroutine ("task-engine-message-puller") dedicated to pull messages from Pulsar
  • N coroutines ("task-engine-$i") processing messages in parallel
  • A coroutine ("task-engine-message-acknoldeger") acknowledging Pulsa rmessages after processing

I’ve added a logChannel to centralize logs if I have multiple processes like this one. Note that to be able to acknowledge a pulsar message in a different coroutine than those that received it, I need to encapsulate the TaskEngineMessage into a MessageToProcess<TaskEngineMessage> containing a Pulsar messageId:

Conclusions

I've shown quickly above in Kotlin how to:

  • code messages (including envelopes for Pulsar topics receiving message of multiple types),
  • create Pulsar producers/consumers
  • and build a simple worker able to process many messages in parallel.

The code shown here is a simplified version of the code written for Infinitic, a general-purpose framework to orchestrate tasks running on distributed servers. Infinitic can be used to reliably orchestrate microservices, to manage distributed transactions, to operate data pipelines, to build user-facing automation, etc.

Please subscribe here if interested.

--

--