Skip to main content
This workflow reads events from your data sources and writes derived entity attributes into entity tables while preserving history for point-in-time queries.

Scope

  • Use cases
    • You have one or multiple events that create or update entity properties, such as UserRegistered and UserEmailChanged.
    • You want to extract entity properties from those events and save them into the resink.ai data warehouse with history preserved.
  • Non-use cases
    • Entity rehydration
    • Event property enrichment

Common steps

1

Build context

  • Locate the source system. Create a data source if needed.
  • Inspect sample records and identify fields to extract.
2

Implement (Flink SQL by default)

  • Define source tables.
  • Define sink (entity) tables.
  • Select from sources and upsert into the sink table.

Implementation notes

  1. Iceberg supports UPSERT in two ways (see https://iceberg.apache.org/docs/latest/flink-writes/#upsert):
    • Enable table property write.upsert.enabled.
    • Use upsert-enabled write options via Flink SQL hints.

RESINK.AI recommendations

  • Both Paimon and Iceberg catalogs are supported in resink.ai. - Default to archiving into an Iceberg catalog for broad engine compatibility. Choose Paimon when you need lower-latency, more real-time reads. - Review the “Entity history preserving strategy” and pick a strategy that matches your query needs and cost profile.

Example

CREATE TABLE UserRegistered (
    user_id      STRING,
    email        STRING,
    event_time   TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'e00-user-registered',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'user-events-consumer',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset',
    'scan.bounded.mode' = 'latest-offset'
);

-- Table for email change events
CREATE TABLE UserEmailChanged (
    user_id         STRING,
    original_email  STRING,
    email           STRING,
    event_time      TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'e00-user-email-changed',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'user-events-consumer',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset',
    'scan.bounded.mode' = 'latest-offset'
);


-- The target Iceberg table for daily user snapshots
-- NOTE: you can also create the catalog first and then create the table within the catalog
CREATE TABLE dim_user (
    user_id         STRING,
    email           STRING,
    snapshot_date   TIMESTAMP(3),
    PRIMARY KEY (user_id, snapshot_date) NOT ENFORCED
) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_iceberg',
    'catalog-type'='hadoop',
    'warehouse'='file:///tmp/iceberg/flink_table',
    'write.upsert.enabled' = 'true'
);


INSERT INTO dim_user
SELECT
    user_id,
    email,
    event_time AS snapshot_date
FROM (
    SELECT user_id, email, event_time FROM UserRegistered
    UNION ALL
    SELECT user_id, email, event_time FROM UserEmailChanged
);

Variations

  • S3-backed Iceberg catalog (catalog-first)
CREATE CATALOG ice_s3 WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='s3a://my-bucket/iceberg/warehouse'
  -- optional auth: 's3.access-key'='...', 's3.secret-key'='...', 's3.endpoint'='https://s3.us-east-1.amazonaws.com'
);
  • Paimon catalog alternative
CREATE CATALOG paimon_s3 WITH (
  'type'='paimon',
  'warehouse'='s3a://my-bucket/paimon/warehouse'
);
  • Create catalog first (local filesystem)
CREATE CATALOG hadoop_iceberg WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table'
);

USE CATALOG hadoop_iceberg;

CREATE TABLE dim_user (
  user_id STRING,
  email STRING,
  snapshot_date TIMESTAMP(3),
  PRIMARY KEY (user_id, snapshot_date) NOT ENFORCED
) WITH ('write.upsert.enabled'='true');
  • Partitioning strategies
    • Iceberg: PARTITIONED BY (days(snapshot_date)), or bucket by user: PARTITIONED BY (bucket(16, user_id))
    • Paimon: PARTITIONED BY (dt) where dt is a DATE column
  • Deduplicate or handle late/duplicate events
INSERT INTO dim_user
SELECT user_id, email, event_time AS snapshot_date
FROM (
  SELECT user_id, email, event_time,
         ROW_NUMBER() OVER (PARTITION BY user_id, event_time ORDER BY event_time DESC) AS rn
  FROM (
    SELECT user_id, email, event_time FROM UserRegistered
    UNION ALL
    SELECT user_id, email, event_time FROM UserEmailChanged
  )
)
WHERE rn = 1;

Troubleshooting

java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema
This error occurs when you try to write to a table with a different schema than the one that is already saved in the catalog.CREATE TABLE dim_user... creates a Flink table instance in the current Flink session. In a new session you must declare the table again. The table metadata is stored in the Iceberg catalog at /tmp/iceberg/flink_table. If you re-declare the table differently in a new session, the Flink table definition may not match the actual table metadata.To fix this, you can either:
  1. Drop the table and re-create it with the correct schema.
  2. Use a different catalog.
  3. Use a different catalog saving path.