org.elasticsearch.threadpool.ThreadPool类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(148)

本文整理了Java中org.elasticsearch.threadpool.ThreadPool类的一些代码示例,展示了ThreadPool类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ThreadPool类的具体详情如下:
包路径:org.elasticsearch.threadpool.ThreadPool
类名称:ThreadPool

ThreadPool介绍

暂无

代码示例

代码示例来源:origin: floragunncom/search-guard

String injectedUserString = threadPool.getThreadContext().getTransient(ConfigConstants.SG_INJECTED_USER);
if (log.isDebugEnabled()) {
  log.debug("Injected user string: {}", injectedUserString);
  log.error("User string malformed, could not extract parts. User string was '{}.' User injection failed.", injectedUserString);
  return false;
      InetAddress iAdress = InetAddress.getByName(ipAndPort[0]);
      int port = Integer.parseInt(ipAndPort[1]);
      threadPool.getThreadContext().putTransient(ConfigConstants.SG_REMOTE_ADDRESS, new TransportAddress(iAdress, port));
    } catch (UnknownHostException | NumberFormatException e) {
      log.error("Cannot parse remote IP or port: {}, user injection failed.", parts[2], e);
  threadPool.getThreadContext().putTransient(ConfigConstants.SG_REMOTE_ADDRESS, xffResolver.resolve(request));
threadPool.getThreadContext().putTransient(ConfigConstants.SG_USER, user);
auditLog.logSucceededLogin(parts[0], true, null, request);
if (log.isTraceEnabled()) {

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * The executor service for this transport service.
 *
 * @return the executor service
 */
private ExecutorService getExecutorService() {
  return threadPool.generic();
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
public void handleResponse(PingResponse response) {
  if (!running()) {
    return;
  }
  retryCount = 0;
  threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this);
}

代码示例来源:origin: org.elasticsearch/elasticsearch

/**
 * Returns <code>true</code> if the given service was terminated successfully. If the termination timed out,
 * the service is <code>null</code> this method will return <code>false</code>.
 */
public static boolean terminate(ExecutorService service, long timeout, TimeUnit timeUnit) {
  if (service != null) {
    service.shutdown();
    if (awaitTermination(service, timeout, timeUnit)) return true;
    service.shutdownNow();
    return awaitTermination(service, timeout, timeUnit);
  }
  return false;
}

代码示例来源:origin: org.elasticsearch/elasticsearch

private void notifyPingReceived(final PingRequest pingRequest) {
  threadPool.generic().execute(new Runnable() {
    @Override
    public void run() {
      for (Listener listener : listeners) {
        listener.onPingReceived(pingRequest);
      }
    }
  });
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
public void onMaster() {
  this.isMaster = true;
  if (logger.isTraceEnabled()) {
    logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
  }
  try {
    // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
    threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
    if (clusterService.state().getNodes().getDataNodes().size() > 1) {
      // Submit an info update job to be run immediately
      threadPool.executor(executorName()).execute(() -> maybeRefresh());
    }
  } catch (EsRejectedExecutionException ex) {
    if (logger.isDebugEnabled()) {
      logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
    }
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
  threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
    @Override
    public void onFailure(Exception e) {
      listener.onFailure(e);
    }
    @Override
    protected void doRun() throws Exception {
      listener.onResponse(shardOperation(request, shardId));
    }
  });
}

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
  public void run() {
    if (logger.isTraceEnabled()) {
      logger.trace("Submitting new rescheduling cluster info update job");
    }
    try {
      threadPool.executor(executorName()).execute(() -> {
        try {
          maybeRefresh();
        } finally { //schedule again after we refreshed
          if (isMaster) {
            if (logger.isTraceEnabled()) {
              logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
            }
            try {
              threadPool.schedule(updateFrequency, executorName(), this);
            } catch (EsRejectedExecutionException ex) {
              logger.debug("Reschedule cluster info service was rejected", ex);
            }
          }
        }
      });
    } catch (EsRejectedExecutionException ex) {
      if (logger.isDebugEnabled()) {
        logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex);
      }
    }
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

final ClusterState previousState = event.previousState();
final ClusterState state = event.state();
final String localNodeId = state.nodes().getLocalNodeId();
assert localNodeId != null;
  if (logger.isDebugEnabled()) {
    logger.debug("[{}] cleaning index, no longer part of the metadata", index);
    indexSettings = indexService.getIndexSettings();
    indicesService.removeIndex(index, DELETED, "index no longer part of the metadata");
  } else if (previousState.metaData().hasIndex(index.getName())) {
    final IndexMetaData metaData = previousState.metaData().index(index);
    indexSettings = new IndexSettings(metaData, settings);
    indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metaData, state);
    threadPool.generic().execute(new AbstractRunnable() {
      @Override
      public void onFailure(Exception e) {

代码示例来源:origin: org.elasticsearch/elasticsearch

@Override
public void clusterChanged(ClusterChangedEvent event) {
  ClusterState state = event.state();
  if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
  ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();
  if (state.nodes().isLocalNodeElectedMaster() == false) {
    return;
  if (changes.isPresent()) {
    if (upgradesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size() + 1)) {
      logger.info("Starting template upgrade to version {}, {} templates will be updated and {} will be removed",
        Version.CURRENT,
        changes.get().v1().size(),
        changes.get().v2().size());
      final ThreadContext threadContext = threadPool.getThreadContext();
      try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
        threadContext.markAsSystemContext();
        threadPool.generic().execute(() -> upgradeTemplates(changes.get().v1(), changes.get().v2()));

代码示例来源:origin: org.elasticsearch/elasticsearch

private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
  if (notifiedMasterFailure.compareAndSet(false, true)) {
    try {
      threadPool.generic().execute(() -> {
        for (Listener listener : listeners) {
          listener.onMasterFailure(masterNode, cause, reason);
        }
      });
    } catch (EsRejectedExecutionException e) {
      logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
    }
    stop("master failure, " + reason);
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

private void handleException(final TransportResponseHandler handler, Throwable error) {
  if (!(error instanceof RemoteTransportException)) {
    error = new RemoteTransportException(error.getMessage(), error);
  }
  final RemoteTransportException rtx = (RemoteTransportException) error;
  threadPool.executor(handler.executor()).execute(() -> {
    try {
      handler.handleException(rtx);
    } catch (Exception e) {
      logger.error(() -> new ParameterizedMessage("failed to handle exception response [{}]", handler), e);
    }
  });
}

代码示例来源:origin: org.elasticsearch/elasticsearch

if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
  if (isThrottling.getAndSet(false)) {
    logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}",
      numMergesInFlight, maxNumMerges);
    deactivateThrottling();
    System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
  engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
    @Override
    public void onFailure(Exception e) {

代码示例来源:origin: org.elasticsearch/elasticsearch

private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
  try {
    threadPool.generic().execute(new Runnable() {
      @Override
      public void run() {
        for (Listener listener : listeners) {
          listener.onNodeFailure(node, reason);
        }
      }
    });
  } catch (EsRejectedExecutionException ex) {
    logger.trace(() -> new ParameterizedMessage(
        "[node  ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", node, reason), ex);
  }
}

代码示例来源:origin: org.elasticsearch/elasticsearch

final List<TransportAddress> seedAddresses = new ArrayList<>();
seedAddresses.addAll(hostsProvider.buildDynamicHosts(createHostsResolver()));
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
  seedAddresses.add(masterNode.value.getAddress());
  ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
  nodes.getLocalNode(), connectionProfile);
activePingingRounds.put(pingingRound.id(), pingingRound);
final AbstractRunnable pingSender = new AbstractRunnable() {
threadPool.generic().execute(pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() {
  @Override
  protected void doRun() throws Exception {

代码示例来源:origin: dadoonet/fscrawler

logger.info("Elasticsearch Client for version {}.x connected to a node running version {}", compatibleVersion(), getVersion());
} catch (ElasticsearchStatusException e) {
  logger.debug("got an error while trying to connect to elasticsearch cluster");
  throw new IOException(e);
} catch (Exception e) {
  logger.warn("failed to create elasticsearch client, disabling crawler...");
  throw e;
    (request, bulkListener) -> client.bulkAsync(request, bulkListener);
threadPool = new ThreadPool(Settings.builder().put("node.name", "fscrawler-client").build());
bulkProcessor = new BulkProcessor.Builder(bulkConsumer, new DebugListener(logger), threadPool)
    .setBulkActions(settings.getElasticsearch().getBulkSize())
    .setFlushInterval(TimeValue.timeValueMillis(settings.getElasticsearch().getFlushInterval().millis()))
    .setBulkSize(new ByteSizeValue(settings.getElasticsearch().getByteSize().getBytes()))
    .build();

代码示例来源:origin: floragunncom/search-guard

public User authenticate(final TransportRequest request, final String sslPrincipal, final Task task, final String action) {
  if(log.isDebugEnabled() && request.remoteAddress() != null) {
    log.debug("Transport authentication request from {}", request.remoteAddress());
    log.error("Not yet initialized (you may need to run sgadmin)");
    return null;
  final String authorizationHeader = threadPool.getThreadContext().getHeader("Authorization");

代码示例来源:origin: apache/servicemix-bundles

static final class Fields {
    static final String TYPE = "type";
    static final String MIN = "min";
    static final String MAX = "max";
    static final String KEEP_ALIVE = "keep_alive";
    static final String QUEUE_SIZE = "queue_size";
  }
}

代码示例来源:origin: floragunncom/search-guard

LOGGER.info("Check if "+searchguardIndex+" index exists ...");
  .masterNodeTimeout(TimeValue.timeValueMinutes(1));
  final ThreadContext threadContext = threadPool.getThreadContext();
  try(StoredContext ctx = threadContext.stashContext()) {
    threadContext.putHeader(ConfigConstants.SG_CONF_REQUEST_HEADER, "true");
  LOGGER.error("Failure while executing IndicesExistsRequest {}",e2, e2);
  bgThread.start();

代码示例来源:origin: dadoonet/fscrawler

@Override
public void close() throws IOException {
  logger.debug("Closing Elasticsearch client manager");
  if (bulkProcessor != null) {
    try {
      bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      logger.warn("Did not succeed in closing the bulk processor for documents", e);
      throw new IOException(e);
    }
  }
  if (threadPool != null) {
    threadPool.shutdownNow();
  }
  if (lowLevelClient != null) {
    lowLevelClient.close();
  }
}

相关文章