java Spring WebClient使用/_bulk API将Flux发送到ElasticSearch

ehxuflar  于 2023-04-04  发布在  Java
关注(0)|答案(1)|浏览(138)

bounty将在2天后过期。回答此问题可获得+50声望奖励。PatPanda希望引起更多人对此问题的关注:一个工作示例,其中对象的流量将以ElasticSearch内部的React方式结束

我想达到的目标:

  • 使用Spring WebClient发送Flux到ElasticSearch,利用/_bulk API。

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
我尝试过的:
使用以下代码:

public class BulkInsertOfFluxUsingWebClientBulkRestApi {

    public static void main(String[] args) throws InterruptedException {
        WebClient client = WebClient.create("http://127.0.0.1:9200/").mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().wiretap(true))).build();
        Flux<String> createCommandFlux = Flux.interval(Duration.ofMillis(100))
                .map(i -> {
                    try {
                        Foo onePojo = new Foo(LocalDateTime.now().toString(), String.valueOf(i));
                        String jsonStringOfOnePojo = new ObjectMapper().writeValueAsString(onePojo);
                        String bulkCreateCommande = "{ \"create\" : {} }\n" + jsonStringOfOnePojo + "\n";
                        return bulkCreateCommande;
                    } catch (Exception e) {
                        e.printStackTrace();
                        return "";
                    }
                });
        
        Disposable disposable = createCommandFlux
                .window(100) 
                .flatMap(windowFlux -> client
                        .post()
                        .uri("my_index/_bulk")
                        .contentType(MediaType.APPLICATION_NDJSON)
                        .body(windowFlux, Foo.class)
                        .exchange()
                        .doOnNext(response -> System.out.println(response))
                        .flatMap(clientResponse -> clientResponse.bodyToMono(String.class)))
                .subscribe();
        Thread.sleep(1000000);
        disposable.dispose();
    }

注意事项:

  • 这只是使用响应式Spring WebClient,而不是另一个http客户端,而不是ElsaticSearch java客户端等。
  • 它试图在ElasticSearch中保存一个Flux(可以是无限的)
  • 我想避免对每个对象发出一个http请求,因此,利用/_bulk API将它们“分组”并批量发送。

问题:
不幸的是,此代码当前返回HTTP 400错误请求。

ckx4rj1h

ckx4rj1h1#

既然你使用的是webclient,我假设你也使用spring
Checkout Spring API用于响应式ElasticSearch操作:https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#elasticsearch.reactive.operations
使用Reactive Elasticsearch模板:https://docs.spring.io/spring-data/elasticsearch/docs/4.2.0-M3/api/index.html?org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.html

protected reactor.core.publisher.Flux<org.elasticsearch.action.bulk.BulkItemResponse> doBulkOperation(List<?> queries,
                                                                                                  BulkOptions bulkOptions,
                                                                                                  IndexCoordinates index)

以下是示例
x一个一个一个一个x一个一个二个x

相关问题