In the past, I wrote two posts about moving binary data with Kafka (Moving binary data with Kafka and the more recent Moving binary data with Kafka – a more realistic scenario).
Transferring large files I had to chop them into chunks. Trying to decide about the right chunk size I found an article by Gwen Shapira (this is the original link, but it does not work anymore) that said that the optimal message size is around 10KB, so I used this size.
But I was curious and wanted to try it by myself. So I did a little test. I wrote a python program that runs a producer and a consumer for 30 minutes with different message sizes and measures how many messages per second it can deliver, or the Kafka cluster throughput.
I did not care about the message content, so the consumer only reads the messages from the topic and then discards them. I used a Three partition topic. I guess that on larger clusters with more partitions the performance will be better, but the message size – throughput ratio will remain roughly the same.
So I wrote a small python program that generates a dummy message in the desired size, then spawns two threads, one is a producer and the other is a consumer. The producer send the same message over and over and the consumer reads the messages from the topic and count how many messages it has read. The main program stops after 30 minutes but before it stops it prints how many messages were consumed and how many messages were consumed per second.
Here is the code itself:
from kafka import KafkaProducer, KafkaConsumer import threading from threading import Timer import pickle kafka_server = "cloudera3:9092" messagesize_kb = 100 timeout=30 producer = KafkaProducer(bootstrap_servers=kafka_server) consumer = KafkaConsumer("mytopic2", bootstrap_servers="cloudera3:9092", auto_offset_reset='latest', enable_auto_commit=False, group_id=None) consumer.subscription() # Function that creates a dummy message of messagesize_kb. We run it first. def create_message(size_kb): char1 = "a".encode("utf8") # This is a 1 byte string message="a".encode("utf8") for i in range (1,1024*size_kb): message=message+char1 return str(message) # This is used by the producer to send the data to Kafka def send(data): producer.send(topic="mytopic2", value=data) # Send messages indefinately. # It should stop when producer_Sem semaphore is released but this does not work. # However, it does not affect the throughput measurement. def produce(message): while not producer_sem.acquire(False): send(message) # Consume all messages in the topic and count how many messages were consumed. # It should stop when consumer_Sem semaphore is released but this does not work. # However, it does not affect the throughput measurement. def consume(): global counter counter=0 while not consumer_sem.acquire(False): for msg in consumer: data = msg.value # Get the message counter = counter + 1 # Runs after 30 seconds and calculates messages per second def finalize(): total=counter print(str(total)+" in 30 seconds.") print(str(total/30)+" messages per second.") producer_sem.release() consumer_sem.release() exit(0) consumer_sem=threading.Semaphore() producer_sem=threading.Semaphore() # Create the message. We send the same message over and over again. message=create_message(messagesize_kb) my_producer = threading.Thread(target=produce, args=(message,)) my_consumer = threading.Thread(target=consume) threads =  threads.append(my_producer) threads.append(my_consumer) consumer_sem.acquire() producer_sem.acquire() my_consumer.start() my_producer.start() # Wait 30 seconds, then run finalize procedure. t=Timer(30, finalize) t.start()
This program is built to send text messages. If you want you can change it to send binary data my changing the create_message function to this:
def create_message(size_kb): message=bytearray(size_kb*1024) return pickle.dumps(message).encode('utf-8')
However, this does not change the results:
|Message size (kb)||Messages per second||MB per second|
From the table you can see there is an inverse relationship between the message size and the number of messages that go through the topic per second. This makes sense because larger messages will take longer to process.
However, what we are really interested in is the overall throughput, how many megabytes can we transfer every second. This is shown in the table and in the chart.
Here the relationship is not that clear. You can see that the throughput rises as the message size gets bigger, peaks at 100kb and drops again. Then at 750kb message it goes back to about 8mb/s. We can see that there is no consistent trend, that 10kb messages performs worse (in contrast to Gwen Shapira’s article) and 100kb messages performs best.
I also tried it with binary data but the results were so close that it wasn’t worth a different table and chart.
First of all, do not take for granted information you find on the internet (Including this post). The fact that Gwen Shapira and I had such a different results implies that there are other variables that may affect the throughput that I did not take into account. Bottom line is You should keep the message size configurable and experiment on your own system until you find the best performing message size.