我有一个文件,我需要Map/减少那里的输出需要一个日期的总和和最大值。我有总和部分的工作,但是,我不知道如何包括最大日期作为减少输出的一部分。
输入数据如下所示:
ID1, ID2, date, count
3000, 001, 2014-12-30 18:00:00, 2
3000, 001, 2015-01-01 10:00:00, 1
3000, 002, 2014-11-18 12:53:00, 5
3000, 002, 2014-12-20 20:14:00, 3
我的Map器将id1+id2连接起来,以便对它们进行分组。其输出如下所示:
key (ID1|ID2), value (count)
3000|001, 2
3000|001, 1
3000|002, 5
3000|002, 3
减速器输出如下所示:
key (ID1|ID2), value (sum)
3000|001, 3
3000|002, 8
我真正需要的是这样的输出:
key (ID1|ID2), value (sum), date (max)
3000|001, 3, 2015-01-01 10:00:00
3000|002, 8, 2014-12-20 20:14:00
mapper和reducer是用ruby编写的,但是,我将使用python编写的一个工作示例(我将把它翻译成ruby)。
以下是Map器代码:
require 'csv'
pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')
Dir.glob(pattern).each do |file|
CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
puts [
"#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
row[7] # value = count
].join("\t")
end
end
以及减速器:
prev_key = nil
key_total = 0
ARGF.each do |line|
line = line.chomp
next unless line
(key, value) = line.split("\t")
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
puts [prev_key, key_total].join("\t")
# reset key total for new key
prev_key = key
key_total = 0
elsif !prev_key
prev_key = key
end
# add to count for this current key
key_total += value.to_i
end
# this is to catch the final counts after all records have been received
puts [prev_key, key_total].join("\t")
更新
以下是基于公认答案的建议的新Map器和还原器:
Map器:
require 'csv'
pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')
Dir.glob(pattern).each do |file|
CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
date_time = "#{row[0]} #{row[1]}:00:00#{row[2]}" # %Y-%m-%d %H:%M:%S%z
puts [
"#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
"#{row[7]}|#{date_time}", # value = count | date_time
].join("\t")
end
end
减速器:
require 'date'
prev_key = nil
key_total = 0
dates = []
ARGF.each do |line|
line = line.chomp
next unless line
(key, values) = line.split("\t")
(value, date_time) = values.split('|')
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
puts [prev_key.split('|'), key_total, dates.max].join("\t")
# reset key total for new key
prev_key = key
key_total = 0
# reset dates array for new key
dates.clear
elsif !prev_key
prev_key = key
end
# add date to array for this current key
dates << DateTime.strptime(date_time, '%Y-%m-%d %H:%M:%S%z')
# add to count for this current key
key_total += value.to_i
end
# this is to catch the final counts after all records have been received
puts [prev_key.split('|'), key_total, dates.max].join("\t")
1条答案
按热度按时间uqdfh47h1#
您只需将date和count放入一对<date,count>中,并将其作为一个值从Map器中发出。然后在reducer中从对值中提取日期和计数。sum的计数方式与之前相同,并跨输入值(每个键)跟踪最大日期。