org.elasticsearch.threadpool.ThreadPool类的使用及代码示例

x33g5p2x  于2022-01-30 转载在 其他  
字(11.7k)|赞(0)|评价(0)|浏览(161)

本文整理了Java中org.elasticsearch.threadpool.ThreadPool类的一些代码示例,展示了ThreadPool类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ThreadPool类的具体详情如下:
包路径:org.elasticsearch.threadpool.ThreadPool
类名称:ThreadPool

ThreadPool介绍

暂无

代码示例

代码示例来源:origin: floragunncom/search-guard

  1. String injectedUserString = threadPool.getThreadContext().getTransient(ConfigConstants.SG_INJECTED_USER);
  2. if (log.isDebugEnabled()) {
  3. log.debug("Injected user string: {}", injectedUserString);
  4. log.error("User string malformed, could not extract parts. User string was '{}.' User injection failed.", injectedUserString);
  5. return false;
  6. InetAddress iAdress = InetAddress.getByName(ipAndPort[0]);
  7. int port = Integer.parseInt(ipAndPort[1]);
  8. threadPool.getThreadContext().putTransient(ConfigConstants.SG_REMOTE_ADDRESS, new TransportAddress(iAdress, port));
  9. } catch (UnknownHostException | NumberFormatException e) {
  10. log.error("Cannot parse remote IP or port: {}, user injection failed.", parts[2], e);
  11. threadPool.getThreadContext().putTransient(ConfigConstants.SG_REMOTE_ADDRESS, xffResolver.resolve(request));
  12. threadPool.getThreadContext().putTransient(ConfigConstants.SG_USER, user);
  13. auditLog.logSucceededLogin(parts[0], true, null, request);
  14. if (log.isTraceEnabled()) {

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. /**
  2. * The executor service for this transport service.
  3. *
  4. * @return the executor service
  5. */
  6. private ExecutorService getExecutorService() {
  7. return threadPool.generic();
  8. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. @Override
  2. public void handleResponse(PingResponse response) {
  3. if (!running()) {
  4. return;
  5. }
  6. retryCount = 0;
  7. threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this);
  8. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. /**
  2. * Returns <code>true</code> if the given service was terminated successfully. If the termination timed out,
  3. * the service is <code>null</code> this method will return <code>false</code>.
  4. */
  5. public static boolean terminate(ExecutorService service, long timeout, TimeUnit timeUnit) {
  6. if (service != null) {
  7. service.shutdown();
  8. if (awaitTermination(service, timeout, timeUnit)) return true;
  9. service.shutdownNow();
  10. return awaitTermination(service, timeout, timeUnit);
  11. }
  12. return false;
  13. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. private void notifyPingReceived(final PingRequest pingRequest) {
  2. threadPool.generic().execute(new Runnable() {
  3. @Override
  4. public void run() {
  5. for (Listener listener : listeners) {
  6. listener.onPingReceived(pingRequest);
  7. }
  8. }
  9. });
  10. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. @Override
  2. public void onMaster() {
  3. this.isMaster = true;
  4. if (logger.isTraceEnabled()) {
  5. logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
  6. }
  7. try {
  8. // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
  9. threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
  10. if (clusterService.state().getNodes().getDataNodes().size() > 1) {
  11. // Submit an info update job to be run immediately
  12. threadPool.executor(executorName()).execute(() -> maybeRefresh());
  13. }
  14. } catch (EsRejectedExecutionException ex) {
  15. if (logger.isDebugEnabled()) {
  16. logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
  17. }
  18. }
  19. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
  2. threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() {
  3. @Override
  4. public void onFailure(Exception e) {
  5. listener.onFailure(e);
  6. }
  7. @Override
  8. protected void doRun() throws Exception {
  9. listener.onResponse(shardOperation(request, shardId));
  10. }
  11. });
  12. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. @Override
  2. public void run() {
  3. if (logger.isTraceEnabled()) {
  4. logger.trace("Submitting new rescheduling cluster info update job");
  5. }
  6. try {
  7. threadPool.executor(executorName()).execute(() -> {
  8. try {
  9. maybeRefresh();
  10. } finally { //schedule again after we refreshed
  11. if (isMaster) {
  12. if (logger.isTraceEnabled()) {
  13. logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
  14. }
  15. try {
  16. threadPool.schedule(updateFrequency, executorName(), this);
  17. } catch (EsRejectedExecutionException ex) {
  18. logger.debug("Reschedule cluster info service was rejected", ex);
  19. }
  20. }
  21. }
  22. });
  23. } catch (EsRejectedExecutionException ex) {
  24. if (logger.isDebugEnabled()) {
  25. logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex);
  26. }
  27. }
  28. }
  29. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. final ClusterState previousState = event.previousState();
  2. final ClusterState state = event.state();
  3. final String localNodeId = state.nodes().getLocalNodeId();
  4. assert localNodeId != null;
  5. if (logger.isDebugEnabled()) {
  6. logger.debug("[{}] cleaning index, no longer part of the metadata", index);
  7. indexSettings = indexService.getIndexSettings();
  8. indicesService.removeIndex(index, DELETED, "index no longer part of the metadata");
  9. } else if (previousState.metaData().hasIndex(index.getName())) {
  10. final IndexMetaData metaData = previousState.metaData().index(index);
  11. indexSettings = new IndexSettings(metaData, settings);
  12. indicesService.deleteUnassignedIndex("deleted index was not assigned to local node", metaData, state);
  13. threadPool.generic().execute(new AbstractRunnable() {
  14. @Override
  15. public void onFailure(Exception e) {

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. @Override
  2. public void clusterChanged(ClusterChangedEvent event) {
  3. ClusterState state = event.state();
  4. if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
  5. ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();
  6. if (state.nodes().isLocalNodeElectedMaster() == false) {
  7. return;
  8. if (changes.isPresent()) {
  9. if (upgradesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size() + 1)) {
  10. logger.info("Starting template upgrade to version {}, {} templates will be updated and {} will be removed",
  11. Version.CURRENT,
  12. changes.get().v1().size(),
  13. changes.get().v2().size());
  14. final ThreadContext threadContext = threadPool.getThreadContext();
  15. try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
  16. threadContext.markAsSystemContext();
  17. threadPool.generic().execute(() -> upgradeTemplates(changes.get().v1(), changes.get().v2()));

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
  2. if (notifiedMasterFailure.compareAndSet(false, true)) {
  3. try {
  4. threadPool.generic().execute(() -> {
  5. for (Listener listener : listeners) {
  6. listener.onMasterFailure(masterNode, cause, reason);
  7. }
  8. });
  9. } catch (EsRejectedExecutionException e) {
  10. logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
  11. }
  12. stop("master failure, " + reason);
  13. }
  14. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. private void handleException(final TransportResponseHandler handler, Throwable error) {
  2. if (!(error instanceof RemoteTransportException)) {
  3. error = new RemoteTransportException(error.getMessage(), error);
  4. }
  5. final RemoteTransportException rtx = (RemoteTransportException) error;
  6. threadPool.executor(handler.executor()).execute(() -> {
  7. try {
  8. handler.handleException(rtx);
  9. } catch (Exception e) {
  10. logger.error(() -> new ParameterizedMessage("failed to handle exception response [{}]", handler), e);
  11. }
  12. });
  13. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
  2. if (isThrottling.getAndSet(false)) {
  3. logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}",
  4. numMergesInFlight, maxNumMerges);
  5. deactivateThrottling();
  6. System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) {
  7. engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
  8. @Override
  9. public void onFailure(Exception e) {

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
  2. try {
  3. threadPool.generic().execute(new Runnable() {
  4. @Override
  5. public void run() {
  6. for (Listener listener : listeners) {
  7. listener.onNodeFailure(node, reason);
  8. }
  9. }
  10. });
  11. } catch (EsRejectedExecutionException ex) {
  12. logger.trace(() -> new ParameterizedMessage(
  13. "[node ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", node, reason), ex);
  14. }
  15. }

代码示例来源:origin: org.elasticsearch/elasticsearch

  1. final List<TransportAddress> seedAddresses = new ArrayList<>();
  2. seedAddresses.addAll(hostsProvider.buildDynamicHosts(createHostsResolver()));
  3. final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
  4. for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
  5. seedAddresses.add(masterNode.value.getAddress());
  6. ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
  7. final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
  8. nodes.getLocalNode(), connectionProfile);
  9. activePingingRounds.put(pingingRound.id(), pingingRound);
  10. final AbstractRunnable pingSender = new AbstractRunnable() {
  11. threadPool.generic().execute(pingSender);
  12. threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
  13. threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
  14. threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() {
  15. @Override
  16. protected void doRun() throws Exception {

代码示例来源:origin: dadoonet/fscrawler

  1. logger.info("Elasticsearch Client for version {}.x connected to a node running version {}", compatibleVersion(), getVersion());
  2. } catch (ElasticsearchStatusException e) {
  3. logger.debug("got an error while trying to connect to elasticsearch cluster");
  4. throw new IOException(e);
  5. } catch (Exception e) {
  6. logger.warn("failed to create elasticsearch client, disabling crawler...");
  7. throw e;
  8. (request, bulkListener) -> client.bulkAsync(request, bulkListener);
  9. threadPool = new ThreadPool(Settings.builder().put("node.name", "fscrawler-client").build());
  10. bulkProcessor = new BulkProcessor.Builder(bulkConsumer, new DebugListener(logger), threadPool)
  11. .setBulkActions(settings.getElasticsearch().getBulkSize())
  12. .setFlushInterval(TimeValue.timeValueMillis(settings.getElasticsearch().getFlushInterval().millis()))
  13. .setBulkSize(new ByteSizeValue(settings.getElasticsearch().getByteSize().getBytes()))
  14. .build();

代码示例来源:origin: floragunncom/search-guard

  1. public User authenticate(final TransportRequest request, final String sslPrincipal, final Task task, final String action) {
  2. if(log.isDebugEnabled() && request.remoteAddress() != null) {
  3. log.debug("Transport authentication request from {}", request.remoteAddress());
  4. log.error("Not yet initialized (you may need to run sgadmin)");
  5. return null;
  6. final String authorizationHeader = threadPool.getThreadContext().getHeader("Authorization");

代码示例来源:origin: apache/servicemix-bundles

  1. static final class Fields {
  2. static final String TYPE = "type";
  3. static final String MIN = "min";
  4. static final String MAX = "max";
  5. static final String KEEP_ALIVE = "keep_alive";
  6. static final String QUEUE_SIZE = "queue_size";
  7. }
  8. }

代码示例来源:origin: floragunncom/search-guard

  1. LOGGER.info("Check if "+searchguardIndex+" index exists ...");
  2. .masterNodeTimeout(TimeValue.timeValueMinutes(1));
  3. final ThreadContext threadContext = threadPool.getThreadContext();
  4. try(StoredContext ctx = threadContext.stashContext()) {
  5. threadContext.putHeader(ConfigConstants.SG_CONF_REQUEST_HEADER, "true");
  6. LOGGER.error("Failure while executing IndicesExistsRequest {}",e2, e2);
  7. bgThread.start();

代码示例来源:origin: dadoonet/fscrawler

  1. @Override
  2. public void close() throws IOException {
  3. logger.debug("Closing Elasticsearch client manager");
  4. if (bulkProcessor != null) {
  5. try {
  6. bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
  7. } catch (InterruptedException e) {
  8. logger.warn("Did not succeed in closing the bulk processor for documents", e);
  9. throw new IOException(e);
  10. }
  11. }
  12. if (threadPool != null) {
  13. threadPool.shutdownNow();
  14. }
  15. if (lowLevelClient != null) {
  16. lowLevelClient.close();
  17. }
  18. }

相关文章