Skip to main content
This workflow derives user sessions from clickstream events such as searches and product clicks using gap-based session windows.

Scope

  • Use cases
    • Compute sessions and session metrics (duration, counts) for behavioral analytics.
  • Non-use cases
    • Business aggregations beyond the session scope (see S005 for daily).
    • Enrichment with dimensions (see S004).

Common steps

1

Build context

  • Identify clickstream sources (e.g., Search, ProductClicked).
  • Choose a session gap (15–30 minutes are common).
2

Implement (Flink SQL by default)

  • Define sources with watermarks.
  • Use SESSION windows to group events into sessions.
  • Compute metrics and write to warehouse.

Implementation notes

  1. Normalize time zones; session boundaries should align with business needs.
  2. Consider deduplicating within session if events may repeat.

RESINK.AI recommendations

  • For high-traffic sites, consider computing sessions per channel (web/app) separately to reduce skew.

Example

CREATE TABLE Search (
  user_id STRING,
  query STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
  'connector'='kafka', 'topic'='e00-search', 'format'='json'
);

CREATE TABLE ProductClicked (
  user_id STRING,
  product_id STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
  'connector'='kafka', 'topic'='e00-product-clicked', 'format'='json'
);

CREATE TABLE fact_sessions (
  user_id STRING,
  session_start TIMESTAMP(3),
  session_end TIMESTAMP(3),
  events BIGINT,
  searches BIGINT,
  clicks BIGINT,
  PRIMARY KEY (user_id, session_start) NOT ENFORCED
) PARTITIONED BY (days(session_start)) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table'
);

-- Union the two streams for sessionization
CREATE TEMPORARY VIEW Clickstream AS
SELECT user_id, 'search' AS event_type, event_time FROM Search
UNION ALL
SELECT user_id, 'click' AS event_type, event_time FROM ProductClicked;

INSERT INTO fact_sessions
SELECT
  user_id,
  SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
  SESSION_END(event_time, INTERVAL '30' MINUTE) AS session_end,
  COUNT(*) AS events,
  SUM(CASE WHEN event_type = 'search' THEN 1 ELSE 0 END) AS searches,
  SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) AS clicks
FROM TABLE(SESSION(TABLE Clickstream, DESCRIPTOR(event_time), INTERVAL '30' MINUTE))
GROUP BY user_id, SESSION_START(event_time, INTERVAL '30' MINUTE), SESSION_END(event_time, INTERVAL '30' MINUTE);

Variations

  • Include ProductDetailsViewed and ProductAdded events in sessionization
-- Add additional sources and UNION ALL as above, updating event_type accordingly.

Troubleshooting

Adjust the session gap or split by device/channel to address multi-device behavior.
Deduplicate upstream or apply row_number-based deduplication within each session grouping key.