我开发了一个kafka连接器,它只是为从外部api检索的文件中的每一行创建消息。它工作得很好,但是现在我尝试使用消息,在每个值的开头有两个奇怪的字节。我可以用控制台使用者和kafka流处理器重现这个问题。
�168410002,OpenX Market,459980962,OpenX_Bidder_Order_merkur_bidder_800x250,313115722,OpenX_Bidder_ANY_LI_merkur_800x250_550,106800839362,OpenX_Bidder_Creative_merkur_800x250_2,10
源文件很好,甚至在创建sourcerecord之前println也不会显示这两个字节。我以前用过一个带一个字段的结构,现在用的是一个简单的字符串模式,但我仍然有同样的问题:
def convert(line: String, ...) = {
...
val record = new SourceRecord(
Partition.sole(partition),
offset.forConnectApi,
topic,
Schema.STRING_SCHEMA,
line
)
...
所以在上面的代码中,如果我添加println(line),就不会显示奇怪的字符。
1条答案
按热度按时间tpgth1q71#
看起来您在连接器中使用了avroconverter或jsonconverter。尝试在key.converter中使用kafka附带的stringconverter,在worker for connect中使用value.converter。它将数据编码为字符串,不应该包含这些额外的内容。