• Login
  • Register
No Result
View All Result
CADS Blog
  • Engineering
  • Data Science
  • Product
  • Life At
  • Engineering
  • Data Science
  • Product
  • Life At
No Result
View All Result
CADS Blog
No Result
View All Result

Build Kafka Producer For Scalability

A general picture for future directions

Xuan Nguyen Thai by Xuan Nguyen Thai
04/02/2026
in Engineering
Reading Time: 4 mins read
A A
0

A mimosa flower seen along a road on the outskirts of Ho Chi Minh City, Vietnam.

0
SHARES
1
VIEWS
ShareShareEmail

At CADS, we deal with the business case of receving REST API requests from multiple clients that insert and update data at near real time in the tables that they have created earlier. The modifications of the table are then reflected in the results of content recommendations for the client’s websites.

In our design, the architecture should employ a service dedicated to handling client requests and sending them to a Kafka cluster, from which data are pulled for real time upsert in another service. Kafka serves to decouple these two processing components, allowing them to scale independently.

The design is figured out with the following requirements in mind:

  • Guaranteeing the order of messages is the same as when they were sent by the client.
  • Ensuring no message loss.
  • Scaling dynamically in response to the growth of the client number and incoming data.

To clarify the context more, we note that a client may need to update multiple tables. One approach is to create each Kafka topic for each table, and the API request may accordingly have to include the message and Kafka topic in the request body. In addition, the requests are moderated by our client-facing component.

The first service mentioned above, one that receives requests from this component, runs on a Netty server. The reason we choose the Netty server-client framework is twofold: (i) the service needs to run together with the others to save on resources; (ii) the framework allows for more throughput, offers low latency and is resource-efficient for the internal gRPC communcation between the client-facing component and the first service.

The server is running on a strong server, but in the future we may need to have it work on a Kubernetes cluster for horizontal autoscaling.

Now comes another important of the design — making the Kafka producer.

  def buildBatchProducer(): KafkaProducer[String, String] = 
    val props: Properties = new Properties()
    props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOSTRAP_SERVERS)
    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
    props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") // guarantee transactionality
    props.setProperty(ProducerConfig.ACKS_CONFIG, "all") // wait for messages to be persisted in all brokers
    props.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) // retry
    props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "60000") // retry timeout
    props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") // significant when sending multiple records (in a for loop) \
    // using the same KafkaProducer instance. See https://kafka.apache.org/32/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
//    props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, topic) // found in the {client_id; ip; port} name of server-side logs
    props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd")
    props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000")
    props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "500")
    // TODO: buffer memory for batching
    props.setProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "90000000")
    props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "86400000") // topic metadata cache expires after every 24 hours

    new KafkaProducer[String, String](props)
  }

It involves various configurations but the crucial ones are ACKS_CONFIG set to “all” to make sure the messages are saved in all the Kafka brokers; and MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to 1, which works in line with the previous setting to ensure message ordering. BATCH_SIZE_CONFIG matters as the producer will then accumulate enough messages before send them to Kafka.

But each request from the client may normally require each instance of the Kafka producer, which is highly inefficient. To tackle this, we instantiate the Kafka producer only once, at the beginning of the server startup and it will last throughout the server lifecycle. The singleton instance is then passed into the message-sending method:

def produceMessageBatch(producer: KafkaProducer[String, String], topic: String, jsons: Seq[String]): String = 
  val status: ArrayBuffer[String] = ArrayBuffer(topic)
  for (json <- jsons) {
    try {
      producer.send(new ProducerRecord[String, String](topic, json))
      status +=  s"OK:${json}."
    } catch {
      case e: Exception
      => e.printStackTrace()
        status +=  s"Error:${json}:${e.getMessage}."
    }
  }

  status.mkString("")
}

The Kafka documentation notes that “the producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.” So we can specify a thread pool size that uses the same produce instance. This is expected to be a huge boost in addressing the request load. (We also experimented with the Alpakka Producer but it didn’t offer a benefit as the use cases don’t match.)

How is the message order maintained when it reaches Kafka? We have two partitions for each topic, and Kafka guarantees message ordering within the same partition, not across all the partitions. The question actually should be “how does the consumer get the messages and upsert the table properly?”

Fortunately, we only need to have the update latency of about several minutes, hence using the microbatch processing of Spark structured streaming allows the selection of the latest record for the upsert for each record ID. The Spark streaming is the second service in the architecture and it runs on a Kubernetes cluster with dynamic resource allocation, consuming records from the Kafka topics so as to make changes to the tables. The tables are created with the Delta format.

On looking back, it seems that we may relax the constraint of message ordering on the producer side since it’s taken care of by the streaming service. But keeping the configurations helps maintainance in case the streaming logic is modified.

How useful was this post?

Average rating 0 / 5. Vote count: 0

No votes so far! Be the first to rate this post.

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?

Previous Post

Extracting Data Lineage From Spark Jobs

Xuan Nguyen Thai

Xuan Nguyen Thai

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *


  • Engineering
  • Data Science
  • Product
  • Life At
WHO WE ARE?

© 2026 CADS - The Center of Applied Data Science by FPT DC5

No Result
View All Result
  • Engineering
  • Data Science
  • Product
  • Life At

© 2026 CADS - The Center of Applied Data Science by FPT DC5

Join us