sparkstreaming在创建kafka消费者后挂起

hmtdttj4  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(302)

我试图得到一个非常简单的Kafka+sparkstreaming集成。
在Kafka那边,我克隆了这个存储库(https://github.com/confluentinc/cp-docker-images)做了一个docker组合让zookeeper和kafka运行。我创建了一个名为“foo”的主题并添加了消息。在这种情况下,Kafka在29092端口上运行。
在spark方面,我的build.sbt文件如下所示:

  1. name := "KafkaSpark"
  2. version := "0.1"
  3. scalaVersion := "2.11.12"
  4. val sparkVersion = "2.2.0"
  5. libraryDependencies ++= Seq(
  6. "org.apache.spark" %% "spark-core" % sparkVersion,
  7. "org.apache.spark" %% "spark-sql" % sparkVersion,
  8. "org.apache.spark" %% "spark-streaming" % sparkVersion,
  9. "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion
  10. )

我能够从终端消费数据中运行以下代码段:

  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. object SparkTest {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  6. val ssc = new StreamingContext(conf, Seconds(3))
  7. val lines = ssc.socketTextStream("localhost", 9999)
  8. val words = lines.flatMap(_.split(" "))
  9. val pairs = words.map(word => (word, 1))
  10. val wordCounts = pairs.reduceByKey(_ + _)
  11. // Print the first ten elements of each RDD generated in this DStream to the console
  12. wordCounts.print()
  13. ssc.start() // Start the computation
  14. ssc.awaitTermination() // Wait for the computation to terminate
  15. }
  16. }

所以sparkstreaming在起作用。
现在,我从Kafka那里创建了以下内容:

  1. import org.apache.kafka.common.serialization.StringDeserializer
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.functions.count
  4. import org.apache.spark.streaming.{Seconds, StreamingContext}
  5. import org.apache.spark.streaming.kafka010._
  6. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  7. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  8. import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
  9. object KafkaTest {
  10. def main(args: Array[String]): Unit = {
  11. val spark = SparkSession.builder
  12. .master("local")
  13. .appName("Spark Word Count")
  14. .getOrCreate()
  15. val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
  16. val kafkaParams = Map[String, Object](
  17. "bootstrap.servers" -> "localhost:29092",
  18. "key.deserializer" -> classOf[StringDeserializer],
  19. "value.deserializer" -> classOf[StringDeserializer],
  20. "group.id" -> "stream_group_id",
  21. "auto.offset.reset" -> "latest",
  22. "enable.auto.commit" -> (false: java.lang.Boolean)
  23. )
  24. val topics = Array("foo")
  25. val stream = KafkaUtils.createDirectStream[String, String](
  26. ssc,
  27. PreferConsistent,
  28. Subscribe[String, String](topics, kafkaParams)
  29. )
  30. stream.foreachRDD { (rdd, time) =>
  31. val data = rdd.map(record => record.value)
  32. data.foreach(println)
  33. println(time)
  34. }
  35. ssc.start() // Start the computation
  36. ssc.awaitTermination()
  37. }
  38. }

当它运行时,我在控制台中得到以下内容(我在intellij中运行它)。这个过程就挂在“订阅”主题后的最后一行。我尝试过创建一个不存在的主题,但我得到了相同的结果,也就是说,尽管没有主题存在,但它似乎不会抛出错误。如果我创建了一个不存在的代理,我会得到一个错误(线程“main”org.apache.kafka.common.kafkaexception中的异常:构造kafka使用者失败),所以当我使用正确的端口时,它一定是在查找代理。
对如何纠正这个问题有什么建议吗?
以下是日志文件:

  1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  2. 17/11/23 05:29:42 INFO SparkContext: Running Spark version 2.2.0
  3. 17/11/23 05:29:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  4. 17/11/23 05:29:48 INFO SparkContext: Submitted application: Spark Word Count
  5. 17/11/23 05:29:48 INFO SecurityManager: Changing view acls to: jonathandick
  6. 17/11/23 05:29:48 INFO SecurityManager: Changing modify acls to: jonathandick
  7. 17/11/23 05:29:48 INFO SecurityManager: Changing view acls groups to:
  8. 17/11/23 05:29:48 INFO SecurityManager: Changing modify acls groups to:
  9. 17/11/23 05:29:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jonathandick); groups with view permissions: Set(); users with modify permissions: Set(jonathandick); groups with modify permissions: Set()
  10. 17/11/23 05:29:48 INFO Utils: Successfully started service 'sparkDriver' on port 59606.
  11. 17/11/23 05:29:48 DEBUG SparkEnv: Using serializer: class org.apache.spark.serializer.JavaSerializer
  12. 17/11/23 05:29:48 INFO SparkEnv: Registering MapOutputTracker
  13. 17/11/23 05:29:48 INFO SparkEnv: Registering BlockManagerMaster
  14. 17/11/23 05:29:48 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
  15. 17/11/23 05:29:48 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
  16. 17/11/23 05:29:48 INFO DiskBlockManager: Created local directory at /private/var/folders/w2/njgz3jnd097cdybxcvp9c2hw0000gn/T/blockmgr-3a3feb00-0fdb-4bc5-867d-808ac65d7c8f
  17. 17/11/23 05:29:48 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
  18. 17/11/23 05:29:48 INFO SparkEnv: Registering OutputCommitCoordinator
  19. 17/11/23 05:29:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
  20. 17/11/23 05:29:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
  21. 17/11/23 05:29:49 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
  22. 17/11/23 05:29:49 INFO Utils: Successfully started service 'SparkUI' on port 4043.
  23. 17/11/23 05:29:49 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.67:4043
  24. 17/11/23 05:29:49 INFO Executor: Starting executor ID driver on host localhost
  25. 17/11/23 05:29:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59613.
  26. 17/11/23 05:29:49 INFO NettyBlockTransferService: Server created on 192.168.1.67:59613
  27. 17/11/23 05:29:49 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
  28. 17/11/23 05:29:49 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.1.67, 59613, None)
  29. 17/11/23 05:29:49 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.67:59613 with 2004.6 MB RAM, BlockManagerId(driver, 192.168.1.67, 59613, None)
  30. 17/11/23 05:29:49 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.1.67, 59613, None)
  31. 17/11/23 05:29:49 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.1.67, 59613, None)
  32. 17/11/23 05:29:49 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/jonathandick/IdeaProjects/KafkaSpark/spark-warehouse/').
  33. 17/11/23 05:29:49 INFO SharedState: Warehouse path is 'file:/Users/jonathandick/IdeaProjects/KafkaSpark/spark-warehouse/'.
  34. 17/11/23 05:29:50 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
  35. 17/11/23 05:29:50 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
  36. 17/11/23 05:29:50 WARN KafkaUtils: overriding enable.auto.commit to false for executor
  37. 17/11/23 05:29:50 WARN KafkaUtils: overriding auto.offset.reset to none for executor
  38. 17/11/23 05:29:50 WARN KafkaUtils: overriding executor group.id to spark-executor-stream_group_id
  39. 17/11/23 05:29:50 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
  40. 17/11/23 05:29:50 INFO DirectKafkaInputDStream: Slide time = 3000 ms
  41. 17/11/23 05:29:50 INFO DirectKafkaInputDStream: Storage level = Serialized 1x Replicated
  42. 17/11/23 05:29:50 INFO DirectKafkaInputDStream: Checkpoint interval = null
  43. 17/11/23 05:29:50 INFO DirectKafkaInputDStream: Remember interval = 3000 ms
  44. 17/11/23 05:29:50 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@1a38eb73
  45. 17/11/23 05:29:50 INFO ForEachDStream: Slide time = 3000 ms
  46. 17/11/23 05:29:50 INFO ForEachDStream: Storage level = Serialized 1x Replicated
  47. 17/11/23 05:29:50 INFO ForEachDStream: Checkpoint interval = null
  48. 17/11/23 05:29:50 INFO ForEachDStream: Remember interval = 3000 ms
  49. 17/11/23 05:29:50 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1e801ce2
  50. 17/11/23 05:29:50 INFO ConsumerConfig: ConsumerConfig values:
  51. metric.reporters = []
  52. metadata.max.age.ms = 300000
  53. partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
  54. reconnect.backoff.ms = 50
  55. sasl.kerberos.ticket.renew.window.factor = 0.8
  56. max.partition.fetch.bytes = 1048576
  57. bootstrap.servers = [localhost:29092]
  58. ssl.keystore.type = JKS
  59. enable.auto.commit = false
  60. sasl.mechanism = GSSAPI
  61. interceptor.classes = null
  62. exclude.internal.topics = true
  63. ssl.truststore.password = null
  64. client.id =
  65. ssl.endpoint.identification.algorithm = null
  66. max.poll.records = 2147483647
  67. check.crcs = true
  68. request.timeout.ms = 40000
  69. heartbeat.interval.ms = 3000
  70. auto.commit.interval.ms = 5000
  71. receive.buffer.bytes = 65536
  72. ssl.truststore.type = JKS
  73. ssl.truststore.location = null
  74. ssl.keystore.password = null
  75. fetch.min.bytes = 1
  76. send.buffer.bytes = 131072
  77. value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  78. group.id = stream_group_id
  79. retry.backoff.ms = 100
  80. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  81. sasl.kerberos.service.name = null
  82. sasl.kerberos.ticket.renew.jitter = 0.05
  83. ssl.trustmanager.algorithm = PKIX
  84. ssl.key.password = null
  85. fetch.max.wait.ms = 500
  86. sasl.kerberos.min.time.before.relogin = 60000
  87. connections.max.idle.ms = 540000
  88. session.timeout.ms = 30000
  89. metrics.num.samples = 2
  90. key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  91. ssl.protocol = TLS
  92. ssl.provider = null
  93. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  94. ssl.keystore.location = null
  95. ssl.cipher.suites = null
  96. security.protocol = PLAINTEXT
  97. ssl.keymanager.algorithm = SunX509
  98. metrics.sample.window.ms = 30000
  99. auto.offset.reset = latest
  100. 17/11/23 05:29:50 DEBUG KafkaConsumer: Starting the Kafka consumer
  101. 17/11/23 05:29:50 INFO ConsumerConfig: ConsumerConfig values:
  102. metric.reporters = []
  103. metadata.max.age.ms = 300000
  104. partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
  105. reconnect.backoff.ms = 50
  106. sasl.kerberos.ticket.renew.window.factor = 0.8
  107. max.partition.fetch.bytes = 1048576
  108. bootstrap.servers = [localhost:29092]
  109. ssl.keystore.type = JKS
  110. enable.auto.commit = false
  111. sasl.mechanism = GSSAPI
  112. interceptor.classes = null
  113. exclude.internal.topics = true
  114. ssl.truststore.password = null
  115. client.id = consumer-1
  116. ssl.endpoint.identification.algorithm = null
  117. max.poll.records = 2147483647
  118. check.crcs = true
  119. request.timeout.ms = 40000
  120. heartbeat.interval.ms = 3000
  121. auto.commit.interval.ms = 5000
  122. receive.buffer.bytes = 65536
  123. ssl.truststore.type = JKS
  124. ssl.truststore.location = null
  125. ssl.keystore.password = null
  126. fetch.min.bytes = 1
  127. send.buffer.bytes = 131072
  128. value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  129. group.id = stream_group_id
  130. retry.backoff.ms = 100
  131. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  132. sasl.kerberos.service.name = null
  133. sasl.kerberos.ticket.renew.jitter = 0.05
  134. ssl.trustmanager.algorithm = PKIX
  135. ssl.key.password = null
  136. fetch.max.wait.ms = 500
  137. sasl.kerberos.min.time.before.relogin = 60000
  138. connections.max.idle.ms = 540000
  139. session.timeout.ms = 30000
  140. metrics.num.samples = 2
  141. key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  142. ssl.protocol = TLS
  143. ssl.provider = null
  144. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  145. ssl.keystore.location = null
  146. ssl.cipher.suites = null
  147. security.protocol = PLAINTEXT
  148. ssl.keymanager.algorithm = SunX509
  149. metrics.sample.window.ms = 30000
  150. auto.offset.reset = latest
  151. 17/11/23 05:29:50 INFO AppInfoParser: Kafka version : 0.10.0.1
  152. 17/11/23 05:29:50 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
  153. 17/11/23 05:29:50 DEBUG KafkaConsumer: Kafka consumer created
  154. 17/11/23 05:29:50 DEBUG KafkaConsumer: Subscribed to topic(s): foo

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题