我试图访问一组过滤后的数据流,就像这个问题的解决方案:spark streaming——基于filter param分割输入流的最佳方法
我创建集合如下:
val statuCodes = Set("200","500", "404")
spanTagStream.cache()
val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))
我试着进入 statusCodeStreams
按以下方式:
for(streamTuple <- statusCodeStreams){
streamTuple._2.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
{
x=>{
/* Code Writing to Kafka using streamTuple._1 as the topic-String */
}
}
})
)
}
执行此操作时,我收到以下错误:java.io.notserializableexception: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects
如何访问流以串行方式写入kafka?
1条答案
按热度按时间e4eetjau1#
例外情况表明
DStream
正在通过闭包捕获定义。一个简单的选择是声明DStream
transient :spamTagStream.foreachRDD{ rdd =>
rdd.cache()
statuCodes.map{code =>
val matchingCodes = rdd.filter(...)
matchingCodes.foreachPartition{write to kafka}
}
rdd.unpersist(true)
}