scala—在spark流上下文Map中使用spark上下文检索kafka事件后的文档

xriantvc  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(363)

我是新来的。我要做的是从一个couchbase视图中检索所有相关文档,并使用SparkKafka流媒体中给定的id。
当我试图从spark上下文中获取这些文档时,总是会出现错误 Task not serializable .
从那以后,我明白了我不能在同一个jvm中使用嵌套rdd,也不能在多个spark上下文中使用,但是我想找到一个解决方法。
以下是我目前的做法:

package xxx.xxx.xxx

import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.ViewQuery
import com.couchbase.spark._

import org.apache.spark.broadcast.Broadcast
import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object Streaming {
  // Method to create a Json document from Key and Value
  def CreateJsonDocument(s: (String, String)): JsonDocument = {
    //println("- Parsing document")
    //println(s._1)
    //println(s._2)
    val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
    (return_doc)
    //(return_doc.content().getString("click"), return_doc)
  }

  def main(args: Array[String]): Unit = {
    // get arguments as key value
    val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap

    println("----------------------------")
    println("Arguments passed to class")
    println("----------------------------")
    println("- Arguments")
    println(arguments)
    println("----------------------------")

    // If the length of the passed arguments is less than 4
    if (arguments.get("brokers") == null || arguments.get("topics") == null) {
      // Provide system error
      System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
    }

    // Create the Spark configuration with app name
    val conf = new SparkConf().setAppName("Streaming")
    // Create the Spark context
    val sc = new SparkContext(conf)
    // Create the Spark Streaming Context
    val ssc = new StreamingContext(sc, Seconds(2))

    // Setup the broker list
    val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
    // Setup the topic list
    val topics = arguments.getOrElse("topics", "").split(",").toSet
    // Get the message stream from kafka
    val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    docs
      // Separate the key and the content
      .map({ case (key, value) => (key, value) })
      // Parse the content to transform in JSON Document
      .map(s => CreateJsonDocument(s))
      // Call the view to all related Review Application Documents
      //.map(messagedDoc => RetrieveAllReviewApplicationDocs(messagedDoc, sc))
      .map(doc => {

        sc.couchbaseView(ViewQuery.from("my-design-document", "stats").key(messagedDoc.content.getString("id"))).collect()
      })
      .foreachRDD(
          rdd => {
             //Create a report of my documents and store it in Couchbase
             rdd.foreach( println )
          }
        )

    // Start the streaming context
    ssc.start()
    // Wait for termination and catch error if there is a problem in the process
    ssc.awaitTermination()
  }
}
xu3bshqb

xu3bshqb1#

通过使用couchbase客户端而不是couchbase spark上下文找到了解决方案。
我不知道这是否是性能方面的最佳方法,但我可以检索计算所需的文档。

package xxx.xxx.xxx

import com.couchbase.client.java.{Bucket, Cluster, CouchbaseCluster}
import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.view.{ViewResult, ViewQuery}

import _root_.kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object Streaming {
  // Method to create a Json document from Key and Value
  def CreateJsonDocument(s: (String, String)): JsonDocument = {
    //println("- Parsing document")
    //println(s._1)
    //println(s._2)
    val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
    (return_doc)
    //(return_doc.content().getString("click"), return_doc)
  }

  // Method to retrieve related documents
  def RetrieveDocs (doc: JsonDocument, arguments: Map[String, String]): ViewResult = {
    val cbHosts = arguments.getOrElse("couchbase-hosts", "")
    val cbBucket = arguments.getOrElse("couchbase-bucket", "")
    val cbPassword = arguments.getOrElse("couchbase-password", "")

    val cluster: Cluster = CouchbaseCluster.create(cbHosts)

    val bucket: Bucket = cluster.openBucket(cbBucket, cbPassword)
    val docs : ViewResult = bucket.query(ViewQuery.from("my-design-document", "my-view").key(doc.content().getString("id")))
    cluster.disconnect()
    println(docs)

    (docs)
  }

  def main(args: Array[String]): Unit = {
    // get arguments as key value
    val arguments = args.grouped(2).collect { case Array(k,v) => k.replaceAll("--", "") -> v }.toMap

    println("----------------------------")
    println("Arguments passed to class")
    println("----------------------------")
    println("- Arguments")
    println(arguments)
    println("----------------------------")

    // If the length of the passed arguments is less than 4
    if (arguments.get("brokers") == null || arguments.get("topics") == null) {
      // Provide system error
      System.err.println("Usage: --brokers <broker1:9092> --topics <topic1,topic2,topic3>")
    }

    // Create the Spark configuration with app name
    val conf = new SparkConf().setAppName("Streaming")
    // Create the Spark context
    val sc = new SparkContext(conf)
    // Create the Spark Streaming Context
    val ssc = new StreamingContext(sc, Seconds(2))

    // Setup the broker list
    val kafkaParams = Map("metadata.broker.list" -> arguments.getOrElse("brokers", ""))
    // Setup the topic list
    val topics = arguments.getOrElse("topics", "").split(",").toSet
    // Get the message stream from kafka
    val docs = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    // Get broadcast arguments
    val argsBC = sc.broadcast(arguments)

    docs
      // Separate the key and the content
      .map({ case (key, value) => (key, value) })
      // Parse the content to transform in JSON Document
      .map(s => CreateJsonDocument(s))
      // Call the view to all related Review Application Documents
      .map(doc => RetrieveDocs(doc, argsBC))
      .foreachRDD(
          rdd => {
             //Create a report of my documents and store it in Couchbase
             rdd.foreach( println )
          }
        )

    // Start the streaming context
    ssc.start()
    // Wait for termination and catch error if there is a problem in the process
    ssc.awaitTermination()
  }
}

相关问题