Databricks | Building an ETL Pipeline on Road Accident Data Using PySpark

Databricks | Building an ETL Pipeline on Road Accident Data Using PySpark

When I started learning data engineering, I always wanted to try a real-world dataset instead of just “toy” examples. So I picked up the India Road Accident Dataset from Kaggle and built a complete ETL pipeline using PySpark and Delta Lake.

Note: This project is a sample ETL pipeline I built for learning and practice. It’s not production-ready, but it’s a great way to understand how raw data becomes analytics-ready data step by step.

In this blog, I’ll walk you through how I designed the pipeline using the Medallion Architecture (Bronze → Silver → Gold). Don’t worry if the terms sound heavy, I’ll explain everything in plain English.

Bronze Layer – Bringing in Raw Data

The Bronze layer is like your raw storage box. Here we keep the data exactly as it comes, without cleaning or filtering.

  •  First, I defined a schema (column names + data types) for the CSV file. This helps avoid errors when Spark reads messy data.

  • I then renamed the columns to a simpler snake_case format (e.g. State Namestate_name).

  • Finally, I added two useful extra columns:

    • ingestion_timestamp → the time when the row was loaded.

    • file_source → the file path where the data came from.

 Think of the Bronze layer as a safe place where your raw data lands, ready for future processing.

Here ddl for bronze layer table:

spark.sql(f""" drop table if exists workspace.bronze.accidents_raw """)

spark.sql(f""" CREATE TABLE IF NOT EXISTS workspace.bronze.accidents_raw (
    state_name STRING,
    city_name STRING,
    year INT,
    month STRING,
    day_of_week STRING,
    time_of_day STRING,
    accident_severity STRING,
    number_of_vehicles_involved INT,
    vehicle_type_involved STRING,
    number_of_casualties INT,
    number_of_fatalities INT,
    weather_conditions STRING,
    road_type STRING,
    road_condition STRING,
    lighting_conditions STRING,
    traffic_control_presence STRING,
    speed_limit_kmph INT,
    driver_age INT,
    driver_gender STRING,
    driver_license_status STRING,
    alcohol_involvement STRING,
    accident_location_details STRING,
    ingestion_timestamp TIMESTAMP,
    file_source STRING
)
          """)

Bronze layer code:

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

accident_schema = StructType([
    StructField("State Name", StringType(), True),
    StructField("City Name", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Month", StringType(), True),
    StructField("Day of Week", StringType(), True),
    StructField("Time of Day", StringType(), True),   
    StructField("Accident Severity", StringType(), True),
    StructField("Number of Vehicles Involved", IntegerType(), True),
    StructField("Vehicle Type Involved", StringType(), True),
    StructField("Number of Casualties", IntegerType(), True),
    StructField("Number of Fatalities", IntegerType(), True),
    StructField("Weather Conditions", StringType(), True),
    StructField("Road Type", StringType(), True),
    StructField("Road Condition", StringType(), True),
    StructField("Lighting Conditions", StringType(), True),
    StructField("Traffic Control Presence", StringType(), True),
    StructField("Speed Limit (km/h)", IntegerType(), True),
    StructField("Driver Age", IntegerType(), True),
    StructField("Driver Gender", StringType(), True),
    StructField("Driver License Status", StringType(), True),
    StructField("Alcohol Involvement", StringType(), True),
    StructField("Accident Location Details", StringType(), True)
])
# /Volumes/<catalog_name>/<schema_name>/<volume_name>/<folder_name>
df_raw = (spark.read.format("csv").option("header","true").
          schema(accident_schema).
          load("/Volumes/workspace/bronze/landing/raw_data/road_accidents_data_india.csv"))

df_raw_2 = df_raw.withColumnsRenamed({"State Name": "state_name", "City Name": "city_name", "Year": "year", 
"Month": "month", "Day of Week": "day_of_week", "Time of Day": "time_of_day", "Accident Severity": "accident_severity", 
"Number of Vehicles Involved": "number_of_vehicles_involved", "Vehicle Type Involved": "vehicle_type_involved", 
"Number of Casualties": "number_of_casualties", "Number of Fatalities": "number_of_fatalities", "Weather Conditions": "weather_conditions", "Road Type": "road_type", "Road Condition": "road_condition",
 "Lighting Conditions": "lighting_conditions", "Traffic Control Presence": "traffic_control_presence",
 "Speed Limit (km/h)": "speed_limit_kmph", "Driver Age": "driver_age", "Driver Gender": "driver_gender", 
"Driver License Status": "driver_license_status", "Alcohol Involvement": "alcohol_involvement",
 "Accident Location Details": "accident_location_details"})

df_bronze = (df_raw_2
    .withColumn("ingestion_timestamp", F.current_timestamp())
    .withColumn("file_source", F.col("_metadata.file_path"))
)

df_bronze.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("workspace.bronze.accidents_raw")

Silver Layer – Cleaning and Organizing

The Silver layer is where the real cleaning magic happens.

  • I created a new column driver_age_group (Minor, Youth, Middle-aged, Senior, Old) by splitting drivers into age ranges.

  • Standardized some fields, like converting YES/NO for alcohol involvement into Y/N, and marking unknown license statuses properly.

Here ddl for silver layer tables:

spark.sql(f""" drop table if exists workspace.silver.accidents_cleaned """)

spark.sql(f""" CREATE TABLE IF NOT EXISTS workspace.silver.accidents_cleaned (
    accident_id BIGINT,
    state_name STRING,
    city_name STRING,
    year INT,
    month STRING,
    day_of_week STRING,
    time_of_day STRING,
    accident_severity STRING,
    number_of_vehicles_involved INT,
    vehicle_type_involved STRING,
    number_of_casualties INT,
    number_of_fatalities INT,
    weather_conditions STRING,
    road_type STRING,
    road_condition STRING,
    lighting_conditions STRING,
    traffic_control_presence STRING,
    speed_limit_kmph INT,
    driver_age INT,
    driver_age_group STRING,
    driver_gender STRING,
    driver_license_status STRING,
    alcohol_involvement STRING,
    accident_location_details STRING
)
""")

spark.sql(f""" drop table if exists workspace.silver.accidents_quarantine """)

spark.sql(f""" CREATE TABLE IF NOT EXISTS workspace.silver.accidents_quarantine (
    accident_id BIGINT,
    state_name STRING,
    city_name STRING,
    year INT,
    month STRING,
    day_of_week STRING,
    time_of_day STRING,
    accident_severity STRING,
    number_of_vehicles_involved INT,
    vehicle_type_involved STRING,
    number_of_casualties INT,
    number_of_fatalities INT,
    weather_conditions STRING,
    road_type STRING,
    road_condition STRING,
    lighting_conditions STRING,
    traffic_control_presence STRING,
    speed_limit_kmph INT,
    driver_age INT,
    driver_age_group STRING,
    driver_gender STRING,
    driver_license_status STRING,
    alcohol_involvement STRING,
    accident_location_details STRING
)
""")

Silver layer code:

silver_df = (df_bronze
    .withColumn("driver_age_group", F.when(F.col("driver_age") < 18, "Minor")
                .when((F.col("driver_age") >= 18) & (F.col("driver_age") <= 30), "Youth")
                .when((F.col("driver_age") >= 31) & (F.col("driver_age") <= 45), "Middle-aged")
                .when((F.col("driver_age") >= 46) & (F.col("driver_age") <= 60), "Senior")
                .otherwise("Old"))
    .withColumn("alcohol_involvement", F.when(F.upper(F.col("alcohol_involvement")) == "YES", F.lit("Y"))
                .when(F.upper(F.col("alcohol_involvement")) == "NO", F.lit("N"))
                .otherwise("unknown"))
    .withColumn("driver_license_status", F.when(F.lower(F.col("driver_license_status")).isin("valid","invalid","expired"), F.col("driver_license_status")).otherwise("unknown")))

Now comes the important part:

  • I separated valid data (clean and correct) from bad data (missing or invalid values).

  • Valid data went into a table called accidents_cleaned.

  • Bad data went into a quarantine table — so it’s not lost, but won’t affect analysis either.

I also created a unique accident_id for each record using Spark’s window function. This way, every row has its own stable identifier.

# Split valid vs bad records
valid_silver_df = silver_df.filter(
    (F.col("state_name").isNotNull()) &
    (F.col("city_name").isNotNull()) &
    (F.col("year").isNotNull()) &
    (F.col("year").between(2000, F.year(F.current_date()))) &
    ((F.col("driver_age") >= 18) & (F.col("driver_age") <= 60)) &
    (F.col("number_of_vehicles_involved") > 0) &
    (F.col("number_of_casualties") >= 0) &
    (F.col("number_of_fatalities") >= 0)
)

quarantine_df = silver_df.subtract(valid_silver_df)

# Get last used ID from Silver table
last_id = spark.table("silver.accidents_cleaned").agg({"accident_id": "max"}).collect()[0][0]
if last_id is None:
    last_id = 0

window = Window.orderBy(F.lit(1))

final_valid_silver_df = valid_silver_df.withColumn("accident_id",(F.row_number().over(window) + last_id).cast("bigint"))

# Write to Silver Clean (append)
(final_valid_silver_df.write 
    .format("delta") 
    .mode("append") 
    .option("mergeSchema", "true")
    .saveAsTable("silver.accidents_cleaned"))

# Write to Silver Quarantine (append)
(quarantine_df.write 
    .format("delta") 
    .mode("append") 
    .option("mergeSchema", "true")
    .saveAsTable("silver.accidents_quarantine"))

Gold Layer – Business-Friendly Insights

Gold is where the data finally becomes useful for analysis. From the cleaned Silver data, I built summary tables:

  • Accident Summary by State/City → total accidents, casualties, fatalities, vehicles involved.

  • Driver Risk Profile → which age groups, genders, or vehicle types see more accidents.

  • Accident Trends → breakdown of accidents by year, month, day of week.

  • Time Dimension Table → a reusable table of year, month, day, time-of-day.

These Gold tables are the ones you can connect to dashboards, reports, or queries.

Gold layer code:

gold_summary_df = (final_valid_silver_df.groupBy("state_name", "city_name", "year", "month")
    .agg(
        F.count("*").alias("total_accidents"),
        F.sum("number_of_casualties").alias("total_casualties"),
        F.sum("number_of_fatalities").alias("total_fatalities"),
        F.sum("number_of_vehicles_involved").alias("total_vehicles_involved")
    ))

(gold_summary_df.write 
    .format("delta") 
    .mode("overwrite") 
    .option("overwriteSchema", "true") 
    .saveAsTable("gold.accident_summary_by_state_city"))

gold_driver_risk_df = final_valid_silver_df.groupBy("driver_age", "driver_gender", "vehicle_type_involved") \
    .agg(
        F.count("*").alias("total_accidents"),
        F.sum("number_of_casualties").alias("total_casualties"),
        F.sum("number_of_fatalities").alias("total_fatalities")
    )

(gold_driver_risk_df.write 
    .format("delta") 
    .mode("overwrite") 
    .option("overwriteSchema", "true") 
    .saveAsTable("gold.driver_vehicle_risk_profile"))


gold_trends_df = final_valid_silver_df.groupBy("year", "month", "day_of_week").agg(
        F.count("*").alias("accidents"),
        F.sum("number_of_fatalities").alias("fatalities")
    )

(gold_trends_df.write 
    .format("delta") 
    .mode("append") 
    .partitionBy("year", "month") 
    .saveAsTable("gold.accident_trends_summary"))

dim_time_df = final_valid_silver_df.select("year", "month", "day_of_week", "time_of_day").distinct()

(dim_time_df.write 
    .format("delta") 
    .mode("append") 
    .saveAsTable("gold.dim_time"))

Key Takeaways

  1. Layered approach matters → Bronze keeps raw data, Silver ensures clean quality data, Gold gives business-ready summaries.

  2. Don’t throw away bad data → quarantine it, so you can check or fix later.

  3. Add metadata like timestamps and source files — it helps with debugging.

  4. Unique IDs are important for tracking each row reliably.

  5. Summaries & dimensions make it easy for analysts and managers to use your data.

Why This Project is Useful?

If you’re a student or job seeker, this project shows:

  • You can handle real-world messy data.

  • You understand the Bronze/Silver/Gold architecture.

  • You know how to use PySpark transformations and aggregations.

If you’re already in the industry, this project is a great portfolio piece and also a good refresher on data engineering best practices.

Final Thoughts

This ETL pipeline taught me how to think like a data engineer — ingesting raw data, cleaning it systematically, and producing insights people can actually use.

Reminder: This project is a sample for learning. In a real-world production system, you’d add more things like automation, monitoring, error handling, and performance optimizations.

But for students, job seekers, or anyone curious about PySpark and data engineering, this kind of project is a perfect practice exercise. And once you see your clean Gold tables, the learning feels worth it!

Leave a Reply

Your email address will not be published. Required fields are marked *

? Need further clarification or have any questions? Let's connect!

Connect 1:1 With Me: Schedule Call


If you have any doubts or would like to discuss anything related to this blog, feel free to reach out to me. I'm here to help! You can schedule a call by clicking on the above given link.
I'm looking forward to hearing from you and assisting you with any inquiries you may have. Your understanding and engagement are important to me!

This will close in 20 seconds