Skip to main content
This workflow rebuilds in-session Flink tables for existing entity data stored in Iceberg or Paimon. It enables low-latency lookups and stream enrichments without re-ingesting from the original sources.

Scope

  • Use cases
    • Rebuild in-memory/table-view dimension tables from warehouse storage.
    • Use with stream enrichment jobs that JOIN against entity tables.
  • Non-use cases
    • Initial ingestion of raw events.
    • Complex SCD history creation (see S008).

Common steps

1

Build context

  • Confirm entities persisted previously (e.g., dim_user, dim_product).
  • Identify point-in-time or current snapshot semantics required for lookups.
2

Implement (Flink SQL by default)

  • Create catalogs (Iceberg/Paimon) and define dimension tables in Flink.
  • If needed, materialize current snapshot views.
  • Use temporal joins or standard joins depending on latency/consistency needs.

Implementation notes

  1. For SCD Type 2, build current snapshot using ROW_NUMBER() ... PARTITION BY ... ORDER BY valid_from DESC and filter rn = 1.
  2. For static dimensions or infrequent updates, broadcast the dimension table for efficient joins.
  3. Use temporal table joins with primary keys and event-time for point-in-time correctness.

RESINK.AI recommendations

  • Keep entity tables small (wide but not too tall) for faster lookups; archive deep history separately if not needed for enrichment. - Consider co-locating dimensions and facts in the same catalog to simplify access control and lineage.

Example

-- Create S003-specific Iceberg catalog for idempotent execution
DROP CATALOG IF EXISTS hadoop_iceberg_S003;

CREATE CATALOG IF NOT EXISTS hadoop_iceberg_S003 WITH (
  'type' = 'iceberg',
  'catalog-type' = 'hadoop',
  'warehouse' = 'file:///tmp/iceberg/flink_table_S003'
);

USE CATALOG hadoop_iceberg_S003;

-- Assume dim_user already archived by S001 or other process
-- SCD2-style table: user attributes over time
CREATE TABLE IF NOT EXISTS hadoop_iceberg_S003.`default`.dim_user (
  user_id STRING,
  email STRING,
  valid_from TIMESTAMP(3),
  valid_to   TIMESTAMP(3),
  PRIMARY KEY (user_id, valid_from) NOT ENFORCED
) WITH (
  'write.upsert.enabled'='true'
);

-- Build current snapshot (latest active records)
-- Note: Flink doesn't support DESC ordering in window functions
SELECT user_id, email
FROM hadoop_iceberg_S003.`default`.dim_user
WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59';

-- Example: Point-in-time lookup for a specific timestamp
-- SELECT u.user_id, u.email
-- FROM hadoop_iceberg_S003.`default`.dim_user u
-- WHERE u.valid_from <= TIMESTAMP '2024-06-15 12:00:00'
--   AND u.valid_to > TIMESTAMP '2024-06-15 12:00:00';

Variations

  • Paimon catalog alternative
CREATE CATALOG IF NOT EXISTS paimon_s3 WITH (
  'type'='paimon',
  'warehouse'='s3a://my-bucket/paimon/warehouse'
);

USE CATALOG paimon_s3;
  • Building current product snapshot
CREATE TABLE IF NOT EXISTS hadoop_iceberg_S003.`default`.dim_product (
  product_id STRING,
  name STRING,
  price DECIMAL(18,2),
  valid_from TIMESTAMP(3),
  valid_to   TIMESTAMP(3),
  PRIMARY KEY (product_id, valid_from) NOT ENFORCED
);

-- Direct query for current product snapshot
SELECT product_id, name, price
FROM hadoop_iceberg_S003.`default`.dim_product
WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59';

Troubleshooting

Flink only supports ASCENDING order in window functions. For current snapshots, use alternative approaches:
-- Alternative 1: Use MAX() to find latest timestamp, then filter
WITH latest_dates AS (
  SELECT user_id, MAX(valid_from) as max_valid_from
  FROM hadoop_iceberg_S003.`default`.dim_user
  GROUP BY user_id
)
SELECT u.user_id, u.email
FROM hadoop_iceberg_S003.`default`.dim_user u
JOIN latest_dates l ON u.user_id = l.user_id AND u.valid_from = l.max_valid_from;

-- Alternative 2: Use WHERE clause with valid_to for current records
SELECT user_id, email
FROM hadoop_iceberg_S003.`default`.dim_user
WHERE valid_to = TIMESTAMP '2099-12-31 23:59:59';
Verify that event-time attributes and watermarks are defined on the streaming side, and that keys match exactly. Consider using snapshots (direct queries) if temporal joins are too complex for your needs.
When referencing Iceberg tables in the default database, always quote the database name:
-- Correct
SELECT * FROM hadoop_iceberg_S003.`default`.dim_user;

-- Incorrect (will fail)
SELECT * FROM hadoop_iceberg_S003.default.dim_user;
If dimension updates are frequent, consider materializing a changelog stream from the dimension table and using upsert-kafka or Paimon changelogs to keep lookups fresh.