为了开始Kafka的项目,我在这个地址中遵循了databricks的指示:
结构化流媒体+kafka集成指南(kafka broker版本0.10.0或更高版本
代码:
# coding: utf-8
import sys
import os,time
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from pyspark.sql import SparkSession,Row
from pyspark import SparkContext,SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import pyspark.sql.functions
import json
spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
trainingSchema = StructType([
StructField("code",StringType(),True),
StructField("ean",StringType(),True),
StructField("name",StringType(),True),
StructField("description",StringType(),True),
StructField("category",StringType(),True),
StructField("attributes",StringType(),True)
])
trainingDF = spark.createDataFrame(sc.emptyRDD(),trainingSchema)
broker, topic =
['kafka.partner.stg.some.domain:9092','hybris.products']
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
"kafka.partner.stg.some.domain:9092") \
.option("subscribe", "hybris.products") \
.option("startingOffsets", "earliest") \
.load()
我的hadoop版本是2.6,spark版本是2.3.0
命令行 spark-submit
是: spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-002.py
错误消息:
py4jjavaerror:调用o48.load时出错:java.lang.noclassdeffounderror:org/apache/kafka/common/serialization/bytearraydesializer,位于org.apache.spark.sql.kafka010.kafkasourceprovider$(kafkasourceprovider)。scala:413)位于org.apache.spark.sql.kafka010.kafkasourceprovider$(kafkasourceprovider.scala)org.apache.spark.sql.kafka010.kafkasourceprovider.validatestreamoptions(kafkasourceprovider。scala:360)位于org.apache.spark.sql.kafka010.kafkasourceprovider.sourceschema(kafkasourceprovider。scala:64)在org.apache.spark.sql.execution.datasources.datasource.sourceschema(datasource。scala:231)在org.apache.spark.sql.execution.datasources.datasource.sourceinfo$lzycompute(数据源。scala:94)在org.apache.spark.sql.execution.datasources.datasource.sourceinfo(datasource。scala:94)在org.apache.spark.sql.execution.streaming.streamingrelation$.apply(streamingrelation。scala:33)在org.apache.spark.sql.streaming.datastreamreader.load(datastreamreader。scala:170)在sun.reflect.nativemethodaccessorimpl.invoke0(本机方法)在sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl)。java:62)在sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl。java:43)在java.lang.reflect.method.invoke(方法。java:498)在py4j.reflection.methodinvoker.invoke(methodinvoker。java:244)在py4j.reflection.reflectionengine.invoke(reflectionengine。java:357)在py4j.gateway.invoke(gateway。java:282)在py4j.commands.abstractcommand.invokemethod(abstractcommand。java:132)在py4j.commands.callcommand.execute(callcommand。java:79)在py4j.gatewayconnection.run(网关连接。java:214)在java.lang.thread.run(线程。java:745)原因:java.lang.classnotfoundexception:org.apache.kafka.common.serialization.bytearraydeserializer位于java.net.urlclassloader.findclass(urlclassloader)。java:381)在java.lang.classloader.loadclass(类加载器。java:424)在java.lang.classloader.loadclass(classloader。java:357)
您可以在我上面提到的网站上查看,我导入的jar文件与此完全相同。所以,我不知道为什么会这样。也许还有一个模块没提到?我真的迷路了
1条答案
按热度按时间pjngdqdw1#
提到的jar并不包括kafka客户机的所有依赖项。你应该用
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
(如部署部分的文件中所述:https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html#deploying)