如何使用python中的mrjob作为map reduce作业进行reduce-side连接

jgovgodb  于 2021-07-15  发布在  Hadoop
关注(0)|答案(1)|浏览(650)

我有两个数据集,我试图结合,即 transactions 数据集和 contract 数据集,我想用的地方 address 负责。 to_address 作为join属性和 value 值的属性。

  1. contract dataset fields:
  2. address, is_erc20, is_erc721, block_number, block_timestamp
  3. transactions dataset fields:
  4. block_number, from_address, to_address, value, gas, gas_price, timestamp

所以我要做的是创建一个输出为: address, value 例子:

  1. transactions dataset:
  2. to_address value
  3. 0x412270b1f0f3884 240648550000000000
  4. 0x8d5a0a7c555602f 984699000000000000
  5. contract dataset:
  6. address
  7. 0x412270b1f0f3884
  8. the output should be:
  9. to_address value
  10. 0x412270b1f0f3884 240648550000000000
  11. as 0x8d5a0a7c555602f is not present in the contract dataset.

下面是我的代码,我不知道我做错了什么。有什么帮助吗??

  1. from mrjob.job import MRJob
  2. class repartition_join(MRJob):
  3. def mapper(self, _, line):
  4. try:
  5. if(len(line.split(','))==5): #contracts dataset
  6. fields=line.split(',')
  7. join_key=fields[0] #key is address
  8. yield (join_key, 1) #yield join key given id 1?
  9. elif(len(line.split(','))==7): #transactions dataset
  10. fields=line.split(',')
  11. join_key=fields[2] #to_address, which is the key
  12. join_value=int(fields[3]) #[3] = value
  13. yield (join_key,(join_value,2)) #gives key with value
  14. except:
  15. pass
  16. def reducer(self, key, values):
  17. val = None
  18. for value in values:
  19. if value[1] == 2:
  20. val = (value[0])
  21. yield(key, val)
  22. if __name__=='__main__':
  23. repartition_join.run()
zu0ti5jz

zu0ti5jz1#

再考虑一下reduce侧连接的map reduce管道。看来你很难理解。
为了从两个关系中区分一个键-值对,必须在Map器产生的值上添加一个关系符号。假设你想做一个内部连接,你必须 yield 在reduce中的一个元组,用于reduce-side连接,只有在 contracts 还有你的 transactions 数据集。因此,必须将这些关系的元组保存在单独的列表中,并通过关系符号标识元组。这可以很容易地调整其他连接-例如(左/右/全)外部连接,半/反连接。
在下面的示例中,我使用了关系符号 'C' 对于 contracts 以及 'T' 对于 transactions 数据集。我不能亲自尝试,因为我缺乏数据集,但它应该这样工作。如果你有任何问题,请给我一个评论。
我可以建议您看看“donald miner,adam Shake的mapreduce设计模式”一书,因为它还解释了mapreduce任务的常见连接算法。同时查看最新的mrjob文档。

  1. from mrjob.job import MRJob
  2. from mrjob.step import MRStep
  3. class repartition_join(MRJob):
  4. def mapper(self, _, line):
  5. fields=line.split(',')
  6. if len(fields == 5): # contracts dataset
  7. join_key = fields[0] # key is in attribute address
  8. yield (join_key, ('C', 1)) # yield join key, value not used
  9. elif len(fields) == 7: # transactions dataset
  10. join_key = fields[2] # key is in attribute to_address
  11. join_value = int(fields[3]) # value is in attribute value
  12. yield (join_key, ('T', join_value)) # yields join key with value
  13. else:
  14. pass # TODO handle error
  15. def reducer(self, key, values):
  16. address = key # the join key
  17. contracts_tuples = []
  18. transactions_tuples = []
  19. for value in values:
  20. relation_symbol = value[0] # either 'T' or 'C'
  21. if relation_symbol == 'C': # contracts dataset
  22. contracts_tuples.append(value[1]) # always 1 - just to know that there is a tuple in contracts
  23. elif relation_symbol == 'T': # transactions dataset
  24. transactions_tuples.append(value[1]) # append the value inside value attribute
  25. else:
  26. pass # TODO handle error
  27. # inner join contract and transaction, generalize if needed
  28. if len(contracts_tuples) > 0 and len(transactions_tuples) > 0:
  29. for value in transactions_tuples:
  30. yield (address, value)
  31. def steps(self):
  32. return [MRStep(
  33. mapper=self.mapper,
  34. reducer=self.reducer)
  35. ]
  36. if __name__=='__main__':
  37. repartition_join.run()
展开查看全部

相关问题