我正在尝试将数据发送到pySpark,但出现以下错误:Error MicroBatchExecution:Query[id=d3a1ed30-d223-4da4-9052-189b103afca8,runid=70bfaa84-15c9-4c8b-9058-0f9a04ee4dd0]已终止,错误为java.lang.UnssubfiedLinkError:‘boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String,int)’
制片人
import tweepy
import json
import time
from kafka import KafkaProducer
import twitterauth as auth
import utils
producer = KafkaProducer(bootstrap_servers=["localhost:9092"], value_serializer=utils.json_serializer)
class twitterStream(tweepy.StreamingClient):
def on_connect(self):
print("Twitter Client Connected")
def on_tweet(self, raw_data):
if raw_data.referenced_tweets == None:
producer.send(topic="registered_user", value=raw_data.text)
print("Producer Running")
def on_error(self):
self.disconnect()
def adding_rules(self, keywords):
for terms in keywords:
self.add_rules(tweepy.StreamRule(terms))
if __name__ == "__main__":
stream = twitterStream(bearer_token=auth.bearer_token)
stream_terms = ['bitcoin','luna','etherum']
stream.adding_rules(stream_terms)
stream.filter(tweet_fields=['referenced_tweets'])
Spark源
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import findspark
import json
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'
if __name__ == "__main__":
findspark.init()
# spark = SparkSession.builder.appName("Kafka Pyspark Streaming Learning").master("local[*]").getOrCreate()
sc = SparkSession.builder.master("local[*]")
.appName('SparkByExamples.com')
.getOrCreate()
df = sc
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingoffsets","latest")
.option("subscribe", "registered_user")
.load()
query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = df.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()
Hadoop版本:3.3.0
Spark和PySpark版本:3.3.0
Scala版本:2.12.15
在过去的8个小时里,我试了所有的方法,但还是没有成功。有谁能帮忙吗?
堆栈跟踪:
当我运行df.printSchema()时,没有任何错误,这是输出:
1条答案
按热度按时间o2gm4chl1#
您需要使用不同的名称来保存转换后的字段,因为Dataframe是不可变的,您的初始 Dataframe 是名为df的,并且它具有特定的架构,当您将字段转换为字符串时,您需要有一个单独的名称。