Skip to main content
This workflow builds a canonical fact_order table for BI, joining order events with user and product dimensions.

Scope

  • Use cases
    • Create a star schema with a central fact table and conformed dimensions.
    • Support BI tools and warehouse analytics.
  • Non-use cases
    • Real-time feature joins for serving (see S004).
    • SCD management logic (see S008).

Common steps

1

Build context

  • Confirm available dimensions: dim_user_current, dim_product_current.
  • Identify order events: OrderCompleted, optionally OrderCancelled.
2

Implement (Flink SQL by default)

  • Define order event sources.
  • Rehydrate dimensions or use current snapshots.
  • Create fact_order and upsert enriched rows.

Implementation notes

  1. Consider surrogate keys for dimensions if required by downstream tools.
  2. Store both natural keys (e.g., user_id, product_id) and surrogate keys (e.g., user_key) if present.
  3. Partition by order date and cluster by high-cardinality keys if supported.

RESINK.AI recommendations

  • Start with a narrow column set focused on analytics needs; avoid over-widening the fact table. - Validate monetary types using DECIMAL with sufficient precision.

Example

-- Order events
CREATE TABLE OrderCompleted (
  order_id STRING,
  user_id STRING,
  product_id STRING,
  quantity INT,
  total_value DECIMAL(18,2),
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
  'connector'='kafka',
  'topic'='e00-order-completed',
  'properties.bootstrap.servers'='localhost:9092',
  'format'='json'
);

-- Example snapshots (replace with S003-derived views)
CREATE TEMPORARY VIEW dim_user_current AS SELECT user_id, email FROM hadoop_iceberg.dim_user_snapshot; -- placeholder
CREATE TEMPORARY VIEW dim_product_current AS SELECT product_id, name, price FROM hadoop_iceberg.dim_product_snapshot; -- placeholder

CREATE TABLE fact_order (
  order_id STRING,
  user_id STRING,
  product_id STRING,
  user_email STRING,
  product_name STRING,
  product_price DECIMAL(18,2),
  quantity INT,
  total_value DECIMAL(18,2),
  order_time TIMESTAMP(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) PARTITIONED BY (days(order_time)) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table',
  'write.upsert.enabled'='true'
);

INSERT INTO fact_order
SELECT
  o.order_id,
  o.user_id,
  o.product_id,
  u.email AS user_email,
  p.name AS product_name,
  p.price AS product_price,
  o.quantity,
  o.total_value,
  o.event_time AS order_time
FROM OrderCompleted o
LEFT JOIN dim_user_current u ON o.user_id = u.user_id
LEFT JOIN dim_product_current p ON o.product_id = p.product_id;

Variations

  • Handle cancellations by adjusting facts
CREATE TABLE OrderCancelled (
  order_id STRING,
  reason STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH ('connector'='kafka','topic'='e00-order-cancelled','format'='json');

-- Approach: write a separate `fact_order_adjustment` or issue an upsert with zeroed metrics

Troubleshooting

Ensure idempotency in the producer or use deduplication on order_id with ROW_NUMBER() and keep latest.
Use the price from the order event as the source of truth; store current product price for context only.