Incremental Data Load

Incremental data load refers to the process of integrating new or updated data into an existing dataset or database without the need to reload all the data from the beginning. This method is commonly employed in combination with techniques like change data capture (CDC), which helps identify and extract only the modified data.

Example 1 - Incremental Data Load with Pyspark using Timestamp comparisons

We have 2 files named existing_data.csv and new_data.csv

  1. Load Existing Data

     existing_data_df = spark.read \
                         .format("csv") \
                         .option("header", "true")
                         .load("dbfs:/test/raw/existing_data.csv")
    
  2. Load New Data

     new_data_df = spark.read\
     .format("csv")\
     .option("header", "True")
     .load("dbfs:/test/raw/new_data.csv")
    
  3. Identify new records based on latest timetamp in existing data

     max_date_timestamp = existing_data_df.selectExpr("max(timestamp)")\
     .collect()[][]
    
     new_records_df = new_data_df.filter(new_data_df.timestamp > max_date_timestamp)
    
  4. Append new Records to existing data

     updated_data_df = existing_data_df.union(new_records_df)
     updated_data_df.show()
    

Example 2 - Incremental Data Pipeline with Delta Lake with Pyspark using MERGE operation

  1. Initialize spark session

     from pyspark.sql import SparkSession
     spark = SparkSession\
     .builder\
     .appName("Delta lake for incremental")\
     .getOrCreate();
    
  2. Load the Existing Delta table

     existingData=spark.read\
     .format("delta").table("existing_delta_table");
    
  3. Load New Data

     newData=spark.read\
     .format("parquet").load("path_to_new_data");
    
  4. Identify Changes

     changes = new_data.subtract(existing_data)
    
  5. Apply merge or union operations

     #USING UNION
     combined_data=existingData.union(changes);
     combined_data.write.format("delta")\
     .mode("overwrite").save("path_to_existing_delta_table")
    
     #USING MERGE
     from delta.tabels import DeltaTable
     delta_table=DeltaTable.forPath(spark, "path_to_existing_delta_table");
    
     delta_table.alias("old_data")\
     .merge(source = changes.alias("new_data"), 
     condition("old_data.primaryKey=new_data.primaryKey")\
     .whenMatchedUpdateAll()\
     .whenNotMatchedInsertAll()\
     .execute();
    
  6. Optimize Delta Table - To improve performance and reduce data storage(z-ordering, optimizing layout, vacumming old files)

     delta_table.vacuum();
     delta_table.optimize();
    
  7. Close Spark session

     spark.stop();
    

We discussed about 2 ways to implement an incremental load of data.

We can adjust the code to our specific use case, including handling different data formats, database connections, and additional data validation and transformations as needed.

Thanks and Happy Learning !