Moving binary data with Kafka – a more realistic scenario

Back in September 2016 I published a post named “Moving binary data with Kafka“. It was a Java program that monitored a directory for changes, and whenever a new file arrived it read it, split it to chunks, and sent it via Kafka to a consumer that assembled the file and wrote it to a target directory. It was a very simple example that used only one partition and thus was transferring the data in a serial manner that does not scale. This post received many comments suggesting ways to rewrite it so it will fit a real world use. So I decided to write a better version that addresses the scalability problem end makes a better use of Kafka’s capabilities. I used Kafka that comes with Cloudera CDH 5.12 for this post.

First of all, let’s create a topic, this time with 4 partitions (as I only have four nodes on my test cluster, there should be more partitions on larger clusters).

cd /opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin
./kafka-topics.sh --zookeeper cloudera1:2181 --create --topic filestopic --partitions 4 --replication-factor 1

Lately I use Python much more than Java, so I decided to write the programs in Python this time (python 2.7).

There are several choices for Kafka libraries in python, like kafka-python, Confluent and pyKafka. I decided to stick with kafka-python. We have to make sure we have kafka-python installed (I use anaconda but standard python 2.7 will do fine):

D:\ProgramData\Anaconda2\Scripts\pip install kafka-python
Collecting kafka-python
Downloading kafka_python-1.3.5-py2.py3-none-any.whl (207kB)
100% |################################| 215kB 1.1MB/s
Installing collected packages: kafka-python
Successfully installed kafka-python-1.3.5

In order to watch a directory for changes, we need to install watchdog. In this case, pip did not work for me, but easy-install did:

easy-install watchdog

This is just to demonstrate Kafka’s ability so I use a “quick and dirty” approach and those programs might not follow all best practices of python programming. I also used many hard-coded parameters but in real life you should use config file or inline parameters instead.

The Producer

We have to split each file to chunks according to chunksize_kb parameter, so I created a datum class that represent such a chunk. It holds the file name that this chunk belongs to, the chunk’s ordinal number and the actual binary data.

The program uses whatchdog to listen to new files arrival and when a new file arrives it finds its size and calculates how many chunks it will be broken to. Then it spawns 4 threads (the same as the number of Kafka partitions and the number of Kafka servers) that reads chunks from the file by offset. The “latest” global variable is used as a high water mark that makes sure that two threads will not process the same chunk.

from kafka import KafkaProducer
import os
import time
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import pickle

# Those are hard coded parameters. In real world you will want to put them in a config file or pass them
# as inline parameters.

# Bootstrap servers
kafka_server = "cloudera1:9092"

# The directory to watch for new files
basepath = "d:\\source"

# Chunk size
chunksize_kb = 10

num_of_threads = 4

# Utility variable that makes sure threads don't compete on the same chunk
latest = 0
file_length = 0
filename = ""

# Utility variable that holds the number of chunks that this file should have
num_of_chunks = 0


# Sends a datum down the stream to Kafka
#def send(self, mydatum):
#    self.producer.send(topic="filestopic", value=pickle.dumps(mydatum),
#                       key=pickle.dumps(mydatum.getFileName() + ":" + str(mydatum.getSerial())))

# A class that represents a single chunk of data
class datum:
    # The binary data itself
    data = bytearray()
    # The file name
    filename = ""
    # Ordinal number of this chunk in the file
    serial = 0
    __len__ = 0

    def __init__(self, data1, name1, serial1):
        self.data = data1
        self.filename = name1
        self.serial = serial1

    def getData(self):
        return self.data

    def getFileName(self):
        return self.filename

    def getSerial(self):
        return self.serial

# This is a worker thread that reads chunks of file and send them to Kafka
class worker(threading.Thread):
    # we pass in the send function so we do not have to define it again in the threads
    def __init__(self, file, send):
        threading.Thread.__init__(self)
        self.file = file
        self.send = send
        self.filename=os.path.basename(file.name)
    def run(self):
        self.dowork(self.file, self.filename)

    def dowork(self, file, filename):
        # Latest is a file high water mark, this is the latest chunk read from the file
        global latest
        global num_of_chunks
        # Iterate until we get to the last chunk, then quit
        while latest <= num_of_chunks:
            # First of all , increase latest, so no other thread will work on the same chunk as this one
            latest += 1
            # get to the offset of latest x chunk size to get to the chunk we want to read now.
            file.seek((latest - 1) * chunksize_kb * 1024)
            # read a chunk
            piece = file.read(chunksize_kb * 1024)
            # Construct a datum with the filename, the ordinal number and the actual data and send it ti Kafka
            onedatum = datum(piece, filename, latest)
            self.send(onedatum)

class CreatedHandler(FileSystemEventHandler):
    serial = 0
    producer = KafkaProducer
    global latest
    global num_of_chunks

    def __init__(self, bootstrap):
        # Create the Kafka producer (all threads share the same producer)
        self.producer = KafkaProducer(bootstrap_servers=bootstrap)

    # Sends a datum down the stream to Kafka
    def send(self, mydatum):
        self.producer.send(topic="filestopic", value=pickle.dumps(mydatum), key=str(mydatum.getSerial()).encode('utf-8'))

    # Before sending each file we send a header that tells the consumer what is the file name and how many chunks to expect.
    # We also have to serialize the datum object.
    def send_header(self, mydatum, mykey):
        self.producer.send(topic="filestopic", value=pickle.dumps(mydatum), key=mykey.encode('utf-8'))

    # This gets fired every time a new file is created in the directory. It then calls read_split_files.
    def on_created(self, event):
        time.sleep(0.5)
        if event.is_directory:
            return
        filepath = event.src_path
        # I use TeraCopy to copy files and it creates a temporary file while copying. You probably don;t need this check.
        if 'TeraCopyTestFile' not in filepath:
            self.read_split_file(filepath)

    def read_split_file(self, filepath):
        global num_of_chunks
        # We use a dictionary to hold pointers to the spawned threads
        threadmap = {}
        f = open(filepath, 'rb')
        self.filename = os.path.basename(filepath)
        self.file_length = os.path.getsize(filepath)
        # Calculate number of chunks
        num_of_chunks = self.file_length / chunksize_kb / 1024
        some_more = self.file_length % (chunksize_kb * 1024)
        if some_more > 0:
            num_of_chunks += 1
        # The header is sent once per file in format of "Header:filename:num_of_chunks" so that the consumer knows
        # How many chunks to expect.
        header_string="Header:"+self.filename+":"+str(num_of_chunks)
        header = datum(None, self.filename, header_string)
        self.send_header(header, header_string)
        # Starting the threads
        for i in range(1, num_of_threads):
            mythread = worker(f, self.send)
            mythread.setName("Thread " + str(i))
            threadmap[i] = mythread
            mythread.start()

        # waiting the threads to finish
        for i in range(1, num_of_threads):
            threadmap[i].join()

        f.close()
        self.latest=0
        self.producer.flush()


if __name__ == "__main__":
    event_handler = CreatedHandler(kafka_server)
    observer = Observer()
    observer.schedule(event_handler, basepath, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

It processes files in a serial manner, but each file is processed in parallel.

Here is how the Kafka topic looks like after the producer pushed all the file chunks into it:

View full size image

Note the header entry and the keys that contains the chunk’s ordinal number.

The consumer

The consumer is more challenging, since it receives the chunks in no special order and has to save them into a file in a very specific order.

It also has to handle the possibility of chunks belonging to more than one file arriving together.  To address those problems I used two python dictionaries, The first dictionary has a key which is the ordinal number of the chunk within the file and the value is the actual byte array. Each chunk that arrives is entered into the dictionary  until the dictionary is complete (the number of chunks in it equals to the expected number of chunks in the file, which is derived from the header record). Then there is a loop that runs from 1 to number_of_chunks_in_the_file and write the chunks into the output files in the right order.

The second dictionary contains file names as keys and the first dictionaries as values, this way we can handle several files at once.

There is , however, one unresolved issue:

I thought that if we have multi-threaded producer and a multi-partition topic, it’s obvious that we also need a multi-threaded consumer to keep the whole thing robust and balanced. But after coding a multi-threaded consumer it just didn’t work. Then I found that kafka-python is not thread safe and does not support multi-threading.  So I had to rewrite it to use a single thread. This, of course, limits scalability and in times of high volume of incoming files can put pressure on the consumer that will not be able to keep up with the producer. I tried to look at the other Kafka libraries documentation and none of them stated they are thread safe (but they also did not say they’re not). Eventually I decided to leave mt code as it is and still use kafka-python, but I should check the other libraries.

Another limitation of this design is that the whole file must fit in memory, and that makes it not suitable for very large files. It can be changed to store the actual chunks on disk, while holding only pointers in memory, and then joining all the chunks on disk to one file. This way it can support very large files in a cost of degraded performance.

from kafka import KafkaConsumer
import os
import pickle
import threading

outdir="d:\\target"
filename=""
# The files dictionary holds filedata objects that are dictionaries that contains file chunks.
files={}

# This class represents a bibary file chunk. It contains the actual bytes, along
# With the file name and the serial number of this chunk in the chunk list.
class datum:
    data = bytearray()
    filename = ""
    serial = 0
    __len__ = 0

    def __init__(self, data1, name1, serial1):
        self.data = data1
        self.filename = name1
        self.serial = serial1
        __len__ = len(data1)

    def getData(self):
        return self.data

    def getFileName(self):
        return self.filename

    def getSerial(self):
        return self.serial

# This class represent a whole file. It has a chunks dictionary that contains all the file's chunks, an add method that adds new chunks
# a isComplete method that determines if all th chunks of this file has arrived and a writeFile method that
# Writes all the chunks to the output file.
class filedata:
    chunks={}
    name=""
    outdir=""
    # Target is the number of chunks that this file is supposed to have
    target=0

    def __init__(self, out):
        self.outdir=out
        self.target = 0
    def add(self, key, value):
        #If this is the header entry, extract the target and the filename and ignore the binary data
        if key.startswith("Header"):
            dummy, self.name, itarget = key.split(":")
            self.target=int(itarget)
        else:
            self.chunks[key]=value

    def getFileName(self):
        return name

    def getTarget(self):
        return target

    def getValueAt(self, pointer):
        return self.chunks[pointer]

    def isComplete(self):
        if len(self.chunks)==self.target:
            return True
        else:
            return False

    def writeFile(self):
        f = open(self.outdir + "\\" + self.name, "wb")
        for i in range (1, self.target):
            data=self.getValueAt(str(i)).getData()
            f.write(data)
        f.close()


consumer = KafkaConsumer("filestopic", group_id=None, auto_offset_reset='earliest',
                             bootstrap_servers="cloudera1:9092")
consumer.subscription()

while True:
    # Read the messages from the Kafka topic
    for msg in consumer:
        # Get the chunk
        mydatum = pickle.loads(msg.value)
        # Get the file name
        filename = mydatum.getFileName()
        # Add the file to the files dictionary if it's not already there
        if filename not in files:
            myfiledata = filedata(outdir)
            files[filename] = myfiledata
        # Add the chunk to the appropriate file data object
        files[filename].add(msg.key, mydatum)
        # Check if we already have all the chunks of the file. if yes, invoke the write file process.
        if files[filename].isComplete():
            files[filename].writeFile()
            del files[filename]
            print "Finished"

It is difficult to show it in action in a static medium like a blog post (I can only show the same file exists in the source and target directories), so you will just have to belive me or try it yourself.

One final note is that like in the original post from September 2016, I followed an article by Gwen Shapira that said that the optimal message size in Kafka is 10Kb and you should split larger files to chunks this size. It looks like the original article is offline now, but I will try to check if this is still valid by running with different chunk sizes and I will post the results.

This entry was posted in Kafka and tagged , . Bookmark the permalink.

Leave a Reply