Apache Flume was designed mainly to transfer log data and there are many posts and guides out there that show how to use it for this purpose. Under certain circumstances it can also be used to transfer binary data.
We will show a use case where we have a Cloudera Hadoop cluster and a remote server that stores image files (jpeg), from security cams, for example. We need to monitor a directory on the remote server and transfer the image files as they appear to the Cloudera cluster and store them in HDFS. To do that we will use http connection that can also be ssl secured (although we will not show it here).
This will be a two steps setup: One agent will run on the remote server, collect the image files and send them via http to the next agent that runs on the cluster. This agent will receive the files via http and store them one by one on HDFS.
Flume provides a http source out of the box, but not http sink. There are at least two custom http sinks developed by 3rd parties, and I chose to go with the one from HMRC.
As you can see if you follow the link above, the download and installation is quite simple. However, to save you a headache, after downloading the source, and before running sbt, edit the build.sbt file and change all occurrences of “1.7.0-SNAPSHOT” to “1.7.0” or it will not find all dependencies and fail to compile.
When the new http sink jar is in place, we can use it to send http POST requests.
As I mentioned, this is a two agent setup as demonstrated here:
I used Apache flume 1.7 on the remote server, while using the built in Flume 1.6 from my CDH 5.12 cluster.
This is the flume configuration file on the remote host side:
sample1.sources=source1 sample1.channels=channel1 sample1.sinks=sink1 sample1.sources.source1.type=spooldir sample1.sources.source1.spoolDir=/flume/incoming sample1.sources.source1.deserializer=org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder sample1.sinks.sink1.type=uk.gov.hmrc.flume.sink.HttpSink sample1.sinks.sink1.endpoint=http://cloudera1.lan:9007 sample1.sinks.sink1.contentTypeHeader=application/octet-stream sample1.sinks.sink1.backoff.200 = false sample1.sinks.sink1.rollback.200 = false sample1.sinks.sink1.incrementMetrics.200 = true sample1.channels.channel1.type=memory sample1.sources.source1.channels=channel1 sample1.sinks.sink1.channel=channel1
Note the source that uses a Blob serializer and the configuration of the Http sink. I used memory channel for simplicity, but in production it’s worth considering a file channel as it will not lose events in case of an agent crash.
This is the configuration on the Cloudera side (you set it via Flume service configuration in cloudera manager):
sample1.sources=source1 sample1.channels=channel1 sample1.sinks=sink1 sample1.sources.source1.type=http sample1.sources.source1.bind=cloudera1.lan sample1.sources.source1.port=9007 sample1.sources.source1.handler=org.apache.flume.sink.solr.morphline.BlobHandler sample1.sources.source1.channels=channel1 sample1.sinks.sink1.type=hdfs sample1.sinks.sink1.hdfs.path=/flume/incoming sample1.sinks.sink1.hdfs.rollInterval=0 sample1.sinks.sink1.hdfs.rollSize=0 sample1.sinks.sink1.hdfs.rollCount=1 sample1.sinks.sink1.hdfs.fileType=DataStream sample1.sinks.sink1.channel=channel1 sample1.channels.channel1.type=memory
Here the source uses Blob handler and the sink uses rollcount of 1, which means that each incoming file (event) will be written to a different file in HDFS.
Now let’s test it
I have Four image files on the remote server:
-rw-r--r-- 1 root root 200353 Sep 7 14:41 Aqua.JPG -rw-r--r-- 1 root root 1021283 Sep 7 10:41 Dune.JPG -rw-r--r-- 1 root root 2539043 Jun 2 2014 IMG_7095.JPG -rw-r--r-- 1 root root 2313312 Jun 2 2014 IMG_7155.JPG
I started the Flume agent on the remote server:
export FLUME_HOME=/flume/apache-flume-1.7.0-bin cd $FLUME_HOME bin/flume-ng agent --name sample1 --conf-file $FLUME_HOME/conf/flume-conf.properties --conf conf -Dflume.root.logger=INFO,console [/PLAIN] Again, the logger parameter is for debugging only and you should omit it in production. One small annoyance is that if there are no new files to send, the HTTP sink just spins and shows a warning "[WARN - uk.gov.hmrc.flume.sink.HttpSink.process(HttpSink.java:216)] Processed empty event" every few seconds. Now that the agents are running on both ends, we just copy the image files to the spool directory: cp *.JPG /flume/incoming
The remote agent picks up the new files immediately:
2017-09-07 14:36:47,955 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one. 2017-09-07 14:36:47,956 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /flume/incoming/Dune.JPG to /flume/incoming/Dune.JPG.COMPLETED 2017-09-07 14:36:47,978 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one. 2017-09-07 14:36:47,979 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /flume/incoming/IMG_7095.JPG to /flume/incoming/IMG_7095.JPG.COMPLETED 2017-09-07 14:36:47,994 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one. 2017-09-07 14:36:47,995 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:433)] Preparing to move file /flume/incoming/IMG_7155.JPG to /flume/incoming/IMG_7155.JPG.COMPLETED
After a short while the files appear in HDFS:
[[email protected] ~]$ hdfs dfs -ls /flume/incoming Found 4 items -rw-r--r-- 2 flume supergroup 200354 2017-09-10 23:38 /flume/incoming/FlumeData.1505075873883 -rw-r--r-- 2 flume supergroup 1021284 2017-09-10 23:38 /flume/incoming/FlumeData.1505075873884 -rw-r--r-- 2 flume supergroup 2539044 2017-09-10 23:38 /flume/incoming/FlumeData.1505075873885 -rw-r--r-- 2 flume supergroup 2313313 2017-09-10 23:38 /flume/incoming/FlumeData.1505075873886.tmp
The last file has a tmp extension because it hasn’t rolled yet. When the next file will arrive it will be renamed without the tmp extension.
The sharp eyed viewers will see that each file as an extra byte comparing to the original files. I couldn’t find the reason for this, but the files seem to be still usable. Here are the files in Linux GUI:
hdfs dfs -copyToLocal /flume/incoming/FlumeData* /home/hdfs
From the application’s perspective this is a good method as the files reach HDFS in the same way they are stored on the source server and they are ready to use without any further actions.
From the storage perspective, HDFS is optimized for much larger files. So if you have lots of files it is better to use avro sink to group image files to larger avro files. You will then have to write your own code that deserializes the avro files and extracts the individual images before you run any computations on them. We will try this technique in a another post.