将Kafka流数据从c#库摄取到Druid

rn0zuynd  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(281)

在过去的几天里,我一直试图把Kafka的一些数据吸收到Druid身上,直到现在我都失败了。
我正在用confluent的c#库创建一些基本数据,如下所示:

using (var p = new ProducerBuilder<string, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync("Test", new Message<string, string> { Key = "Random", Value = i.ToString(), Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime) });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                p.Flush(TimeSpan.FromSeconds(10));

            }

这就成功地添加了数据。但是当我试图把数据输入到Druid时,任务创建之后什么也没有发生,它告诉我,数据是不可解析的。
这是我的摄入规范:

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "Test",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "time",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [
            {
              "name": "Random",
              "type": "string"
            }
          ]
        }
      }
    },
    "metricsSpec": [],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": {
        "type": "none"
      },
      "rollup": true,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": false,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": false,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false
  },
  "ioConfig": {
    "topic": "Test",
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT600S",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": false,
    "completionTimeout": "PT1200S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "stream": "Test",
    "useEarliestSequenceNumber": false
  },
  "context": null,
  "suspended": false
}

我希望,有人能在那里发现错误。
提前谢谢!
编辑:
由摄取任务创建的日志

{
  "0": {
    "index_kafka_Test_4b0ecd3ab842a29_bebmfiod": {
      "movingAverages": {
        "buildSegments": {
          "5m": {
            "processed": 0,
            "unparseable": 1,
            "thrownAway": 0,
            "processedWithError": 0
          },
          "15m": {
            "processed": 0,
            "unparseable": 1,
            "thrownAway": 0,
            "processedWithError": 0
          },
          "1m": {
            "processed": 0,
            "unparseable": 1,
            "thrownAway": 0,
            "processedWithError": 0
          }
        }
      },
      "totals": {
        "buildSegments": {
          "processed": 0,
          "processedWithError": 0,
          "thrownAway": 0,
          "unparseable": 8
        }
      }
    }
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题