我用k-均值定义了一个终止条件的过滤器。如果我运行我的应用程序,它总是只计算一次迭代。
我认为问题在于:
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
或者过滤器功能:
public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
private static final long serialVersionUID = 5868635346889117617L;
public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
if(tuple.f0.equals(tuple.f1)) {
return true;
}
else {
return false;
}
}
}
致以最诚挚的问候,保罗
我的完整代码在这里:
public void run() {
//load properties
Properties pro = new Properties();
FileSystem fs = null;
try {
pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
} catch (Exception e) {
e.printStackTrace();
}
int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
DataSet<GeoTimeDataCenter> centroids = null;
try {
centroids = getCentroidDataSet(env);
} catch (Exception e1) {
e1.printStackTrace();
}
// set number of bulk iterations for KMeans algorithm
IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
DataSet<GeoTimeDataCenter> newCentroids = points
// compute closest centroid for each point
.map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
// count and sum point coordinates for each centroid
.groupBy(0).reduceGroup(new CentroidAccumulator())
// compute new centroids from point counts and coordinate sums
.map(new CentroidAverager(this.getBenchmarkCounter()));
// feed new centroids back into next iteration with termination condition
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
// emit result
clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
finalCentroids.writeAsText(outputPath+"/centers");//print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
private static final long serialVersionUID = 5868635346889117617L;
public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
if(tuple.f0.equals(tuple.f1)) {
return true;
}
else {
return false;
}
}
}
1条答案
按热度按时间8nuwlpux1#
我认为问题在于filter函数(对你没有发布的代码进行模化)。flink的终止标准的工作方式如下:如果所提供的终止
DataSet
是空的。否则,如果未超过最大迭代次数,则开始下一个迭代。Flink的
filter
函数只保留FilterFunction
退货true
. 因此,与您的MyFilter
实现您只保持迭代前后的质心相同。这意味着您将获得一个空的DataSet
如果所有质心都改变了,那么迭代就终止了。这显然与实际的终止标准相反。终止标准应该是:只要有一个质心已经改变,就继续k-均值。你可以用一个
coGroup
函数,如果前面的质心没有匹配的质心,则在其中发射元素DataSet
. 这类似于左外连接,只是丢弃非空匹配。