如何在Pyspark RDD中找到公共对,而不管它们的顺序如何?

kcrjzv8t  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(184)

我想找出这对曾经接触过的人。以下是数据:

  1. Input is
  2. K-\> M, H
  3. M-\> K, E
  4. H-\> F
  5. B-\> T, H
  6. E-\> K, H
  7. F-\> K, H, E
  8. A-\> Z

字符串
输出为:

  1. Output:
  2. K, M //(this means K has supplied goods to M and M has also supplied some good to K)
  3. H, F


下面是我写的代码。

  1. from pyspark import SparkContext, SparkConf
  2. from pyspark.sql import SparkSession, SQLContext
  3. from pyspark.ml.regression import LinearRegression
  4. import re
  5. from itertools import combinations
  6. spark = SparkContext("local", "DoubleRDD")
  7. def findpairs(ls):
  8. lst = []
  9. for i in range(0,len(ls)-1):
  10. for j in range(i+1, len(ls)):
  11. if ls[i] == tuple(reversed(ls[j])):
  12. lst.append(ls[i])
  13. return(lst)
  14. text = spark.textFile("path to the .txt")
  15. text = text.map(lambda s: s.replace("->",","))
  16. text = text.map(lambda s: s.replace(",",""))
  17. text = text.map(lambda s: s.replace(" ",""))
  18. pairs = text.flatMap(lambda x: [(x[0],y) for y in x[1:]])
  19. commonpairs = pairs.filter(lambda x: findpairs(x))
  20. pairs.collect()
  1. The output is: []

的字符串

5w9g7ksd

5w9g7ksd1#

不要使用RDD,这个问题可以使用本机spark框架函数来解决。

  1. df = spark.read.csv('data.txt', header=False, sep=r'-\\> ').toDF('x', 'y')
  2. # +---+-------+
  3. # | x| y|
  4. # +---+-------+
  5. # | K| M, H|
  6. # | M| K, E|
  7. # | H| F|
  8. # | B| T, H|
  9. # | E| K, H|
  10. # | F|K, H, E|
  11. # | A| Zs|
  12. # +---+-------+

字符串
拆分并分解收件人(y)列

  1. df1 = df.withColumn('y', F.explode(F.split('y', r',\s+')))
  2. # +---+---+
  3. # | x| y|
  4. # +---+---+
  5. # | K| M|
  6. # | K| H|
  7. # | M| K|
  8. # | M| E|
  9. # | H| F|
  10. # | B| T|
  11. # | B| H|
  12. # | E| K|
  13. # | E| H|
  14. # | F| K|
  15. # | F| H|
  16. # | F| E|
  17. # | A| Zs|
  18. # +---+---+


自连接 Dataframe ,其中左边的接收者是右边 Dataframe 中的发送者。然后过滤 Dataframe ,使左边的发送者和右边的接收者相同

  1. df1 = df1.alias('left').join(df1.alias('right'), on=F.expr("left.y == right.x"))
  2. df1 = df1.filter("left.x == right.y")
  3. # +---+---+---+---+
  4. # | x| y| x| y|
  5. # +---+---+---+---+
  6. # | K| M| M| K|
  7. # | M| K| K| M|
  8. # | H| F| F| H|
  9. # | F| H| H| F|
  10. # +---+---+---+---+


删除发件人和收件人的重复组合

  1. df1 = df1.select('left.*').withColumn('pairs', F.array_sort(F.array('x', 'y')))
  2. df1 = df1.dropDuplicates(['pairs']).drop('pairs')
  3. # +---+---+
  4. # | x| y|
  5. # +---+---+
  6. # | H| F|
  7. # | K| M|
  8. # +---+---+

展开查看全部
woobm2wo

woobm2wo2#

  1. text = spark.textFile("PATH TO .txt file")
  2. text = text.map(lambda s: s.replace("->",","))
  3. text = text.map(lambda s: s.replace(",",""))
  4. text = text.map(lambda s: s.replace(" ",""))
  5. pairs = text.flatMap(lambda x: [(tuple(sorted((x[0],y))),1) for y in
  6. x[1:]]).groupByKey().mapValues(len)
  7. cm = pairs.filter(lambda x: x[1] ==2).collect()
  8. for i in range(0, len(cm)):
  9. print(cm[i][0])

字符串
我写了上面的代码,它产生了预期的输出。

  1. ('K', 'M')
  2. ('F', 'H')

相关问题