我在postgresql中有一个表,其模式如下:
Table "public.kc_ds"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
id | integer | | not null | nextval('kc_ds_id_seq'::regclass) | plain | |
num | integer | | not null | | plain | |
text | character varying(50) | | not null | | extended | |
Indexes:
"kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
"dbz_publication"
当我为这个使用 io.confluent.connect.avro.AvroConverter
和schema registry,它创建一个schema registry schema,该schema如下所示(这里省略了一些字段):
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"num",
"type":"int"
},
{
"name":"text",
"type":"string"
}
],
"connect.name":"xxx.public.kc_ds.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"Value"
],
"default":null
},
]
debezium在我的kafka主题中生成的消息如下所示(省略了一些字段):
{
"before": null,
"after": {
"xxx.public.kc_ds.Value": {
"id": 2,
"num": 2,
"text": "text version 1"
}
}
当我插入或更新时, "before"
总是 null
,和 "after"
包含我的数据;当我删除时,相反的情况成立: "after"
为空且 "before"
包含数据(尽管所有字段都设置为默认值)。
问题1:为什么Kafka会用 "before"
以及 "after"
领域?为什么这些领域的行为如此怪异?
问题#2:有没有一种内置的方法可以让kafka connect在仍然使用schema registry的情况下向我的主题发送平面消息?请注意,展平转换不是我所需要的:如果启用,我仍然会有 "before"
以及 "after"
领域。
问题#3(实际上并不希望有什么,但也许有人知道):平展我的信息的必要性来自于这样一个事实,即我需要使用hudideltastreamer从我的主题中读取数据,而且这个工具似乎需要平展的输入数据。这个 "before"
以及 "after"
字段最终成为结果.parquet文件中类似于列的独立对象。有人知道hudideltastreamer应该如何与kafka connect生成的消息集成吗?
暂无答案!
目前还没有任何答案,快来回答吧!