我想检索数据流从一个网站使用Spark流。我想我必须使用定制接收器。所以,我试过这个:
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingMain {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val spark = SparkSession.builder.config(conf).getOrCreate()
val lines = ssc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))
println("lines value" + lines.print())
val words: DStream[String] = lines.flatMap(_.split(","))
}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import java.io.InputStreamReader
import java.io.BufferedReader
import java.net.{URL, URLConnection}
import org.apache.spark.internal.Logging
class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
override def onStart() = {
new Thread("Url Receiver") {
override def run() = {
val urlConnection: URLConnection = new URL(urlStr).openConnection
urlConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:37.0) Gecko/20100101 Firefox/37.0")
println(urlConnection)
val bufferedReader: BufferedReader = new BufferedReader(
new InputStreamReader(urlConnection.getInputStream)
)
var msg = bufferedReader.readLine
while (msg != null) {
if (!msg.isEmpty) {
store(msg)
println("msg" + msg)
}
msg = bufferedReader.readLine
}
bufferedReader.close()
}
}.start()
}
override def onStop() = {
}
}
控制台上的结果如下,没有数据:
-------------------------------------------
Time: 1602750320000 ms
-------------------------------------------
我想检索数据流从网站上。一旦有新的数据在网站上,我想跟踪它。流应用程序应该一直在运行(如果我或用户决定停止应用程序,它应该停止)
谢谢你的帮助。
暂无答案!
目前还没有任何答案,快来回答吧!