Skip to main content
This workflow ingests database changes (INSERT/UPDATE/DELETE) from OLTP sources into an Iceberg table using Flink CDC connectors.

Scope

  • Use cases
    • Mirror operational tables (e.g., users, orders) into the data lake with full history.
    • Build near-real-time dimensional data without polling.
  • Non-use cases
    • Stream event enrichment (see S004).

Common steps

1

Build context

  • Identify source database and tables (e.g., MySQL orders).
  • Ensure binlog/WAL is enabled and accessible.
2

Implement (Flink SQL by default)

  • Define CDC source tables.
  • Define Iceberg sink with upsert/row-level deletes if needed.
  • Insert CDC changelog into Iceberg.

Implementation notes

  1. Enable primary keys on the sink for upsert semantics.
  2. Use debezium-json or connector-specific formats that carry operation types.
  3. Consider compaction settings for frequent updates.

RESINK.AI recommendations

  • Keep CDC topics/table schemas aligned with OLTP to reduce drift. Document schema evolution policies.

Example

-- MySQL CDC source for orders (replace with actual connector and options)
CREATE TABLE orders_cdc (
  order_id STRING,
  user_id STRING,
  product_id STRING,
  quantity INT,
  total_value DECIMAL(18,2),
  ts TIMESTAMP_LTZ(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector'='mysql-cdc',
  'hostname'='mysql',
  'port'='3306',
  'username'='user',
  'password'='pass',
  'database-name'='shop',
  'table-name'='orders'
);

-- Iceberg sink with upsert and deletes
CREATE TABLE dim_order_cdc (
  order_id STRING,
  user_id STRING,
  product_id STRING,
  quantity INT,
  total_value DECIMAL(18,2),
  ts TIMESTAMP_LTZ(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) PARTITIONED BY (days(ts)) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table',
  'write.upsert.enabled'='true',
  'write.delete.mode'='merge-on-read'
);

INSERT INTO dim_order_cdc
SELECT order_id, user_id, product_id, quantity, total_value, ts
FROM orders_cdc;

Variations

  • Postgres CDC via postgres-cdc connector.
  • Capture deletes explicitly by enabling row-level delete support.

Troubleshooting

Ensure checkpointing and state backends are configured. Verify binlog retention and offsets.
Tune Iceberg compaction (rewrite data files) and consider clustering.