我遵循了来自http://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/ 我可以从avro控制台向cassandra插入数据。现在我正在尝试扩展它以使用flume,我在我的机器中设置了flume,它将拾取日志文件并将其推送到kafka,尝试将我的数据插入cassandra数据库。在一个文本文件中,我把数据
{“id”:1,“created”:“2016-05-06 13:53:00”,“product”:“op-dax-p-20150201-95.7”,“price”:94.2}
{“id”:2,“created”:“2016-05-06 13:54:00”,“product”:“op-dax-c-20150201-100”,“price”:99.5}
{“id”:3,“created”:“2016-05-06 13:55:00”,“product”:“fu-datamountaineer-20150201-100”,“price”:10000}
{“id”:4,“created”:“2016-05-06 13:56:00”,“product”:“fu-kospi-c-20150201-100”,“price”:150}
flume正在提取数据并将其推送到kafka。
在CassandraFlume,我面临一个错误,
错误任务cassandra-sink-orders-0引发了未捕获且不可恢复的异常(org.apache.kafka.connect.runtime.workertask:142)org.apache.kafka.connect.errors.dataexception:无法将数据反序列化到avro:at io.confluent.connect.avro.avroconverter.toconnectdata(avroconverter)。java:109)在org.apache.kafka.connect.runtime.workersinktask.convertmessages(workersinktask)。java:346)在org.apache.kafka.connect.runtime.workersinktask.poll(workersinktask。java:226)在org.apache.kafka.connect.runtime.workersinktask.iteration(workersinktask。java:170)在org.apache.kafka.connect.runtime.workersinktask.execute(workersinktask。java:142)在org.apache.kafka.connect.runtime.workertask.dorun(工作任务。java:140)在org.apache.kafka.connect.runtime.workertask.run(workertask。java:175)在java.util.concurrent.executors$runnableadapter.call(executors。java:511)在java.util.concurrent.futuretask.run(futuretask。java:266)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)原因:org.apache.kafka.common.errors.serializationexception:反序列化id-1的avro消息时出错,原因是:org.apache.kafka.common.errors.serializationexception:未知的魔法字节[2016-09-28 15:47:00951]错误任务被终止,在手动重新启动之前不会恢复(org.apache.kafka.connect.runtime.workertask:143)[2016-09-28 15:47:00951]信息停止Cassandra下沉(com.datamountaineer.streamreactor.connect.cassandra.sink.cassandrasinktask:79)[2016-09-28 15:47:00952]关闭cassandra驱动程序会话和群集的信息(com.datamountaineer.streamreactor.connect.cassandra.sink.cassandraj连接sonwriter:165)
我正在使用的架构
./confluent/bin/kafka-avro-console-producer \--broker-list localhost:9092 \--topic orders-topic \--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"}, {"name":"created", "type": "string"}, {"name":"product", "type": "string"}, {"name":"price", "type": "double"}]}'
flume配置:flume-kafka.conf.properties
agent.sources = spoolDirSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink
agent.sources.spoolDirSrc.type = spooldir
agent.sources.spoolDirSrc.spoolDir = eventlogs
agent.sources.spoolDirSrc.inputCharset = UTF-8
agent.sources.spoolDirSrc.deserializer.maxLineLength = 1048576
agent.sources.spoolDirSrc.channels = memoryChannel
agent.sinks.kafkaSink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 1000
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.topic = orders-topic
agent.sinks.kafkaSink.brokerList = localhost:9092
agent.sinks.kafkaSink.channel = memoryChannel
agent.sinks.kafkaSink.batchSize = 20
有人能帮我吗,怎么纠正这个错误?
1条答案
按热度按时间pbossiut1#
一般来说,如果你有一个未知的魔术字节,这意味着你的客户端和服务器版本Kafka是不兼容的。检查以确保您的cassandra sink版本已使用kafka客户端库构建,其版本小于或等于您的代理。