从斯卡拉的Kafka频道读Flume事件?

oymdgrw7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(404)

你好,我正在使用cloudera flume代理获取推特,并使用kafka作为频道来举办flume活动。kafka channel event(headers json tweets)中编写的事件结构如下´::::´ 标题后面有分隔符吗�m和tweet的json如下:它是一个flume事件对象结构:

event(headers, jsonTweet)

对象如下:

::::�M{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715916168905633792,"in_reply_to_user_id_str":null,"timestamp_ms":"1459522690403","in_reply_to_status_id":null,"created_at":"Fri Apr 01 14:58:10 +0000 2016","favorite_count":0,"place":null,"coordinates":null,"text":"RT @frmikeschmitz: What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American","contributors":null,"retweeted_status":{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715549110728847360,"in_reply_to_user_id_str":null,"in_reply_to_status_id":null,"created_at":"Thu Mar 31 14:39:36 +0000 2016","favorite_count":22,"place":null,"coordinates":null,"text":"What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American icon.  via @bmoviesd","contributors":null,"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"/2016/03/30/superman-and-the-damage-done","indices":[98,121],"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[41,45]}],"user_mentions":[{"id":202668848,"name":"Birth.Movies.Death.","indices":[126,135],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":5,"id_str":"715549110728847360","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":3001,"lang":"en","profile_link_color":"0084B4","profile_banner_url":"","id":565216911,"following":null,"protected":false,"favourites_count":1858,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"fathermikeschmitz","profile_background_color":"C0DEED","created_at":"Sat Apr 28 03:49:18 +0000 2012","default_profile_image":false,"followers_count":17931,"profile_image_url_https":"","geo_enabled":false,"profile_background_image_url":"":"":null,"url":"","utc_offset":-14400,"time_zone":"Eastern Time (US & Canada)","notifications":null,"profile_use_background_image":true,"friends_count":81,"profile_sidebar_fill_color":"DDEEF6","screen_name":"frmikeschmitz","id_str":"565216911","profile_image_url":"","listed_count":138,"is_translator":false}},"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[60,64]}],"user_mentions":[{"id":565216911,"name":"fathermikeschmitz","indices":[3,17],"screen_name":"frmikeschmitz","id_str":"565216911"},{"id":202668848,"name":"Birth.Movies.Death.","indices":[139,140],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":0,"id_str":"715916168905633792","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":25,"lang":"en","profile_link_color":"0084B4","id":2987773015,"following":null,"protected":false,"favourites_count":90,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"Pam Anderson","profile_background_color":"C0DEED","created_at":"Sun Jan 18 02:14:41 +0000 2015","default_profile_image":false,"followers_count":22,"profile_image_ur":"","geo_enabled":false,"profile_background_image_url":"","profile_background_image_url_httpd":"":null,"url":null,"utc_offset":null,"time_zone":null,"notifications":null,"profile_use_background_image":true,"friends_count":59,"profile_sidebar_fill_color":"DDEEF6","screen_name":"pamawah25","id_str":"2987773015","profile_image_url":"","listed_count":0,"is_translator":false}}

我正在工作的Flume代理如下:

agent1.sources.twitter-data.type = com.cloudera.flume.source.TwitterSource
agent1.sources.twitter-data.consumerKey = ""
agent1.sources.twitter-data.consumerSecret = ""
agent1.sources.twitter-data.accessToken = ""
agent1.sources.twitter-data.accessTokenSecret = ""
agent1.sources.twitter-data.keywords = superman, batman, iron man, tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql

agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka-channel.capacity = 10000
agent1.channels.kafka-channel.transactionCapacity = 1000
agent1.channels.kafka-channel.brokerList = kafka:9092
agent1.channels.kafka-channel.topic = twitter
agent1.channels.kafka-channel.zookeeperConnect = kafka:2181

# agent1.channels.kafka-channel.parseAsFlumeEvent = false

agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs:///user/Hadoop/twitter_data
agent1.sinks.hdfs-sink.fileType = DataStream
agent1.sinks.hdfs-sink.writeFormat = Text
agent1.sinks.hdfs-sink.batchSize = 1000
agent1.sinks.hdfs-sink.rollSize = 0
agent1.sinks.hdfs-sink.rollCount = 10000

agent1.sources.twitter-data.channels = kafka-channel

我想在spark streaming/spark sql中读取这些数据来处理和存储它。
因此,我使用了如下代码,但由于flume事件,它没有帮助:

val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext =  new org.apache.spark.sql.hive.HiveContext(sc)//new org.apache.spark.sql.SQLContext(sc)

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

// Get the data (tweets) from kafka
val tweets = messages.map(_._2)

tweets.foreachRDD { rdd =>
  val jsonRDD = sqlContext.read.json(rdd)
  val tweetTable = jsonRDD.toDF()
  tweetTable.printSchema()
  tweetTable.show(5)
  tweetTable.write.mode("append").saveAsTable("twitterStream")

}
nvbavucw

nvbavucw1#

如果要从kafka流中使用它们,则需要通过分隔符手动解析值:

val tweets = messages.map { case (_, tweet) => { 
    val splitTweet = tweet.split("?M") 
    (splitTweet(0), splitTweet(1))
    }
 }

这将产生连接头作为元组的第一个值,第二个值将包含表示tweet的json。

相关问题