Skip to main content
This workflow ingests raw e-commerce events from Kafka (or other streaming buses) and stores them into data warehouse tables with event-time-based partitioning for efficient querying and lifecycle management.

Scope

  • Use cases
    • Landing raw topic data into cost-efficient, queryable storage.
    • Preserving event-time, watermarks, and minimal transformations.
  • Non-use cases
    • Entity enrichment or SCD history building.
    • Business-level aggregations or feature generation.

Common steps

1

Build context

  • Identify source topics and sample events (e.g., Search, ProductClicked).
  • Decide target table format and partitioning strategy (e.g., Iceberg daily partitions).
2

Implement (Flink SQL by default)

  • Define source Kafka tables with proper watermarks.
  • Define raw sink tables in Iceberg or Paimon with partitions.
  • Insert from sources into sinks, 1:1 mapping of columns.

Implementation notes

  1. Prefer event-time partitioning (e.g., PARTITIONED BY (days(event_time))) to reduce small files while keeping predictable pruning.
  2. Use WATERMARK to model lateness and support downstream time-based operations.
  3. Keep the raw zone schema close to the source; avoid lossy casts or complex transformations.

RESINK.AI recommendations

  • Default to Iceberg for broad engine compatibility and snapshot isolation. Paimon is great for low-latency reads and streaming updates. - Start with daily partitions. If query patterns require, consider hourly for very high-volume topics.

Example

-- 1) Create Iceberg catalog (local warehouse for demo) - using unique warehouse path for idempotency

DROP CATALOG IF EXISTS hadoop_iceberg_S002;

CREATE CATALOG IF NOT EXISTS hadoop_iceberg_S002 WITH (
  'type' = 'iceberg',
  'catalog-type' = 'hadoop',
  'warehouse' = 'file:///tmp/iceberg/flink_table_S002'
);

-- 2) Define Kafka sources (event-time + watermark)
CREATE TABLE `Search` (
    user_id     STRING,
    query       STRING,
    event_time  TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'search-events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'search-consumer',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'json.timestamp-format.standard' = 'SQL',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE ProductClicked (
    user_id     STRING,
    product_id  STRING,
    event_time  TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'product-events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'clicks-consumer',
    'format' = 'json',
    'json.ignore-parse-errors' = 'true',
    'json.timestamp-format.standard' = 'SQL',
    'scan.startup.mode' = 'earliest-offset'
);

-- 3) Define Iceberg raw landing tables (daily partitions via computed DATE)
USE CATALOG hadoop_iceberg_S002;

CREATE TABLE IF NOT EXISTS fact_search_raw (
    user_id     STRING,
    query       STRING,
    event_time  TIMESTAMP(3),
    dt          DATE,
    PRIMARY KEY (user_id, event_time) NOT ENFORCED
) PARTITIONED BY (dt);

CREATE TABLE IF NOT EXISTS fact_product_click_raw (
    user_id     STRING,
    product_id  STRING,
    event_time  TIMESTAMP(3),
    dt          DATE,
    PRIMARY KEY (user_id, product_id, event_time) NOT ENFORCED
) PARTITIONED BY (dt);

-- 4) Ingest raw events into Iceberg (fully qualified sources)
INSERT INTO hadoop_iceberg_S002.`default`.fact_search_raw
SELECT user_id, query, event_time, CAST(event_time AS DATE) as dt
FROM default_catalog.default_database.`Search`;

INSERT INTO hadoop_iceberg_S002.`default`.fact_product_click_raw
SELECT user_id, product_id, event_time, CAST(event_time AS DATE) as dt
FROM default_catalog.default_database.ProductClicked;

Variations

  • S3-backed Iceberg catalog (catalog-first)
CREATE CATALOG ice_s3 WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='s3a://my-bucket/iceberg/warehouse'
);

USE CATALOG ice_s3;
  • Paimon catalog alternative
CREATE CATALOG paimon_s3 WITH (
  'type'='paimon',
  'warehouse'='s3a://my-bucket/paimon/warehouse'
);
  • Partitioning strategies
    • Iceberg: PARTITIONED BY (days(event_time)), or PARTITIONED BY (hours(event_time)) for very high volume.
    • Paimon: PARTITIONED BY (dt) where dt is a DATE derived from event_time.

Troubleshooting

Ensure the format and JSON field names match the producers. Consider adding optional fields with NULL defaults to avoid hard failures during schema evolution.