noclassdeffounderror:org/apache/spark/sq/sources/v2/streamingwritesupportprovider试图从scala中的kafka主题中提取

piok6c0g  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(555)

我用的是 spark-shell 示例来测试从客户机的kafka源中提取数据。要启动示例,我使用以下命令 spark-shell --jars spark-sql-kafka-0-10_2.11-2.5.0-palantir.8.jar, kafka_2.12-2.5.0.jar, kafka-clients-2.5.0.jar (所有jar都在工作目录中)。
但是,当我运行命令 val df = spark.read.format("kafka")........... 几秒钟后,它崩溃了,如下所示:

java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/StreamingWriteSupportProvider
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
  at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
  at java.net.URLClassLoader.defineClass(URLClassLoader.java:455)
  at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:367)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:344)
  at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
  at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
  at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
  at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
  at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
  at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:533)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
  at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:304)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
  ... 48 elided
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
  at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 79 more

但是,如果我将sparkshell命令中jar的顺序改为 spark-shell --jars kafka_2.12-2.5.0.jar, kafka-clients-2.5.0.jar, spark-sql-kafka-0-10_2.11-2.5.0-palantir.8.jar ,而不是崩溃:

java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:376)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateBatchOptions(KafkaSourceProvider.scala:330)
  at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:113)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
  ... 48 elided
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArrayDeserializer
  at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 55 more

我正在开发一个非常严格的代理,由我们的客户管理,无法使用 --packages 相反,我在这里有点不知所措,我不能在shell启动时加载所有3个依赖项吗?我是不是又错过了一步?

9ceoxa92

9ceoxa921#

《结构化流媒体+Kafka集成指南》中写道:
为了在sparkshell上进行实验,您需要在调用sparkshell时添加上述库及其依赖项。
您正在使用的库似乎是定制的,在maven中央存储库中不公开。这意味着,我不能研究它的依赖关系。
不过,看看最新的稳定版本 2.4.5 根据maven central repository,依赖项是 kafka-clients 版本 2.0.0 .

9bfwbjaz

9bfwbjaz2#

在用户应用程序和spark本身都依赖于同一个库的情况下,处理依赖性冲突是一个偶尔出现的破坏性问题。这种情况很少出现,但一旦出现,可能会让用户感到烦恼。通常,当在执行spark作业期间引发nosuchmethoderror、classnotfoundexception或其他与类加载相关的jvm异常时,这将显示自己。这个问题有两种解决办法。第一种方法是修改应用程序,使其与spark使用的第三方库版本相同。第二种方法是使用一个通常被称为“着色”的过程来修改应用程序的打包。maven构建工具通过示例7-5中所示的插件的高级配置来支持着色(事实上,着色功能是插件名为maven shade plugin的原因)。着色允许您在不同的命名空间下创建冲突包的第二个副本,并重写应用程序的代码以使用重命名的版本。这种有点暴力的技术在解决运行时依赖冲突时非常有效。有关如何隐藏依赖项的具体说明,请参阅构建工具的文档。
我想知道spark shell的scala版本,因为它可能是scala版本的问题

scala> util.Properties.versionString
res3: String = version 2.11.8

如果没有,请检查您使用的spark版本和作为依赖项使用的第三方库版本,因为我确信您的spark版本不支持最新或最旧的版本。
希望对你有帮助。

vc9ivgsu

vc9ivgsu3#

您正在尝试导入多个scala版本 2.11 & 2.12 不同的图书馆。
请添加相同版本的scala库并检查下面如何导入到 spark-shell .

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,org.apache.kafka:kafka_2.11:2.4.1,org.apache.kafka:kafka-clients:2.4.1

相关问题