如何在spark流中添加具有相同键(列值)的两行?

lndjwyie  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(479)

我有一个来自hdfs的数据流:

  1. nyu,-0.0,1.36,0.64,1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  2. madrid,-0.494,1.506,0.0,-1.3616,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  3. cynthia,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  4. rachel,-0.085,1.4300000000000002,0.485,1.6916,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  5. rachel,-0.0,1.322,0.6779999999999999,1.8496000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  6. haha,-0.0,0.921,1.079,1.5928,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  7. spain,-0.16499999999999998,1.419,0.417,1.6442999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  8. madrid,-0.076,1.608,0.317,1.334,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  9. spain,-0.142,1.363,0.497,1.8187,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  10. american,-0.028,1.888,0.084,0.8658,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  11. middleburi,-0.148,1.6880000000000002,0.164,0.5698000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  12. metro,-0.096,1.654,0.249,1.3209,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  13. simon,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  14. spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  15. korea,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  16. spain,-0.047,1.797,0.155,1.2171,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  17. anthoni,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  18. anderson,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  19. madrid,-0.047,1.855,0.097,0.9211,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  20. spain,-0.047,1.7349999999999999,0.217,1.6118999999999999,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  21. spain,-0.20700000000000002,1.6949999999999998,0.097,0.3662000000000001,15131c3f-7ad9-489d-bbc4-0f99305b7db0
  22. america,-0.047,1.338,0.614,1.679,15131c3f-7ad9-489d-bbc4-0f99305b7db0

我想对具有相同单词(第一列)和文档编号(最后一列)的分数求和。
到目前为止,我有以下代码:

  1. from pyspark.streaming import StreamingContext
  2. import time
  3. import pprint
  4. from pyspark.sql import functions as F
  5. ssc = StreamingContext(sc, 60)
  6. lines = ssc.textFileStream("hdfs://thesis:9000/user/kush/data/")
  7. data = lines.map(lambda x: x.split(','))
  8. // trying to do the task here
  9. m_Data = data.reduceByKey(lambda x,y: (x[1] + y[1], x[2] + y[2],x[3]+y[3],x[4]+y[4]))
  10. m_Data.pprint()
  11. ssc.start()
  12. time.sleep(5)

在pyspark流媒体中这怎么可能?

gwo2fgha

gwo2fgha1#

要使用reduceby key,实际上需要帮助spark确定键。我创建了一个名为pair的键/值rdd。
关键字由单词和文档编号决定。值是对应于分数的数据结构。分数也被铸造成浮动(或者根据你的数据集你想要的任何东西)来计算。

  1. data = lines.map(lambda x: x.split(','))
  2. pair = data.map(lambda x: ( (x[0],x[5]), (float(x[1]),float(x[2]),float(x[3]),float(x[4])) ))
  3. aggregation = pair.reduceByKey(lambda x,y: ( x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3] ))
  4. aggregation.pprint(20)

样本输出:

  1. (('haha', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.0, 0.921, 1.079, 1.5928))
  2. (('american', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.028, 1.888, 0.084, 0.8658))
  3. (('madrid', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.617, 4.968999999999999, 0.41400000000000003, 0.8935000000000002))
  4. (('middleburi', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.148, 1.6880000000000002, 0.164, 0.5698000000000001))
  5. (('cynthia', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.085, 1.4300000000000002, 0.485, 1.6916))
  6. (('metro', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.096, 1.654, 0.249, 1.3209))
  7. (('korea', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.797, 0.155, 1.2171))
  8. (('anthoni', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.855, 0.097, 0.9211))
  9. (('anderson', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.855, 0.097, 0.9211))
  10. (('spain', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.6549999999999999, 9.806, 1.538, 7.8753))
  11. (('nyu', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.0, 1.36, 0.64, 1.3616))
  12. (('rachel', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.085, 2.7520000000000002, 1.1629999999999998, 3.5412))
  13. (('simon', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.797, 0.155, 1.2171))
  14. (('america', '15131c3f-7ad9-489d-bbc4-0f99305b7db0'), (-0.047, 1.338, 0.614, 1.679))

另一方面,关键字k(单词/文档)在时间t处出现,然后30分钟后出现第二个条目的情况将不在此范围内。

展开查看全部

相关问题