bounty将在2天后过期。回答此问题可获得+50声望奖励。PatPanda希望引起更多人对此问题的关注:一个工作示例,其中对象的流量将以ElasticSearch内部的React方式结束
我想达到的目标:
- 使用Spring WebClient发送Flux到ElasticSearch,利用
/_bulk
API。
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
我尝试过的:
使用以下代码:
public class BulkInsertOfFluxUsingWebClientBulkRestApi {
public static void main(String[] args) throws InterruptedException {
WebClient client = WebClient.create("http://127.0.0.1:9200/").mutate().clientConnector(new ReactorClientHttpConnector(HttpClient.create().wiretap(true))).build();
Flux<String> createCommandFlux = Flux.interval(Duration.ofMillis(100))
.map(i -> {
try {
Foo onePojo = new Foo(LocalDateTime.now().toString(), String.valueOf(i));
String jsonStringOfOnePojo = new ObjectMapper().writeValueAsString(onePojo);
String bulkCreateCommande = "{ \"create\" : {} }\n" + jsonStringOfOnePojo + "\n";
return bulkCreateCommande;
} catch (Exception e) {
e.printStackTrace();
return "";
}
});
Disposable disposable = createCommandFlux
.window(100)
.flatMap(windowFlux -> client
.post()
.uri("my_index/_bulk")
.contentType(MediaType.APPLICATION_NDJSON)
.body(windowFlux, Foo.class)
.exchange()
.doOnNext(response -> System.out.println(response))
.flatMap(clientResponse -> clientResponse.bodyToMono(String.class)))
.subscribe();
Thread.sleep(1000000);
disposable.dispose();
}
注意事项:
- 这只是使用响应式Spring WebClient,而不是另一个http客户端,而不是ElsaticSearch java客户端等。
- 它试图在ElasticSearch中保存一个Flux(可以是无限的)
- 我想避免对每个对象发出一个http请求,因此,利用
/_bulk
API将它们“分组”并批量发送。
问题:
不幸的是,此代码当前返回HTTP 400错误请求。
1条答案
按热度按时间ckx4rj1h1#
既然你使用的是webclient,我假设你也使用spring
Checkout Spring API用于响应式ElasticSearch操作:https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#elasticsearch.reactive.operations
使用Reactive Elasticsearch模板:https://docs.spring.io/spring-data/elasticsearch/docs/4.2.0-M3/api/index.html?org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.html
以下是示例
x一个一个一个一个x一个一个二个x