Skip to main content
This workflow shows how to capture malformed or schema-incompatible events, route them to a DLQ topic or table, and persist failures for auditing and replay.

Scope

  • Use cases
    • Capture JSON parse errors, schema mismatches, or business validation failures.
    • Persist failure payloads with reasons and timestamps for later fixes and replays.
  • Non-use cases
    • Complex semantic validation pipelines.

Common steps

1

Build context

  • Identify critical streams (e.g., OrderCompleted, PaymentInfoAdded).
  • Define failure categories and DLQ storage (Kafka topic or Iceberg table).
2

Implement (Flink SQL by default)

  • Use error-tolerant formats or wrap parsing in a UDF to catch failures.
  • Route successes to facts and failures to DLQ.

Implementation notes

  1. Flink SQL connectors may support ignore-parse-errors; combine with a side output for failures using custom functions, or implement parsing in a Flink job.
  2. Store original payload and error details for replay.

RESINK.AI recommendations

  • Keep DLQ retention long enough to handle upstream issues. Compress DLQ storage to reduce costs.

Example

-- Example raw topic with potential malformed JSON
CREATE TABLE RawOrderCompleted (
  payload STRING,
  ingest_time TIMESTAMP(3),
  WATERMARK FOR ingest_time AS ingest_time - INTERVAL '10' SECOND
) WITH (
  'connector'='kafka', 'topic'='e00-order-completed-raw', 'format'='json'
);

-- A parsed view; in practice, implement a UDF or use SQL JSON functions with TRY_CAST semantics
CREATE TEMPORARY VIEW ParsedOrderCompleted AS
SELECT
  CAST(JSON_VALUE(payload, '$.order_id') AS STRING) AS order_id,
  CAST(JSON_VALUE(payload, '$.user_id') AS STRING) AS user_id,
  CAST(JSON_VALUE(payload, '$.total_value') AS DECIMAL(18,2)) AS total_value,
  TO_TIMESTAMP_LTZ(CAST(JSON_VALUE(payload, '$.event_ts') AS BIGINT), 3) AS event_time,
  payload
FROM RawOrderCompleted;

-- Success sink
CREATE TABLE fact_order_ingested (
  order_id STRING,
  user_id STRING,
  total_value DECIMAL(18,2),
  event_time TIMESTAMP(3)
) PARTITIONED BY (days(event_time)) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table'
);

-- DLQ sink
CREATE TABLE dlq_order_completed (
  error_type STRING,
  error_message STRING,
  payload STRING,
  failure_time TIMESTAMP(3)
) PARTITIONED BY (days(failure_time)) WITH (
  'connector'='iceberg',
  'catalog-name'='hadoop_iceberg',
  'catalog-type'='hadoop',
  'warehouse'='file:///tmp/iceberg/flink_table'
);

-- Route logic (illustrative). Replace TRY_* with actual supported functions/UDFs.
INSERT INTO fact_order_ingested
SELECT order_id, user_id, total_value, event_time
FROM ParsedOrderCompleted
WHERE order_id IS NOT NULL AND user_id IS NOT NULL AND total_value IS NOT NULL AND event_time IS NOT NULL;

INSERT INTO dlq_order_completed
SELECT 'PARSE_ERROR' AS error_type,
       'Missing required fields' AS error_message,
       payload,
       CURRENT_TIMESTAMP AS failure_time
FROM ParsedOrderCompleted
WHERE order_id IS NULL OR user_id IS NULL OR total_value IS NULL OR event_time IS NULL;

Variations

  • DLQ to Kafka topic
CREATE TABLE dlq_order_completed_kafka (
  error_type STRING,
  error_message STRING,
  payload STRING
) WITH (
  'connector'='kafka', 'topic'='e00-dlq-order-completed', 'format'='json'
);

Troubleshooting

Alert on DLQ rates; throttle or temporarily buffer in object storage during incidents.
Store the original payload and a canonical timestamp; build a replay job that reuses the same parsing logic.