VOOZH about

URL: https://dev.to/datanestdigital/databricks-analytics-engineering-module-4-data-transformation-patterns-kp9

⇱ Databricks Analytics Engineering: Module 4: Data Transformation Patterns - DEV Community


Module 4: Data Transformation Patterns

Learning Objectives

By the end of this module, you will be able to:

  1. Apply common transformation patterns: deduplication, pivot, unpivot, gap-fill, and sessionization
  2. Write complex joins including semi-joins, anti-joins, and inequality joins
  3. Parse and transform semi-structured data (JSON, XML) with Databricks SQL
  4. Use array and map operations for nested data transformations
  5. Implement higher-order functions and SQL UDFs for custom logic

4.1 Deduplication Patterns

Duplicate records are one of the most common data quality issues. The approach depends on whether duplicates are exact or fuzzy.

Exact Deduplication with ROW_NUMBER

-- Keep the latest record per customer based on updated_at
WITH ranked AS (
 SELECT
 *,
 ROW_NUMBER() OVER (
 PARTITION BY customer_id
 ORDER BY updated_at DESC, _ingested_at DESC
 ) AS row_num
 FROM silver.stg_customers
)
SELECT * EXCEPT(row_num)
FROM ranked
WHERE row_num = 1;

Deduplication with QUALIFY (Databricks SQL)

-- Cleaner syntax using QUALIFY (supported in Databricks SQL)
SELECT *
FROM silver.stg_customers
QUALIFY ROW_NUMBER() OVER (
 PARTITION BY customer_id
 ORDER BY updated_at DESC
) = 1;

Hash-Based Change Detection

-- Use MD5 hash to detect actual changes vs duplicate ingestion
WITH hashed AS (
 SELECT
 *,
 MD5(CONCAT_WS('|',
 COALESCE(customer_name, ''),
 COALESCE(email, ''),
 COALESCE(phone, ''),
 COALESCE(segment, '')
 )) AS _row_hash
 FROM silver.stg_customers
),

deduped AS (
 SELECT *
 FROM hashed
 QUALIFY ROW_NUMBER() OVER (
 PARTITION BY customer_id, _row_hash
 ORDER BY _ingested_at DESC
 ) = 1
)

SELECT * FROM deduped;

Deduplication as a dbt Macro

-- macros/deduplicate.sql
{% macro deduplicate(relation, partition_by, order_by='_ingested_at DESC') %}
SELECT *
FROM {{ relation }}
QUALIFY ROW_NUMBER() OVER (
 PARTITION BY {{ partition_by }}
 ORDER BY {{ order_by }}
) = 1
{% endmacro %}

-- Usage in a model:
-- {{ deduplicate(ref('stg_shopify__orders'), 'order_id') }}

4.2 Pivot and Unpivot

PIVOT: Rows to Columns

Transform row-level category values into separate columns.

-- Monthly revenue by product category, pivoted into columns
SELECT *
FROM (
 SELECT
 DATE_FORMAT(order_date, 'yyyy-MM') AS month,
 product_category,
 net_revenue
 FROM gold.fact_sales f
 JOIN gold.dim_product p ON f.product_key = p.product_key
)
PIVOT (
 SUM(net_revenue)
 FOR product_category IN (
 'Electronics' AS electronics,
 'Clothing' AS clothing,
 'Home & Garden' AS home_garden,
 'Sports' AS sports
 )
)
ORDER BY month;

UNPIVOT: Columns to Rows

Normalize wide tables into a long format suitable for analysis.

-- Unpivot monthly metric columns into rows
SELECT *
FROM (
 SELECT
 product_id,
 jan_revenue,
 feb_revenue,
 mar_revenue
 FROM reports.quarterly_product_summary
)
UNPIVOT (
 revenue FOR month IN (
 jan_revenue AS 'January',
 feb_revenue AS 'February',
 mar_revenue AS 'March'
 )
)
ORDER BY product_id, month;

Dynamic Pivot with dbt

-- macros/pivot_values.sql
{% macro get_column_values(table, column) %}
 {% set query %}
 SELECT DISTINCT {{ column }}
 FROM {{ table }}
 ORDER BY 1
 {% endset %}
 {% set results = run_query(query) %}
 {% if execute %}
 {{ return(results.columns[0].values()) }}
 {% else %}
 {{ return([]) }}
 {% endif %}
{% endmacro %}

-- Usage in a model:
-- {% set categories = get_column_values(ref('dim_product'), 'category_level_1') %}

4.3 Gap-Fill (Missing Data Interpolation)

Gap-filling creates rows for missing time periods, critical for accurate time series analysis and dashboards.

Date Spine Gap-Fill

-- Fill missing dates in daily revenue with zero values
WITH date_spine AS (
 SELECT full_date AS date_day
 FROM gold.dim_date
 WHERE full_date BETWEEN '2026-01-01' AND '2026-03-31'
),

daily_revenue AS (
 SELECT
 order_date AS date_day,
 SUM(net_revenue) AS revenue
 FROM gold.fact_sales f
 JOIN gold.dim_date d ON f.date_key = d.date_key
 GROUP BY order_date
),

gap_filled AS (
 SELECT
 ds.date_day,
 COALESCE(dr.revenue, 0) AS revenue,
 CASE WHEN dr.revenue IS NULL THEN true ELSE false END AS is_imputed
 FROM date_spine ds
 LEFT JOIN daily_revenue dr ON ds.date_day = dr.date_day
)

SELECT * FROM gap_filled
ORDER BY date_day;

Gap-Fill with Forward Fill (Last Known Value)

-- Forward-fill missing inventory levels
WITH date_spine AS (
 SELECT full_date AS date_day
 FROM gold.dim_date
 WHERE full_date BETWEEN '2026-01-01' AND '2026-03-31'
),

sparse_data AS (
 SELECT
 snapshot_date,
 product_id,
 quantity_on_hand
 FROM silver.inventory_snapshots
),

cross_joined AS (
 SELECT
 ds.date_day,
 p.product_id
 FROM date_spine ds
 CROSS JOIN (SELECT DISTINCT product_id FROM sparse_data) p
),

joined AS (
 SELECT
 cj.date_day,
 cj.product_id,
 sd.quantity_on_hand
 FROM cross_joined cj
 LEFT JOIN sparse_data sd
 ON cj.date_day = sd.snapshot_date
 AND cj.product_id = sd.product_id
),

forward_filled AS (
 SELECT
 date_day,
 product_id,
 -- Forward-fill: use LAST_VALUE with IGNORE NULLS
 LAST_VALUE(quantity_on_hand, true) OVER (
 PARTITION BY product_id
 ORDER BY date_day
 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 ) AS quantity_on_hand
 FROM joined
)

SELECT * FROM forward_filled
ORDER BY product_id, date_day;

4.4 Sessionization

Sessionization groups a stream of events into logical sessions based on time gaps.

-- Web analytics sessionization with a 30-minute inactivity threshold
WITH events AS (
 SELECT
 user_id,
 event_timestamp,
 page_url,
 event_type,
 LAG(event_timestamp) OVER (
 PARTITION BY user_id
 ORDER BY event_timestamp
 ) AS prev_event_timestamp
 FROM silver.stg_web_events
),

session_boundaries AS (
 SELECT
 *,
 CASE
 WHEN prev_event_timestamp IS NULL THEN 1
 WHEN TIMESTAMPDIFF(MINUTE, prev_event_timestamp, event_timestamp) > 30
 THEN 1
 ELSE 0
 END AS is_new_session
 FROM events
),

session_numbered AS (
 SELECT
 *,
 SUM(is_new_session) OVER (
 PARTITION BY user_id
 ORDER BY event_timestamp
 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 ) AS session_number,
 CONCAT(
 user_id, '-',
 SUM(is_new_session) OVER (
 PARTITION BY user_id
 ORDER BY event_timestamp
 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 )
 ) AS session_id
 FROM session_boundaries
)

SELECT
 session_id,
 user_id,
 session_number,
 MIN(event_timestamp) AS session_start,
 MAX(event_timestamp) AS session_end,
 TIMESTAMPDIFF(
 SECOND,
 MIN(event_timestamp),
 MAX(event_timestamp)
 ) AS session_duration_seconds,
 COUNT(*) AS event_count,
 FIRST_VALUE(page_url) OVER (
 PARTITION BY session_id
 ORDER BY event_timestamp
 ) AS landing_page,
 LAST_VALUE(page_url) OVER (
 PARTITION BY session_id
 ORDER BY event_timestamp
 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
 ) AS exit_page
FROM session_numbered
GROUP BY session_id, user_id, session_number, page_url, event_timestamp;

Simplified Sessionization as dbt Model

-- models/intermediate/int_web_sessions.sql
{{
 config(
 materialized='incremental',
 unique_key='session_id',
 incremental_strategy='merge'
 )
}}

WITH events_with_gap AS (
 SELECT
 user_id,
 event_timestamp,
 page_url,
 event_type,
 CASE
 WHEN TIMESTAMPDIFF(
 MINUTE,
 LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp),
 event_timestamp
 ) > 30
 OR LAG(event_timestamp) OVER (PARTITION BY user_id ORDER BY event_timestamp) IS NULL
 THEN 1
 ELSE 0
 END AS is_session_start
 FROM {{ ref('stg_web_events') }}
 {% if is_incremental() %}
 WHERE event_timestamp > (SELECT MAX(session_end) FROM {{ this }})
 {% endif %}
),

with_session_id AS (
 SELECT
 *,
 CONCAT(user_id, '-', SUM(is_session_start) OVER (
 PARTITION BY user_id ORDER BY event_timestamp
 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 )) AS session_id
 FROM events_with_gap
)

SELECT
 session_id,
 user_id,
 MIN(event_timestamp) AS session_start,
 MAX(event_timestamp) AS session_end,
 COUNT(*) AS event_count,
 COUNT(DISTINCT page_url) AS distinct_pages
FROM with_session_id
GROUP BY session_id, user_id

4.5 Complex Joins

Semi-Join (EXISTS)

Return rows from one table that have matches in another, without duplicating.

-- Customers who have placed at least one order
SELECT c.*
FROM gold.dim_customer c
WHERE EXISTS (
 SELECT 1
 FROM gold.fact_sales f
 WHERE f.customer_key = c.customer_key
);

Anti-Join (NOT EXISTS)

Return rows from one table that have no matches in another.

-- Products that have never been sold
SELECT p.*
FROM gold.dim_product p
WHERE NOT EXISTS (
 SELECT 1
 FROM gold.fact_sales f
 WHERE f.product_key = p.product_key
);

-- Alternative using LEFT JOIN / IS NULL (same execution plan)
SELECT p.*
FROM gold.dim_product p
LEFT JOIN gold.fact_sales f ON p.product_key = f.product_key
WHERE f.product_key IS NULL;

Inequality Join (Range Join)

-- Map events to time-based pricing tiers
SELECT
 e.event_id,
 e.event_date,
 e.product_id,
 p.price_tier,
 p.unit_price
FROM silver.events e
JOIN silver.pricing_history p
 ON e.product_id = p.product_id
 AND e.event_date >= p.effective_from
 AND e.event_date < p.effective_to;

LATERAL Join (Table-Valued Functions)

-- Get the top 3 products per customer using LATERAL
SELECT
 c.customer_id,
 c.customer_name,
 top_products.*
FROM gold.dim_customer c,
LATERAL (
 SELECT
 p.product_name,
 SUM(f.net_revenue) AS total_spend
 FROM gold.fact_sales f
 JOIN gold.dim_product p ON f.product_key = p.product_key
 WHERE f.customer_key = c.customer_key
 GROUP BY p.product_name
 ORDER BY total_spend DESC
 LIMIT 3
) AS top_products
WHERE c.is_current = true;

4.6 Semi-Structured Data: JSON

Databricks SQL has rich support for JSON data stored in STRING or VARIANT columns.

Extracting JSON Fields

-- Parse JSON payload from raw events
SELECT
 event_id,
 event_timestamp,

 -- Dot notation on STRING column (returns STRING)
 raw_payload:user_id::STRING AS user_id,
 raw_payload:event_type::STRING AS event_type,
 raw_payload:page.url::STRING AS page_url,
 raw_payload:page.referrer::STRING AS referrer,

 -- Nested access
 raw_payload:device.type::STRING AS device_type,
 raw_payload:device.os::STRING AS device_os,
 raw_payload:device.browser::STRING AS browser,

 -- Numeric extraction with casting
 raw_payload:metrics.load_time_ms::INT AS load_time_ms,
 raw_payload:metrics.scroll_depth::DOUBLE AS scroll_depth

FROM bronze.raw_web_events;

JSON with get_json_object and json_tuple

-- Alternative extraction functions
SELECT
 event_id,
 GET_JSON_OBJECT(raw_payload, '$.user_id') AS user_id,
 GET_JSON_OBJECT(raw_payload, '$.page.url') AS page_url,

 -- Extract multiple fields at once
 jt.*
FROM bronze.raw_web_events
LATERAL VIEW JSON_TUPLE(
 raw_payload,
 'user_id', 'event_type', 'timestamp'
) jt AS user_id, event_type, event_ts;

Flattening JSON Arrays

-- Explode order line items from a JSON array
SELECT
 order_id,
 order_date,
 item.product_id,
 item.quantity,
 item.unit_price
FROM bronze.raw_orders,
LATERAL VIEW EXPLODE(
 FROM_JSON(
 line_items,
 'ARRAY<STRUCT<product_id: STRING, quantity: INT, unit_price: DOUBLE>>'
 )
) AS item;

-- Alternative with INLINE for struct arrays
SELECT
 order_id,
 order_date,
 inline_item.*
FROM (
 SELECT
 order_id,
 order_date,
 FROM_JSON(
 line_items,
 'ARRAY<STRUCT<product_id: STRING, quantity: INT, unit_price: DOUBLE>>'
 ) AS parsed_items
 FROM bronze.raw_orders
),
LATERAL VIEW INLINE(parsed_items) AS inline_item;

4.7 Semi-Structured Data: XML

-- Parse XML using xpath functions
SELECT
 xpath_string(xml_payload, '/order/order_id') AS order_id,
 xpath_string(xml_payload, '/order/customer/name') AS customer_name,
 xpath_double(xml_payload, '/order/total') AS order_total,

 -- Extract multiple line items
 xpath(xml_payload, '/order/items/item/product_id/text()') AS product_ids,
 xpath(xml_payload, '/order/items/item/quantity/text()') AS quantities
FROM bronze.raw_xml_orders;

4.8 Array and Map Operations

Array Functions

-- Common array operations in Databricks SQL
SELECT
 customer_id,

 -- Create arrays
 ARRAY('Electronics', 'Clothing', 'Home') AS categories,

 -- Array aggregation
 COLLECT_LIST(product_category) AS purchased_categories,
 COLLECT_SET(product_category) AS unique_categories,

 -- Array functions
 SIZE(COLLECT_SET(product_category)) AS category_count,
 ARRAY_CONTAINS(COLLECT_SET(product_category), 'Electronics') AS bought_electronics,
 ARRAY_DISTINCT(COLLECT_LIST(product_category)) AS distinct_cats,
 ARRAY_SORT(COLLECT_SET(product_category)) AS sorted_cats,

 -- Array to string
 ARRAY_JOIN(ARRAY_SORT(COLLECT_SET(product_category)), ', ') AS categories_csv

FROM gold.fact_sales f
JOIN gold.dim_product p ON f.product_key = p.product_key
GROUP BY customer_id;

Map Functions

-- Build and query maps
SELECT
 customer_id,

 -- Create map from key-value pairs
 MAP_FROM_ENTRIES(
 COLLECT_LIST(
 STRUCT(product_category, total_revenue)
 )
 ) AS category_revenue_map,

 -- Access map values
 MAP_FROM_ENTRIES(
 COLLECT_LIST(STRUCT(product_category, total_revenue))
 )['Electronics'] AS electronics_revenue,

 -- Map keys and values
 MAP_KEYS(
 MAP_FROM_ENTRIES(COLLECT_LIST(STRUCT(product_category, total_revenue)))
 ) AS categories,
 MAP_VALUES(
 MAP_FROM_ENTRIES(COLLECT_LIST(STRUCT(product_category, total_revenue)))
 ) AS revenues

FROM (
 SELECT
 customer_id,
 product_category,
 SUM(net_revenue) AS total_revenue
 FROM gold.fact_sales f
 JOIN gold.dim_product p ON f.product_key = p.product_key
 GROUP BY customer_id, product_category
);

4.9 Higher-Order Functions

Higher-order functions operate on arrays with lambda expressions. They avoid the need to EXPLODE/COLLECT.

TRANSFORM

-- Apply a function to each element of an array
SELECT
 customer_id,
 tags,
 TRANSFORM(tags, tag -> UPPER(tag)) AS tags_upper,
 TRANSFORM(tags, tag -> CONCAT('category:', tag)) AS tags_prefixed,
 TRANSFORM(
 monthly_revenues,
 rev -> ROUND(rev * 1.1, 2)
 ) AS revenues_with_10pct_uplift
FROM silver.customer_profiles;

FILTER

-- Filter array elements matching a condition
SELECT
 order_id,
 line_items,
 FILTER(line_items, item -> item.quantity > 5) AS bulk_items,
 FILTER(line_items, item -> item.unit_price > 100.00) AS premium_items,
 SIZE(FILTER(line_items, item -> item.quantity > 5)) AS bulk_item_count
FROM silver.parsed_orders;

AGGREGATE (REDUCE)

-- Reduce an array to a single value
SELECT
 order_id,
 line_items,
 AGGREGATE(
 line_items,
 CAST(0.0 AS DOUBLE),
 (acc, item) -> acc + (item.quantity * item.unit_price)
 ) AS computed_total,
 AGGREGATE(
 line_items,
 CAST(0 AS INT),
 (acc, item) -> acc + item.quantity
 ) AS total_quantity
FROM silver.parsed_orders;

EXISTS (Array Predicate)

-- Check if any element matches a condition
SELECT
 customer_id,
 purchase_categories,
 EXISTS(purchase_categories, cat -> cat = 'Electronics') AS is_tech_buyer,
 EXISTS(purchase_categories, cat -> cat LIKE '%Premium%') AS is_premium_buyer
FROM gold.customer_summary;

4.10 SQL UDFs

When built-in functions are insufficient, SQL UDFs provide reusable custom logic without the Python serialization overhead.

Scalar UDF

-- Calculate customer lifetime value tier
CREATE OR REPLACE FUNCTION gold.classify_clv(
 total_revenue DOUBLE,
 total_orders INT,
 tenure_months INT
)
RETURNS STRING
RETURN CASE
 WHEN total_revenue > 10000 AND total_orders > 50 THEN 'Platinum'
 WHEN total_revenue > 5000 AND total_orders > 20 THEN 'Gold'
 WHEN total_revenue > 1000 AND total_orders > 5 THEN 'Silver'
 WHEN total_revenue > 0 THEN 'Bronze'
 ELSE 'Prospect'
END;

-- Usage
SELECT
 customer_id,
 customer_name,
 gold.classify_clv(lifetime_revenue, total_orders, tenure_months) AS clv_tier
FROM gold.customer_summary;

Table-Valued UDF

... [content trimmed for length — full version in the complete kit]


This is 1 of 5 resources in the Datanest Academy toolkit. Get the complete [Databricks Analytics Engineering] with all files, templates, and documentation for $129.

Get the Full Kit →

Or grab the entire Datanest Academy bundle (5 products) for $XXX — save XX%.

Get the Complete Bundle →


Related Articles