本文整理了Java中org.elasticsearch.threadpool.ThreadPool.generic()
方法的一些代码示例,展示了ThreadPool.generic()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ThreadPool.generic()
方法的具体详情如下:
包路径:org.elasticsearch.threadpool.ThreadPool
类名称:ThreadPool
方法名:generic
[英]Get the generic ExecutorService. This executor service Executor#execute(Runnable) method will run the Runnable it is given in the ThreadContext of the thread that queues it.
Warning: this ExecutorService will not throw RejectedExecutionExceptionif you submit a task while it shutdown. It will instead silently queue it and not run it.
[中]获取通用执行器服务。这个executor service executor#execute(Runnable)方法将运行Runnable,它是在队列线程的线程上下文中给出的。
警告:如果在任务关闭时提交任务,此ExecutorService将不会抛出RejectedExecutionException。相反,它将静默地对其排队,而不是运行它。
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* The executor service for this transport service.
*
* @return the executor service
*/
private ExecutorService getExecutorService() {
return threadPool.generic();
}
代码示例来源:origin: org.elasticsearch/elasticsearch
private void notifyPingReceived(final PingRequest pingRequest) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {
listener.onPingReceived(pingRequest);
}
}
});
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void onNodeDisconnected(DiscoveryNode node) {
AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("failed to handle transport disconnect for node: {}", node);
}
@Override
protected void doRun() {
handleTransportDisconnect(node);
}
};
threadPool.generic().execute(runnable);
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
public void accept(final IndexShard.ShardFailure shardFailure) {
final ShardRouting shardRouting = shardFailure.routing;
threadPool.generic().execute(() -> {
synchronized (IndicesClusterStateService.this) {
failAndRemoveShard(shardRouting, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause,
clusterService.state());
}
});
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
try {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
for (Listener listener : listeners) {
listener.onNodeFailure(node, reason);
}
}
});
} catch (EsRejectedExecutionException ex) {
logger.trace(() -> new ParameterizedMessage(
"[node ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", node, reason), ex);
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
protected void handleMergeException(final Directory dir, final Throwable exc) {
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("merge failure action rejected", e);
}
@Override
protected void doRun() throws Exception {
/*
* We do this on another thread rather than the merge thread that we are initially called on so that we have complete
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc, dir));
}
});
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
@Override
protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
threadPool.generic().execute(
() -> tasks.forEach(
task -> ((UpdateTask) task).listener.onFailure(task.source,
new ProcessClusterEventTimeoutException(timeout, task.source))));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
try {
threadPool.generic().execute(() -> {
for (Listener listener : listeners) {
listener.onMasterFailure(masterNode, cause, reason);
}
});
} catch (EsRejectedExecutionException e) {
logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
}
stop("master failure, " + reason);
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
private void handleResponse(final int responseSlot, final MultiSearchResponse.Item item) {
responses.set(responseSlot, item);
if (responseCounter.decrementAndGet() == 0) {
assert requests.isEmpty();
finish();
} else {
if (thread == Thread.currentThread()) {
// we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread
threadPool.generic().execute(() -> executeSearch(requests, responses, responseCounter, listener));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
executeSearch(requests, responses, responseCounter, listener);
}
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/** starts a new joining thread if there is no currently active one and join thread controlling is started */
public void startNewThreadIfNotRunning() {
assert Thread.holdsLock(stateMutex);
if (joinThreadActive()) {
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
Thread currentThread = Thread.currentThread();
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
while (running.get() && joinThreadActive(currentThread)) {
try {
innerJoinCluster();
return;
} catch (Exception e) {
logger.error("unexpected error while joining cluster, trying again", e);
// Because we catch any exception here, we want to know in
// tests if an uncaught exception got to this point and the test infra uncaught exception
// leak detection can catch this. In practise no uncaught exception should leak
assert ExceptionsHelper.reThrowIfNotNull(e);
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
}
代码示例来源:origin: org.elasticsearch/elasticsearch
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
代码示例来源:origin: org.elasticsearch/elasticsearch
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
代码示例来源:origin: org.elasticsearch/elasticsearch
private void performStateRecovery(boolean enforceRecoverAfterTime, String reason) {
final Gateway.GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener();
if (enforceRecoverAfterTime && recoverAfterTime != null) {
if (scheduledRecovery.compareAndSet(false, true)) {
logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason);
threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, () -> {
if (recovered.compareAndSet(false, true)) {
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
gateway.performStateRecovery(recoveryListener);
}
});
}
} else {
if (recovered.compareAndSet(false, true)) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("Recovery failed", e);
// we reset `recovered` in the listener don't reset it here otherwise there might be a race
// that resets it to false while a new recover is already running?
recoveryListener.onFailure("state recovery failed: " + e.getMessage());
}
@Override
protected void doRun() throws Exception {
gateway.performStateRecovery(recoveryListener);
}
});
}
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
final CountDownLatch latch = new CountDownLatch(1);
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
代码示例来源:origin: org.elasticsearch/elasticsearch
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
threadPool.generic().execute(() -> upgradeTemplates(changes.get().v1(), changes.get().v2()));
代码示例来源:origin: org.elasticsearch/elasticsearch
threadPool.generic().execute(pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
代码示例来源:origin: org.elasticsearch/elasticsearch
case EXISTING_STORE:
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
threadPool.generic().execute(() -> {
try {
final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository());
assert requiredShards.isEmpty() == false;
markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
代码示例来源:origin: org.elasticsearch/elasticsearch
private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
final Function<ClusterState, ClusterState> executor,
final ClusterApplyListener listener) {
if (!lifecycle.started()) {
return;
}
try {
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
if (config.timeout() != null) {
threadPoolExecutor.execute(updateTask, config.timeout(),
() -> threadPool.generic().execute(
() -> listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source))));
} else {
threadPoolExecutor.execute(updateTask);
}
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
/**
* Executed on the node that should be running the task to find and return the running task. Falls back to
* {@link #getFinishedTaskFromIndex(Task, GetTaskRequest, ActionListener)} if the task isn't still running.
*/
void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
Task runningTask = taskManager.getTask(request.getTaskId().getId());
if (runningTask == null) {
// Task isn't running, go look in the task index
getFinishedTaskFromIndex(thisTask, request, listener);
} else {
if (request.getWaitForCompletion()) {
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
listener.onResponse(new GetTaskResponse(new TaskResult(false, info)));
}
}
}
代码示例来源:origin: org.elasticsearch/elasticsearch
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
内容来源于网络,如有侵权,请联系作者删除!