spark在阅读kafka时丢失了99.9%的消息

qfe3c7zg  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(382)

热释光;博士:我的spark应用程序正在吸收Kafka发送的0.1%的MSG。我的主要怀疑是:对于每个批处理间隔(在这个示例中是1秒),新的jvm被示例化。我正在尝试使用延迟加载的.map()转换来接收数据。驱动程序和执行程序代码相互穷举是否可能与此有关?
详细说明的长版本:
我的事件流如下所示:一个java类生成示例(.json作为字符串)数据并使用kafka的kafka-run-class.sh脚本运行。这些消息是在kafka中收集的,spark使用javadirectstream从中读取它们。为了简洁起见,让我们假设我的数据生产者发送的json消息的值可以是1或0&spark应用程序的目的是区分1和0。数据生成器还向发送的消息追加一个计数值。
问题:在实验中,我从数据生成器发送10000个msg。i'v一个kibana Jmeter 盘显示了9600 msgs的数据(+-0.1%,但始终可以看到)
问题1。剩下的400毫克在哪里丢失了?
现在spark(批处理间隔为1秒,在一个线程上运行)读取这些msg,并在另一个可视化环境下将其输出i'v输入同一个kibana。
它读10(有时20)msgs一致。
如果读取10毫秒,则计数值为1-10;如果读取20毫秒,则计数值为1-10&~3000-3010
问题2。为什么spark只能得到10(或最多20)msgs?
我在spark应用程序中更改了“auto.offset.reset”、“smallest”的设置,但这并没有真正起到作用。它只是从计数1到10读取10毫秒。
问题3。从Kafka的主题开始读起来,需要做些什么?
我能想到的一件事是,我在.map函数中摄取msgs:

JavaDStream<String> lines = stream.map(new Function<Tuple2<String, String>, String>() {
  public String call(Tuple2<String, String> tuple2) {
  my_fn(tuple2._2().toString());
  return tuple2._2();
}

有没有人能对窗口进行一些照明,减少功能&这与spark看到0.1%的msg有什么关系?
注意:我使用logstash示例从spark->kafka,kafka->elastic迁移数据
我需要在数据生成器脚本中操作json obj。我使用maven,使用json依赖,构建得很好。但是在尝试使用kafka-run-class.sh运行类时,它抛出了json对象的classnotfoundexception。
第四季度。如何使用kafka的run class方法运行着色jar,或者它需要作为独立的java程序运行,在这种情况下,它会以与使用kafka的开箱即用脚本运行时相同的速率发出msg,因为我认为它会考虑并行化和排队以保持背压。
使用这个Kafka脚本,我可以在我的机器上输出1.4mps。
编辑:更多关于KafkaSpark分区和代码逻辑的信息
图例:主题1是来自制作者(脚本)的数据->Kafka主题2是来自spark->Kafka的数据

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_1 
          Topic:topic_1 PartitionCount:1    ReplicationFactor:1 Configs: 
          Topic: topic_1 Partition: 0   Leader: 0   Replicas: 0 Isr: 0 

curl 'localhost:9200/_cat/indices?v'
health status index          pri rep docs.count docs.deleted store.size pri.store.size 
yellow open   topic_1     5   1   11002386            0    702.1mb        702.1mb 
yellow open   topic_2     5   1       6307            0    786.4kb        786.4kb 
yellow open   .kibana         1   1          9            0       47kb        47kb

spark,我在4核1线程上运行1个带4gb驱动程序和执行程序内存的1个执行程序和1个驱动程序,以1秒的批处理间隔对10m+MSG进行实验,主题1收到了几乎正确的数字(9600/10000 q1以上),同时主题2只能收到约6k关于分区的消息,一切都是在一个独立的模式在同一台机器上与32gb内存和4核。我的意思是,数据生产者,Kafka,麋鹿,Spark。
data\u gen:简单的javakafka生产者代码,用于以1:1的比率发送值为0或1的json字符串。Spark应用程序:代码在主qn。myfn()获取字符串msg,将其转换为json&查看值是0还是1

fumotvh3

fumotvh31#

map() transformations are lazy so they don’t like to work until asked for.
I was expecting messages in a map() transformation.
put this expectation in rdd.take() which in non-lazy inside foreachRDD()
It worked.

相关问题