我有标准的机制来将数据从Kafka主题流到clickhouse,我的意思是像这样的算法表(Kafka引擎)->物化视图TO ->目标表(MergeTree)。它工作得很好,我需要添加两列到这个算法,当我删除这个表链并重新创建新列时,数据不会到达目标表,有没有具体的错误,在日志中的信号有关的错误与流.当我重新创建表回来没有新的列它再次工作良好.我发现,新的列只存在于一个消息,其他消息没有新的列。我没有Kafka的丰富经验,但根据我的理解,问题可能与我没有新的数据加载到目标表,因为不是每个Kafka消息都有新的列。可以做些什么来解决这个问题?我有想法使用JSON for Each行格式将原始数据流到一列中,并使用JSON clickhouse函数处理这些数据,但这也不起作用,它向我发送了输入数据无法解析的特定错误,是不是应该在Kafka侧更改一些设置?(同样,我没有看到它在Kafka中的样子,我使用偏移资源管理器来查看数据。)。
/ *--------------------------------------CREATE
DESTINATION
TABLE
LOCAL - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events_local
ON
CLUSTER
'cluste_name'
(
`event_type` String,
`source` String,
`timestamp` UInt32,
`master_id` String,
`event_params_app` Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
--`event_params_item_list_name`
Nullable(String),
--`event_params_promo_name`
Nullable(String),
`deleted`
UInt8 - -MATERIALIZED
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/ga_events', '{replica}', timestamp, `deleted`)
ORDER
BY
`master_id`
SETTINGS
index_granularity = 8192;
/ *--------------------------------------CREATE
DESTINATION
TABLE
DISTRIBUTED - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events
ON
CLUSTER
'cluste_name'
(
`event_type` String,
`source` String,
`timestamp` UInt32,
`master_id` String,
`event_params_app` Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
--`event_params_item_list_name`
Nullable(String),
--`event_params_promo_name`
Nullable(String),
`deleted`
UInt8 - -MATERIALIZED
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END
)
ENGINE = Distributed('cluste_name', 'database_name', 'ga_events_local', javaHash(`master_id`))
/ *--------------------------------------CREATE
KAFKA
TABLE - ------------------------------------- * /
CREATE
TABLE
database_name.ga_events_kafka
(
`event_type` String,
`source` String,
`timestamp` UInt64,
`data.master_id` Nullable(String),
`data.event_params_app`
Nullable(String),
`data.items_item_list_name`
Nullable(String),
`data.event_params_parameter`
Nullable(String),
`data.items_affiliation`
Nullable(String),
`data.event_params_item_cat`
Nullable(String),
`data.event_params_item_id`
Nullable(String),
`data.event_params_step`
Nullable(String),
`data.event_params_item_name`
Nullable(String),
`data.event_params_dest`
Nullable(String),
`data.items_promotion_id`
Nullable(String),
`data.step_number`
Nullable(String),
`data.items_item_name`
Nullable(String),
`data.event_params_item_variant`
Nullable(String),
`data.event_params_origin`
Nullable(String),
`data.event_params_way`
Nullable(String),
`data.items_item_brand`
Nullable(String),
`data.items_promotion_name`
Nullable(String),
`data.event_params_item_cat2`
Nullable(String),
`data.event`
Nullable(String),
`data.event_params_ux_ui`
Nullable(String),
`data.items_item_cat2`
Nullable(String),
`data.items_item_cat3`
Nullable(String),
`data.event_params_type`
Nullable(String),
`data.items_item_cat4`
Nullable(String),
`data.event_params_flow`
Nullable(String),
`data.event_params_segment`
Nullable(String),
`data.items_item_id`
Nullable(String),
`data.items_item_cat5`
Nullable(String),
`data.itemslocation_id`
Nullable(String),
`data.event_params_error`
Nullable(String),
`data.event_params_result`
Nullable(String),
`data.items_item_cat`
Nullable(String),
`data.event_params_quantity`
Nullable(String),
`data.event_params_id`
Nullable(String),
`data.items_item_variant`
Nullable(String),
`data.event_name`
Nullable(String),
`data.items_item_list_index`
Nullable(String) - -,
-- `data.event_params_item_list_name`
Nullable(String),
-- `data.event_params_promo_name`
Nullable(String)
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'somebroker:9093,somebroker:9093,somebroker:9093',
kafka_topic_list = 'SOMETOPIC',
kafka_format = 'AvroConfluent',
kafka_num_consumers = 1,
kafka_group_name = 'somegroup( I change this every time when I recreate tables ',
format_avro_schema_registry_url = 'registry_url',
kafka_client_id = 'someclient( I change this every time when I recreate tables ';
/ *--------------------------------------CREATE
MATERIALIZED
VIEW - ----------------- * /
CREATE
MATERIALIZED
VIEW
database_name.ga_events_kafka_mv
TO
database_name.ga_events
(
`event_type` String,
`source` String,
`timestamp` DateTime64(3),
`master_id`
Nullable(String),
`event_params_app`
Nullable(String),
`items_item_list_name`
Nullable(String),
`event_params_parameter`
Nullable(String),
`items_affiliation`
Nullable(String),
`event_params_item_cat`
Nullable(String),
`event_params_item_id`
Nullable(String),
`event_params_step`
Nullable(String),
`event_params_item_name`
Nullable(String),
`event_params_dest`
Nullable(String),
`items_promotion_id`
Nullable(String),
`step_number`
Nullable(String),
`items_item_name`
Nullable(String),
`event_params_item_variant`
Nullable(String),
`event_params_origin`
Nullable(String),
`event_params_way`
Nullable(String),
`items_item_brand`
Nullable(String),
`items_promotion_name`
Nullable(String),
`event_params_item_cat2`
Nullable(String),
`event`
Nullable(String),
`event_params_ux_ui`
Nullable(String),
`items_item_cat2`
Nullable(String),
`items_item_cat3`
Nullable(String),
`event_params_type`
Nullable(String),
`items_item_cat4`
Nullable(String),
`event_params_flow`
Nullable(String),
`event_params_segment`
Nullable(String),
`items_item_id`
Nullable(String),
`items_item_cat5`
Nullable(String),
`itemslocation_id`
Nullable(String),
`event_params_error`
Nullable(String),
`event_params_result`
Nullable(String),
`items_item_cat`
Nullable(String),
`event_params_quantity`
Nullable(String),
`event_params_id`
Nullable(String),
`items_item_variant`
Nullable(String),
`event_name`
Nullable(String),
`items_item_list_index`
Nullable(String),
-- `event_params_item_list_name`
Nullable(String),
-- `event_params_promo_name`
Nullable(String),
`deleted`
UInt8
)
AS
SELECT
`event_type`,
`source`,
`timestamp`,
`data.master_id` as master_id,
`data.event_params_app` as event_params_app,
`data.items_item_list_name` as items_item_list_name,
`data.event_params_parameter` as event_params_parameter,
`data.items_affiliation` as items_affiliation,
`data.event_params_item_cat` as event_params_item_cat,
`data.event_params_item_id` as event_params_item_id,
`data.event_params_step` as event_params_step,
`data.event_params_item_name` as event_params_item_name,
`data.event_params_dest` as event_params_dest,
`data.items_promotion_id` as items_promotion_id,
`data.step_number` as step_number,
`data.items_item_name` as items_item_name,
`data.event_params_item_variant` as event_params_item_variant,
`data.event_params_origin` as event_params_origin,
`data.event_params_way` as event_params_way,
`data.items_item_brand` as items_item_brand,
`data.items_promotion_name` as items_promotion_name,
`data.event_params_item_cat2` as event_params_item_cat2,
`data.event` as event,
`data.event_params_ux_ui` as event_params_ux_ui,
`data.items_item_cat2` as items_item_cat2,
`data.items_item_cat3` as items_item_cat3,
`data.event_params_type` as event_params_type,
`data.items_item_cat4` as items_item_cat4,
`data.event_params_flow` as event_params_flow,
`data.event_params_segment` as event_params_segment,
`data.items_item_id` as items_item_id,
`data.items_item_cat5` as items_item_cat5,
`data.itemslocation_id` as itemslocation_id,
`data.event_params_error` as event_params_error,
`data.event_params_result` as event_params_result,
`data.items_item_cat` as items_item_cat,
`data.event_params_quantity` as event_params_quantity,
`data.event_params_id` as event_params_id,
`data.items_item_variant` as items_item_variant,
`data.event_name` as event_name,
`data.items_item_list_index` as items_item_list_index,
-- `data.event_params_promo_name` as event_params_promo_name,
-- `data.event_params_item_list_name` as event_params_item_list_name,
CASE
WHEN
event_type = 'UPDATE'
THEN
0
WHEN
event_type = 'DELETE'
THEN
1
ELSE - 1000
END as deleted
FROM
database_name.ga_events_kafka
where
`data.master_id` is not null and `data.master_id` <> ''
字符串
应添加的列:
-- data.event_params_item_list_name
Nullable(String),-- data.event_params_promo_name
Nullable(String)
1条答案
按热度按时间wj8zmpe11#
input_format_avro_allow_missing_fields = 1修复了此问题。
启用使用Avro或AvroConfluent格式架构中未指定的字段。当架构中未找到字段时,ClickHouse将使用默认值而不是引发异常。
可能值:
0 -禁用。1 -启用。默认值:0。