At CADS, we deal with the business case of receving REST API requests from multiple clients that insert and update data at near real time in the tables that they have created earlier. The modifications of the table are then reflected in the results of content recommendations for the client’s websites.
In our design, the architecture should employ a service dedicated to handling client requests and sending them to a Kafka cluster, from which data are pulled for real time upsert in another service. Kafka serves to decouple these two processing components, allowing them to scale independently.
The design is figured out with the following requirements in mind:
- Guaranteeing the order of messages is the same as when they were sent by the client.
- Ensuring no message loss.
- Scaling dynamically in response to the growth of the client number and incoming data.

To clarify the context more, we note that a client may need to update multiple tables. One approach is to create each Kafka topic for each table, and the API request may accordingly have to include the message and Kafka topic in the request body. In addition, the requests are moderated by our client-facing component.
The first service mentioned above, one that receives requests from this component, runs on a Netty server. The reason we choose the Netty server-client framework is twofold: (i) the service needs to run together with the others to save on resources; (ii) the framework allows for more throughput, offers low latency and is resource-efficient for the internal gRPC communcation between the client-facing component and the first service.
The server is running on a strong server, but in the future we may need to have it work on a Kubernetes cluster for horizontal autoscaling.
Now comes another important of the design — making the Kafka producer.
def buildBatchProducer(): KafkaProducer[String, String] =
val props: Properties = new Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOSTRAP_SERVERS)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // guarantee transactionality
props.setProperty(ProducerConfig.ACKS_CONFIG, "all") // wait for messages to be persisted in all brokers
props.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) // retry
props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "60000") // retry timeout
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") // significant when sending multiple records (in a for loop) \
// using the same KafkaProducer instance. See https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
// props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, topic) // found in the {client_id; ip; port} name of server-side logs
props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd")
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000")
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500")
// TODO: buffer memory for batching
props.setProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "90000000")
props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "86400000") // topic metadata cache expires after every 24 hours
new KafkaProducer[String, String](props)
}
It involves various configurations but the crucial ones are ACKS_CONFIG set to “all” to make sure the messages are saved in all the Kafka brokers; and MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1, which works in line with the previous setting to ensure message ordering. BATCH_SIZE_CONFIG matters as the producer will then accumulate enough messages before send them to Kafka.
But each request from the client may normally require each instance of the Kafka producer, which is highly inefficient. To tackle this, we instantiate the Kafka producer only once, at the beginning of the server startup and it will last throughout the server lifecycle. The singleton instance is then passed into the message-sending method:
def produceMessageBatch(producer: KafkaProducer[String, String], topic: String, jsons: Seq[String]): String =
val status: ArrayBuffer[String] = ArrayBuffer(topic)
for (json <- jsons) {
try {
producer.send(new ProducerRecord[String, String](topic, json))
status += s"OK:${json}."
} catch {
case e: Exception
=> e.printStackTrace()
status += s"Error:${json}:${e.getMessage}."
}
}
status.mkString("")
}
The Kafka documentation notes that “the producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.” So we can specify a thread pool size that uses the same produce instance. This is expected to be a huge boost in addressing the request load. (We also experimented with the Alpakka Producer but it didn’t offer a benefit as the use cases don’t match.)
How is the message order maintained when it reaches Kafka? We have two partitions for each topic, and Kafka guarantees message ordering within the same partition, not across all the partitions. The question actually should be “how does the consumer get the messages and upsert the table properly?”
Fortunately, we only need to have the update latency of about several minutes, hence using the microbatch processing of Spark structured streaming allows the selection of the latest record for the upsert for each record ID. The Spark streaming is the second service in the architecture and it runs on a Kubernetes cluster with dynamic resource allocation, consuming records from the Kafka topics so as to make changes to the tables. The tables are created with the Delta format.
On looking back, it seems that we may relax the constraint of message ordering on the producer side since it’s taken care of by the streaming service. But keeping the configurations helps maintainance in case the streaming logic is modified.
