我正在编写twitter连接器以从twitter获取数据,但在运行时出现以下异常。
我已经创建了一个打印tweets的应用程序,并学习如何使用twitter进行spark流。
20/09/25 05:53:18 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error starting Twitter stream - java.lang.IllegalStateException: Authentication credentials are missing. See http://twitter4j.org/en/configuration.html for details. See and register at http://apps.twitter.com/
at twitter4j.TwitterBaseImpl.ensureAuthorizationEnabled(TwitterBaseImpl.java:219)
at twitter4j.TwitterStreamImpl.sample(TwitterStreamImpl.java:161)
at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:93)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:198)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
下面是此应用程序的代码
package SparkStreaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.io.Source
object Tweets {
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
setTweeter()
val ssc = new StreamingContext("local[*]","Tweets", Seconds(3))
val tweets = TwitterUtils.createStream(ssc,None)
val statuses = tweets.map(status => status.getText)
statuses.print()
ssc.start()
ssc.awaitTermination()
}
def setTweeter() : Unit = {
for ( line <- Source.fromFile("src/data/tweeter.txt").getLines())
{
val fields = line.split(" ")
if(fields.length == 2)
{
System.setProperty("tweeter4j.oauth." + fields(0), fields(1))
}
}
}
}
有人能帮我解决这个问题吗???
暂无答案!
目前还没有任何答案,快来回答吧!