KafkaMap消息(使用java spark)

xcitsw88  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(278)

我有一个包含json的kafka主题,例如:

{"jsonCode":"1234", "jsonData":{.....}}
{"jsonCode":"1234", "jsonData":{.....}}
{"jsonCode":"1235", "jsonData":{.....}}
{"jsonCode":"1235", "jsonData":{.....}}
{"jsonCode":"1236", "jsonData":{.....}}

我的问题是,是否可以在read-from主题期间创建以下哈希Map:

["1234", [list of jsonCode 1234 jsons]
["1235", [list of jsonCode 1235 jsons]
["1236", [list of jsonCode 1236 jsons]

有可能吗?如何进行Map?
我想使用sparkstreaming阅读kafka的文章,获取所有关于主题的未读消息并创建散列图
谢谢。

yqyhoc1h

yqyhoc1h1#

您的代码中是否有任何使用者配置设置。使用者配置通常需要键和值对。
检查在阅读主题时,您能够以键值对的形式读取值。通常,您的消费者应该是这样的:

final Consumer<yourKey,yourValue> consumer;  //consumer with consumer config
final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollvals);
   consumerRecords.forEach(record -> {
                System.out.printf("[Consumer Record:(key - %s,value- %s,partition- %d, offset %d)]\n", record.key(),
                        record.value(), record.partition(), record.offset());

                //parse your json from either key or from value
                String value=null;
                 .....     
               value = jsonparser(record.value()); // lets parse from value.
                 ...

相关问题