Handling Schema Drift with Apache NiFi

In the ever-evolving world of technology, it’s crucial to ensure data quality and maintain governance over dynamic data schemas. Schema drift, characterised by changes in the structure, distribution, or quality of data over time, can have far-reaching implications on downstream systems and processes.

 

To overcome these challenges, we’ve implemented a comprehensive, customised solution that leverages the power of Cloudera Flow Management (a.k.a. Apache NiFi) to provide solid schema-drift detection and monitoring capabilities.

 

In this blog post, we’ll delve into the details of our approach and explore the powerful tools and techniques we employed.

 

 

The NiFi Flow

 

Apache NiFi, a powerful data flow automation tool, played a pivotal role in our solution: its flexibility and extensibility allowed us to develop custom logic and integrate it seamlessly into our data pipelines.

 

Within NiFi, we implemented the flow from the ingestion to the detection of the schema changes, in a solution called Schema Changes Detector.

 

Our solution operates through a flow comprising three main stages, each represented by a NiFi process group:

    1. Ingest Data
      • Data is ingested and inserted into a target table.
      • If the insertion fails, indicating a potential issue with the data or a schema mismatch, further analysis is triggered.

 

    1. Detect Schema Changes
      • Upon failed insertion, the system initiates schema change detection.
      • The incoming data schema is compared with the schema of the target table to identify any discrepancies or modifications.

 

  1. Store Schema Changes
    • Detected schema changes are logged into a dedicated table, allowing for convenient monitoring and analysis of these events.

 

Schema changes detector flow

Figure 1: Schema changes detector flow

 

 

Example Use Case

 

To better understand the functionality of our flow, let’s look at a case in which we attempt to ingest a CSV file containing an airline’s customer satisfaction data into a Hive table, but the «age» field is mistakenly passed as a string instead of the intended integer type:

 

CSV file with age field as a string instead of an integer

Figure 2: CSV file with age field as a string instead of an integer

 

In the figure above we can see a sample of the dataset, highlighting the two incorrect records. Below, we find the original target table schema:

 

Hive table schema

Figure 3: Hive table schema

 

Note that this – and what we will describe next – is just an example to present our solution. The concept can be applied to any combination of sources and targets, provided that the schemas are correctly detected and compared at the right points of the ingestion flow.

 

1. Ingest Data

 

The first step of the Schema Changes Detector flow is to ingest the file into the corresponding Hive table: we do so in the Ingest Data process group. If the insertion is successful, it means there were no issues, but if it fails it might be due to a difference in the schema, so we route the FlowFile to the Detect Schema Changes process group to address the issue:

 

Route the file to check for changes

Figure 4: Route the file to check for changes

 

2. Detect Schema Changes

 

As mentioned, upon failing to upload the file to the Hive table, our flow process proceeds to check for potential changes between the schema of the incoming data and the actual schema of the target table. This happens in the Detect Schema Changes process group. To do so, we take the schema from the incoming file, and we compare it with the schema of the corresponding Hive table using a query. This allows us to identify any discrepancies or changes that may need to be addressed.

 

In some cases, we may need to normalise schemas before comparing them. This is required for schema structure validation, ensuring that the schemas are correctly defined and compatible for accurate comparison, particularly with respect to how null values are handled. This step is crucial for maintaining data integrity whilst ensuring compatibility between the incoming data and the target table schema. In our case, we use the Execute Script processor (Normalize avro schema in the picture below) where we run an ad hoc Python script:

 

Schema normalisation

Figure 5: Schema normalisation

 

Below, we can see both schemas after the normalisation process. As we can see, the age field is supposed to be an int, whereas it’s a string in the incoming file:

 

Normalised schema in data

Figure 6: Normalised schema in data

 

Normalised schema in table

Figure 7: Normalised schema in table

 

The actual comparison is automated in NiFi using another Execute Script processor: detect_schema_change. This script will generate an attribute called comparison_output, containing the description of the detected discrepancy:

 

Changes between the schemas

Figure 8: Changes between the schemas

 

This information can also be stored to track the changes in the schema and to facilitate monitoring and further analysis.

 

By using a custom attribute to indicate whether a schema change was detected or not, we can use the Route On Attribute processor to send our file to the last process group of our flow:

 

PROCESSOR DETECTING CHANGES

Figure 9: Processor detecting changes

 

3. Store Schema Changes

 

In this final process group, Store Schema Changes, we extract some metadata from the file that we failed to ingest, and store it in a dedicated log/event table:

 

Store changes in hive table

Figure 10: Store changes in hive table

 

First, using the replaceText processor, we generate a record that contains key information about the detected schema change. Then, as we can see in the image below, we store the information in the track_changes Hive table, which includes the table name, a description of the changes, and the timestamp:

 

Example of changes in hive table

Figure 11: Example of changes in hive table

 

This table can then be used to feed any monitoring dashboard, alert mechanism, or as a source of useful information for data stewards who might have to take actions based on the detected drift.

 

 

Conclusions

 

By using NiFi as our data movement and orchestration tool, with a touch of custom code, we’ve created a comprehensive schema drift detection and monitoring solution, allowing us to mitigate the potential impacts of schema changes on downstream systems and processes, enabling prompt remediation when necessary and in turn, ensuring data integrity.

 

It’s important to note that while we’ve illustrated this with a CSV to Hive scenario, our approach is versatile and adaptable to virtually any data integration combination with minimal adjustments. This flexibility highlights the broad potential of our solution across different use cases.

 

What’s more, thanks to the final event table, we can easily embed the drift information into existing monitoring or alerting apps or ecosystems.

 

If you would like further information about this solution or other NiFi integration projects, don’t hesitate to contact us – we’ll be happy to help!

 

Dario B
dario.buccafusco@clearpeaks.com