我有两个数据集,我试图结合,即 transactions
数据集和 contract
数据集,我想用的地方 address
负责。 to_address
作为join属性和 value
值的属性。
contract dataset fields:
address, is_erc20, is_erc721, block_number, block_timestamp
transactions dataset fields:
block_number, from_address, to_address, value, gas, gas_price, timestamp
所以我要做的是创建一个输出为: address, value
例子:
transactions dataset:
to_address value
0x412270b1f0f3884 240648550000000000
0x8d5a0a7c555602f 984699000000000000
contract dataset:
address
0x412270b1f0f3884
the output should be:
to_address value
0x412270b1f0f3884 240648550000000000
as 0x8d5a0a7c555602f is not present in the contract dataset.
下面是我的代码,我不知道我做错了什么。有什么帮助吗??
from mrjob.job import MRJob
class repartition_join(MRJob):
def mapper(self, _, line):
try:
if(len(line.split(','))==5): #contracts dataset
fields=line.split(',')
join_key=fields[0] #key is address
yield (join_key, 1) #yield join key given id 1?
elif(len(line.split(','))==7): #transactions dataset
fields=line.split(',')
join_key=fields[2] #to_address, which is the key
join_value=int(fields[3]) #[3] = value
yield (join_key,(join_value,2)) #gives key with value
except:
pass
def reducer(self, key, values):
val = None
for value in values:
if value[1] == 2:
val = (value[0])
yield(key, val)
if __name__=='__main__':
repartition_join.run()
1条答案
按热度按时间zu0ti5jz1#
再考虑一下reduce侧连接的map reduce管道。看来你很难理解。
为了从两个关系中区分一个键-值对,必须在Map器产生的值上添加一个关系符号。假设你想做一个内部连接,你必须
yield
在reduce中的一个元组,用于reduce-side连接,只有在contracts
还有你的transactions
数据集。因此,必须将这些关系的元组保存在单独的列表中,并通过关系符号标识元组。这可以很容易地调整其他连接-例如(左/右/全)外部连接,半/反连接。在下面的示例中,我使用了关系符号
'C'
对于contracts
以及'T'
对于transactions
数据集。我不能亲自尝试,因为我缺乏数据集,但它应该这样工作。如果你有任何问题,请给我一个评论。我可以建议您看看“donald miner,adam Shake的mapreduce设计模式”一书,因为它还解释了mapreduce任务的常见连接算法。同时查看最新的mrjob文档。