如何组合两个数据流(pyspark)?

ffdz8vbo  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(514)

我有一个Kafka流进来一些输入主题。这是我为接受Kafka流而写的代码。

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc) 
kvs = KafkaUtils.createDirectStream(ssc, topics,\ 
            {"metadata.broker.list": brokers})

然后我创建两个原始流的键和值的数据流。

keys = kvs.map(lambda x: x[0].split(" ")) 
values = kvs.map(lambda x: x[1].split(" "))

然后我在值dstream中执行一些计算。例如,

val = values.flatMap(lambda x: x*2)

现在,我需要将keys和valdstream组合起来,并以kafka流的形式返回结果。
如何将val组合到相应的键?

zbq4xfa0

zbq4xfa01#

你可以用 join 运算符来合并它们。当您进行Map时,实际上是在创建另一个流。因此,join将帮助您将它们合并在一起。
如:

Joined_Stream = keys.join(values).(any operation like map, flatmap...)

相关问题