由于业务需要,我需要使用druid,但是druid不能将mysql数据加载到其中,所以首先使用confluent(kafka connect jdbc)将mysql数据加载到kafka中,最后将数据加载到druid中。演示url:(https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/)Kafka资料:
x@dataserv:~/confluent-4.1.1$ bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --property print.key=true --from-beginning --topic mysql_foobar
null {"c1":{"int":1},"c2":{"string":"foo"},"create_ts":1530778230000,"update_ts":1530778230000}
null {"c1":{"int":2},"c2":{"string":"foo"},"create_ts":1530778309000,"update_ts":1530778309000}
null {"c1":{"int":3},"c2":{"string":"foo1"},"create_ts":1530778675000,"update_ts":1530778675000}
然后使用tanquity-distribution-0.8.2将kafka数据加载到druid中,但是失败了,多次尝试并修改json文件(te最简单的方法,没有做任何聚合操作),仍然没有成功。日志:
2018-07-05 08:18:04,679 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:05,427 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:05,441 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:05,555 [KafkaConsumer-0] INFO c.metamx.emitter.core.LoggingEmitter - Start: started [true]
2018-07-05 08:18:10,491 [KafkaConsumer-0] WARN io.druid.segment.indexing.DataSchema - No metricsSpec has been specified. Are you sure this is what you want?
2018-07-05 08:18:10,635 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=1, sentCount=0, droppedCount=0, unparseableCount=1}} pending messages in 5ms and committed offsets in 25ms.
2018-07-05 08:18:25,637 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
2018-07-05 08:18:40,639 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
2018-07-05 08:18:55,640 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 1ms.
2018-07-05 08:19:10,642 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
2018-07-05 08:19:25,644 [KafkaConsumer-CommitThread] INFO c.m.tranquility.kafka.KafkaConsumer - Flushed {mysql_foobar={receivedCount=0, sentCount=0, droppedCount=0, unparseableCount=0}} pending messages in 0ms and committed offsets in 0ms.
你能帮我解决这个问题吗?我解决不了,还要学习两个星期。
version:druid-0.12.1 json 文件:
{
"dataSources" : {
"kafka" : {
"spec" : {
"dataSchema" : {
"dataSource" : "kafka",
"parser" : {
"type" : "string",
"parseSpec" : {
"timestampSpec" : {
"column" : "update_ts",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions" : ["c1","c2","create_ts"],
"dimensionExclusions" : [ ]
},
"format" : "json"
}
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "hour",
"queryGranularity" : "none"
},
"metricsSpec" : []
},
"ioConfig" : {
"type" : "realtime"
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT1M",
"windowPeriod" : "PT2M"
}
},
"properties" : {
"task.partitions" : "1",
"task.replicants" : "1",
"topicPattern" : "mysql_foobar"
}
}
},
"properties" : {
"zookeeper.connect" : "192.168.6.231:2181",
"druid.discovery.curator.path" : "/druid/discovery",
"druid.selectors.indexing.serviceName" : "druid/overlord",
"commit.periodMillis" : "15000",
"consumer.numThreads" : "2",
"kafka.zookeeper.connect" : "192.168.6.231:2182",
"kafka.group.id" : "tranquility-kafka"
}
}
暂无答案!
目前还没有任何答案,快来回答吧!