在pyspark 2.3.1中使用kafka结构化流时,spark提交失败
但是pyspark命令中也有同样的代码,所以我想知道如何解决它
from pyspark.sql.types import *
from pyspark.sql import SparkSession
topic="topicname"
spark=SparkSession\
.builder\
.appName("test_{}".format(topic))\
.getOrCreate()
source_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","ip:6667")\
.option("subscribe", topic)\
.option("failOnDataLoss","false")\
.option("maxOffsetsPerTrigger",30000)\
.load()
query=source_df.selectExpr("CAST(key AS STRING)")\
.writeStream\
.format("json")\
.option("checkpointLocation","/data/testdata_test")\
.option("path","/data/testdata_test_checkpoint")\
.start()
命令是这样的
//use this command -> fail
spark-submit --master yarn --jars hdfs:///vcrm_data/spark-sql-kafka-0-10_2.11-2.3.1.jar,hdfs:///vcrm_data/kafka-clients-1.1.1.3.2.0.0-520.jar test.py
//use this command then regist code -> success
pyspark --jars hdfs:///vcrm_data/spark-sql-kafka-0-10_2.11-2.3.1.jar,hdfs:///vcrm_data/kafka-clients-2.6.0.jar
我的spark env是
hdp(hortonworks)3.0.1(spark2.3.1),Kafka1.1.1
spark提交日志
20/08/24 20:19:01 INFO AppInfoParser: Kafka version: 2.6.0
20/08/24 20:19:01 INFO AppInfoParser: Kafka commitId: 62abe01bee039651
20/08/24 20:19:01 INFO AppInfoParser: Kafka startTimeMs: 1598267941674
20/08/24 20:19:01 INFO KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-6d5eb2af-8039-4073-a3f1-3ba44d01fedc--47946854-driver-0-2, groupId=spark-kafka-source-6d5eb2af-8039-4073-a3f1-3ba44d01fedc--47946854-driver-0] Subscribed to topic(s): test
20/08/24 20:19:01 INFO SparkContext: Invoking stop() from shutdown hook
20/08/24 20:19:01 INFO MicroBatchExecution: Starting new streaming query.
20/08/24 20:19:01 INFO AbstractConnector: Stopped Spark@644a2858{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
20/08/24 20:19:01 INFO SparkUI: Stopped Spark web UI at http://p0gn001.io:4040
20/08/24 20:19:01 INFO YarnClientSchedulerBackend: Interrupting monitor thread
20/08/24 20:19:01 INFO YarnClientSchedulerBackend: Shutting down all executors
20/08/24 20:19:01 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/08/24 20:19:01 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
20/08/24 20:19:01 INFO YarnClientSchedulerBackend: Stopped
20/08/24 20:19:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/08/24 20:19:01 INFO MemoryStore: MemoryStore cleared
20/08/24 20:19:01 INFO BlockManager: BlockManager stopped
20/08/24 20:19:01 INFO BlockManagerMaster: BlockManagerMaster stopped
20/08/24 20:19:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/08/24 20:19:01 INFO SparkContext: Successfully stopped SparkContext
20/08/24 20:19:01 INFO ShutdownHookManager: Shutdown hook called
20/08/24 20:19:01 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-ea374033-f3fc-4bca-9c15-4cccaa7da8ac
20/08/24 20:19:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-075415ca-8e98-4bb0-916c-a89c4d4f9d1f
20/08/24 20:19:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-83270204-3330-4361-bf56-a82c47d8c96f
20/08/24 20:19:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-075415ca-8e98-4bb0-916c-a89c4d4f9d1f/pyspark-eb26535b-1bf6-495e-83a1-4bbbdc658c7a
暂无答案!
目前还没有任何答案,快来回答吧!