使用Flink SQL连接时出现不可预见的Tombstones消息

8yoxcaq7  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(93)

We've a SQL Flink Job (Table API) that reads Offers from a Kafka topic (8 partitions) as source and sinks it back to another Kafka topic after some aggregations with other data sources to calculate the cheapest one and aggregate extra data over that result.
Sink looks like this:

CREATE TABLE cheapest_item_offer (
  `id_offer` VARCHAR(36),
  `id_item` VARCHAR(36),
  `price` DECIMAL(13,2), 
-- ... more offer fields
  PRIMARY KEY (`id_item`) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = '<TOPIC_NAME>',
  'properties.bootstrap.servers' = '<KAFKA_BOOTSTRAP_SERVERS>',
  'properties.group.id' = '<JOBNAME>',
  'sink.buffer-flush.interval' = '1000',
  'sink.buffer-flush.max-rows' = '100',
  'key.format' = 'json',
  'value.format' = 'json'
);

And the upsert looks like this:

INSERT INTO cheapest_item_offer
  WITH offers_with_stock_ordered_by_price AS (
    SELECT *,
           ROW_NUMBER() OVER(
             PARTITION BY id_item
             ORDER BY price ASC
           ) AS n_row
      FROM offer
     WHERE quantity > 0
), cheapest_offer AS (
    SELECT offer.*
      FROM offers_with_stock_ordered_by_price offer
     WHERE offer.n_row = 1
)
SELECT id_offer,
       id_item,
       price,
-- ... item extra fields
  FROM cheapest_offer
-- ... extra JOINS here to aggregate more item data

Given this configuration, the job initially ingests the data and calculates it properly, and sets the cheapest offer right, but after some time passes, whenever there are some events in our data source they are unexpectedly resulting in a Tombstone (not always though, sometimes it's properly set) result which, after checking them, we notice they shouldn't be, mainly because there's an actual cheapest offer for that item and the related JOIN rows do exists.
The following images illustrate the issue with some Kafka messages:

Data source


This is the data source we ingest the data from. The latest for a given Item update shows that an Offer has some changes.

Data Sink


This is the data Sink for the same Item, as we can see, the latest update was generated at the same time, because of the data source update, but the resulting value is a Tombstone, rather than its actual value from the data source
If we relaunch the Job from scratch (ignoring savepoints), the affected Items are fixed on the first run, but the same issue will appears after some time.
Some considerations:

  • In our Data Source, each Item can have multiple Offers and can be allocated in different Partitions
  • Flink Job is running with Paralellism set to 8 (same as Kafka Partitions)
  • We're using Flink 1.13.2 with upsert-kafka connector in Source & Sink
  • We're using Kafka 2.8
  • We believe the issue is in the cheapest offer virtual tables, as the JOINs contain proper data
  • We're using rocksdb as state.backend

We're struggling to find the reason behind this behavior (we're pretty new to Flink), and we don't know where to focus for fixing this, can anybody help here?
Any suggestion will be highly appreciated!

qvk1mo1f

qvk1mo1f1#

Apparently it was a bug from Flink SQL on v1.13.2 as noted in Flink's Jira Task FLINK-25559 .
We managed to solve this issue by upgrading version to v1.13.6.

相关问题