运行Flink作业时出错:
ClassCastException: cannot assign instance of java.util.LinkedHashMap to field org.apache.flink.runtime.jobgraph.JobVertex.results of type java.util.ArrayList in instance of org.apache.flink.runtime.jobgraph.InputOutputFormatVertex
下面是源代码
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<String> filtered = text.filter(new FilterFunction<String>()
{
public boolean filter(String value)
{
return value.startsWith("N");
}
});
DataSet<Tuple2<String, Integer>> tokenized = filtered.map(new Tokenizer());
DataSet<Tuple2<String, Integer>> counts = tokenized.groupBy(new int[] { 0 }).sum(1);
if (params.has("output"))
{
counts.writeAsText(params.get("output"));
env.execute("WordCount Example");
}
}
public static final class Tokenizer
implements MapFunction<String, Tuple2<String, Integer>>
{
public Tuple2<String, Integer> map(String value)
{
return new Tuple2(value, Integer.valueOf(1));
}
}
错误:Error image
1条答案
按热度按时间6jygbczu1#
你是对的大卫安德森,这是我本地机器上的版本不匹配,我已经通过将本地Flink集群版本升级到最新版本来解决这个问题。