无法使用spark shell上的scala从tweet获取特定内容

mxg2im7a  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(245)

我在hortonworks上工作。我将twitter上的推文存储到kafka主题中。我使用spark-shell上的scala对kafka作为制作人和spark作为消费者的推文进行情绪分析。但我只想从推文中获取特定内容,如文本、标签、推文是正面还是负面,我从tweets中选择了一个肯定词或否定词。我的训练数据是data.txt。
我添加了依赖项:org.apache。spark:spark-streaming-kafka_2.10:1.6.2,org.apache。spark:spark-streaming_2.10:1.6.2
这是我的密码:

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.kafka._

val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(5))
val zkQuorum="sandbox.hortonworks.com:2181"
val group="test-consumer-group"
val topics="test"
val numThreads=5
val args=Array(zkQuorum, group, topics, numThreads)
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val hashTags = lines.flatMap(_.split(" ")).filter(_.startsWith("#"))
val wordSentimentFilePath = "hdfs://sandbox.hortonworks.com:8020/TwitterData/Data.txt"
val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
      val Array(word, happiness) = line.split("\t")
      (word, happiness)
    } cache()
    val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).transform{topicCount => wordSentiments.join(topicCount)}.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false))
happiest60.print()
ssc.start()

我得到了这样的结果,
(消极,恐惧)(积极,健康)
我想要这样的输出,
(#体育,推文,健身,正面)
但是我没有得到像上面那样存储文本和标签的解决方案。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题