如何修复“不兼容类型:org.apache.beam.sdk.options.valueprovider< java.lang.string>无法转换为java.lang.string”

kfgdxczn  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(723)

我按照这个链接创建了一个模板,它构建了一个梁管道来读取kafkaio的内容。但我总是遇到“不兼容类型:org.apache.beam.sdk.options.valueprovider无法转换为java.lang.string”。导致错误的是“.WithBootsTapServers(options.getkafkaserver())”行。beam版本是2.9.0,这是我代码的一部分。

public interface Options extends PipelineOptions {
    @Description("Kafka server")
    @Required
    ValueProvider<String> getKafkaServer();

    void setKafkaServer(ValueProvider<String> value);

    @Description("Topic to read from")
    @Required
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @Description("Topic to write to")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

    @Description("File path to write to")
    @Required
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getKafkaServer())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata() 
    )

下面是我如何运行代码:

mvn compile exec:java \
-Dexec.mainClass=${MyClass} \
-Pdataflow-runner -Dexec.args=" \
--project=${MyClass} \
--stagingLocation=gs://${MyBucket}/staging \
--tempLocation=gs://${MyBucket}/temp \
--templateLocation=gs://${MyBucket}/templates/${MyClass} \
--runner=DataflowRunner"
pvabu6sv

pvabu6sv1#

我可能会发现问题,那就是Kafka约不受支持。以下是来自谷歌创建模板。
“某些i/o连接器包含接受valueprovider对象的方法。要确定对特定连接器和方法的支持,请参阅i/o连接器的api参考文档。支持的方法具有valueprovider的重载。如果方法没有重载,则该方法不支持运行时参数。以下i/o连接器至少具有部分valueprovider支持:
基于文件的ios:textio、avroio、fileio、tfrecordio、xmlio bigqueryio*bigtableio(需要sdk 2.3.0或更高版本)pubsubio“

vql8enpb

vql8enpb2#

为了通过 ValueProvider ,您需要使用 get 方法,然后获得具有其具体类型的值。
例如:有选项时: ValueProvider<String> getKafkaServer(); 您可以通过以下方式访问: getKafkaServer().get() 这将返回字符串对象。
似乎kafkaio api需要获取字符串参数而不是valueprovider,您必须从valueprovider Package 器中提取值。

相关问题