我有FlumeavroFlume和sparkstreaming程序,读取Flume。cdh 5.1、flume 1.5.0、spark 1.0,使用scala作为spark上的程序语言我能够制作spark示例并计算flume avro事件。但是,我无法将flume avro事件反序列化为string\text,然后解析结构行。有没有人举过一个使用scala的例子?
km0tfn4u1#
为了反序列化,可以实现自定义解码器。提供预期的类型信息。
v1l68za42#
请尝试以下代码:
stream.map(e => "Event:header:" + e.event.get(0).toString + "body: " + new String(e.event.getBody.array)).print
ep6jt1vc3#
可以使用以下代码反序列化flume事件:
val eventBody = stream.map(e => new String(e.event.getBody.array))
下面是一个spark流媒体应用程序的示例,该应用程序使用flume twitter源和avro接收器来分析来自twitter的流行标签,从而将事件推送到spark:
import org.apache.spark.streaming.{ Seconds, StreamingContext } import org.apache.spark.SparkContext._ import org.apache.spark.streaming.twitter._ import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume._ object PopularHashTags { val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags").set("spark.executor.memory", "1g") val sc = new SparkContext(conf) def main(args: Array[String]) { sc.setLogLevel("WARN") System.setProperty("twitter4j.oauth.consumerKey", <consumerKey>) System.setProperty("twitter4j.oauth.consumerSecret", <consumerSecret>) System.setProperty("twitter4j.oauth.accessToken", <accessToken>) System.setProperty("twitter4j.oauth.accessTokenSecret", <accessTokenSecret>) val ssc = new StreamingContext(sc, Seconds(5)) val filter = args.takeRight(args.length) val stream = FlumeUtils.createStream(ssc, <hostname>, <port>) val tweets = stream.map(e => new String(e.event.getBody.array)) val hashTags = tweets.flatMap(status => status.split(" ").filter(_.startsWith("#"))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) .map { case (topic, count) => (count, topic) } .transform(_.sortByKey(false)) // Print popular hashtags topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } }) stream.count().map(cnt => "Received " + cnt + " flume events.").print() ssc.start() ssc.awaitTermination() } }
3条答案
按热度按时间km0tfn4u1#
为了反序列化,可以实现自定义解码器。提供预期的类型信息。
v1l68za42#
请尝试以下代码:
ep6jt1vc3#
可以使用以下代码反序列化flume事件:
下面是一个spark流媒体应用程序的示例,该应用程序使用flume twitter源和avro接收器来分析来自twitter的流行标签,从而将事件推送到spark: