Kafka主题无输出:spark结构化流媒体和Kafka集成

nwsw7zdq  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(406)

我正在尝试使用kafka接收器将apache spark 2.3.1的流输出发送到apache kafka:

  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.sql.types.StructType
  3. import org.apache.spark.sql.functions._
  4. import org.apache.spark.sql.functions.udf
  5. import org.apache.kafka.clients
  6. import org.apache.spark.streaming
  7. import java.sql.Timestamp
  8. import java.util.Properties
  9. object CQ3D {
  10. def main(args: Array[String]) {
  11. val spark = SparkSession
  12. .builder
  13. .appName("test")
  14. .getOrCreate()
  15. val predictionStreamSchema = new StructType()
  16. .add("production_id", "long")
  17. .add("type", "string")
  18. val lines = spark
  19. .readStream
  20. .option("sep", ",")
  21. .schema(testSchema)
  22. .csv("/path/to/directory/")
  23. val query = lines.selectExpr("CAST(production_id AS STRING) AS key", "type AS value").writeStream
  24. .format("kafka")
  25. .option("kafka.bootstrap.servers", "localhost:9092")
  26. .option("topic", "test")
  27. .option("checkpointLocation", "/local/directory")
  28. .outputMode("complete")
  29. .start()
  30. query.awaitTermination()

我的build.sbt文件如下所示:

  1. name := "CQ3D"
  2. version := "0.1"
  3. scalaVersion := "2.11.8"
  4. val sparkVersion = "2.3.1"
  5. libraryDependencies ++= Seq(
  6. "org.apache.spark" %% "spark-sql" % sparkVersion,
  7. "org.apache.spark" %% "spark-streaming" % sparkVersion,
  8. "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
  9. )
  10. libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion

我的代码通过控制台接收器提供了正确的输出,但是在使用kafka接收器时,没有生成输出或将输出发送到kafka主题。我的KafkaZookeeper和Kafka服务器在同一台机器上运行。控制台消息如下:

  1. ./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class CQ3D --master local[4] /home/salman/Development
  2. /SparkStreaming/Scala/target/scala-2.11/cq3d_2.11-0.1.jar
  3. Ivy Default Cache set to: /home/salman/.ivy2/cache
  4. The jars for the packages stored in: /home/salman/.ivy2/jars
  5. :: loading settings :: url = jar:file:/home/salman/spark-2.3.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
  6. org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
  7. :: resolving dependencies :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888;1.0
  8. confs: [default]
  9. found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 in spark-list
  10. found org.apache.kafka#kafka-clients;0.10.0.1 in spark-list
  11. found net.jpountz.lz4#lz4;1.3.0 in spark-list
  12. found org.xerial.snappy#snappy-java;1.1.2.6 in spark-list
  13. found org.slf4j#slf4j-api;1.7.21 in central
  14. found org.spark-project.spark#unused;1.0.0 in spark-list
  15. :: resolution report :: resolve 247ms :: artifacts dl 4ms
  16. :: modules in use:
  17. net.jpountz.lz4#lz4;1.3.0 from spark-list in [default]
  18. org.apache.kafka#kafka-clients;0.10.0.1 from spark-list in [default]
  19. org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 from spark-list in [default]
  20. org.slf4j#slf4j-api;1.7.21 from central in [default]
  21. org.spark-project.spark#unused;1.0.0 from spark-list in [default]
  22. org.xerial.snappy#snappy-java;1.1.2.6 from spark-list in [default]
  23. ---------------------------------------------------------------------
  24. | | modules || artifacts |
  25. | conf | number| search|dwnlded|evicted|| number|dwnlded|
  26. ---------------------------------------------------------------------
  27. | default | 6 | 0 | 0 | 0 || 6 | 0 |
  28. ---------------------------------------------------------------------
  29. :: retrieving :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888
  30. confs: [default]
  31. 0 artifacts copied, 6 already retrieved (0kB/5ms)
  32. 2018-09-14 20:14:58 WARN Utils:66 - Your hostname, salman-ubuntu-desktop resolves to a loopback address: 127.0.1.1; using 150.82.219.122 instead (on interface enp4s0)
  33. 2018-09-14 20:14:58 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
  34. 2018-09-14 20:14:59 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  35. 2018-09-14 20:14:59 INFO SparkContext:54 - Running Spark version 2.3.1
  36. 2018-09-14 20:14:59 INFO SparkContext:54 - Submitted application: CQ3D
  37. 2018-09-14 20:14:59 INFO SecurityManager:54 - Changing view acls to: salman
  38. 2018-09-14 20:14:59 INFO SecurityManager:54 - Changing modify acls to: salman
  39. 2018-09-14 20:14:59 INFO SecurityManager:54 - Changing view acls groups to:
  40. 2018-09-14 20:14:59 INFO SecurityManager:54 - Changing modify acls groups to:
  41. 2018-09-14 20:14:59 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(salman); groups with view permissions: Set(); users with modify permissions: Set(salman); groups with modify permissions: Set()
  42. 2018-09-14 20:14:59 INFO Utils:54 - Successfully started service 'sparkDriver' on port 36805.

我是否使用了正确的导入和/或库依赖项?
有时在编译时,我会收到以下警告:

  1. [warn] There may be incompatibilities among your library dependencies.
  2. [warn] Run 'evicted' to see detailed eviction warnings

但是,代码仍然使用“sbt包”进行编译。当我使用下面的代码执行时,我在kafka主题中没有得到任何输出?

  1. ./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class testClass --master local[4] /home/user/Dev/Scala/target/scala-2.11/testClass_2.11-0.1.jar
23c0lvtd

23c0lvtd1#

在spark文档中,它提到对于来自本地文件系统的spark流,文件必须以原子方式移动到源文件夹中。可能有读取现有文件的配置,但我想不起来了。
在评论中,我提到了kafka connect,它是一个内置的框架,用于将数据传输到kafka,您只需要构建链接的项目并运行kafka connect。
否则,如果您已经在使用hadoop,我建议其他人使用flume,如果您使用elasticsearch将文件放入kafka,则使用filebeat/fluentd。基本上,对于这样一个从本地文件系统读取的简单程序来说,spark开销太大,并且不需要任何并行性来读取每个文件

相关问题