eclipse 本地Pubsub模拟器无法与数据流一起工作

x7yiwoj4  于 2022-11-04  发布在  Eclipse
关注(0)|答案(2)|浏览(133)

我正在用Java开发Dataflow,输入来自Pubsub。后来,我看到了一个关于如何使用本地Pubsub模拟器的指南here,这样我就不需要为了测试而部署到GCP。
下面是我的简单代码:

private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {

    @Description("Pub/Sub topic to read messages from")
    String getTopic();
    void setTopic(String topic);

    @Description("Pub/Sub subscription to read messages from")
    String getSubscription();
    void setSubscription(String subscription);

    @Description("Local file output")
    String getOutput();
    void setOutput(String output);
}

public static void main(String[] args) {

    Options options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(Options.class);
    options.setStreaming(true);
    options.setPubsubRootUrl("localhost:8085");

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // other .apply's

    pipeline.run();

}

我能够按照指南进行操作,包括我需要使用示例Python代码创建主题、订阅、发布者甚至发布消息的部分。当我使用Python代码与Pubsub模拟器交互时,我注意到运行模拟器的命令行中出现消息Detected HTTP/2 connection

Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.

我使用Dataflow Pipeline Run Configuration在Eclipse中编译/运行代码,但是遇到了一个问题。
第一次

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription: 
...
Caused by: java.lang.RuntimeException: Failed to create subscription: 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost

当我试图在options.setPubsubRootUrl("localhost:8085")行中添加http时,我得到了一个无限重复的异常:

com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    at java.net.PlainSocketImpl.connect(Unknown Source)
    at java.net.SocksSocketImpl.connect(Unknown Source)

它似乎可以到达Pubsub模拟器,但无法连接,因为我运行模拟器的命令行也会无限地生成此命令:

[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.

如何让我的Dataflow与Pubsub模拟器一起工作?

8ulbf1ek

8ulbf1ek1#

您正在尝试使用Beam 2.5 SDK的Dataflow分支从Beam Direct Runner连接到Pubsub仿真器。Dataflow 2.5 SDK和Eclipse插件已于2019年6月6日弃用。但这应该可以工作。
正如您所发现的,您需要在Beam中为PubsubRootUrl添加“http://”前缀。您看到的第二个问题表明localhost:8085上没有侦听。这可能是因为实际上有两个本地主机:IPv4和IPv6。Pubsub仿真器只侦听IPv4,Windows首先尝试IPv6。请尝试将localhost替换为127.0.0.1以强制使用IPv4。最后应显示以下内容:

options.setPubsubRootUrl("http://127.0.0.1:8085")
kmbjn2e3

kmbjn2e32#

除了设置根URL,你还需要提供一个凭据工厂。当使用模拟器时,你不需要任何凭据。你可以使用代码(通过手动设置选项)或者直接传递命令行来完成。后者可以保持代码干净。
编码:

options.setPubsubRootUrl("http://127.0.0.1:8085");
options.setCredentialFactoryClass(NoCredentialsFactory.class);

命令行选项:

--pubsubRootUrl=http://127.0.0.1:8085
--credentialFactoryClass=ca.dataedu.dataflow.otlpdemo.NoCredentialsFactory

NoCredentialsFactory代码类似于:

import com.google.auth.Credentials;
import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

public class NoCredentialsFactory implements CredentialFactory{

    private static final NoCredentialsFactory INSTANCE = new NoCredentialsFactory();

    public static NoCredentialsFactory fromOptions(PipelineOptions options) {
        return INSTANCE;
    }

    @Override
    public @Nullable Credentials getCredential() {
        return null;
    }
}

相关问题