Scope
- Use cases
- You have one or multiple events that create or update entity properties, such as
UserRegisteredandUserEmailChanged. - You want to extract entity properties from those events and save them into the resink.ai data warehouse with history preserved.
- You have one or multiple events that create or update entity properties, such as
- Non-use cases
- Entity rehydration
- Event property enrichment
Common steps
Build context
- Locate the source system. Create a data source if needed.
- Inspect sample records and identify fields to extract.
Implementation notes
- Iceberg supports UPSERT in two ways (see
https://iceberg.apache.org/docs/latest/flink-writes/#upsert):- Enable table property
write.upsert.enabled. - Use upsert-enabled write options via Flink SQL hints.
- Enable table property
RESINK.AI recommendations
Example
Variations
- S3-backed Iceberg catalog (catalog-first)
- Paimon catalog alternative
- Create catalog first (local filesystem)
-
Partitioning strategies
- Iceberg:
PARTITIONED BY (days(snapshot_date)), or bucket by user:PARTITIONED BY (bucket(16, user_id)) - Paimon:
PARTITIONED BY (dt)wheredtis a DATE column
- Iceberg:
- Deduplicate or handle late/duplicate events
Troubleshooting
Cannot write incompatible dataset to table with schema
Cannot write incompatible dataset to table with schema
java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schemaThis error occurs when you try to write to a table with a different schema than the one that is already saved in the catalog.
CREATE TABLE dim_user... creates a Flink table instance in the current Flink session. In a new session you must declare the table again. The table metadata is stored in the Iceberg catalog at /tmp/iceberg/flink_table. If you re-declare the table differently in a new session, the Flink table definition may not match the actual table metadata.To fix this, you can either:- Drop the table and re-create it with the correct schema.
- Use a different catalog.
- Use a different catalog saving path.

