我使用了一个mysql-cdc(changedatacapturing)系统,它可以捕获插入mysql表中的新记录并发送给kafka,每个表有一个主题。然后在spark streaming中,我希望与spark streaming dstream api并行接收kafka主题的多个消息,以便进一步处理来自这些mysql表的这些变化数据。
cdc的设置很好,kafka-consume-topic.sh测试所有表的消息都正常接收。但是在spark流中,我只能收到1个表的消息。但是如果在应用程序中为所有表的测试逐个创建了一个主题/流,则所有表都被单独测试为可以被spark streaming读取。我在spark github项目中搜索了很长时间,寻找相关的问题、文章和示例,没有找到解决方案。联合非直接流的例子有很多,但是那些spark流api很旧,我不太愿意采用它们,我怀疑以后可能会有很多重新发明的工作要做。
以下是我的代码:
package com.fm.data.fmtrade
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
object TestKafkaSparkStreaming {
def main(args: Array[String]): Unit = {
var conf=new SparkConf()//.setMaster("spark://192.168.177.120:7077")
.setAppName("SparkStreamKaflaWordCount Demo")
.set("spark.streaming.concurrentJobs", "8")
val ss = SparkSession
.builder()
.config(conf)
.appName(args.mkString(" "))
.getOrCreate()
val topicsArr: Array[String] = Array(
"betadbserver1.copytrading.t_trades",
"betadbserver1.copytrading.t_users",
"betadbserver1.account.s_follower",
"betadbserver1.copytrading.t_followorder",
"betadbserver1.copytrading.t_follow",
"betadbserver1.copytrading.t_activefollow",
"betadbserver1.account.users",
"betadbserver1.account.user_accounts"
)
var group="con-consumer-group111" + (new util.Random).nextInt(10000)
val kafkaParam = Map(
"bootstrap.servers" -> "beta-hbase02:9092,beta-hbase03:9092,beta-hbase04:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> group,
"auto.offset.reset" -> "earliest",//"latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val ssc = new StreamingContext(ss.sparkContext, Seconds(4))
//val streams =
topicsArr.foreach{//.slice(0,1)
topic =>
val newTopicsArr=Array(topic)
val stream=
KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent, Subscribe[String,String](newTopicsArr,kafkaParam))
stream.map(s =>(s.key(),s.value())).print();
}
/*
val unifiedStream = ssc.union(streams)
unifiedStream.repartition(2)
unifiedStream.map(s =>(s.key(),s.value())).print()
*/
/*
unifiedStream.foreachRDD{ rdd =>
rdd.foreachPartition{ partitionOfRecords =>
partitionOfRecords.foreach{ record =>
}
}
}
*/
ssc.start();
ssc.awaitTermination();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!