MicroBatchExecution错误:将Kafka与pySpark集成时

c9qzyr3d  于 2022-09-21  发布在  Kafka
关注(0)|答案(1)|浏览(178)

我正在尝试将数据发送到pySpark,但出现以下错误:Error MicroBatchExecution:Query[id=d3a1ed30-d223-4da4-9052-189b103afca8,runid=70bfaa84-15c9-4c8b-9058-0f9a04ee4dd0]已终止,错误为java.lang.UnssubfiedLinkError:‘boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String,int)’

制片人

  1. import tweepy
  2. import json
  3. import time
  4. from kafka import KafkaProducer
  5. import twitterauth as auth
  6. import utils
  7. producer = KafkaProducer(bootstrap_servers=["localhost:9092"], value_serializer=utils.json_serializer)
  8. class twitterStream(tweepy.StreamingClient):
  9. def on_connect(self):
  10. print("Twitter Client Connected")
  11. def on_tweet(self, raw_data):
  12. if raw_data.referenced_tweets == None:
  13. producer.send(topic="registered_user", value=raw_data.text)
  14. print("Producer Running")
  15. def on_error(self):
  16. self.disconnect()
  17. def adding_rules(self, keywords):
  18. for terms in keywords:
  19. self.add_rules(tweepy.StreamRule(terms))
  20. if __name__ == "__main__":
  21. stream = twitterStream(bearer_token=auth.bearer_token)
  22. stream_terms = ['bitcoin','luna','etherum']
  23. stream.adding_rules(stream_terms)
  24. stream.filter(tweet_fields=['referenced_tweets'])

Spark源

  1. from pyspark.sql import SparkSession
  2. from pyspark import SparkContext
  3. from pyspark.streaming import StreamingContext
  4. import findspark
  5. import json
  6. import os
  7. os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
  8. if __name__ == "__main__":
  9. findspark.init()
  10. # spark = SparkSession.builder.appName("Kafka Pyspark Streaming Learning").master("local[*]").getOrCreate()
  11. sc = SparkSession.builder.master("local[*]")
  12. .appName('SparkByExamples.com')
  13. .getOrCreate()
  14. df = sc
  15. .readStream
  16. .format("kafka")
  17. .option("kafka.bootstrap.servers", "localhost:9092")
  18. .option("startingoffsets","latest")
  19. .option("subscribe", "registered_user")
  20. .load()
  21. query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  22. query = df.writeStream.format("console").start()
  23. import time
  24. time.sleep(10) # sleep 10 seconds
  25. query.stop()

Hadoop版本:3.3.0
Spark和PySpark版本:3.3.0
Scala版本:2.12.15

在过去的8个小时里,我试了所有的方法,但还是没有成功。有谁能帮忙吗?

堆栈跟踪:

当我运行df.printSchema()时,没有任何错误,这是输出:

o2gm4chl

o2gm4chl1#

您需要使用不同的名称来保存转换后的字段,因为Dataframe是不可变的,您的初始 Dataframe 是名为df的,并且它具有特定的架构,当您将字段转换为字符串时,您需要有一个单独的名称。

  1. castDf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  2. query = castDf .writeStream...

相关问题