向clickhouse Kafka表链添加列

vuktfyat  于 2024-01-06  发布在  Apache
关注(0)|答案(1)|浏览(156)

我有标准的机制来将数据从Kafka主题流到clickhouse,我的意思是像这样的算法表(Kafka引擎)->物化视图TO ->目标表(MergeTree)。它工作得很好,我需要添加两列到这个算法,当我删除这个表链并重新创建新列时,数据不会到达目标表,有没有具体的错误,在日志中的信号有关的错误与流.当我重新创建表回来没有新的列它再次工作良好.我发现,新的列只存在于一个消息,其他消息没有新的列。我没有Kafka的丰富经验,但根据我的理解,问题可能与我没有新的数据加载到目标表,因为不是每个Kafka消息都有新的列。可以做些什么来解决这个问题?我有想法使用JSON for Each行格式将原始数据流到一列中,并使用JSON clickhouse函数处理这些数据,但这也不起作用,它向我发送了输入数据无法解析的特定错误,是不是应该在Kafka侧更改一些设置?(同样,我没有看到它在Kafka中的样子,我使用偏移资源管理器来查看数据。)。

/ *--------------------------------------CREATE
            DESTINATION
            TABLE
            LOCAL - ------------------------------------- * /
            
            CREATE
            TABLE
            database_name.ga_events_local
            ON
            CLUSTER
            'cluste_name'
            (
                `event_type` String,
            `source` String,
            `timestamp` UInt32,
            `master_id` String,
            `event_params_app` Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --`event_params_item_list_name`
            Nullable(String),
            --`event_params_promo_name`
            Nullable(String),
            
            `deleted`
            UInt8 - -MATERIALIZED
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END
            
            )
            ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/ga_events', '{replica}', timestamp, `deleted`)
            ORDER
            BY
            `master_id`
            SETTINGS
            index_granularity = 8192;
            / *--------------------------------------CREATE
            DESTINATION
            TABLE
            DISTRIBUTED - ------------------------------------- * /
            
            CREATE
            TABLE
            database_name.ga_events
            ON
            CLUSTER
            'cluste_name'
            (
                `event_type` String,
            `source` String,
            `timestamp` UInt32,
            `master_id` String,
            `event_params_app` Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --`event_params_item_list_name`
            Nullable(String),
            --`event_params_promo_name`
            Nullable(String),
            `deleted`
            UInt8 - -MATERIALIZED
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END
            
            )
            ENGINE = Distributed('cluste_name', 'database_name', 'ga_events_local', javaHash(`master_id`))
            / *--------------------------------------CREATE
            KAFKA
            TABLE - ------------------------------------- * /
            CREATE
            TABLE
            database_name.ga_events_kafka
            (
            
                `event_type` String,
            `source` String,
            `timestamp` UInt64,
            `data.master_id` Nullable(String),
            `data.event_params_app`
            Nullable(String),
            `data.items_item_list_name`
            Nullable(String),
            `data.event_params_parameter`
            Nullable(String),
            `data.items_affiliation`
            Nullable(String),
            `data.event_params_item_cat`
            Nullable(String),
            `data.event_params_item_id`
            Nullable(String),
            `data.event_params_step`
            Nullable(String),
            `data.event_params_item_name`
            Nullable(String),
            `data.event_params_dest`
            Nullable(String),
            `data.items_promotion_id`
            Nullable(String),
            `data.step_number`
            Nullable(String),
            `data.items_item_name`
            Nullable(String),
            `data.event_params_item_variant`
            Nullable(String),
            `data.event_params_origin`
            Nullable(String),
            `data.event_params_way`
            Nullable(String),
            `data.items_item_brand`
            Nullable(String),
            `data.items_promotion_name`
            Nullable(String),
            `data.event_params_item_cat2`
            Nullable(String),
            `data.event`
            Nullable(String),
            `data.event_params_ux_ui`
            Nullable(String),
            `data.items_item_cat2`
            Nullable(String),
            `data.items_item_cat3`
            Nullable(String),
            `data.event_params_type`
            Nullable(String),
            `data.items_item_cat4`
            Nullable(String),
            `data.event_params_flow`
            Nullable(String),
            `data.event_params_segment`
            Nullable(String),
            `data.items_item_id`
            Nullable(String),
            `data.items_item_cat5`
            Nullable(String),
            `data.itemslocation_id`
            Nullable(String),
            `data.event_params_error`
            Nullable(String),
            `data.event_params_result`
            Nullable(String),
            `data.items_item_cat`
            Nullable(String),
            `data.event_params_quantity`
            Nullable(String),
            `data.event_params_id`
            Nullable(String),
            `data.items_item_variant`
            Nullable(String),
            `data.event_name`
            Nullable(String),
            `data.items_item_list_index`
            Nullable(String) - -,
            --    `data.event_params_item_list_name`
            Nullable(String),
            --    `data.event_params_promo_name`
            Nullable(String)
            )
            ENGINE = Kafka
            SETTINGS
            kafka_broker_list = 'somebroker:9093,somebroker:9093,somebroker:9093',
            kafka_topic_list = 'SOMETOPIC',
            kafka_format = 'AvroConfluent',
            kafka_num_consumers = 1,
            kafka_group_name = 'somegroup( I change this every time when I recreate tables ',
            format_avro_schema_registry_url = 'registry_url',
            kafka_client_id = 'someclient( I change this every time when I recreate tables ';
            
            / *--------------------------------------CREATE
            MATERIALIZED
            VIEW - ----------------- * /
            CREATE
            MATERIALIZED
            VIEW
            database_name.ga_events_kafka_mv
            TO
            database_name.ga_events
            
            (
                `event_type` String,
            `source` String,
            `timestamp` DateTime64(3),
            `master_id`
            Nullable(String),
            `event_params_app`
            Nullable(String),
            `items_item_list_name`
            Nullable(String),
            `event_params_parameter`
            Nullable(String),
            `items_affiliation`
            Nullable(String),
            `event_params_item_cat`
            Nullable(String),
            `event_params_item_id`
            Nullable(String),
            `event_params_step`
            Nullable(String),
            `event_params_item_name`
            Nullable(String),
            `event_params_dest`
            Nullable(String),
            `items_promotion_id`
            Nullable(String),
            `step_number`
            Nullable(String),
            `items_item_name`
            Nullable(String),
            `event_params_item_variant`
            Nullable(String),
            `event_params_origin`
            Nullable(String),
            `event_params_way`
            Nullable(String),
            `items_item_brand`
            Nullable(String),
            `items_promotion_name`
            Nullable(String),
            `event_params_item_cat2`
            Nullable(String),
            `event`
            Nullable(String),
            `event_params_ux_ui`
            Nullable(String),
            `items_item_cat2`
            Nullable(String),
            `items_item_cat3`
            Nullable(String),
            `event_params_type`
            Nullable(String),
            `items_item_cat4`
            Nullable(String),
            `event_params_flow`
            Nullable(String),
            `event_params_segment`
            Nullable(String),
            `items_item_id`
            Nullable(String),
            `items_item_cat5`
            Nullable(String),
            `itemslocation_id`
            Nullable(String),
            `event_params_error`
            Nullable(String),
            `event_params_result`
            Nullable(String),
            `items_item_cat`
            Nullable(String),
            `event_params_quantity`
            Nullable(String),
            `event_params_id`
            Nullable(String),
            `items_item_variant`
            Nullable(String),
            `event_name`
            Nullable(String),
            `items_item_list_index`
            Nullable(String),
            --                        `event_params_item_list_name`
            Nullable(String),
            --                        `event_params_promo_name`
            Nullable(String),
            `deleted`
            UInt8
            
            )
            AS
            SELECT
            
            `event_type`,
            `source`,
            `timestamp`,
            `data.master_id` as master_id,
            `data.event_params_app` as event_params_app,
            `data.items_item_list_name` as items_item_list_name,
            `data.event_params_parameter` as event_params_parameter,
            `data.items_affiliation` as items_affiliation,
            `data.event_params_item_cat` as event_params_item_cat,
            `data.event_params_item_id` as event_params_item_id,
            `data.event_params_step` as event_params_step,
            `data.event_params_item_name` as event_params_item_name,
            `data.event_params_dest` as event_params_dest,
            `data.items_promotion_id` as items_promotion_id,
            `data.step_number` as step_number,
            `data.items_item_name` as items_item_name,
            `data.event_params_item_variant` as event_params_item_variant,
            `data.event_params_origin` as event_params_origin,
            `data.event_params_way` as event_params_way,
            `data.items_item_brand` as items_item_brand,
            `data.items_promotion_name` as items_promotion_name,
            `data.event_params_item_cat2` as event_params_item_cat2,
            `data.event` as event,
            `data.event_params_ux_ui` as event_params_ux_ui,
            `data.items_item_cat2` as items_item_cat2,
            `data.items_item_cat3` as items_item_cat3,
            `data.event_params_type` as event_params_type,
            `data.items_item_cat4` as items_item_cat4,
            `data.event_params_flow` as event_params_flow,
            `data.event_params_segment` as event_params_segment,
            `data.items_item_id` as items_item_id,
            `data.items_item_cat5` as items_item_cat5,
            `data.itemslocation_id` as itemslocation_id,
            `data.event_params_error` as event_params_error,
            `data.event_params_result` as event_params_result,
            `data.items_item_cat` as items_item_cat,
            `data.event_params_quantity` as event_params_quantity,
            `data.event_params_id` as event_params_id,
            `data.items_item_variant` as items_item_variant,
            `data.event_name` as event_name,
            `data.items_item_list_index` as items_item_list_index,
            --    `data.event_params_promo_name` as event_params_promo_name,
            --    `data.event_params_item_list_name` as event_params_item_list_name,
            
            CASE
            WHEN
            event_type = 'UPDATE'
            THEN
            0
            WHEN
            event_type = 'DELETE'
            THEN
            1
            ELSE - 1000
            END as deleted
            
            FROM
            database_name.ga_events_kafka
            where
            `data.master_id` is not null and `data.master_id` <> ''

字符串
应添加的列:
-- data.event_params_item_list_name Nullable(String),-- data.event_params_promo_name Nullable(String)

wj8zmpe1

wj8zmpe11#

input_format_avro_allow_missing_fields = 1修复了此问题。
启用使用Avro或AvroConfluent格式架构中未指定的字段。当架构中未找到字段时,ClickHouse将使用默认值而不是引发异常。
可能值:
0 -禁用。1 -启用。默认值:0。

相关问题