我正在使用properties/json文件配置我的连接器,我试图添加一个包含kafka时间戳的timestamp列,当它从源连接器读取消息时没有任何成功。
我试图补充 transforms
,但它总是空的,我的接收器连接器“大查询”返回一个错误
未能更新表架构
我确实在bigquery连接器属性中放置了这些配置
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
我的源配置sap连接器
{
"name": "sap",
"config": {
"connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
"tasks.max": "10",
"topics": "mytopic",
"connection.url": "jdbc:sap://IP:30015/",
"connection.user": "user",
"connection.password": "pass",
"group.id":"589f5ff5-1c43-46f4-bdd3-66884d61m185",
"mytopic.table.name": "\"schema\".\"mytable\""
}
}
我的Flume连接器bigquery
name=bigconnect
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
tasks.max=1
sanitizeTopics=true
autoCreateTables=true
autoUpdateSchemas=true
schemaRetriever=com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
schemaRegistryLocation=http://localhost:8081
bufferSize=100000
maxWriteSize=10000
tableWriteWait=1000
project=kafka-test-217517
topics=mytopic
datasets=.*=sap_dataset
keyfile=/opt/bgaccess.json
transforms=InsertField
transforms.InsertField.timestamp.field=fieldtime
transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
2条答案
按热度按时间euoag5mw1#
旧答案我想我已经理解了背后的问题
首先,您不能在任何源连接器中使用transform insertfield,因为msg的时间戳值是在写入主题时分配的,所以连接器不能知道它,
对于jdbc连接器,有一个问题https://github.com/confluentinc/kafka-connect-jdbc/issues/311
在sap中,源连接器也不能正常工作。
第二个bigquery连接器有一个bug,它不允许使用insertfield将时间戳添加到这里提到的每个表中
https://github.com/wepay/kafka-connect-bigquery/issues/125#issuecomment-439102994
因此,如果您想使用bigquery作为输出,目前唯一的解决方案是在加载cink连接器之前手动编辑每个表的模式以添加列
更新2018-12-03最终解决方案,以便始终在接收器连接器中添加消息时间戳。假设您希望将时间戳添加到sink connector的每个表中
在源连接器中放入此配置
这将为每个源表添加一个名为“fieldtime”的列名
在你的Flume里放上这些配置
这实际上会删除fieldtime列,并使用消息的时间戳再次添加它
此解决方案将自动添加具有正确值的列,而无需任何加法操作
bmp9r5qi2#
我猜你的错误来自bigquery,而不是kafka connect。
例如,在独立模式下启动connect console consumer,您将看到如下消息
Struct{...,fieldtime=Fri Nov 16 07:38:19 UTC 2018}
测试connect-standalone ./connect-standalone.properties ./connect-console-sink.properties
我有一个关于avro数据的输入主题。。。相应地更新您自己的设置connect-standalone.properties属性
connect-console-sink.properties连接