CDC with Apache NiFi

Nikhil Suthar
6 min readOct 12, 2021

--

What is CDC?

CDC is a short term for Change Data Capture. The Change Data Capture (CDC) is an ideal solution for near-real-time data movement from any relational database (such as MySQL, Oracle, Microsoft SQL Server etc) to a data warehouse, data lake or any other relational database system.

CDC track all the changes occur at Source Database (such as Insert, Update or Delete) and replicate these changes to the target system.

Tools available for CDC

When it comes to the tool for CDC, there are so many options available in the market. Following is the list of a few tools that support the CDC.

The list of tools will continue since there are so many other CDC tools are also available. All these tools come with an enterprise licence version that supports multiple features. But when it comes to the open-source Tool for CDC, Then the Apache NiFi is one of the open-source tools that limited support CDC with MySQL Database.

CDC with Apache NiFi

The Apache NiFi is a powerful tool that comes with a web-based user interface and it supports ETL, Orchestration, Scheduling, Data provenance and also CDC with MySQL. Apache NiFi comes with a bundle of processors these processors support different functionality. For change data capture, we will use the CaptureChangeMySQL processor.

CDC with NiFi — Published at https://nikhil-suthar-bigdata.medium.com/

Environment

  • Apache NiFi (> version 1.11.0)
  • MySQL

For the below tutorial I have used NiFi version 1.11.4 and MySQL version 5.7.35.

CDC Pipeline Setup

We will walk you through the process of using the CaptureChangeMySQL and the complete CDC Pipeline Setup.

MySql Configuration

Following are the steps that need to perform to configure Mysql.

  1. Install MySQL Server if not available.
  2. Enable Binary Logging: To Enable Binary logging at MySQL, add the below configuration to the [mysqld] section of the my.cnf config file. You can get my.cnf file at /usr/local/etc/mysql or /etc/mysql.
[mysqld] server-id = 1 binlog_format = row log_bin = /var/log/mysql/mysql-bin.log

Give server-id → 1, log_bin path as above and enable row-level binlog events. It will create all logfiles at the path mentioned in log_bin properties.

3. Restart MySql Server.

4. Validate the binary log change with the below command at MySQL shell.

mysql > show variables like “%log_bin%”

5. create Database and table. Just for reference, we have created a database name as “cdc_db” and the table name as “employee”.

mysql> create database cdc_db;
Query OK, 1 row affected (0.01 sec)
mysql> use cdc_db;
Database changed
mysql> create table employee (Id int, Name varchar(30), Salary int);
Query OK, 0 rows affected (0.14 sec)

NiFi Configuration

  1. Start the NiFi server.
  2. Download MySql driver (mysql-connector-java-8.0.19.jar). Click here to download the jar. Store the jar on the NiFi host server.
  3. We have created NiFi Template for CDC. You can download the NiFi Template from here. This Template contains the flow of data from MySQL to a local directory. The destination can be changed as per the use cases.
  4. Import downloaded template into your NiFi server.
  5. Modify the configuration of CaptureChangeMySQL processor as per your environment. Replace MySql Hosts, Driver Location, the username( should be admin or root user)

6. Create Distributed Map cache Client as below with Nifi server IP or localhost as a Hostname. Enable it.

7. The Complete Flow will look like as below:

Processors Details

The above image shows the complete CDC Pipeline from the MySql source to the local directory as a destination. The destination can be anything such hdfs path, s3 path or any other database.

  1. Processor #1 (CaptureChangeMySQL) — This processor will continue to track the binary log of MySQL and identify the operation.
  2. Processor #2 (EvaluateJsonPath) — This processor will parse the incoming JSON flow and capture the database name, table name and timestamp (UNIX timestamp).
  3. Processor #3 (ExecuteScript) — This processor execute a custom script to classify data as per the operation and add a new column “op” for operation. Possible op values will be “I” for Insert, “U” for Update and “D” for Delete.
  4. Processor #4 (UpdateRecord) — This processor converts input Unix Timestamp to “YYYY-MM-DD HH:mm:ss” format and adds it as a new column (cdc_timestamp).
  5. Processor #5 (ConvertAvroToParquet)[Optional] — It will convert Avro Data into Parquet Data.
  6. Processor #6(UpdateAttribute)[Optional] — Declare the attribute with the parent path for putting the file in the next processor. Please change the attribute value as per the environment.
  7. Processor #7(PutFile)[Optional] — Put file into local directory at path <parentpath>/<databasename>/<tablename>.

Finally, It’s time to execute the Pipeline

  • Insert a new record into the table cdc_db.employee.
mysql> insert into employee value(1,"Ram", 5000);
  • Insert event will be replicate and parquet file will be put at destination path in near real-time.
  • To verify data, we have created one spark Dataframe over the Destination path to display data. Below is the complete process.
Insert operation replication (gif)
  • Let’s insert a few more below records and verify the same at Destination Source:
mysql> insert into employee values (2, "Shyam", 15000),
-> (3, "Nikhil", 25000),
-> (4, "John",20000);
Multiple Insert operation replication (gif)
  • Next, let's update the Name as “Ramesh” where id = 1.
mysql> update employee set Name = "Ramesh" where Id = 1;
Update operation replication (gif)

Now, the final operation is Delete. Let’s delete records where id’s value is more than 2.

mysql> delete from employee where id > 2;
Delete operation replication (gif)

We have tested all the row-level operations and can see that the NiFi pipeline is replicating all events to the destination. Since the destination is a file-based system, it will keep all records and we can identify the latest record by using the primary key (“id”) and CDC Timestamp column.

Note: Above blog contains multiple GIF Images that might take time to load (use the web if it takes time with the mobile app). So just for reference, the final table view at MySQL and data view at Destination will be as below after all the above operations:

At MySQL System

At Destination System

Conclusion

As we can see that Apache NiFi replicating all changes in near real-time and also tracking all operations. The limitation of Apache NiFi is, currently it only supports MySQL Database but for MySQL, it can work efficiently. For any other database, we have to use other paid CDC tools.

P.S. — Please comment below if there is any other open-source tool that supports CDC for all other databases.

Reference

--

--