使用hadoop从spark连接到elasticsearch不起作用

vatpfxk5  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(439)

从java代码连接到本地运行的elasticsearch节点时遇到问题,java代码作为提交给spark的作业运行(本地运行)。然而,当我不使用Spark连接是没有问题的。运行python作业并将其提交给spark也可以很好地工作。
我知道对于java,我需要通过端口9300而不是9200(http端口)连接。然而,我总是得到同样的例外,在阅读和写作上没有区别:

16/08/04 16:51:55 ERROR NetworkClient: Node [The server localhost failed to respond with a valid HTTP response] failed (localhost:9300); no other nodes left - aborting...
Exception in thread "main" org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9300]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:102)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:282)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:266)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:270)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:108)
    at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:90)
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:61)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:434)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:415)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1341)
    at org.apache.spark.api.java.JavaPairRDD.first(JavaPairRDD.scala:211)
    at com.dd.mediaforce.spark.most_popular.ExecutorMostPopular.main(ExecutorMostPopular.java:564)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我们正在许多节点上运行spark和elasticsearch。python代码在这里运行得很好,但是用这个es设置尝试java代码也无助于解决问题。
我使用的代码来自java:

SparkConf _sparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("Test");
    JavaSparkContext jsc = new JavaSparkContext(_sparkConf);
    Configuration conf = new Configuration();
    conf.set("cluster.name", "our_clustername");
    conf.set("es.nodes", "localhost");
    conf.setInt("es.port", 9300);
    conf.set("es.resource", index_and_type);
    JavaPairRDD readRdd = jsc.newAPIHadoopRDD(conf, org.elasticsearch.hadoop.mr.EsInputFormat.class, org.apache.hadoop.io.NullWritable.class, org.elasticsearch.hadoop.mr.LinkedMapWritable.class);
    System.out.println(readRdd.first());
    jsc.stop();

以下使用transportclient(没有spark)的java代码如前所述连接到es没有问题,读写都可以正常工作:

Client client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

    ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
    for (ObjectCursor<IndexMetaData> value : indices.values()) {
        log.info("Index: " + value.index + " : " + value.toString());
    }

    GetResponse response = client.prepareGet("index_name", "type_name", "1").get();
    log.info(response.getIndex() + " : " + response.getId() + " : " + response.isExists());

    String field_id = "6";
    IndexRequest indexRequest = new IndexRequest("index_name", "type", "2")
        .source(jsonBuilder()
                .startObject()
                .prettyPrint()
                .field("field_id", field_id)
                .field("another_field", "value")
                .field("integer_field", 100)
                .endObject());

    UpdateRequest updateRequest = new UpdateRequest("index_name", "type_name", article_id)
        .doc(jsonBuilder()
                .startObject()
                .prettyPrint()
                .field("field_id", field_id)
                .field("another_field", "value")
                .field("integer_field", 100)
                .endObject())
                .upsert(indexRequest);

    UpdateResponse responseUpdate = client.update(updateRequest).get();
    log.info(responseUpdate.getIndex() + " : " + responseUpdate.getGetResult() + " : " + responseUpdate.getType());
    client.close();

任何建议都是欢迎的,因为我已经被困在这里好几天了,没有任何进一步的印象。显然,我在google上搜索了这个问题,并在stackoverflow上搜索了一下,但到目前为止,我还没有找到问题的答案。
为了完整起见,一些python代码也可以使用spark对es进行良好的读写。

conf = SparkConf()
conf = conf.setAppName('Test')
sc = SparkContext(conf=conf)

# Omitting some of the code in creating some_rdd on Spark:

index_and_type = index_name + '/type_name'
groovy_script = "if (ctx._source.%s) { ctx._source.%s+=value } else { ctx._source.%s=value }" % (field, field, field)

es_db_connection_dictionary = {
    "es.nodes": db_hosts,
    "es.port": db_port,
    "es.resource": index_and_type,
    "es.write.operation": "upsert",
    "es.mapping.id": "field_id",
    "es.update.script": groovy_script,
    "es.update.script.params": "value:%s" % integer_field,
    "es.http.timeout": "10s"
}

es_input = views_tuple_rdd.map(lambda item: (item[0],
        {
            'field_id': item[0],
            "integer_field": item[1],
            "another_field": client_name,
        }))

es_input.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_db_connection_dictionary)
bnlyeluc

bnlyeluc1#

通常,如果您使用的是elasticsearch spark连接器,则不需要使用端口9300(如果默认端口为9200)。它的行为与常规ElasticSearchAPI不同。
而且看起来你使用的连接器和elasticsearch不兼容。这是一个常见的错误,因为他们主要是在2.x的大多数。
我相信ElasticSearch5.x的情况不是这样的,因为他们已经将所有其他elastic产品版本与之匹配。

相关问题