org.elasticsearch.client.RestHighLevelClient.bulkAsync()方法的使用及代码示例

x33g5p2x  于2022-01-28 转载在 其他  
字(6.0k)|赞(0)|评价(0)|浏览(589)

本文整理了Java中org.elasticsearch.client.RestHighLevelClient.bulkAsync方法的一些代码示例,展示了RestHighLevelClient.bulkAsync的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。RestHighLevelClient.bulkAsync方法的具体详情如下:
包路径:org.elasticsearch.client.RestHighLevelClient
类名称:RestHighLevelClient
方法名:bulkAsync

RestHighLevelClient.bulkAsync介绍

[英]Asynchronously executes a bulk request using the Bulk API. See Bulk API on elastic.co
[中]使用批量API异步执行批量请求。见Bulk API on elastic.co

代码示例

代码示例来源:origin: apache/incubator-gobblin

@Override
public Future<WriteResponse> write(final Batch<Object> batch, @Nullable WriteCallback callback) {
 Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
 try {
  client.bulkAsync(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
  return preparedBatch.getSecond().getFuture();
 }
 catch (Exception e) {
  throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e);
 }
}

代码示例来源:origin: dadoonet/fscrawler

(request, bulkListener) -> client.bulkAsync(request, bulkListener);

代码示例来源:origin: dadoonet/fscrawler

@Override
public void start() throws IOException {
  if (client != null) {
    // The client has already been initialized. Let's skip this again
    return;
  }
  try {
    // Create an elasticsearch client
    client = new RestHighLevelClient(buildRestClient(settings.getElasticsearch()));
    checkVersion();
    logger.info("Elasticsearch Client for version {}.x connected to a node running version {}", compatibleVersion(), getVersion());
  } catch (Exception e) {
    logger.warn("failed to create elasticsearch client, disabling crawler...");
    throw e;
  }
  if (settings.getElasticsearch().getPipeline() != null) {
    // Check that the pipeline exists
    if (!isExistingPipeline(settings.getElasticsearch().getPipeline())) {
      throw new RuntimeException("You defined pipeline:" + settings.getElasticsearch().getPipeline() +
          ", but it does not exist.");
    }
  }
  BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
      (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
  bulkProcessor = BulkProcessor.builder(bulkConsumer, new DebugListener(logger))
      .setBulkActions(settings.getElasticsearch().getBulkSize())
      .setFlushInterval(TimeValue.timeValueMillis(settings.getElasticsearch().getFlushInterval().millis()))
      .setBulkSize(new ByteSizeValue(settings.getElasticsearch().getByteSize().getBytes()))
      .build();
}

代码示例来源:origin: dadoonet/fscrawler

@Override
public void start() throws IOException {
  if (client != null) {
    // The client has already been initialized. Let's skip this again
    return;
  }
  try {
    // Create an elasticsearch client
    client = new RestHighLevelClient(buildRestClient(settings.getElasticsearch()));
    checkVersion();
    logger.info("Elasticsearch Client for version {}.x connected to a node running version {}", compatibleVersion(), getVersion());
  } catch (Exception e) {
    logger.warn("failed to create elasticsearch client, disabling crawler...");
    throw e;
  }
  if (settings.getElasticsearch().getPipeline() != null) {
    // Check that the pipeline exists
    if (!isExistingPipeline(settings.getElasticsearch().getPipeline())) {
      throw new RuntimeException("You defined pipeline:" + settings.getElasticsearch().getPipeline() +
          ", but it does not exist.");
    }
  }
  BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
      (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
  bulkProcessor = BulkProcessor.builder(bulkConsumer, new DebugListener(logger))
      .setBulkActions(settings.getElasticsearch().getBulkSize())
      .setFlushInterval(TimeValue.timeValueMillis(settings.getElasticsearch().getFlushInterval().millis()))
      .setBulkSize(new ByteSizeValue(settings.getElasticsearch().getByteSize().getBytes()))
      .build();
}

代码示例来源:origin: org.nuxeo.elasticsearch/nuxeo-elasticsearch-core

@Override
public BulkProcessor.Builder bulkProcessorBuilder(BulkProcessor.Listener bulkListener) {
  return BulkProcessor.builder((request, listener) -> client.bulkAsync(request, RequestOptions.DEFAULT, listener),
      bulkListener);
}

代码示例来源:origin: io.reactiverse/elasticsearch-client

@Override()
public void bulkAsync(BulkRequest bulkRequest, RequestOptions options, Handler<AsyncResult<BulkResponse>> handler) {
  Context context = vertx.getOrCreateContext();
  delegate.bulkAsync(bulkRequest, options, new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse value) {
      context.runOnContext(v -> handler.handle(Future.succeededFuture(value)));
    }
    @Override
    public void onFailure(Exception e) {
      context.runOnContext(v -> handler.handle(Future.failedFuture(e)));
    }
  });
}

代码示例来源:origin: org.apache.gobblin/gobblin-elasticsearch

@Override
public Future<WriteResponse> write(final Batch<Object> batch, @Nullable WriteCallback callback) {
 Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback);
 try {
  client.bulkAsync(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener());
  return preparedBatch.getSecond().getFuture();
 }
 catch (Exception e) {
  throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e);
 }
}

代码示例来源:origin: cognitree/flume-elasticsearch-sink

private BulkProcessor build(final RestHighLevelClient client) {
  logger.trace("Bulk processor name: [{}]  bulkActions: [{}], bulkSize: [{}], flush interval time: [{}]," +
          " concurrent Request: [{}], backoffPolicyTimeInterval: [{}], backoffPolicyRetries: [{}] ",
      new Object[]{bulkProcessorName, bulkActions, bulkSize, flushIntervalTime,
          concurrentRequest, backoffPolicyTimeInterval, backoffPolicyRetries});
  BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
      (request, bulkListener) -> client
          .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
  return BulkProcessor.builder(bulkConsumer, getListener())
      .setBulkActions(bulkActions)
      .setBulkSize(bulkSize)
      .setFlushInterval(flushIntervalTime)
      .setConcurrentRequests(concurrentRequest)
      .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
          Util.getTimeValue(backoffPolicyTimeInterval,
              DEFAULT_ES_BACKOFF_POLICY_START_DELAY),
          backoffPolicyRetries))
      .build();
}

代码示例来源:origin: fr.pilato.elasticsearch.crawler/fscrawler-elasticsearch-client-v5

(request, bulkListener) -> client.bulkAsync(request, bulkListener);

相关文章