如何反序列化flume的avro事件?

jgwigjjp  于 2021-06-04  发布在  Flume
关注(0)|答案(3)|浏览(343)

我有FlumeavroFlume和sparkstreaming程序,读取Flume。cdh 5.1、flume 1.5.0、spark 1.0,使用scala作为spark上的程序语言
我能够制作spark示例并计算flume avro事件。
但是,我无法将flume avro事件反序列化为string\text,然后解析结构行。
有没有人举过一个使用scala的例子?

km0tfn4u

km0tfn4u1#

为了反序列化,可以实现自定义解码器。提供预期的类型信息。

v1l68za4

v1l68za42#

请尝试以下代码:

stream.map(e => "Event:header:" + e.event.get(0).toString
                + "body: " + new String(e.event.getBody.array)).print
ep6jt1vc

ep6jt1vc3#

可以使用以下代码反序列化flume事件:

val eventBody = stream.map(e => new String(e.event.getBody.array))

下面是一个spark流媒体应用程序的示例,该应用程序使用flume twitter源和avro接收器来分析来自twitter的流行标签,从而将事件推送到spark:

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._

object PopularHashTags {

val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags").set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)

def main(args: Array[String]) {

sc.setLogLevel("WARN")

System.setProperty("twitter4j.oauth.consumerKey", <consumerKey>)
System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret>)
System.setProperty("twitter4j.oauth.accessToken", <accessToken>)
System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>)

val ssc = new StreamingContext(sc, Seconds(5))
val filter = args.takeRight(args.length)
val stream = FlumeUtils.createStream(ssc, <hostname>, <port>)

val tweets = stream.map(e => new String(e.event.getBody.array))

val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
  .map { case (topic, count) => (count, topic) }
  .transform(_.sortByKey(false))

// Print popular hashtags
topCounts60.foreachRDD(rdd => {
  val topList = rdd.take(10)
  println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
  topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})

stream.count().map(cnt => "Received " + cnt + " flume events.").print()

ssc.start()
ssc.awaitTermination()
    }

}

相关问题