使用spark streaming从twitter流式传输数据时出错

ryevplcw  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(323)

我正在编写twitter连接器以从twitter获取数据,但在运行时出现以下异常。
我已经创建了一个打印tweets的应用程序,并学习如何使用twitter进行spark流。

20/09/25 05:53:18 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error starting Twitter stream - java.lang.IllegalStateException: Authentication credentials are missing. See http://twitter4j.org/en/configuration.html for details. See and register at http://apps.twitter.com/
    at twitter4j.TwitterBaseImpl.ensureAuthorizationEnabled(TwitterBaseImpl.java:219)
    at twitter4j.TwitterStreamImpl.sample(TwitterStreamImpl.java:161)
    at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:93)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:198)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

下面是此应用程序的代码

package SparkStreaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.io.Source

object Tweets {

  Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]): Unit = {

    setTweeter()

    val ssc = new StreamingContext("local[*]","Tweets", Seconds(3))

    val tweets = TwitterUtils.createStream(ssc,None)
    val statuses = tweets.map(status => status.getText)

    statuses.print()

    ssc.start()
    ssc.awaitTermination()

  }

  def setTweeter() : Unit = {

      for ( line <- Source.fromFile("src/data/tweeter.txt").getLines())
        {
          val fields = line.split(" ")
          if(fields.length == 2)
            {
              System.setProperty("tweeter4j.oauth." + fields(0), fields(1))
            }
        }
  }
}

有人能帮我解决这个问题吗???

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题