我的传入Kafka消息在Kafka头中包含Auditlog。如何将Header值保存到我的MongoDB接收器?注意事项:1.不应使用HeaderToField SMT,因为我们的连接器没有该SMT。1.无新SMT
bjg7j2ky1#
下面的解决方案为我工作CREATE OR REPLACE STREAM streamName(topicname BYTES HEADER(''__connect.errors.topic'''),errormessage BYTES HEADER(''__connect.errors.exception.message'''),exception BYTES HEADER(''__connect.errors.exception.stacktrace''')WITH(Kafka_TOPIC =''topicname'',VALUE_FORMAT =''JSON''');
1条答案
按热度按时间bjg7j2ky1#
下面的解决方案为我工作
CREATE OR REPLACE STREAM streamName(topicname BYTES HEADER(''__connect.errors.topic'''),errormessage BYTES HEADER(''__connect.errors.exception.message'''),exception BYTES HEADER(''__connect.errors.exception.stacktrace''')WITH(Kafka_TOPIC =''topicname'',VALUE_FORMAT =''JSON''');