我正在尝试从我的kafka集群中检索tweet,以触发流媒体,在流媒体中我执行一些分析,将它们存储在elasticsearch索引中。
版本:spark-2.3.0 pyspark-2.3.0 kafka-2.3.0 elastic search-7.9 elastic search hadoop-7.6.2
我在jupyter env中运行以下代码,将流Dataframe写入ElasticSearch。
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:7.6.2 pyspark-shell'
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
import nltk
import logging
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from nltk.sentiment.vader import SentimentIntensityAnalyzer
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
def analyze_sentiment(tweet):
scores = dict([('pos', 0), ('neu', 0), ('neg', 0), ('compound', 0)])
sentiment_analyzer = SentimentIntensityAnalyzer()
score = sentiment_analyzer.polarity_scores(tweet)
for k in sorted(score):
scores[k] += score[k]
return json.dumps(scores)
def process(time,rdd):
print("========= %s =========" % str(time))
try:
if rdd.count()==0:
raise Exception('Empty')
sqlContext = getSqlContextInstance(rdd.context)
df = sqlContext.read.json(rdd)
df = df.filter("text not like 'RT @%'")
if df.count() == 0:
raise Exception('Empty')
udf_func = udf(lambda x: analyze_sentiment(x),returnType=StringType())
df = df.withColumn("Sentiment",lit(udf_func(df.text)))
print(df.take(10))
df.writeStream.outputMode('append').format('org.elasticsearch.spark.sql').option('es.nodes','localhost').option('es.port',9200)\
.option('checkpointLocation','/checkpoint').option('es.spark.sql.streaming.sink.log.enabled',False).start('PythonSparkStreamingKafka_RM_01').awaitTermination()
except Exception as e:
print(e)
pass
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("INFO")
ssc = StreamingContext(sc, 20)
kafkaStream = KafkaUtils.createDirectStream(ssc, ['kafkaspark'], {
'bootstrap.servers':'localhost:9092',
'group.id':'spark-streaming',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'})
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.foreachRDD(process)
ssc.start()
ssc.awaitTermination(timeout=180)
但我得到了一个错误:
'writeStream' can be called only on streaming Dataset/DataFrame;
而且,看起来我必须使用.readstream,但是如果没有createdirectstream,如何使用它来读取kafkastream?
有人能帮我把这个数据框写进ElasticSearch吗。我是一个初学者Spark流和ElasticSearch,发现它相当具有挑战性。如果有人能指导我完成这件事,我会很高兴的。
1条答案
按热度按时间6tqwzwtp1#
.writeStream
是spark structured streaming api的一部分,因此需要使用相应的api来开始读取数据spark.readStream
,并传递特定于kafka源代码的选项,这些选项在单独的文档中进行了描述,还使用了包含kafka实现的附加jar。相应的代码如下所示(完整代码如下所示):