客户端未连接到任何elasticsearch节点!在flink中以独立群集模式运行时

col17t5w  于 2021-06-25  发布在  Flink
关注(0)|答案(0)|浏览(199)

我可以在我的eclipse中运行下面的代码,但是当我在flink集群中运行它时,我得到下面的错误。有人能给我指点一下吗?
我的代码:

public static void main(String[] args) throws Exception {

    /**
     * Getting the execution Environment
     */
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "11.71.10.13:9092");
    properties.setProperty("group.id", "Bitfinex");
    Map<String, String> config = new HashMap<>();
    config.put("cluster.name", "TRADE-ES");
    config.put("bulk.flush.max.actions", "1");
    config.put("node.name", "node-1");

    List<InetSocketAddress> transportAddresses = new ArrayList<>();
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("19.18.1.55"), 9300));
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("19.18.1.78"), 9300));

    /**
     * Adding the BitFinex-ETHBTC-Order source to the execution environment
     */
    DataStream<String> ethbtc_OrderStream = env.addSource(
            new FlinkKafkaConsumer010<String>("BitFinex-ETHBTC-Order", new SimpleStringSchema(), properties),
            "Kafka_BitFinex-ETHBTC-Order_Source").setParallelism(1);

    ethbtc_OrderStream.addSink(new BitfinexEthbtcOrderADLSink<String>()).name("BitfinexEthbtcOrderADLSink")
            .setParallelism(10);

    ethbtc_OrderStream
            .addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
                public IndexRequest createIndexRequest(String element) {
                    return Requests.indexRequest().index("ethbtc_order").type("order").source(element);
                }

                @Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    System.out.println(element);
                    indexer.add(createIndexRequest(element));
                }

            })).name("BitfinexEthbtcOrderESSink").setParallelism(10);
}
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
    at java.lang.Thread.run(Thread.java:748)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题