Skip to main content
This workflow composes dimension rehydration with raw event ingestion to produce enriched facts suitable for analytics and real-time features, such as adding user or product attributes to clickstream events.

Scope

  • Use cases
    • Join raw events with user/product dimensions.
    • Create enriched fact tables for BI and ML features.
  • Non-use cases
    • Building SCD history (see S001, S008).
    • Heavy aggregations (see S005).

Common steps

1

Build context

  • Identify event streams to enrich (e.g., ProductClicked, CheckoutStarted).
  • Confirm dimension availability and freshness (e.g., dim_user_current, dim_product_current).
2

Implement (Flink SQL by default)

  • Define source event tables from Kafka.
  • Rehydrate dimension tables or use current snapshot views.
  • Join event streams with dimensions and write to Iceberg/Paimon.

Implementation notes

  1. Use temporal joins for point-in-time correctness when dimensions change frequently.
  2. If dimensions change rarely, a simple left join on current snapshots is often sufficient and simpler.
  3. Consider projecting only needed columns to keep costs low.

RESINK.AI recommendations

  • Start with a minimal set of dimension fields. Expand as downstream consumers request more. - Use partitioning that matches query patterns (e.g., days by event_time).

Example

-- Create S004-specific Iceberg catalog for idempotent execution
DROP CATALOG IF EXISTS hadoop_iceberg_S004;

CREATE CATALOG IF NOT EXISTS hadoop_iceberg_S004 WITH (
  'type' = 'iceberg',
  'catalog-type' = 'hadoop',
  'warehouse' = 'file:///tmp/iceberg/flink_table_S004'
);

USE CATALOG hadoop_iceberg_S004;

-- Event source using available Kafka topic
CREATE TABLE IF NOT EXISTS product_events (
    user_id     STRING,
    product_id  STRING,
    event_time  TIMESTAMP(3),
    event_type  STRING,
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'product-events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'flink-s004-group',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);

-- Dimension tables (assume created from S003 or S001)
CREATE TABLE IF NOT EXISTS hadoop_iceberg_S004.`default`.dim_user (
  user_id STRING,
  email STRING,
  valid_from TIMESTAMP(3),
  valid_to   TIMESTAMP(3),
  PRIMARY KEY (user_id, valid_from) NOT ENFORCED
) WITH ('write.upsert.enabled'='true');

CREATE TABLE IF NOT EXISTS hadoop_iceberg_S004.`default`.dim_product (
  product_id STRING,
  name STRING,
  price DECIMAL(18,2),
  valid_from TIMESTAMP(3),
  valid_to   TIMESTAMP(3),
  PRIMARY KEY (product_id, valid_from) NOT ENFORCED
);

-- Enriched product event facts
CREATE TABLE IF NOT EXISTS hadoop_iceberg_S004.`default`.fact_product_event_enriched (
  user_id STRING,
  product_id STRING,
  user_email STRING,
  product_name STRING,
  product_price DECIMAL(18,2),
  event_time TIMESTAMP(3),
  event_type STRING,
  PRIMARY KEY (user_id, product_id, event_time) NOT ENFORCED
) WITH (
  'write.upsert.enabled'='true'
);

-- Enrichment query using current snapshots (active dimension records)
INSERT INTO hadoop_iceberg_S004.`default`.fact_product_event_enriched
SELECT
  e.user_id,
  e.product_id,
  u.email AS user_email,
  p.name AS product_name,
  p.price AS product_price,
  e.event_time,
  e.event_type
FROM product_events e
LEFT JOIN (
  SELECT user_id, email
  FROM hadoop_iceberg_S004.`default`.dim_user
  WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59'
) u ON e.user_id = u.user_id
LEFT JOIN (
  SELECT product_id, name, price
  FROM hadoop_iceberg_S004.`default`.dim_product
  WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59'
) p ON e.product_id = p.product_id;

Variations

  • Enrich checkout funnel events
CREATE TABLE IF NOT EXISTS checkout_events (
  user_id STRING,
  cart_value DECIMAL(18,2),
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '15' SECOND
) WITH (
  'connector'='kafka',
  'topic'='checkout-events',
  'properties.bootstrap.servers'='localhost:9092',
  'properties.group.id' = 'flink-s004-checkout-group',
  'format'='json',
  'scan.startup.mode' = 'latest-offset'
);

CREATE TABLE IF NOT EXISTS hadoop_iceberg_S004.`default`.fact_checkout_enriched (
  user_id STRING,
  user_email STRING,
  cart_value DECIMAL(18,2),
  event_time TIMESTAMP(3),
  PRIMARY KEY (user_id, event_time) NOT ENFORCED
) WITH (
  'write.upsert.enabled'='true'
);

INSERT INTO hadoop_iceberg_S004.`default`.fact_checkout_enriched
SELECT
  c.user_id,
  u.email AS user_email,
  c.cart_value,
  c.event_time
FROM checkout_events c
LEFT JOIN (
  SELECT user_id, email
  FROM hadoop_iceberg_S004.`default`.dim_user
  WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59'
) u ON c.user_id = u.user_id;

Troubleshooting

Flink SQL has limited support for partitioning syntax in CREATE TABLE statements. For complex partitioning schemes, create tables without partitioning first, then add partitioning via ALTER TABLE or use simpler partitioning patterns:
-- Avoid complex partitioning in CREATE TABLE
-- PARTITIONED BY (days(event_time))  -- Not supported

-- Use simple partitioning instead
CREATE TABLE fact_enriched (
  -- columns
) PARTITIONED BY (event_date STRING)
Confirm join keys and nullability. Use LEFT JOIN to retain events even when dimension is missing. Consider adding default values for downstream consumers:
-- Handle missing dimensions gracefully
SELECT
  e.user_id,
  COALESCE(u.email, '[email protected]') AS user_email,
  COALESCE(p.name, 'Unknown Product') AS product_name
FROM events e
LEFT JOIN dimensions u ON e.user_id = u.user_id
LEFT JOIN products p ON e.product_id = p.product_id;
For frequently changing dimensions, the WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59' pattern works well for current snapshots. For better performance with large dimension tables, consider creating materialized views:
-- Alternative: Pre-filter current snapshots
CREATE TABLE dim_user_current AS
SELECT user_id, email
FROM dim_user
WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59';
If a few keys are very hot, try salting or broadcasting the smaller dimension. Limit selected columns and consider using temporal joins for better performance.