来自嵌套json的datastax apache kafka连接器udt字段问题

gv8xihay  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(511)

无法读取json主题以写入具有udt类型列的cassandra表(在我的主题和表中有很多列,所以我使用了udt类型的列)get below warning和also flatten transform没有帮助,也没有转换。正在删除 value.call.onnet 从Map连接器工作。

  1. [2020-12-08 00:17:12,415] WARN Error decoding/mapping Kafka record SinkRecord{kafkaOffset=358, timestampType=CreateTime} ConnectRecord{topic='account_usage', kafkaPartition=0, key={"number":943821834,"usageDate":20201108}, keySchema=Schema{STRING}, value={"startTime":20201108001019,"endTime":20201108160902,"count":142,"call": { "onnet": { "volume": 3, "unit": "SECOND", "amount": 12.5 }, "offnet": { "volume": 1, "unit": "SECOND", "amount": 2.0 } }, "message": { "roaming": { "volume": 1, "unit": "MSG", "amount": 1.5 }, "local": { "volume": 12, "unit": "MSG", "amount": 3.0 } }, valueSchema=Schema{STRING}, timestamp=1607370857363, headers=ConnectHeaders(headers=)}: Required field 'value.call.onnet' (mapped to column onnet) was missing from record (or may refer to an invalid function). Please remove it from the mapping. (com.datastax.oss.kafka.sink.CassandraSinkTask)

Kafka主题示例-帐户使用

钥匙

  1. {
  2. "number": 943821834,
  3. "usageDate": 20201108
  4. }

价值

  1. {
  2. "startTime": 20201108001019,
  3. "endTime": 20201108160902,
  4. "count": 142,
  5. "call": {
  6. "onnet": {
  7. "volume": 3,
  8. "unit": "SECOND",
  9. "amount": 12.5
  10. },
  11. "offnet": {
  12. "volume": 1,
  13. "unit": "SECOND",
  14. "amount": 2.0
  15. }
  16. },
  17. "message": {
  18. "roaming": {
  19. "volume": 1,
  20. "unit": "MSG",
  21. "amount": 1.5
  22. },
  23. "local": {
  24. "volume": 12,
  25. "unit": "MSG",
  26. "amount": 3.0
  27. }
  28. }
  29. }

cassandra udt(用户定义类型)定义

  1. CREATE TYPE ks_usage.usage_type (
  2. volume bigint,
  3. amount decimal
  4. );

Cassandra表定义

  1. CREATE TABLE ks_usage.usage_call
  2. (
  3. number bigint,
  4. usage_date int,
  5. entry_date timeuuid,
  6. onnet usage_type,
  7. offnet usage_type,
  8. primary key (number, usage_date)
  9. )
  10. WITH CLUSTERING ORDER BY (usage_date DESC)

连接器Map

  1. POST /connectors HTTP/1.1
  2. Host: localhost:8083
  3. Content-Type: application/json
  4. Content-Length: 568
  5. {
  6. "name": "usage-sink",
  7. "config": {
  8. "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
  9. "tasks.max": "1",
  10. "topics": "account_usage",
  11. "contactPoints": "10.0.153.27",
  12. "loadBalancing.localDc": "datacenter1",
  13. "port": 9042,
  14. "auth.provider": "PLAIN",
  15. "auth.username": "testusr",
  16. "auth.password": "test",
  17. "topic.account_usage.ks_usage.usage_call.mapping": "number=key.number, usage_date=key.usageDate, entry_date=now(), onnet=value.call.onnet, offnet=value.call.offnet"
  18. }
  19. }

连接器配置-connect-distributed.properties

  1. key.converter=org.apache.kafka.connect.json.JsonConverter
  2. value.converter=org.apache.kafka.connect.json.JsonConverter
  3. # key.converter=org.apache.kafka.connect.storage.StringConverter
  4. # value.converter=org.apache.kafka.connect.storage.StringConverter
  5. key.converter.schemas.enable=false
  6. value.converter.schemas.enable=false
  1. transforms=flatten
  2. transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
  3. transforms.flatten.delimiter=.
bvjveswy

bvjveswy1#

谢谢你的详细描述。我在调试你的问题,似乎连接不检查(传播)内部字段。发送到kafka连接器的记录包含以下字段:

  1. value.count, key.usageDate, value.endTime, value.startTime, value.call, value.message, key.number

你可能会注意到 call 场,但没有像 onnet 或者 offnet . 由于这个事实,没有什么可以扁平化的。这些字段不存在,因此它们不能被展平。为了解决您的问题,您可以考虑:
移动两者 onnet 和离线 one level higher and removing the call value. If you do that, the record will contain 值.onnet and 价值.offnet . You will be able to use the ks\用法。用法\类型。 创建call包含两者的udtonnet以及offnet. 这样,您将拥有一个包含所有call价值观。接下来,在Map中,您可以onnet=value.call` .

相关问题