Skip to main content
This workflow aggregates raw events into higher-level facts such as per-session engagement and per-day activity metrics.

Scope

  • Use cases
    • Compute session-level metrics (e.g., pageviews per session).
    • Compute daily metrics (e.g., product clicks per day, orders per day).
  • Non-use cases
    • Joining with slow-changing dimensions (see S004).
    • Building SCD entity history (see S001, S008).

Common steps

1

Build context

  • Select input event streams (e.g., ProductClicked, OrderCompleted).
  • Decide on session gap (e.g., 30 minutes) and windowing strategy.
2

Implement (Flink SQL by default)

  • Define event sources with watermarks.
  • Use session windows or gap-based sessionization for session metrics.
  • Use tumbling windows for daily aggregates.

Implementation notes

  1. Choose appropriate watermarks to balance latency and correctness.
  2. For daily aggregates, prefer TUMBLE with INTERVAL '1' DAY aligned to UTC or business time zone.
  3. For session windows, Flink supports SESSION windows with a defined gap.

RESINK.AI recommendations

  • Store aggregated outputs in Iceberg with daily partitions for efficient downstream queries. - Keep aggregate schemas narrow and well-named for BI usability.

Example

-- ProductClicked source
CREATE TABLE ProductClicked (
  user_id STRING,
  product_id STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
  'connector'='kafka',
  'topic'='e00-product-clicked',
  'properties.bootstrap.servers'='localhost:9092',
  'format'='json'
);

-- Session-level aggregates (30-minute gap)
CREATE TABLE agg_click_session (
  user_id STRING,
  session_start TIMESTAMP(3),
  session_end TIMESTAMP(3),
  clicks BIGINT,
  PRIMARY KEY (user_id, session_start) NOT ENFORCED
) PARTITIONED BY (days(session_start)) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table'
);

INSERT INTO agg_click_session
SELECT
  user_id,
  SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
  SESSION_END(event_time, INTERVAL '30' MINUTE) AS session_end,
  COUNT(*) AS clicks
FROM TABLE(SESSION(TABLE ProductClicked, DESCRIPTOR(event_time), INTERVAL '30' MINUTE))
GROUP BY user_id, SESSION_START(event_time, INTERVAL '30' MINUTE), SESSION_END(event_time, INTERVAL '30' MINUTE);

-- Daily product clicks
CREATE TABLE agg_click_daily (
  dt DATE,
  product_id STRING,
  clicks BIGINT,
  PRIMARY KEY (dt, product_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table'
);

INSERT INTO agg_click_daily
SELECT
  CAST(TUMBLE_START(event_time, INTERVAL '1' DAY) AS DATE) AS dt,
  product_id,
  COUNT(*) AS clicks
FROM TABLE(TUMBLE(TABLE ProductClicked, DESCRIPTOR(event_time), INTERVAL '1' DAY))
GROUP BY TUMBLE_START(event_time, INTERVAL '1' DAY), product_id;

Variations

  • Daily orders
CREATE TABLE OrderCompleted (
  order_id STRING,
  user_id STRING,
  total_value DECIMAL(18,2),
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
  'connector'='kafka', 'topic'='e00-order-completed', 'format'='json'
);

CREATE TABLE agg_orders_daily (
  dt DATE,
  orders BIGINT,
  revenue DECIMAL(18,2)
) PARTITIONED BY (dt) WITH (
  'connector'='paimon',
  'warehouse'='s3a://my-bucket/paimon/warehouse'
);

INSERT INTO agg_orders_daily
SELECT CAST(TUMBLE_START(event_time, INTERVAL '1' DAY) AS DATE) AS dt,
       COUNT(*) AS orders,
       SUM(total_value) AS revenue
FROM TABLE(TUMBLE(TABLE OrderCompleted, DESCRIPTOR(event_time), INTERVAL '1' DAY))
GROUP BY TUMBLE_START(event_time, INTERVAL '1' DAY);

Troubleshooting

Adjust watermark delay or switch to updateable aggregates with retractions if late data is significant.
Pre-aggregate upstream or reduce group keys. Consider hourly aggregates then roll-up downstream.