在flink queryable状态下无法获取查询响应[version-1.7.2]

ltqd579y  于 2021-07-15  发布在  Flink
关注(0)|答案(0)|浏览(245)

我正在查询flink cluster的代理服务器,该服务器位于127.0.1.1:9069上,但没有得到查询响应。我通过在9000端口上创建一个服务器来计算所有输入数字的总和。同时,我将总和存储在值状态。
flink工作:

private transient ValueState<Tuple2<String, Long>> sum;

@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<String,Long>> out) throws Exception {
    if (input.f1==-1){
        sum.clear();
        return;
    }
    Tuple2<String, Long> currentSum = sum.value();
    currentSum.f1 += input.f1;

    sum.update(currentSum);
    System.out.println("Current Sum: "+(sum.value().f1)+"\nCurrent Count: "+(sum.value().f0));
        out.collect(new Tuple2<>("sum", sum.value().f1));
}

@Override
public void open(Configuration config) {
    ValueStateDescriptor<Tuple2<String, Long>> descriptor =
            new ValueStateDescriptor<>(
                    "sum", // the state name
                    TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}),
                    Tuple2.of("sum", 0L)); // default value of the state, if nothing was set
    sum = getRuntimeContext().getState(descriptor);

}

inp.flatMap(new FlatMapFunction<String, Tuple2<Long, Long>>() {
        @Override
        public void flatMap(String inpstr, Collector<Tuple2<Long, Long>> out) throws Exception{

            for (String word : inpstr.split("\\s")) {
                try {
                    if(word.equals("quit")){
                        throw new QuitValueState( "Stoppping!!!",hostname,port);
                    }
                    if(word.equals("clear")){
                        word="-1";
                    }
                    out.collect(Tuple2.of(1L, Long.valueOf(word)));
                }
                catch ( NumberFormatException e) {
                    System.out.println("Enter valid number: "+e.getMessage());
                }catch (QuitValueState ex){
                    System.out.println("Quitting!!!");
                }
            }
        }
    }).keyBy(0).flatMap(new StreamingJob())
            .keyBy(0).asQueryableState("query-name");

在flink cluster上,我可以看到代理服务器位于127.0.1.1:9069
客户端:

public static void main(String[] args) throws IOException, InterruptedException, Exception {
    QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);

    System.out.println("Querying on "+args[0]);
    JobID jobId = JobID.fromHexString(args[0]);
    ValueStateDescriptor<Tuple2<String, Long>> descriptor =
            new ValueStateDescriptor<>(
                    "sum",
                    TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                    }));

    CompletableFuture<ValueState<Tuple2<String, Long>>> resultFuture =
            client.getKvState(jobId, "query-name", "sum", BasicTypeInfo.STRING_TYPE_INFO, descriptor);
    System.out.println(resultFuture);
    resultFuture.thenAccept(response -> {
        try {
            Tuple2<String, Long> res = response.value();
            System.out.println("Queried sum value: " + res);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Exiting future ...");
    });
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题