如何与spark streaming数据流并行接收Kafka主题的多条消息

jdg4fx2g  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(331)

我使用了一个mysql-cdc(changedatacapturing)系统,它可以捕获插入mysql表中的新记录并发送给kafka,每个表有一个主题。然后在spark streaming中,我希望与spark streaming dstream api并行接收kafka主题的多个消息,以便进一步处理来自这些mysql表的这些变化数据。
cdc的设置很好,kafka-consume-topic.sh测试所有表的消息都正常接收。但是在spark流中,我只能收到1个表的消息。但是如果在应用程序中为所有表的测试逐个创建了一个主题/流,则所有表都被单独测试为可以被spark streaming读取。我在spark github项目中搜索了很长时间,寻找相关的问题、文章和示例,没有找到解决方案。联合非直接流的例子有很多,但是那些spark流api很旧,我不太愿意采用它们,我怀疑以后可能会有很多重新发明的工作要做。
以下是我的代码:

  1. package com.fm.data.fmtrade
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.Seconds
  4. import org.apache.spark.streaming.StreamingContext
  5. import org.apache.spark.streaming.kafka010.KafkaUtils
  6. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  7. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  8. import org.apache.kafka.common.serialization.StringDeserializer
  9. import org.apache.spark.sql.SparkSession
  10. object TestKafkaSparkStreaming {
  11. def main(args: Array[String]): Unit = {
  12. var conf=new SparkConf()//.setMaster("spark://192.168.177.120:7077")
  13. .setAppName("SparkStreamKaflaWordCount Demo")
  14. .set("spark.streaming.concurrentJobs", "8")
  15. val ss = SparkSession
  16. .builder()
  17. .config(conf)
  18. .appName(args.mkString(" "))
  19. .getOrCreate()
  20. val topicsArr: Array[String] = Array(
  21. "betadbserver1.copytrading.t_trades",
  22. "betadbserver1.copytrading.t_users",
  23. "betadbserver1.account.s_follower",
  24. "betadbserver1.copytrading.t_followorder",
  25. "betadbserver1.copytrading.t_follow",
  26. "betadbserver1.copytrading.t_activefollow",
  27. "betadbserver1.account.users",
  28. "betadbserver1.account.user_accounts"
  29. )
  30. var group="con-consumer-group111" + (new util.Random).nextInt(10000)
  31. val kafkaParam = Map(
  32. "bootstrap.servers" -> "beta-hbase02:9092,beta-hbase03:9092,beta-hbase04:9092",
  33. "key.deserializer" -> classOf[StringDeserializer],
  34. "value.deserializer" -> classOf[StringDeserializer],
  35. "group.id" -> group,
  36. "auto.offset.reset" -> "earliest",//"latest",
  37. "enable.auto.commit" -> (false: java.lang.Boolean)
  38. )
  39. val ssc = new StreamingContext(ss.sparkContext, Seconds(4))
  40. //val streams =
  41. topicsArr.foreach{//.slice(0,1)
  42. topic =>
  43. val newTopicsArr=Array(topic)
  44. val stream=
  45. KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent, Subscribe[String,String](newTopicsArr,kafkaParam))
  46. stream.map(s =>(s.key(),s.value())).print();
  47. }
  48. /*
  49. val unifiedStream = ssc.union(streams)
  50. unifiedStream.repartition(2)
  51. unifiedStream.map(s =>(s.key(),s.value())).print()
  52. */
  53. /*
  54. unifiedStream.foreachRDD{ rdd =>
  55. rdd.foreachPartition{ partitionOfRecords =>
  56. partitionOfRecords.foreach{ record =>
  57. }
  58. }
  59. }
  60. */
  61. ssc.start();
  62. ssc.awaitTermination();
  63. }
  64. }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题