VOOZH about

URL: https://dev.to/datanestdigital/medallion-architecture-in-databricks-a-complete-implementation-guide-2lb9

⇱ Medallion Architecture in Databricks: A Complete Implementation Guide - DEV Community


Every data team eventually hits the same wall: raw data scattered across landing zones, inconsistent transformations bolted on over time, and no clear lineage when something breaks at 2 AM. The Medallion Architecture exists to solve exactly this — and while the Bronze-Silver-Gold concept is simple, most guides skip the production details that actually matter.

This guide goes further. We'll build a complete, production-ready Medallion Architecture implementation in Databricks with PySpark — including schema enforcement, data quality gates, incremental processing, and metadata tracking.

What Is the Medallion Architecture?

The Medallion Architecture is a data design pattern that organizes your lakehouse into three logical layers:

  • Bronze (Raw) — Ingests data as-is from source systems. Append-only, schema-on-read. Your insurance policy.
  • Silver (Cleaned) — Deduplicates, validates, and conforms data. Schema-on-write with enforced types.
  • Gold (Business) — Aggregated, business-level datasets optimized for analytics, ML, and reporting.

Each layer serves a distinct purpose, and data flows progressively from raw to refined.

Why This Pattern Works

  1. Debuggability — When something breaks in Gold, you trace back through Silver to Bronze. Raw data is never lost.
  2. Reprocessing — Schema changes or business logic updates? Replay from Bronze without re-ingesting from source.
  3. Separation of concerns — Ingestion engineers own Bronze, data engineers own Silver, analytics engineers own Gold.
  4. Data quality — Each layer transition is an explicit checkpoint where you validate and enforce quality.

Setting Up the Foundation

Before writing pipeline code, you need a solid project structure. Here's what a production Databricks workspace looks like:

lakehouse/
├── bronze/
│ ├── ingest_orders.py
│ ├── ingest_customers.py
│ └── ingest_events.py
├── silver/
│ ├── clean_orders.py
│ ├── clean_customers.py
│ └── clean_events.py
├── gold/
│ ├── agg_daily_revenue.py
│ ├── agg_customer_lifetime.py
│ └── dim_product_catalog.py
├── lib/
│ ├── quality_checks.py
│ ├── schema_registry.py
│ └── metadata_tracker.py
├── tests/
│ ├── test_bronze_orders.py
│ └── test_silver_orders.py
└── config/
 ├── schemas.yaml
 └── pipeline_config.yaml

Catalog and Schema Setup with Unity Catalog

-- Create catalogs for each environment
CREATE CATALOG IF NOT EXISTS prod;
CREATE CATALOG IF NOT EXISTS staging;

-- Create schemas for each medallion layer
CREATE SCHEMA IF NOT EXISTS prod.bronze;
CREATE SCHEMA IF NOT EXISTS prod.silver;
CREATE SCHEMA IF NOT EXISTS prod.gold;

-- Set default permissions
GRANT USE CATALOG ON CATALOG prod TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.bronze TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod.silver TO `data-engineers`;
GRANT SELECT ON SCHEMA prod.gold TO `data-analysts`;

Bronze Layer: Raw Ingestion

The Bronze layer is your data landing zone. The rules are simple: ingest everything, lose nothing, track metadata.

Core Bronze Ingestion Pattern

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
 current_timestamp, input_file_name, lit, col
)
from pyspark.sql.types import StructType, StringType
from delta.tables import DeltaTable


class BronzeIngester:
 """Production Bronze layer ingestion with metadata tracking."""

 def __init__(self, spark: SparkSession, source_name: str):
 self.spark = spark
 self.source_name = source_name
 self.target_table = f"prod.bronze.{source_name}"

 def ingest_from_landing(
 self,
 landing_path: str,
 file_format: str = "json",
 schema: StructType | None = None,
 ) -> DataFrame:
 """Ingest raw files from landing zone into Bronze."""

 reader = self.spark.read.format(file_format)

 if schema:
 reader = reader.schema(schema)
 else:
 # For Bronze, infer schema — we'll enforce in Silver
 reader = reader.option("inferSchema", "true")

 if file_format == "json":
 reader = reader.option("multiLine", "true") \
 .option("mode", "PERMISSIVE") \
 .option("columnNameOfCorruptRecord", "_corrupt_record")

 raw_df = reader.load(landing_path)

 # Add Bronze metadata columns
 enriched_df = raw_df \
 .withColumn("_bronze_ingested_at", current_timestamp()) \
 .withColumn("_bronze_source_file", input_file_name()) \
 .withColumn("_bronze_source_system", lit(self.source_name)) \
 .withColumn("_bronze_batch_id", lit(self._generate_batch_id()))

 return enriched_df

 def write_bronze(self, df: DataFrame, mode: str = "append") -> int:
 """Write DataFrame to Bronze Delta table."""

 df.write \
 .format("delta") \
 .mode(mode) \
 .option("mergeSchema", "true") \
 .saveAsTable(self.target_table)

 row_count = df.count()
 self._log_ingestion(row_count)
 return row_count

 def ingest_incremental(
 self, landing_path: str, file_format: str = "json"
 ) -> int:
 """Auto-Loader based incremental ingestion."""

 stream_df = self.spark.readStream \
 .format("cloudFiles") \
 .option("cloudFiles.format", file_format) \
 .option("cloudFiles.inferColumnTypes", "true") \
 .option("cloudFiles.schemaLocation",
 f"/mnt/checkpoints/{self.source_name}/schema") \
 .load(landing_path)

 enriched_df = stream_df \
 .withColumn("_bronze_ingested_at", current_timestamp()) \
 .withColumn("_bronze_source_file", input_file_name()) \
 .withColumn("_bronze_source_system", lit(self.source_name))

 query = enriched_df.writeStream \
 .format("delta") \
 .outputMode("append") \
 .option("checkpointLocation",
 f"/mnt/checkpoints/{self.source_name}/checkpoint") \
 .option("mergeSchema", "true") \
 .trigger(availableNow=True) \
 .toTable(self.target_table)

 query.awaitTermination()
 return query.lastProgress["numInputRows"]

 def _generate_batch_id(self) -> str:
 from datetime import datetime
 return f"{self.source_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"

 def _log_ingestion(self, row_count: int):
 print(f"[Bronze] {self.source_name}: ingested {row_count} rows")

Running Bronze Ingestion

# In a Databricks notebook or job
ingester = BronzeIngester(spark, "orders")

# Batch ingestion
df = ingester.ingest_from_landing("/mnt/landing/orders/2026-03-10/")
count = ingester.write_bronze(df)
print(f"Ingested {count} order records to Bronze")

# Or use Auto Loader for incremental
count = ingester.ingest_incremental("/mnt/landing/orders/")

Silver Layer: Clean and Conform

Silver is where the real engineering happens. You deduplicate, enforce schemas, validate data quality, and create a "single source of truth."

Data Quality Framework

Before any record crosses into Silver, it passes through a quality gate. This framework lets you define rules declaratively and quarantine failures instead of silently dropping data.

from dataclasses import dataclass
from typing import Callable
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when, sum as spark_sum


@dataclass
class QualityRule:
 name: str
 check: Callable[[DataFrame], DataFrame]
 threshold: float # Minimum pass rate (0.0 to 1.0)
 is_critical: bool = False # Critical = halt pipeline on failure


class DataQualityGate:
 """Validates data between medallion layers."""

 def __init__(self, table_name: str, rules: list[QualityRule]):
 self.table_name = table_name
 self.rules = rules
 self.results: list[dict] = []

 def validate(self, df: DataFrame) -> tuple[bool, DataFrame]:
 """Run all quality checks. Returns (passed, quarantined_df)."""

 total_rows = df.count()
 quarantine_condition = None
 all_passed = True

 for rule in self.rules:
 # Apply the check — returns DF with boolean column
 checked_df = rule.check(df)
 pass_count = checked_df.filter(
 col("_quality_passed")
 ).count()
 pass_rate = pass_count / total_rows if total_rows > 0 else 1.0

 passed = pass_rate >= rule.threshold

 result = {
 "rule": rule.name,
 "pass_rate": round(pass_rate, 4),
 "threshold": rule.threshold,
 "passed": passed,
 "critical": rule.is_critical,
 "failed_rows": total_rows - pass_count,
 }
 self.results.append(result)

 if not passed:
 all_passed = False
 if rule.is_critical:
 raise DataQualityError(
 f"Critical quality check failed: {rule.name}"
 f"(pass_rate={pass_rate:.2%}, "
 f"threshold={rule.threshold:.2%})"
 )

 # Build quarantine filter
 fail_condition = ~col("_quality_passed")
 if quarantine_condition is None:
 quarantine_condition = fail_condition
 else:
 quarantine_condition = quarantine_condition | fail_condition

 df = checked_df.drop("_quality_passed")

 # Separate clean and quarantined records
 if quarantine_condition is not None:
 quarantined_df = df.filter(quarantine_condition)
 else:
 quarantined_df = df.limit(0)

 return all_passed, quarantined_df

 def print_report(self):
 print(f"\n{'='*60}")
 print(f"Data Quality Report: {self.table_name}")
 print(f"{'='*60}")
 for r in self.results:
 status = "PASS" if r["passed"] else "FAIL"
 print(
 f" [{status}] {r['rule']}: "
 f"{r['pass_rate']:.2%}"
 f"(threshold: {r['threshold']:.2%}, "
 f"failed: {r['failed_rows']})"
 )
 print(f"{'='*60}\n")


class DataQualityError(Exception):
 pass

Silver Transformation Pipeline

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
 col, trim, lower, to_timestamp, row_number, sha2, concat_ws
)
from pyspark.sql.window import Window


class SilverTransformer:
 """Production Silver layer transformation pipeline."""

 def __init__(self, spark: SparkSession, entity_name: str):
 self.spark = spark
 self.entity_name = entity_name
 self.source_table = f"prod.bronze.{entity_name}"
 self.target_table = f"prod.silver.{entity_name}"

 def read_bronze_incremental(
 self, last_processed_ts: str | None = None
 ) -> DataFrame:
 """Read new records from Bronze since last processing."""
 df = self.spark.table(self.source_table)

 if last_processed_ts:
 df = df.filter(
 col("_bronze_ingested_at") > last_processed_ts
 )

 return df.filter(col("_corrupt_record").isNull()) \
 .drop("_corrupt_record")

 def deduplicate(
 self, df: DataFrame, key_columns: list[str],
 order_column: str = "_bronze_ingested_at"
 ) -> DataFrame:
 """Deduplicate using key columns, keeping latest record."""
 window = Window.partitionBy(
 *key_columns
 ).orderBy(col(order_column).desc())

 return df.withColumn("_row_num", row_number().over(window)) \
 .filter(col("_row_num") == 1) \
 .drop("_row_num")

 def apply_schema(
 self, df: DataFrame, column_types: dict[str, str]
 ) -> DataFrame:
 """Cast columns to target types with error handling."""
 for col_name, target_type in column_types.items():
 if col_name in df.columns:
 df = df.withColumn(col_name, col(col_name).cast(target_type))
 return df

 def add_silver_metadata(self, df: DataFrame) -> DataFrame:
 """Add Silver-layer tracking columns."""
 from pyspark.sql.functions import current_timestamp, lit

 key_cols = [c for c in df.columns if not c.startswith("_")]
 return df \
 .withColumn("_silver_processed_at", current_timestamp()) \
 .withColumn(
 "_silver_hash",
 sha2(concat_ws("||", *[col(c) for c in key_cols]), 256)
 )

 def merge_into_silver(
 self, source_df: DataFrame, key_columns: list[str]
 ):
 """SCD Type 1 merge into Silver table."""
 if not self.spark.catalog.tableExists(self.target_table):
 source_df.write.format("delta").saveAsTable(self.target_table)
 return

 target = DeltaTable.forName(self.spark, self.target_table)
 merge_condition = " AND ".join(
 [f"target.{k} = source.{k}" for k in key_columns]
 )

 target.alias("target") \
 .merge(source_df.alias("source"), merge_condition) \
 .whenMatchedUpdateAll(
 condition="source._silver_hash != target._silver_hash"
 ) \
 .whenNotMatchedInsertAll() \
 .execute()

Putting Silver Together

# Define quality rules for orders
quality_rules = [
 QualityRule(
 name="order_id_not_null",
 check=lambda df: df.withColumn(
 "_quality_passed", col("order_id").isNotNull()
 ),
 threshold=1.0,
 is_critical=True,
 ),
 QualityRule(
 name="order_amount_positive",
 check=lambda df: df.withColumn(
 "_quality_passed", col("total_amount") > 0
 ),
 threshold=0.99,
 ),
 QualityRule(
 name="valid_order_date",
 check=lambda df: df.withColumn(
 "_quality_passed",
 col("order_date").between("2020-01-01", "2027-12-31")
 ),
 threshold=0.995,
 ),
]

# Run the Silver pipeline
transformer = SilverTransformer(spark, "orders")
bronze_df = transformer.read_bronze_incremental()
deduped_df = transformer.deduplicate(bronze_df, ["order_id"])

# Quality gate
gate = DataQualityGate("silver.orders", quality_rules)
passed, quarantined = gate.validate(deduped_df)
gate.print_report()

# Apply schema and write
typed_df = transformer.apply_schema(deduped_df, {
 "order_id": "long",
 "total_amount": "decimal(18,2)",
 "order_date": "date",
 "customer_id": "long",
})
enriched_df = transformer.add_silver_metadata(typed_df)
transformer.merge_into_silver(enriched_df, ["order_id"])

Gold Layer: Business-Ready Datasets

Gold tables are pre-aggregated datasets optimized for specific consumption patterns: dashboards, ML features, or API serving.

Gold Aggregation Example

from pyspark.sql.functions import (
 sum as spark_sum, count, avg, max as spark_max,
 min as spark_min, datediff, current_date
)


def build_customer_lifetime_value(spark: SparkSession) -> DataFrame:
 """Gold table: Customer Lifetime Value metrics."""

 orders = spark.table("prod.silver.orders")
 customers = spark.table("prod.silver.customers")

 clv = orders.groupBy("customer_id").agg(
 spark_sum("total_amount").alias("total_revenue"),
 count("order_id").alias("total_orders"),
 avg("total_amount").alias("avg_order_value"),
 spark_min("order_date").alias("first_order_date"),
 spark_max("order_date").alias("last_order_date"),
 ).withColumn(
 "customer_tenure_days",
 datediff(current_date(), col("first_order_date"))
 ).withColumn(
 "days_since_last_order",
 datediff(current_date(), col("last_order_date"))
 )

 # Join with customer details
 result = clv.join(
 customers.select("customer_id", "customer_name", "segment", "region"),
 on="customer_id",
 how="left"
 )

 # Write as Gold table with Z-ORDER for query optimization
 result.write \
 .format("delta") \
 .mode("overwrite") \
 .option("overwriteSchema", "true") \
 .saveAsTable("prod.gold.customer_lifetime_value")

 # Optimize for query performance
 spark.sql("""
 OPTIMIZE prod.gold.customer_lifetime_value
 ZORDER BY (segment, region)
 """)

 return result

Daily Revenue Gold Table

def build_daily_revenue(spark: SparkSession) -> DataFrame:
 """Gold table: Daily revenue aggregations."""

 orders = spark.table("prod.silver.orders")
 products = spark.table("prod.silver.products")

 daily = orders \
 .join(products, on="product_id", how="left") \
 .groupBy("order_date", "product_category", "region") \
 .agg(
 spark_sum("total_amount").alias("revenue"),
 count("order_id").alias("order_count"),
 avg("total_amount").alias("avg_order_value"),
 spark_sum("quantity").alias("units_sold"),
 )

 daily.write \
 .format("delta") \
 .mode("overwrite") \
 .partitionBy("order_date") \
 .saveAsTable("prod.gold.daily_revenue")

 return daily

Production Considerations

Table Maintenance Automation

Delta tables accumulate small files and old versions over time. Schedule this maintenance to run daily or after large batch loads.

def maintain_delta_tables(spark: SparkSession, catalog: str = "prod"):
 """Run maintenance on all Delta tables in the catalog."""

 schemas = ["bronze", "silver", "gold"]

 for schema in schemas:
 tables = spark.sql(
 f"SHOW TABLES IN {catalog}.{schema}"
 ).collect()

 for table in tables:
 table_name = f"{catalog}.{schema}.{table.tableName}"
 print(f"Maintaining {table_name}...")

 # Optimize small files
 spark.sql(f"OPTIMIZE {table_name}")

 # Vacuum old versions (retain 7 days)
 spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")

 # Analyze for query optimization
 spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")

Monitoring and Lineage

def get_table_health(spark: SparkSession, table_name: str) -> dict:
 """Get health metrics for a Delta table."""

 detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
 history = spark.sql(
 f"DESCRIBE HISTORY {table_name} LIMIT 10"
 ).collect()

 return {
 "table": table_name,
 "size_gb": round(detail.sizeInBytes / (1024**3), 2),
 "num_files": detail.numFiles,
 "last_modified": str(detail.lastModified),
 "recent_operations": [
 {
 "version": h.version,
 "operation": h.operation,
 "timestamp": str(h.timestamp),
 }
 for h in history
 ],
 }

Common Pitfalls and How to Avoid Them

1. Schema drift in Bronze — Use Auto Loader's mergeSchema option and track schema evolution. Don't let unknown columns silently appear in Silver.

2. Over-aggregating in Gold — Build Gold tables at the right granularity. Too aggregated = inflexible. Too granular = just another Silver table.

3. Skipping data quality — Every Bronze-to-Silver transition needs validation. Quarantine bad records instead of dropping them.

4. Not partitioning intelligently — Partition Bronze by ingestion date, Silver by business date, Gold by the most common filter column.

5. Ignoring table maintenance — OPTIMIZE and VACUUM should run on schedule. Small files kill query performance.

Summary

The Medallion Architecture is straightforward in concept but requires disciplined implementation:

Layer Purpose Write Pattern Quality
Bronze Raw landing Append-only Schema-on-read
Silver Cleaned, deduped Merge (SCD) Enforced schema + quality gates
Gold Business aggregations Overwrite/Merge Pre-validated from Silver

The code patterns in this article are extracted from real production implementations running at scale on Databricks. Take them, adapt them to your domain, and build with confidence.


If you found this useful and want production-ready pipeline templates you can drop straight into your Databricks workspace, check out DataStack Pro — it includes a Medallion Architecture accelerator, metadata-driven pipeline configs, and 20+ data engineering tools built from real enterprise implementations.


Related Articles