在过去的几天里,我一直试图把Kafka的一些数据吸收到Druid身上,直到现在我都失败了。
我正在用confluent的c#库创建一些基本数据,如下所示:
using (var p = new ProducerBuilder<string, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync("Test", new Message<string, string> { Key = "Random", Value = i.ToString(), Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime) });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
p.Flush(TimeSpan.FromSeconds(10));
}
这就成功地添加了数据。但是当我试图把数据输入到Druid时,任务创建之后什么也没有发生,它告诉我,数据是不可解析的。
这是我的摄入规范:
{
"type": "kafka",
"dataSchema": {
"dataSource": "Test",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
{
"name": "Random",
"type": "string"
}
]
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": true,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false
},
"ioConfig": {
"topic": "Test",
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT600S",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1200S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"stream": "Test",
"useEarliestSequenceNumber": false
},
"context": null,
"suspended": false
}
我希望,有人能在那里发现错误。
提前谢谢!
编辑:
由摄取任务创建的日志
{
"0": {
"index_kafka_Test_4b0ecd3ab842a29_bebmfiod": {
"movingAverages": {
"buildSegments": {
"5m": {
"processed": 0,
"unparseable": 1,
"thrownAway": 0,
"processedWithError": 0
},
"15m": {
"processed": 0,
"unparseable": 1,
"thrownAway": 0,
"processedWithError": 0
},
"1m": {
"processed": 0,
"unparseable": 1,
"thrownAway": 0,
"processedWithError": 0
}
}
},
"totals": {
"buildSegments": {
"processed": 0,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 8
}
}
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!