使用ksqldb,我创建了一个带有自定义查询的jdbc连接器。然后,根据产生的Kafka主题,我创建了一个表。但是,从表中选择仅返回主键的数据,而对所有其他值返回null。我连接到的postgres数据库的sales表不断更新新的数据,我正在尝试使用ksql流式处理这些数据。
ksql> CREATE SOURCE CONNECTOR con WITH (
'connector.class' ='io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = '....',
'topic.prefix' = 'sales',
...
'key' = 'id',
'query' = 'SELECT id, time, price FROM sales');
Message
Created connector CON
ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased
ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');
Message
Table created
ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
+-----+-----+-----+
|ID |TIME |PRICE|
+-----+-----+-----+
|123 |null |null |
Limit Reached
Query terminated
如您所见,kafka主题在time和price字段中有具有适当值的条目。但是,当在该主题上创建一个表时,从该表中选择会产生空的时间和价格字段。只有id(主键列)打印正确。
知道为什么会这样吗?
1条答案
按热度按时间7gs2gvoe1#
你用的是
org.apache.kafka.connect.json.JsonConverter
连接器中的转换器schemas.enable=true
,因此您的架构不是(id VARCHAR PRIMARY KEY, time INT, price DOUBLE)
,从而得到空值。最好是使用
io.confluent.connect.avro.AvroConverter
(或protobuf,或json模式),因为这样您甚至不必为您的源连接器键入模式CREATE STREAM
,你只是您可以这样指定替代转换器:
但如果必须使用json,请在源连接器中禁用模式:
裁判:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained