MicroBatchExecution错误:将Kafka与pySpark集成时

c9qzyr3d  于 2022-09-21  发布在  Kafka
关注(0)|答案(1)|浏览(139)

我正在尝试将数据发送到pySpark,但出现以下错误:Error MicroBatchExecution:Query[id=d3a1ed30-d223-4da4-9052-189b103afca8,runid=70bfaa84-15c9-4c8b-9058-0f9a04ee4dd0]已终止,错误为java.lang.UnssubfiedLinkError:‘boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String,int)’

制片人

import tweepy
import json
import time
from kafka import KafkaProducer
import twitterauth as auth
import utils

producer = KafkaProducer(bootstrap_servers=["localhost:9092"], value_serializer=utils.json_serializer)

class twitterStream(tweepy.StreamingClient):
    def on_connect(self):
        print("Twitter Client Connected")

    def on_tweet(self, raw_data):
        if raw_data.referenced_tweets == None:
            producer.send(topic="registered_user", value=raw_data.text) 
            print("Producer Running")          

    def on_error(self):
        self.disconnect()

    def adding_rules(self, keywords):
        for terms in keywords:
            self.add_rules(tweepy.StreamRule(terms))

if __name__ == "__main__":
    stream = twitterStream(bearer_token=auth.bearer_token)
    stream_terms = ['bitcoin','luna','etherum']
    stream.adding_rules(stream_terms)
    stream.filter(tweet_fields=['referenced_tweets'])

Spark源

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

import findspark

import json
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

if __name__ == "__main__":

    findspark.init()

   # spark = SparkSession.builder.appName("Kafka Pyspark Streaming Learning").master("local[*]").getOrCreate()

    sc = SparkSession.builder.master("local[*]") 
                    .appName('SparkByExamples.com') 
                    .getOrCreate()

    df = sc 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "localhost:9092") 
        .option("startingoffsets","latest") 
        .option("subscribe", "registered_user") 
        .load()

    query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    query = df.writeStream.format("console").start()
    import time
    time.sleep(10) # sleep 10 seconds
    query.stop()

Hadoop版本:3.3.0
Spark和PySpark版本:3.3.0
Scala版本:2.12.15

在过去的8个小时里,我试了所有的方法,但还是没有成功。有谁能帮忙吗?

堆栈跟踪:

当我运行df.printSchema()时,没有任何错误,这是输出:

o2gm4chl

o2gm4chl1#

您需要使用不同的名称来保存转换后的字段,因为Dataframe是不可变的,您的初始 Dataframe 是名为df的,并且它具有特定的架构,当您将字段转换为字符串时,您需要有一个单独的名称。

castDf = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 

query = castDf .writeStream...

相关问题