spark:writestream'只能在流数据集/Dataframe上调用

s2j5cfk0  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(1125)

我正在尝试从我的kafka集群中检索tweet,以触发流媒体,在流媒体中我执行一些分析,将它们存储在elasticsearch索引中。
版本:spark-2.3.0 pyspark-2.3.0 kafka-2.3.0 elastic search-7.9 elastic search hadoop-7.6.2
我在jupyter env中运行以下代码,将流Dataframe写入ElasticSearch。

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:7.6.2 pyspark-shell'

from pyspark import SparkContext

# Spark Streaming

from pyspark.streaming import StreamingContext

# Kafka

from pyspark.streaming.kafka import KafkaUtils

# json parsing

import json
import nltk
import logging
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from nltk.sentiment.vader import SentimentIntensityAnalyzer

def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

def analyze_sentiment(tweet):

    scores = dict([('pos', 0), ('neu', 0), ('neg', 0), ('compound', 0)])
    sentiment_analyzer = SentimentIntensityAnalyzer()
    score = sentiment_analyzer.polarity_scores(tweet)
    for k in sorted(score):
        scores[k] += score[k]

    return json.dumps(scores)

def process(time,rdd):

     print("========= %s =========" % str(time))

     try:
        if rdd.count()==0: 
            raise Exception('Empty')

        sqlContext = getSqlContextInstance(rdd.context)

        df = sqlContext.read.json(rdd)
        df = df.filter("text not like 'RT @%'")

        if df.count() == 0: 
            raise Exception('Empty')

        udf_func = udf(lambda x: analyze_sentiment(x),returnType=StringType())
        df = df.withColumn("Sentiment",lit(udf_func(df.text)))
        print(df.take(10))

        df.writeStream.outputMode('append').format('org.elasticsearch.spark.sql').option('es.nodes','localhost').option('es.port',9200)\
        .option('checkpointLocation','/checkpoint').option('es.spark.sql.streaming.sink.log.enabled',False).start('PythonSparkStreamingKafka_RM_01').awaitTermination()

     except Exception as e:
        print(e)
        pass

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("INFO")

ssc = StreamingContext(sc, 20)

kafkaStream = KafkaUtils.createDirectStream(ssc, ['kafkaspark'], {
                        'bootstrap.servers':'localhost:9092', 
                        'group.id':'spark-streaming', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})

parsed = kafkaStream.map(lambda v: json.loads(v[1]))

parsed.foreachRDD(process)

ssc.start()
ssc.awaitTermination(timeout=180)

但我得到了一个错误:

'writeStream' can be called only on streaming Dataset/DataFrame;

而且,看起来我必须使用.readstream,但是如果没有createdirectstream,如何使用它来读取kafkastream?
有人能帮我把这个数据框写进ElasticSearch吗。我是一个初学者Spark流和ElasticSearch,发现它相当具有挑战性。如果有人能指导我完成这件事,我会很高兴的。

6tqwzwtp

6tqwzwtp1#

.writeStream 是spark structured streaming api的一部分,因此需要使用相应的api来开始读取数据 spark.readStream ,并传递特定于kafka源代码的选项,这些选项在单独的文档中进行了描述,还使用了包含kafka实现的附加jar。相应的代码如下所示(完整代码如下所示):

val streamingInputDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.0.10:9092")
      .option("subscribe", "tweets-txt")
      .load()

相关问题