我有一个来自hdfs的数据流:
nyu,-0.0,1.36,0.64,1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.494,1.506,0.0,-1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
cynthia,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
rachel,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
rachel,-0.0,1.322,0.6779999999999999,1.8496000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
haha,-0.0,0.921,1.079,1.5928,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.16499999999999998,1.419,0.417,1.6442999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.076,1.608,0.317,1.334,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.142,1.363,0.497,1.8187,15131c3f-7ad9-489d-bbc4-0f99305b7db0
american,-0.028,1.888,0.084,0.8658,15131c3f-7ad9-489d-bbc4-0f99305b7db0
middleburi,-0.148,1.6880000000000002,0.164,0.5698000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
metro,-0.096,1.654,0.249,1.3209,15131c3f-7ad9-489d-bbc4-0f99305b7db0
simon,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
korea,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anthoni,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
anderson,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
madrid,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.047,1.7349999999999999,0.217,1.6118999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
spain,-0.20700000000000002,1.6949999999999998,0.097,0.3662000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
america,-0.047,1.338,0.614,1.679,15131c3f-7ad9-489d-bbc4-0f99305b7db0
我想对具有相同单词(第一列)和文档编号(最后一列)的分数求和。
到目前为止,我有以下代码:
from pyspark.streaming import StreamingContext
import time
import pprint
from pyspark.sql import functions as F
ssc = StreamingContext(sc, 60)
lines = ssc.textFileStream("hdfs://thesis:9000/user/kush/data/")
data = lines.map(lambda x: x.split(','))
// trying to do the task here
m_Data = data.reduceByKey(lambda x,y: (x[1] + y[1], x[2] + y[2],x[3]+y[3],x[4]+y[4]))
m_Data.pprint()
ssc.start()
time.sleep(5)
在pyspark流媒体中这怎么可能?
1条答案
按热度按时间gwo2fgha1#
要使用reduceby key,实际上需要帮助spark确定键。我创建了一个名为pair的键/值rdd。
关键字由单词和文档编号决定。值是对应于分数的数据结构。分数也被铸造成浮动(或者根据你的数据集你想要的任何东西)来计算。
样本输出:
另一方面,关键字k(单词/文档)在时间t处出现,然后30分钟后出现第二个条目的情况将不在此范围内。