如何从网站检索spark流数据?

6ss1mwsb  于 2021-05-22  发布在  Spark
关注(0)|答案(0)|浏览(233)

我想检索数据流从一个网站使用Spark流。我想我必须使用定制接收器。所以,我试过这个:

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingMain {

 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
 val ssc = new StreamingContext(conf, Seconds(1))
 val spark = SparkSession.builder.config(conf).getOrCreate()
 val lines = ssc.receiverStream(new UrlReceiver("http://stream.meetup.com/2/rsvps"))

 println("lines value" +  lines.print())

 val words: DStream[String] = lines.flatMap(_.split(","))
}

import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import java.io.InputStreamReader
import java.io.BufferedReader

import java.net.{URL, URLConnection}

import org.apache.spark.internal.Logging

 class UrlReceiver(urlStr: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2)   with Logging {

override def onStart() = {
    new Thread("Url Receiver") {
        override def run() = {
            val urlConnection: URLConnection = new URL(urlStr).openConnection
            urlConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:37.0) Gecko/20100101 Firefox/37.0")
            println(urlConnection)
            val bufferedReader: BufferedReader = new BufferedReader(
                new InputStreamReader(urlConnection.getInputStream)
            )
            var msg = bufferedReader.readLine
            while (msg != null) {
                if (!msg.isEmpty) {
                    store(msg)
                    println("msg" + msg)
                }
                msg = bufferedReader.readLine
            }
            bufferedReader.close()
        }
    }.start()
}

override def onStop() = {

   }
}

控制台上的结果如下,没有数据:

-------------------------------------------
Time: 1602750320000 ms
-------------------------------------------

我想检索数据流从网站上。一旦有新的数据在网站上,我想跟踪它。流应用程序应该一直在运行(如果我或用户决定停止应用程序,它应该停止)
谢谢你的帮助。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题