Spring Boot 当我发现某些内容时如何停止ExecutorService

kulphzqa  于 2023-11-17  发布在  Spring
关注(0)|答案(1)|浏览(181)

我正在使用Java 21和Sping Boot 3.2.0(SNAPSHOT)。我有更多的类,但我只显示这种情况下的基本类。我试图使用ExecutorService在多线程环境中检查一些记录,并根据检查结果继续或停止ExecutorService。
SecurityCheckc类:

  1. private ExecutorService pool;
  2. private final SendToDatabse sendToDatabse;
  3. private final UserCheck userCheck;
  4. private final SynchronisedCheckedMap synchronisedCheckedMap;
  5. public SecurityCheck(SendToDatabse sendToDatabse,
  6. UserCheck userCheck,
  7. SynchronisedCheckedMap synchronisedCheckedMap) {
  8. this.sendToDatabse = sendToDatabse;
  9. this.userCheck = userCheck;
  10. this.synchronisedCheckedMap = synchronisedCheckedMap;
  11. }
  12. public void securityCheck(List<String> list) throws ExecutionException, InterruptedException, TimeoutException {
  13. List<Future<?>> futures = new ArrayList<>();
  14. this.pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  15. try {
  16. list.forEach(line -> futures.add(pool.submit(
  17. new Security(line,
  18. this.userCheck,
  19. this.synchronisedCheckedMap
  20. ))));
  21. for (Future<?> future : futures) {
  22. future.get(Long.MAX_VALUE, TimeUnit.SECONDS);
  23. }
  24. }catch (Exception e){
  25. throw new RuntimeException(e);
  26. }
  27. finally {
  28. print();
  29. pool.shutdown();
  30. while (!pool.awaitTermination(24L,TimeUnit.SECONDS)){
  31. System.out.println("Not yet. Still in waiting for termination");
  32. }
  33. // this.sendToDatabse.sendToDatabse(synchronisedCheckedMap.synchronisedCheckedMap);
  34. }
  35. }
  36. public void stopPool(Short code, Customer customer){
  37. System.out.println("oprire");
  38. Map<Short, List<Customer>> map = new HashMap<>();
  39. map.put(code, (List<Customer>) customer);
  40. map.forEach((k,v)->{
  41. System.out.println(k + "\t" + v);
  42. });
  43. this.pool.shutdownNow();
  44. // this.sendToDatabse.sendToDatabse(map);
  45. Thread.currentThread().interrupt();
  46. }

字符串
安全类:

  1. public class Security implements SQLInjections, Runnable{
  2. private final String line;
  3. private final UserCheck userCheck;
  4. private final SynchronisedCheckedMap synchronisedCheckedMap;
  5. public Security(String line,
  6. UserCheck userCheck,
  7. SynchronisedCheckedMap synchronisedCheckedMap){
  8. this.line = line;
  9. this.userCheck = userCheck;
  10. this.synchronisedCheckedMap = synchronisedCheckedMap;
  11. }
  12. @Override
  13. public void run() {
  14. try {
  15. if (Thread.interrupted()) {
  16. return;
  17. }
  18. JsonToLine jsonToLine = new JsonToLine();
  19. String newLine = jsonToLine.jsonToLine(line);
  20. StringToValues stringToValues = new StringToValues();
  21. List<String> values = stringToValues.stringToValues(newLine);
  22. // values.forEach(string -> {System.out.println("line: " + string);});
  23. Short validID = validID(values.get(0));
  24. Short validUser = null;
  25. Short validPassword = null;
  26. Short validMessage = null;
  27. List<String> message = new ArrayList<>();
  28. Customer customer;
  29. switch (validID) {
  30. case 200:
  31. // values.forEach(string -> {System.out.println("code" + "\t" + "line: " + string);});
  32. validUser = validUser(values.get(1));
  33. validPassword = validPassword(values.get(2));
  34. message = Arrays.asList(values.get(3).split(";"));
  35. validMessage = validMesage(message);
  36. customer =
  37. new Customer(Long.parseLong(values.get(0)),
  38. values.get(1),
  39. values.get(2),
  40. message);
  41. if(validUser==200 &&
  42. validMessage==200 &&
  43. validPassword==200){
  44. this.synchronisedCheckedMap.synchronisedCheckedMap((short) 200, customer);
  45. }else{
  46. this.synchronisedCheckedMap.synchronisedCheckedMap(
  47. getHighestCode(List.of(validUser,validID,validMessage,validPassword)), customer);
  48. }
  49. break;
  50. case 600:
  51. validUser = validUser(values.get(0));
  52. validPassword = validPassword(values.get(1));
  53. message = Arrays.asList(values.get(3).split(";"));
  54. validMessage = validMesage(message);
  55. customer =
  56. new Customer(null,
  57. values.get(1),
  58. values.get(2),
  59. message);
  60. this.synchronisedCheckedMap.synchronisedCheckedMap((short) 600, customer);
  61. break;
  62. }
  63. }catch (Exception e){
  64. Thread.currentThread().interrupt(); // Preserve interruption status
  65. return;
  66. }
  67. }
  68. }


我还有SyncroniseMap

  1. @Component
  2. public class SynchronisedCheckedMap {
  3. public Map<Short, List<Customer>> synchronisedCheckedMap = new HashMap<>();
  4. private SecurityCheck securityCheck;
  5. public SynchronisedCheckedMap(SecurityCheck securityCheck) {
  6. this.securityCheck = securityCheck;
  7. }
  8. public SynchronisedCheckedMap() {}
  9. protected synchronized void synchronisedCheckedMap(Short code,
  10. Customer customer){
  11. if(code>=800){
  12. this.securityCheck.stopPool(code,customer);
  13. }
  14. if(this.synchronisedCheckedMap.containsKey(code)){
  15. this.synchronisedCheckedMap.get(code).add(customer);
  16. }else {
  17. List<Customer> list = new ArrayList<>();
  18. list.add(customer);
  19. synchronisedCheckedMap.put(code,list);
  20. }
  21. }
  22. public void print(){
  23. this.synchronisedCheckedMap.forEach((k,v)->{
  24. System.out.println(k + "\t" + v);
  25. });
  26. }
  27. }


我把这个类中的依赖注入搞砸了,因为它根本不工作。线程仍然在运行。但是我试图在“stopPool”方法上放置调试器,并且没有停止。
我知道我将不得不在这些方面做更多的工作,但我想从SecurityCheck类中停止ExecutorService。当它是一个发送到“this. synchronisedBikedMap. synchronisedBikedMap”的特定代码时,我试图调用“stopPool”方法。
我已经尝试过这样做,但我需要帮助才能使它工作,因为线程继续下去。
在这种情况下,我应该如何正确地停止ExecutorService?

cmssoen2

cmssoen21#

私有ExecutorService池;
关于这个名字,不要把你的executor服务看作是一个线程池。
一个executor service * 可以 * 由一个线程池支持。或者它可以由单个线程支持。或者它可以没有线程支持,可以在当前线程上执行它继承的execute方法。在Java 21+中,执行器服务可以为每个提交的任务创建一个新的virtual thread,没有池。
因此,可以将执行器服务看作是代表您以某种方式执行代码的服务。
我有更多的类,但我只显示了这种情况下的基本类。
你展示了太多的代码,没有缩小到你的特定问题的焦点。所以我可以给予一些提示和建议来指导你。但是如果你想要一个直接的答案,你需要提供一个直接的问题。
我正试图在多线程环境中检查一些记录
当你有一堆任务要执行时,你可以通过invokeAll将它们一次性提交给你的执行器服务。这只是一个方便,与一次提交一个任务的效果相同。
看起来你只是有一个单独处理的文本行列表。你似乎试图通过停止executor服务来停止任务,这可能很笨拙。线程不能直接停止。在Java中,线程是通过设计你的代码来测试中断状态或其他条件来合作停止的。Search了解更多信息。
让我们定义一个简单的示例任务类。
如果你想从每个任务返回一个结果,使用Callable而不是Runnable。在这种情况下,我们返回一个Boolean对象来指示我们的行是否成功处理。

  1. package work.basil.example;
  2. import java.time.Instant;
  3. import java.util.concurrent.Callable;
  4. public class LineProcessor implements Callable < Boolean > {
  5. private final String line;
  6. public LineProcessor ( final String line ) {
  7. this.line = line;
  8. }
  9. @Override
  10. public Boolean call ( ) {
  11. System.out.println ( "In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " processing line: " + this.line );
  12. Boolean success = Boolean.TRUE; // If we successfully processed this line.
  13. return success;
  14. }
  15. }

字符串
ExecutorService现在是AutoCloseable。因此,您可以使用try-with-resources语法在执行程序服务完成一批任务时自动关闭它。
在这里,我们模拟获取一个行列表。我们将每个行发送到LineProcessor任务Callable类的构造函数。我们将这些构造的任务对象传递给executor服务。我们得到一个Future对象的列表。通过使用try-with-resources语法,我们的代码阻塞,直到executor服务执行完所有任务或在一天后过期。

  1. List < String > lines = List.of ( "x" , "y" , "z" );
  2. List < LineProcessor > lineProcessors = lines.stream ( ).map ( LineProcessor :: new ).toList ( );
  3. List < Future < Boolean > > futures = null;
  4. try (
  5. ExecutorService executorService = Executors.newFixedThreadPool ( Runtime.getRuntime ( ).availableProcessors ( ) ) ;
  6. ) {
  7. futures = executorService.invokeAll ( lineProcessors );
  8. } catch ( InterruptedException e ) {
  9. throw new RuntimeException ( e );
  10. }
  11. // At this point, the executor service has closed after all its tasks are done or canceled.


如果我们想检查结果,我们可以添加更多的代码来处理Future对象。

  1. for ( Future < Boolean > future : futures ) {
  2. if ( future.isDone ( ) ) {
  3. System.out.println ( "done" );
  4. } else if ( future.isCancelled ( ) ) {
  5. System.out.println ( "cancelled" );
  6. }
  7. System.out.println ( "Call future.get here to get result" );
  8. }
  9. System.out.println ( "INFO - Demo done. " + Instant.now ( ) );


运行时:

  1. In thread ID: 22 at 2023-11-01T05:56:56.499449Z processing line: y
  2. In thread ID: 23 at 2023-11-01T05:56:56.499442Z processing line: z
  3. In thread ID: 21 at 2023-11-01T05:56:56.499460Z processing line: x
  4. done
  5. Call future.get here to get result
  6. done
  7. Call future.get here to get result
  8. done
  9. Call future.get here to get result
  10. INFO - Demo done. 2023-11-01T05:56:56.512539Z


回到你的问题的要点,它似乎是问如果其中一个任务遇到特定的情况,如何中断兄弟任务。
在Java中,线程是协同中断的。你可以要求executor服务提前关闭,它会尝试中断它的每个任务的线程。但是中断处理有点棘手。我建议你设置一个特定的标志,让所有的任务共享。这个标志必须是线程-安全。所以我将使用AtomicBoolean作为任务之间的标志,表明一切正常,它们应该继续工作。让我们初始化该标志,并在构造任务对象时将其作为参数传递。

  1. List < String > lines = List.of ( "x" , "y" , "z" );
  2. AtomicBoolean flagToContinueProcessingLines = new AtomicBoolean ( true );
  3. List < LineProcessor > lineProcessors =
  4. lines
  5. .stream ( )
  6. .map ( ( String line ) -> new LineProcessor ( line , flagToContinueProcessingLines ) )
  7. .toList ( );
  8. List < Future < Boolean > > futures = null;
  9. try (
  10. ExecutorService executorService = Executors.newFixedThreadPool ( Runtime.getRuntime ( ).availableProcessors ( ) ) ;
  11. ) {
  12. futures = executorService.invokeAll ( lineProcessors );
  13. } catch ( InterruptedException e ) {
  14. throw new RuntimeException ( e );
  15. }


我们必须修改我们的任务类LineProcessor,以接受AtomicBoolean参数并在内部存储引用。

  1. public class LineProcessor implements Callable < Boolean > {
  2. private final String line;
  3. private final AtomicBoolean flagToContinueExecution;
  4. public LineProcessor ( final String line , final AtomicBoolean flagToContinueExecution ) {
  5. this.line = line;
  6. this.flagToContinueExecution = flagToContinueExecution;
  7. }
  8. @Override
  9. public Boolean call ( ) {
  10. if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
  11. System.out.println ( "In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " processing line: " + this.line );
  12. if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;
  13. boolean badSituationArose = ThreadLocalRandom.current ( ).nextBoolean ( );
  14. if ( badSituationArose ) {
  15. System.out.println ( "OOPS! In thread ID: " + Thread.currentThread ( ).threadId ( ) + " at " + Instant.now ( ) + " funky situation arose while processing line: " + this.line );
  16. this.flagToContinueExecution.set ( false ); // Flip the flag to signal the other sibling tasks to halt.
  17. return Boolean.FALSE;
  18. }
  19. Boolean success = true; // If we successfully processed this line.
  20. return success;
  21. }
  22. }


请注意我们是如何在if ( ! this.flagToContinueExecution.get ( ) ) return Boolean.FALSE;的多次出现中进行丢弃的。如果您想让该任务在被另一个任务标记时自动停止其工作,则必须在整个任务代码中散布这样的调用。
运行时:

  1. In thread ID: 21 at 2023-11-01T06:28:12.682969Z processing line: x
  2. In thread ID: 23 at 2023-11-01T06:28:12.682936Z processing line: z
  3. In thread ID: 22 at 2023-11-01T06:28:12.682935Z processing line: y
  4. OOPS! In thread ID: 22 at 2023-11-01T06:28:12.695232Z funky situation arose while processing line: y
  5. done
  6. Call future.get here to get result
  7. done
  8. Call future.get here to get result
  9. done
  10. Call future.get here to get result


虽然与您的问题无关,但我建议在Java 21+中使用虚拟线程以提高任务的吞吐量。只有当您的任务涉及阻塞并且不完全受CPU限制时才这样做。阻塞伴随着数据库调用,文件存储调用,网络调用等。
为了与当前的虚拟线程实现更兼容,对于在任务中执行的任何长时间运行的代码,请将synchronized的使用替换为Reentrant锁对象。
顺便说一句,当做类似这样的开关:

  1. switch (validID) {
  2. case 200:
  3. case 600:
  4. }


.始终包含default大小写以捕获意外值。

展开查看全部

相关问题