18 Nov 2021 Integrating Apache MiNiFi with Apache NiFi for Collecting Data from the Edge
Business Intelligence (BI) is now a very well-known term among decision-makers. We could say that the concepts, methodologies, and paradigms behind the term are popping up almost every day, and these new additions are not only related to technology, but also to a way of thinking and to the agreements between people working together on projects. Bearing this in mind, there is one term which is, we could say, complementary to BI, and that is Operational Intelligence (OI).
Operational Intelligence is an ecosystem of business rules, technology and concepts which can answer the question “What is going on right now, at this very moment?”, while BI, on the contrary, answers the question “What happened before, in the past?” in order to obtain insights into the future.
To be able to answer this question, OI needs to use fast technology which can ingest data from the very place where it is generated (data on the edge) and serve that data for consumption in a real-time or near real-time manner – the data is then processed in streams and time windows. There are also some general terms emerging from OI which are very popular today: Internet Of Things (IoT), Smart Cities, and Smart Industry.
In this article, we will demonstrate how to build an IoT ecosystem for collecting data from the edge in near real time, using Apache NiFi, Mosquitto message broker and Apache MiNiFi running on a Raspberry Pi 3B+. So let’s get started with a short introduction to these technologies.
Apache NiFi is a robust and scalable solution to automate data movement and transformation across various systems. Alongside Apache NiFi, we will use Apache MiNiFi: a subproject of Apache NiFi and a lightweight version of it, which means most of the processors are not preloaded and there are some internal differences that make it more convenient for running on the edge, like in embedded computers close to the data source (sensors, signal processors, etc.). Both NiFi and MiNiFi are offered by Cloudera as part of Cloudera Data Flow, but for this article we will use open-source versions.
For our demonstration, MiNiFi is running on a Raspberry Pi model 3B+ and acting as a data gateway, i.e. the point where data is collected and routed to a higher domain for further processing. Note that in this article we are demonstrating data collection, but not further processing.
We are also using Mosquitto message broker, a lightweight service to send messages securely from one point to another following a publish/subscribe model, ensuring the messages are not lost. Mosquitto is an implementation of the MQTT protocol, which is used for communication on low bandwidth networks and where resource consumption needs to be low, like embedded computers, phones or microcontrollers.
1. Demo Architecture
Our demo environment consists of the components represented in the picture below:
Figure 1: Demo architecture
On the right we can see the data source (a radio tower belonging to an imaginary telecommunications company). In that tower, signals are processed and captured, and from these signals data about phone calls is generated, as well as data from SMS traffic and from people surfing the internet.
For this article, we will simulate SMS traffic data on the radio tower, and we will capture that data using the previously mentioned technologies. SMS traffic data is generated by a JSON data generator running on the Raspberry PI microcomputer; this simulates operational (OT) data generated by hardware on real telecommunications towers. The JSON data is then forwarded to the Mosquitto message broker which forwards the messages on to the MiNiFi agent. This agent is the real data mediator: it sends the received data to the NiFi instance running on a Linux Virtual Machine (VM) on a server in the same network using Site-to-Site protocol (in a real scenario, the NiFi instance would be on a separate remote server, in a different geographical location).
On that server, represented on the left of the picture, data can be further processed and prepared for storing into various analytics storages or time-series databases for real-time analysis and dashboarding, or even for alerting purposes. Note that in real life there would also be some sort of streaming platform, such as Kafka, for buffering the messages from multiple towers. However, to simplify our demonstration, we are putting everything in the same location, without a streaming platform in the middle.
2. Preparing the Services
Before the actual demonstration of streaming data from the Raspberry Pi to the NiFi instance, we will go through the installation and configuration steps for the services. After these steps, we will demonstrate how to integrate Apache MiNiFi and NiFi service with the rest of the components: Mosquitto message broker and the JSON data generator.
2.1. Installing and Configuring Apache NiFi
Apache NiFi can be downloaded from the official project website in the form of binary files. At the time of writing, the latest stable version is 1.14.0. After the download, the NiFi files can be unpacked and installed in the desired folder on the Linux VM, which we will refer to as $NIFI_HOME. In the installation folder there are other important folders, like $NIFI_HOME/bin and $NIFI_HOME/conf. An important requirement to run NiFi is to have Java version 1.8+, and to set the Java home path as an environment variable; to do so, use the following commands:
[user@host]# apt-get install openjdk-8-jdk [user@host]# nano /etc/environment # set $JAVA_HOME=”path/to/jvm/installation/folder” [user@host]# source /etc/environment [user@host]# java -version #check installed Java version
To configure NiFi to start on the desired localhost port, edit the file nifi.properties in $NIFI_HOME/conf and set the following lines under the section #web properties, as shown below:
nifi.web.http.host=0.0.0.0 nifi.web.http.port=8081
To ensure that NiFi is able to receive the data from another NiFi or MiNiFi instance using the Site-to-Site protocol, change the following lines in the same file:
# Site to Site properties nifi.remote.input.host= nifi.remote.input.secure=false nifi.remote.input.socket.port=1026 nifi.remote.input.http.enabled=true nifi.remote.input.http.transaction.ttl=30 sec nifi.remote.contents.cache.expiration=30 secs
After this, we can go to the folder $NIFI_HOME/bin and install NiFi as a service on the Linux VM; to do so, use the following command:
[user@host]# ./nifi.sh install nifi
After this command, NiFi is installed on the Linux VM as a service named “nifi”. We can now start NiFi and check that everything has gone well by opening the URL http://localhost:8081/nifi in our web browser. If the NiFi UI is shown, NiFi is ready for flow development.
To check the status or to start/stop the NiFi service, use the following commands:
[user@host]# service nifi status [user@host]# service nifi start [user@host]# service nifi stop
2.2. Installing Apache MiNiFi
Apache MiNiFi is installed on the Raspberry Pi microcomputer. The binaries can be downloaded from the official project website. At the time of writing, the latest stable version is 1.14.0. After the download, files can be unpacked in the desired installation folder, which we will refer to as $MINIFI_HOME. In the installation folder there are also other important folders such as $MINIFI_HOME/bin and $MINIFI_HOME/conf. The requirements to have Java 1.8+ installed and the Java home path set as an environment variable also apply to MiNiFi, and we can do so using the same commands as shown in the previous step.
To install MiNiFi as a service on the Raspberry Pi, open the folder $MINIFI_HOME/bin and use the following command:
[user@host]# ./minifi.sh install minifi
After the service has been installed, we can use the same commands as for NiFi to check the status of the MiNiFi service, or to start/stop it:
[user@host]# service minifi status [user@host]# service minifi start [user@host]# service minifi stop
The MiNiFi agent is now ready to use, but we are not going to start it yet as we first need to develop the flow which will run on it.
Generally, there are two ways of developing MiNiFi flows: the first one is to manually create a YAML file, which is basically the flow configuration and the MiNiFi instance configuration used at the MiNiFi agent start-up. To create such a YAML file manually is cumbersome, so it is usually better to pick the second option, and that is to develop the flow using the NiFi UI and to save it as a template in XML format. After that, the MiNiFi toolkit can be used to convert the template XML file into a configuration YAML file which can be deployed on the MiNiFi instance on the Raspberry Pi. This YAML file is basically the definition of the flow, which will start automatically after starting the MiNiFi agent.
2.3. Installing Mosquitto Broker and the JSON Generator
To handle the generated JSON data, we will install the Mosquitto message broker on the Raspberry Pi. To do so, we can use a standard Linux package manager to fetch the latest stable version of the broker from the internet and to install it (at the time of writing, the latest version of Mosquitto message broker is 3.1.1 and the MQTT protocol is version 3.1). We use the following command:
[user@host]# apt-get install mosquitto
After the installation, the Mosquitto service starts automatically and listens on port 1883 for incoming messages.
To simulate SMS traffic data, we use the JSON data generator, which can be downloaded from the GitHub repository. After unpacking the generator files in the desired installation folder, we use the JAR file “json-data-generator-1.4.0.jar” to run the generator.
Before running the generator, we must create two configuration files to define the details of the stream simulation, including what data to generate and how fast. Some configuration file examples are located in the ./conf folder, which is where we put the two configuration files for our demo.
The first configuration file defines the type of data that will be generated and its frequency. This file looks like the one shown below:
{ "eventFrequency": 40, "varyEventFrequency": true, "repeatWorkflow": true, "timeBetweenRepeat": 200, "varyRepeatFrequency": true, "steps": [ { "config": [ { "SendDateTime": "nowTimestamp()", "SMSChannelId": "random(260987,261000,261383,90922,202007,203417)", "NetworkId": "random(540,5663,847,2502,4822,5664,1213,1428,2450,3204)", "GatewayId": "random(13,15,29,92,94,96,136,138,163,170,172,180)", "SMSCount": "random(1,2)" } ], "duration": 0 } ] }
In this configuration file we have some JSON fields defining the data generator workflow:
- eventFrequency – the time in milliseconds between the steps (our generator has only one step)
- varyEventFrequency – if true, a random amount (between 0 and half the eventFrequency) of time will be added/subtracted to the eventFrequency
- repeatWorkflow – if true, the workflow will repeat after it finishes
- timeBetweenRepeat – the time in milliseconds to wait before the workflow is restarted
- varyRepeatFrequency – if true, a random amount (between 0 and half the eventFrequency) of time will be added/subtracted to the timeBewteenRepeat
- steps – the data that will be generated in the form of a one-line JSON string; for every field in the JSON string a random value will be picked and the step will run only once, defined by the field “duration” with value 0
The second file serves as definition of the workflow name and of the data sink. The configuration file looks like this:
{ "workflows": [ { "workflowName": "SMSTraffic", "workflowFilename": "sms_traffic_generator.json" } ], "producers": [ { "type": "mqtt", "broker.server": "tcp://localhost", "broker.port": 1883, "topic": "/rpi/sensors/basetower1", "clientId": "BaseTower1", "qos": 2 } ] }
The field “workflowName” is an internal field of the JSON generator and it is arbitrary, just to give a name to the workflow. The next field is “workflowFilename”, which refers to the first configuration file, where we defined what data will be generated and how fast. The rest of the fields define the remaining details of the data sink:
- type – type of sink: in our case, Mosquitto broker which implements the MQTT protocol
- server – the hostname or IP address of the machine where Mosquitto broker is running
- port – the Mosquitto broker port
- topic – the topic where the data is sent
- clientId – arbitrary name of the client which sends the data
- qos – Quality of Service, an important aspect of the MQTT protocol which defines the quality of message transfer. In our case it is level 2, which means every message will be transferred exactly once and is guaranteed not to be lost
With this configuration, the JSON data generator will send JSON strings like the one represented below to the Mosquitto message broker every 200 milliseconds; since there is only one step in the workflow, the “eventFrequency” field is ignored:
{"SendDateTime":"2021-09-23T11:57:22.882Z","SMSChannelId":289109,"NetworkId":3965,"GatewayId":1594,"SMSCount":2}
3. Preparing the Flows for NiFi and MiNiFi
Before starting the MiNiFi agent and generating the SMS traffic data, we need to develop the NiFi and MiNiFi flows which will handle the data stream. As we said earlier, we will develop the MiNiFi flow using NiFi running on a Linux VM because it is the easiest way.
In the picture below there are two process groups: on the right, we can see the process group that will run on the MiNiFi agent on the Raspberry Pi, collecting data from the Mosquitto message broker. On the left, we can see the process group that will run on NiFi on the Linux VM: it consists of one simple flow containing one input port and one dummy process group in which logic for further processing can be developed (like routing the data to multiple destinations, filtering it, or storing it to the final storage like time-series databases). The input port listens for data coming from other NiFi or MiNiFi instances on port 1026, as configured before. In our case, data will be coming from MiNiFi running on the Raspberry Pi.
Figure 2: NiFi and MiNiFi flows
Once again, note that the process group on the right is developed in NiFi, but will run on the MiNiFi agent, not on the NiFi instance.
3.1 The MiNiFi Flow
The picture below depicts the MiNiFi flow. Before data is sent to the NiFi instance, every flowfile is enriched with some additional attributes like timestamp, clientId, agent and flow version.
Figure 3: MiNiFi flow
The first processor in the pipeline is ConsumeMQTT, which pulls JSON data from the Mosquitto message broker. The properties of the processor are shown in the pictures below. The processor connects to the localhost Mosquitto service, running on port 1883.
Figure 4: ConsumeMQTT properties (1/2)
Figure 5: ConsumeMQTT properties (2/2)
The topic to which the processor is subscribed is located on the path “/rpi/sensors” and it is called “basetower1”. Here we are using the wildcard “#” to receive data from all the topics on that path, although we have just one topic. Quality of service is set to level 2 to guarantee that messages will not be lost and will be delivered exactly once. To ensure this, the processor uses a queue into which it buffers the messages if the processor’s run schedule is behind the rate messages are coming in.
The next processor in the pipeline is UpdateAttribute, which adds some additional flowfile attributes to every message. This is an example of how the data stream can be enriched with some arbitrary data that has meaning for us. In the picture below, we can see the details of the additional attributes: agent – name of the MiNiFi agent, clientId – name of the radio tower, timestamp – date and time when a message entered MiNiFI flow and version – the MiNiFi flow version.
Figure 6: Attributes added to flowfiles
4. Configuring MiNiFi Agent
As mentioned earlier, we need to convert the NiFi flow template, developed in NiFi, to a YAML file that the MiNiFi agent will use as the flow configuration. To do so, we use MiNiFi Toolkit v1.14.0 and the script called “config.sh”, located in the ./bin folder of the toolkit installation:
[user@host]# ./bin/config.sh transform /path/to/template.xml /output/path/config.yml
The first argument of the script is the path to the exported NiFi flow template in XML; the second argument is the path to which the script saves the converted YAML file. Note that the filename is “config.yml” and it must not be changed.
After converting it, we copy the YAML file into the $MINIFI_HOME/conf folder and replace the old “config.yml” file. This is the file that the MiNiFi agent uses for initial flow and instance configuration.
Before starting the MiNiFi agent, we open the “config.yml” file and add some additional properties, which are crucial for communication between MiNiFi and NiFi. As we configured NiFi running on a Linux VM with Site-to-Site port 1026, we need to do the same in the configuration of the MiNiFi agent. We add the port and the hostname to which MiNiFi is sending data, right under the field “Input Ports”, as shown below:
Input Ports: - id: AUTOGENERATED_NIFI_PORT_ID_HERE name: MiNiFi-input comment: '' max concurrent tasks: 1 use compression: false Properties: Port: 1026 Host Name: 192.168.1.9 #ip address of the Linux VM running on separate server
5. Starting the Data Stream
After completing the necessary setup, we can finally start the JSON data generator and the MiNiFi agent to stream the data into NiFi. To start the MiNiFi agent, use the following command on the Raspberry Pi:
[user@host]# service minifi start
After a few minutes, we use the “minifi.sh” script from the $MINIFI_HOME/bin folder to check the MiNiFi instance status, confirming that everything is running correctly:
[user@host]# minifi.sh flowStatus instance:health,stats,bulletins
The response from the script shows that everything is running smoothly, without any error bulletin:
Figure 7: Status of MiNiFi instance
After successfully starting the MiNiFi agent, we also start the JSON data generator on the Raspberry Pi, using the following command:
[user@host]# java -jar json-data-generator-1.4.0.jar sms_traffic_config.json
The generator starts to produce JSON messages and sends them to Mosquitto message broker, as shown in the picture below:
Figure 8: JSON data generator running
Immediately after starting the JSON generator, we can see the messages coming into the Input Port in NiFi.
Figure 9: JSON flowfiles collected by NiFi
We will now list the relationship to inspect the contents of the flowfiles and flowfile attributes, as generated by the MiNiFi flow. As we can see in the picture below, the payload of every flowfile is the JSON string message generated by the JSON data generator on the Raspberry Pi. Note that it also includes the timestamp (field “SendDateTime”), to simulate when the event occurred in the source (i.e. in the radio tower).
Figure 10: JSON message string collected by NiFi
In the NiFi flow (on the Linux VM server), we can also add an UpdateAttribute processor to capture the timestamp when the flowfiles actually entered the flow.
Figure 11: Message timestamp in NiFi
Now we have both the timestamp when the message was created in the simulated radio tower (SendDateTime field: 2021-09-23T15:09:12.740 Z), and the timestamp when the message entered the first processor in NiFi (nifi_timestamp attribute: 2021/09/23 15:09:13.318Z). This is a further example of enriching the data using Apache NiFi: we can add arbitrary flowfile attributes with some information or values that can be generated dynamically as data flows through the pipeline.
Conclusion
This article shows how we can leverage Apache NiFi and MiNiFi in combination with Mosquitto message broker and a JSON data generator to collect data from a simulated radio communications tower in near real time. The radio tower was simulated using a JSON data generator and a Raspberry Pi microcomputer, which was also where the MiNiFi agent was running: today, this paradigm is called Edge Computing.
With these tools, data can be streamed and processed in near real time, which is crucial to monitoring events and processes exactly when they are happening. This enables fast responses to some critical events, to establish predictive maintenance, or to carry out system optimisations at the right time.
At ClearPeaks we are experts on solutions like the one demonstrated in this article. If you have interesting use cases or any questions related to data streaming, simply contact us, and we will be happy to discuss them with you!