访问数据流集合

wixjitnu  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(325)

我试图访问一组过滤后的数据流,就像这个问题的解决方案:spark streaming——基于filter param分割输入流的最佳方法
我创建集合如下:

val statuCodes = Set("200","500", "404")
    spanTagStream.cache()
    val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))

我试着进入 statusCodeStreams 按以下方式:

for(streamTuple <- statusCodeStreams){
      streamTuple._2.foreachRDD(rdd =>
  rdd.foreachPartition(
      partitionOfRecords =>
        {
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String,String](props)

            partitionOfRecords.foreach
            {
                 x=>{ 
                 /* Code Writing to Kafka using streamTuple._1 as the topic-String */
                 }
            }
      })
   )
}

执行此操作时,我收到以下错误:java.io.notserializableexception: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects 如何访问流以串行方式写入kafka?

e4eetjau

e4eetjau1#

例外情况表明 DStream 正在通过闭包捕获定义。一个简单的选择是声明 DStream transient :

@transient val spamTagStream = //KafkaUtils.create...
``` `@transient` 标记要从某个对象的对象图的java序列化中删除的某些对象。这种情况的关键是 `val` 在与 `DStream` ( `statusCodeStreams` 在这种情况下)在闭包内使用。它的实际引用 `val` 从内部关闭是 `outer.statusCodeStreams` ,导致序列化进程“拉”的所有上下文 `outer` 进入关闭状态。与 `@transient` 我们将dstream(以及streamingcontext)声明标记为不可序列化,并避免序列化问题。取决于代码结构(如果它是一个线性的 `main` 函数(错误的做法,顺便说一句)可能需要将所有dstream声明+streamingcontext示例标记为 `@transient` . 
如果初始筛选的唯一目的是将内容“路由”到单独的kafka主题,那么在 `foreachRDD` . 这将使程序结构更简单。

spamTagStream.foreachRDD{ rdd =>
rdd.cache()
statuCodes.map{code =>
val matchingCodes = rdd.filter(...)
matchingCodes.foreachPartition{write to kafka}
}
rdd.unpersist(true)
}

相关问题