Skip to main content
This workflow demonstrates strategies to handle late-arriving and duplicate events to ensure accurate aggregations and facts.

Scope

  • Use cases
    • Deduplicate events with identical keys/timestamps.
    • Include late events within a bounded lateness.
  • Non-use cases
    • Complex reorder buffering beyond watermark tolerance.

Common steps

1

Build context

  • Identify natural keys and timestamps for dedup (e.g., user_id, event_time).
  • Choose acceptable lateness (e.g., 2–10 minutes).
2

Implement (Flink SQL by default)

  • Define watermarks to control lateness.
  • Use ROW_NUMBER() for within-key deduplication.
  • Optionally apply upsert sinks to collapse duplicates downstream.

Implementation notes

  1. ROW_NUMBER() over partition by keys + time descending is a common pattern to keep latest.
  2. For aggregations, use allowed lateness via watermarks and updateable windows.

RESINK.AI recommendations

  • Start with conservative watermarks; relax as needed to reduce latency. - Embrace idempotent sinks (Iceberg upsert) to simplify retries.

Example

CREATE TABLE ProductClicked (
  user_id STRING,
  product_id STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE
) WITH ('connector'='kafka','topic':'e00-product-clicked','format':'json');

-- Deduplicate exact duplicates by (user_id, product_id, event_time)
CREATE TEMPORARY VIEW ProductClicked_dedup AS
SELECT user_id, product_id, event_time
FROM (
  SELECT *, ROW_NUMBER() OVER (
            PARTITION BY user_id, product_id, event_time
            ORDER BY event_time DESC) AS rn
  FROM ProductClicked
)
WHERE rn = 1;

-- Sink with upsert
CREATE TABLE fact_product_click_raw_dedup (
  user_id STRING,
  product_id STRING,
  event_time TIMESTAMP(3),
  PRIMARY KEY (user_id, product_id, event_time) NOT ENFORCED
) PARTITIONED BY (days(event_time)) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table',
  'write.upsert.enabled'='true'
);

INSERT INTO fact_product_click_raw_dedup
SELECT user_id, product_id, event_time
FROM ProductClicked_dedup;

Variations

  • Deduplicate with sequence numbers
-- If producers send a monotonically increasing seq per key
SELECT * FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id, product_id ORDER BY seq DESC) rn
  FROM ProductClickedWithSeq
) WHERE rn = 1;
  • Late event tolerant daily aggregates
INSERT INTO agg_click_daily
SELECT
  CAST(TUMBLE_START(event_time, INTERVAL '1' DAY) AS DATE) AS dt,
  product_id,
  COUNT(*) AS clicks
FROM TABLE(TUMBLE(TABLE ProductClicked, DESCRIPTOR(event_time), INTERVAL '1' DAY))
GROUP BY TUMBLE_START(event_time, INTERVAL '1' DAY), product_id;
-- Watermark determines how late events are included or dropped

Troubleshooting

Check that partition keys exactly match the natural dedup key and that sources are not reformatting timestamps.
Reduce watermark delay or split streams by key ranges. Consider compaction on sinks.