Incremental Data Load
Incremental data load refers to the process of integrating new or updated data into an existing dataset or database without the need to reload all the data from the beginning. This method is commonly employed in combination with techniques like change data capture (CDC), which helps identify and extract only the modified data.
Example 1 - Incremental Data Load with Pyspark using Timestamp comparisons
We have 2 files named existing_data.csv and new_data.csv
Load Existing Data
existing_data_df = spark.read \ .format("csv") \ .option("header", "true") .load("dbfs:/test/raw/existing_data.csv")
Load New Data
new_data_df = spark.read\ .format("csv")\ .option("header", "True") .load("dbfs:/test/raw/new_data.csv")
Identify new records based on latest timetamp in existing data
max_date_timestamp = existing_data_df.selectExpr("max(timestamp)")\ .collect()[][] new_records_df = new_data_df.filter(new_data_df.timestamp > max_date_timestamp)
Append new Records to existing data
updated_data_df = existing_data_df.union(new_records_df) updated_data_df.show()
Example 2 - Incremental Data Pipeline with Delta Lake with Pyspark using MERGE operation
Initialize spark session
from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("Delta lake for incremental")\ .getOrCreate();
Load the Existing Delta table
existingData=spark.read\ .format("delta").table("existing_delta_table");
Load New Data
newData=spark.read\ .format("parquet").load("path_to_new_data");
Identify Changes
changes = new_data.subtract(existing_data)
Apply merge or union operations
#USING UNION combined_data=existingData.union(changes); combined_data.write.format("delta")\ .mode("overwrite").save("path_to_existing_delta_table") #USING MERGE from delta.tabels import DeltaTable delta_table=DeltaTable.forPath(spark, "path_to_existing_delta_table"); delta_table.alias("old_data")\ .merge(source = changes.alias("new_data"), condition("old_data.primaryKey=new_data.primaryKey")\ .whenMatchedUpdateAll()\ .whenNotMatchedInsertAll()\ .execute();
Optimize Delta Table - To improve performance and reduce data storage(z-ordering, optimizing layout, vacumming old files)
delta_table.vacuum(); delta_table.optimize();
Close Spark session
spark.stop();
We discussed about 2 ways to implement an incremental load of data.
We can adjust the code to our specific use case, including handling different data formats, database connections, and additional data validation and transformations as needed.
Thanks and Happy Learning !