Concept of SCD-2
Slowly Changing Dimension (SCD)
SCD Type 1: Overwrites the existing value with the new value and does not retain history.
SCD Type 2: Adds a new row for the new value and maintains the existing row for historical(inactive) and reporting purposes.
SCD Type 3: Creates a new current value column in the existing record but retains the original column.
SCD Type 2 (SCD-2) is the most suitable way to maintain history, as it adds a row with updating valid_to date and an isactive flag for existing records. This type of data is prevalent in data warehousing, when for a dimension table history needs to be maintained along with date range. Some examples of such tables are price, VAT (Tax), or discount tables for any e-commerce data lake. Joining multiple SCD2 tables effectively is fundamental to maintaining a comprehensive and accurate view of historical data.
Use Case:
The first step of data modeling is to identify the business objective and the relation between data. To achieve business requirements sometimes we need to join multiple tables that can be fact or dimension tables.
However, in certain situations there are need to join SCD-2 dimension tables, to make one SCD-2 dimension table for example to build a star schema. That is a challenging task for data engineers to perform such join(many to many join restricted with dates) where primary keys are not unique due to all tables being SCD-2.
With the help of pySpark, we will understand how to achieve such a requirement with the example.
Example Business Requirement
Consider we have two SCD-2 tables item and discount with the following schema. We need to create a single consolidated item table to calculate the price after the discount.


Example Business Requirement
Steps to Join SCD2 tables:
Step 1: Load the SCD-2 tables as DataFrames or create a CTE in SQL with required columns only and correct datatypes.
Take care points: Make sure your date columns are in date datatype only if not cast them as the date.
item_df = spark.createDataFrame(item_table) \
.withColumn("valid_from", to_date(col("valid_from"),"yyyy-MM-dd")) \
.withColumn("valid_to", to_date(col("valid_to"),"yyyy-MM-dd"))
discount_df = spark.createDataFrame(discount_table) \
.withColumn("valid_from", to_date(col("valid_from"),"yyyy-MM-dd")) \
.withColumn("valid_to", to_date(col("valid_to"),"yyyy-MM-dd"))
Step 2: Find out available distinct dates for each join key. The date can be any valid_from or valid_to and create a single data frame with distinct_dates.
item_from_dates = item_df.select(col("valid_from").alias("date"), "item_code")
item_to_dates = item_df.select("valid_to", "item_code")
discount_from_dates = discount_df.select("valid_from", "item_code")
discount_to_dates = discount_df.select("valid_to", "item_code")
distinct_dates = item_from_dates \
.union(item_to_dates) \
.union(discount_from_dates) \
.union(discount_to_dates) \
.dropDuplicates() \
.orderBy("item_code", "date")

partition = Window.partitionBy("item_code").orderBy(asc_nulls_last("date"))
date_range = distinct_dates \
.select(col("date").alias("valid_from"), lead("date", 1).over(partition).alias("valid_to"), "item_code") \
.filter(col("valid_from").isNotNull()) \
date_range = date_range.withColumn("valid_to", coalesce(col("valid_to"), to_date(lit("9999-12-31"),"yyyy-MM-dd")))
item_df = item_df.withColumn("valid_to", coalesce(col("valid_to"), to_date(lit("9999-12-31"),"yyyy-MM-dd")))
discount_df = discount_df.withColumn("valid_to", coalesce(col("valid_to"), to_date(lit("9999-12-31"),"yyyy-MM-dd")))
Take care points:
- Make sure you put null valid_to at the end while sorting on the join key.
- Give any max date to null valid_to for all data frames for further calculation.

Benefit: This will make a single consolidated table that we can use as a driving table to join other SCD2 tables. The join which was many to many will now become many to one.
Step 4: Once we find the possible distinct date range we can consider that as the primary/driving table and add the required fields from the source (item and discount) tables.
Take care points:
- The join should be on the join key and a date range.
- For the date range consider item/discount table record should fall within any one of the above calculated date range rows.
compiled_data = date_range.join(item_df,
(item_df["item_code"] == date_range["item_code"]) &
(item_df["valid_from"] <= date_range["valid_to"]) &
(item_df["valid_to"] > date_range["valid_from"]),
"LEFT") \
.join(discount_df,
(discount_df["item_code"] == date_range["item_code"])
& (discount_df["valid_from"] <= date_range["valid_to"])
& (discount_df["valid_to"] > date_range["valid_from"]),
"LEFT") \
.select(date_range["valid_from"], date_range["valid_to"], date_range["item_code"], "discount_id", "item_name", "unit_price($)", "discount_percentage") \
.withColumn("is_active", when(col("valid_to") > datetime.now(),1).otherwise(0))
Benefit: After joining source tables with consolidated date range we will have our desired result and we can do any calculation that needs to be done after joining both SCD2 tables.

Step 5: Implement business requirements over the consolidated SCD2 table. In our example, it’s calculating price after discount.
def calculate_price_after_discount(price, discount_percentage):
return DoubleType((price - ((discount_percentage/100.0)*price)) if discount_percentage is not None else price)
calculate_price = udf(calculate_price_after_discount, DoubleType())
compiled_data = compiled_data.withColumn("price_after_discount($)", calculate_price(col("unit_price($)").cast(DecimalType()), col("discount_percentage").cast(DecimalType())))
you can find the working pySpark code for the above example on SCD2_Join_pyspark