我试图用一个数据集做一个简单的处理。
考虑一个具有两列类型的数据集 String
. 我想在这个数据集中添加第三列 Long
,它累计到目前为止在数据集中看到的记录数。
例子:
输入:
a、 b类
b、 c级
c、 d级
输出:
a、 b,1号
b、 c,2个
c、 d,3个
我尝试了以下解决方案,但得到了一个奇怪的结果:
DataSet<Tuple2<String, String>> csvInput = env.readCsvFile("src/main/resources/data_file")
.ignoreFirstLine()
.includeFields("11")
.types(String.class,String.class);
long cnt=0;
DataSet<Tuple3<String, String, Long>> csvOut2 = csvInput.map(new MyMapFunction(cnt));
private static class MyMapFunction implements MapFunction<Tuple2<String, String>, Tuple3<String, String, Long>> {
long cnt;
public MyMappingFunction(long cnt) {
this.cnt = cnt;
}
@Override
public Tuple3<String, String, Long> map(Tuple2<String, String> m) throws Exception {
Tuple3 <String ,String, Long> resultTuple = new Tuple3(m.f0,m.f1, Long.valueOf(cnt));
cnt++;
return resultTuple;
}
}
当我将此解决方案应用于一个包含100个条目的文件时,我得到的计数是47而不是100。计数器在53处重新启动。类似地,当我将它应用于一个更大的文件时,计数器会不时地重置,这样我就不会得到行的总数。
你能解释一下为什么我的实现是这样的吗?还有,有什么可能解决我的问题?
谢谢!
1条答案
按热度按时间7z5jn7bk1#
这是一个多线程问题。你有多少任务?
我必须在运行之前清理你的代码-我建议以后发布完整的工作示例,这样你就有机会得到更多的答案。
跟踪计数的方式不是线程安全的,因此如果有多个任务槽,则会出现计数值不准确的问题。
正确的计数方法,如dataartisans单词计数示例所示,是使用元组中的第3个槽来简单地存储值1,然后对数据集求和。
然后
其中2是包含值1的元组的索引。