设置和访问flink可查询状态(nullpointerexception)

3ks5zfa0  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(487)

我在用Flink v1.4.0 我已经安排了两份不同的工作。第一个是一个管道,它使用来自kafka主题的数据并将其存储到可查询状态(qs)中。数据按日期键入。第二个提交一个查询到qs作业并处理返回的数据。
两份工作都和Flink相处得很好 v.1.3.2 . 但随着新的更新,一切都坏了。以下是第一个作业的部分代码:

private void runPipeline() throws Exception {
    StreamExecutionEnvironment env = configurationEnvironment();

    QueryableStateStream<String, DataBucket> dataByDate = env.addSource(sourceDataFromKafka())
        .map(NewDataClass::new)
        .keyBy(data.date)
        .asQueryableState("QSName", reduceIntoSingleDataBucket());
}

下面是客户端的代码:

QueryableStateClient client = new QueryableStateClient("localhost", 6123);

// the state descriptor of the state to be fetched.
ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
    "QSName",
    TypeInformation.of(new TypeHint<DataBucket>() {}));

jobId = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
String key = "2017-01-06";

CompletableFuture<ValueState<DataBucket> resultFuture = client.getKvState(
    jobId, 
    "QSName", 
    key, 
    BasicTypeInfo.STRING_TYPE_INFO, 
    descriptor);

try {

    ValueState<DataBucket>  valueState = resultFuture.get();
    DataBucket bucket = valueState.value();          
    System.out.println(bucket.getLabel());

} catch (IOException | InterruptionException | ExecutionException e) {
    throw new RunTimeException("Unable to query bucket key: " + key , e);
}

我已按照以下链接的说明进行操作:https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html
确保通过包含 flink-queryable-state-runtime_2.11-1.4.0.jaropt/ 将flink发行版的文件夹 lib/ 并选中它在任务管理器中运行。
我不断得到以下错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:84)
    at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
    at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:210)
    at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:174)
    at com.company.dept.query.QuerySubmitter.main(QuerySubmitter.java:37)

知道发生了什么吗?我想我的请求根本没有到达qs。。。我真的不知道我该不该改变什么。谢谢。

mnemlml8

mnemlml81#

所以,事实证明,是两个因素导致了这个错误。第一个是使用错误的构造函数来创建 descriptor 在客户端。我没有使用只接受qs名称和typehint作为输入的方法,而是使用另一种方法 keySerialiser 以及默认值,如下所示:

ValueStateDescriptor<DataBucket> descriptor = new ValueStateDescriptor<>(
    "QSName",
    TypeInformation.of(new TypeHint<DataBucket>() {}).createSerializer(new ExecutionConfig()),
    DataBucket.emptyBucket());    // or anything that can be used as a default value

第二个与主机和端口值有关。港口不同于 v1.3.2 现在设置为9069,localhost在我的例子中也不同。您可以通过检查该行的任何任务管理器的日志来验证这两者: Started the Queryable State Proxy Server @ ... .
最后,如果您在这里是因为希望允许可查询状态客户端代理的端口范围,我建议您在这里遵循相应的问题(flink-7788):https://issues.apache.org/jira/browse/flink-7788.

相关问题