Ingesting log data with NiFi, MiNiFi and Hive

Apache Nifi is a very good data transfee and ETL system, but it does not have built-in remote components that can collect data at its origin. For example, it has FTP adapter, but you will still have to run a FTP server on the remote host and make sure the files are in the server’s directory. It has a HTTP adapter, but you will have to write code that will send the data over HTTP to NiFi.

So the guys that develop the NiFi system created a companion component named MiNiFi which does just this: It’s an agent that runs on the data source and sends the data to NiFi.

We will use MiNiFi and NiFi to monitor a remote log file and send any new added lines directly into a Hive table using Hive streaming.

Preparing Hive

In order for the Hive sink to work, our table should satisfy some rather strict prerequisites as documented here. It must be partitioned and bucketed and stored in ORC format. Here is the demo table I created in Hive;

create table logdata (
time1 timestamp,
value1 int,
message string)
CLUSTERED by (value1) into 5 buckets
stored as orc
tblproperties("transactional"="true");

There are also Three parameters that have to be set in Hive server itself:

hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on = true
hive.compactor.worker.threads > 0

[/palin]

You can set them in the advanced configuration (safety valve) in the Hive service configuration section in Cloudera manager.
Also give /tmp/hive directory full permissions in HDFS:
hdfs dfs -chmod -R 777 /tmp/hive

The Log producer

I did not have a real log creating system so I simulated one. I just ran a small bash script that writes dummy log messages to a file consisting of a timestamp, a random number and a text message:

#!/bin/bash

RANDOM=$$
PAUSE=0.1
i="0"

while [ $i -lt 1000 ]; do
((i++))
CURRDATE=`date +%D" "%T`
RANDOM_NUM=$RANDOM
echo $CURRDATE $RANDOM_NUM "This is my message." >> /tmp/original.log
sleep $PAUSE
done

The problem is that like in real life, logs are usually plain text, which is not an ingest-able format like csv or json.
I could not find a NiFi processor that can convert the log file to csv line by line (I tried a shell script and a ExecuteStreamCommand processor but it couldn’t convert it properly).
So I ended up running this command on the original log file and creating a csv formatted version of it, on which I ran a tailFile processor (This awk command is specific for the log type, so you will have to change it according to your specific log format):

tail -f original.log | awk '{print $1" "$2 "," $3 "," substr($0,index($0,$4))}' >> guy.log

This produces nice csv like this:

2017-11-06 23:21:44,11780,This is my message.
2017-11-06 23:21:44,11058,This is my message.
2017-11-06 23:21:44,15444,This is my message.
2017-11-06 23:21:44,27018,This is my message.
2017-11-06 23:21:44,1961,This is my message.

Setting up MiNiFi

MiNiFi agent comes in two flavors: a Java agent that is more comprehensive and supports all NiFi processors but has a larger footprint. The second is a C++ agent which support a smaller subset of processors but is very thin. For this demo I will use the Java agent which requires Java 1.8 to be installed.

Download the Java agent from the download page, unpack it on the data source server and point MINIFI_HOME to the directory where you unpacked MiNiFi.

Now install MiNiFi as a service (optional):

cd $MINIFI_HOME

./bin/minifi.sh install

Configure the remote port in nifi.preperties on all NiFi nodesnodes:

nifi.remote.input.socket.port=1026

MiNiFi uses a yml file to store its configuration and the recommended way to create this file is vi NiFi template. To do so, open NiFi and place a processor on the board (In our case it is a TailFilr processor that points to /tmp/guy.log). Then place an input port (we will call it From MiNiFi) and a remote process group. Double click on the remote process group and set the url to one of the NiFi nodes (here it is http://cloudera2:8080/nifi). Make sure you check the “on success” checkbox:

View full size image

 

Now create a template, then go to the main menu–>templates and save it as a xml file:

View full size image

Copy the xml file to the MiNiFi server and download the Converter Toolkit (this link is for the Linux version).

Now run it to convert your xml file to yml:

tar -xzvf minifi-toolkit-0.2.0-bin.tar.gz

cd minifi-toolkit-0.2.0

./bin/config.sh transform Logger.xml config.yml

config.sh: JAVA_HOME not set; results may vary

Java home:
MiNiFi Toolkit home: /root/Downloads/minifi-toolkit-0.2.0

No validation errors found in converted configuration.

cp config.yml minifi-0.2.0/conf

service minifi start

Finally, copy the newly created config.yml file to $MINIFI_HOME/conf and start the minifi service. The whole process is described here.

If you look at MiNiFi log at $MINIFI_HOME/logs you can see it connects to NiFi and detects that the Nifi cluster has 3 nodes (in my case):

2017-10-29 22:13:28,890 INFO [StandardProcessScheduler Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled TailFile[id=77e08c49-983a-3661-0000-000000000000] to run with 1 threads
2017-10-29 22:13:28,972 INFO [main] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled RemoteGroupPort[name=From MiNiFi,targets=http://cloudera2:8080/nifi] to run with 1 threads
2017-10-29 22:13:28,973 INFO [main] o.apache.nifi.controller.FlowController Started 1 Remote Group Ports transmitting
2017-10-29 22:13:28,988 INFO [main] org.apache.nifi.minifi.MiNiFiServer Flow loaded successfully.
2017-10-29 22:13:28,992 INFO [main] org.apache.nifi.BootstrapListener Successfully initiated communication with Bootstrap
2017-10-29 22:13:29,000 INFO [main] org.apache.nifi.minifi.MiNiFi Controller initialization took 4411667394 nanoseconds.
2017-10-29 22:13:30,404 INFO [NiFi Site-to-Site Connection Pool Maintenance] o.apache.nifi.remote.client.PeerSelector [email protected] Successfully refreshed Peer Status; remote instance consists of 3 peers

Configure NiFi
First we will look at the whole flow and then we will drill down and see each component’s details.
This is only a demo so I did not follow all best practices and did not handle all error situations. In the detailed view of each component I will not discuss the parameters taht were not changed from their default values.

First, there is an input port that will receive the data from MiNiFi and pass it to a process group:

View full size image

Here is how it looks like when we enter the process group:

View full size image

The data (which is csv lines) first arrives to an InferAvroSchema processor that tries to find the schema of the csv data. Here are it’s properties:

View full size image

The content type is, of course, csv, the csv Header definition corresponds to the Hive table columns and there should be no spaces around the commas. Also give Avro record name a value (any value). All the rest of the properties were left with default values.

In the settings tab I directed failures and unsupported formats to go to the pulFile processor we will see next, and success to proceed to the convertCSVToAvro processor.

I added a single putFile processor that should receive erroneous records from all processors and write them to files. Here is how it looks like:

View full size image

I just gave it an output directory.

Next in the flow is the ConvertCSVToAvro processor that, well, converts the csv lines into avro file. Here it is:

View full size image

The only thing I had to change here is the record schema, which I got from the previous processor, InferAvroSchema using the expression language.

Finally, there is the PutHiveStreaming processor which takes the avro file and pushes it directly into the Hive table:

View full size image

Here you have to enter the thrift URI of the Hive Metastore (you can see it in Hive parameter …), and database name and table name should point to the target table.

On success this processor should stop, but on failure it continues to the putFile processor.

After starting all components and the MiNiFi service, I ran the tail -f original.log process and then the log creating script. After a few seconds you can see the data flows through NiFi:

View full size image

You can see that the putFile component is empty, which means we had no failures. You can now check the Hive table we created earlier and see that it contains data.

This entry was posted in NiFi and tagged , , . Bookmark the permalink.

Leave a Reply