带超时功能的 Latch 设计模式

x33g5p2x  于2022-04-27 转载在 其他  
字(3.8k)|赞(0)|评价(0)|浏览(643)

一 点睛

对 Latch 设计模式增加超时功能

二 实战

1 Latch

  1. package concurrent.latch;
  2. import java.util.concurrent.TimeUnit;
  3. public abstract class Latch {
  4. // 用于控制多少个线程完成任务时才能打开阀门
  5. protected int limit;
  6. // 通过构造函数传入 limit
  7. public Latch(int limit) {
  8. this.limit = limit;
  9. }
  10. // 该方法会使得当前线程一直等待,直到所有的线程都完成工作,被阻塞的线程是允许被中断的
  11. public abstract void await() throws InterruptedException;
  12. // 当任务线程完成工作之后调用该方法使得计数器减一
  13. public abstract void countDown();
  14. // 获取当前还有多少个线程没有完成任务
  15. public abstract int getUnarriveed();
  16. // 超时等待
  17. public abstract void await(TimeUnit unit, long time) throws InterruptedException, WaitTimeoutException;
  18. }

2 CountDownLatch

  1. package concurrent.latch;
  2. import java.util.concurrent.TimeUnit;
  3. /**
  4. * @className: CountDownLatch
  5. * @description: 无限等待门栓实现
  6. * @date: 2022/4/25
  7. * @author: cakin
  8. */
  9. public class CountDownLatch extends Latch {
  10. public CountDownLatch(int limit) {
  11. super(limit);
  12. }
  13. @Override
  14. public void await() throws InterruptedException {
  15. synchronized (this) {
  16. // 当 limit > 0 时,当前线程进入阻塞状态
  17. while (limit > 0) {
  18. this.wait();
  19. }
  20. }
  21. }
  22. @Override
  23. public void countDown() {
  24. synchronized (this) {
  25. if (limit <= 0) {
  26. throw new IllegalStateException("all of task already arrived");
  27. }
  28. // 使 limit 减一,并且通知阻塞线程
  29. limit--;
  30. this.notifyAll();
  31. }
  32. }
  33. @Override
  34. public int getUnarriveed() {
  35. // 返回有多少线程还未完成任务
  36. return limit;
  37. }
  38. @Override
  39. public void await(TimeUnit unit, long time) throws InterruptedException, WaitTimeoutException {
  40. if (time <= 0) {
  41. throw new IllegalArgumentException("The time is invalid");
  42. }
  43. long remainingNanos = unit.toNanos(time); // 将秒转换为纳秒
  44. // 等待任务将在 endNanos 纳秒后超时
  45. final long endNanos = System.nanoTime() + remainingNanos;
  46. synchronized (this) {
  47. while (limit > 0) {
  48. if (TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0) {
  49. throw new WaitTimeoutException("The wait time over specify time。");
  50. }
  51. // 等待 remainingNanos,在等待的过程中可能会被中断,需要重新计算 remainingNanos
  52. this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
  53. remainingNanos = endNanos - System.nanoTime();
  54. }
  55. }
  56. }
  57. }

3 ProgrammerTravel

  1. package concurrent.latch;
  2. import java.util.concurrent.ThreadLocalRandom;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * @className: ProgrammerTravel
  6. * @description: 程序员旅游线程
  7. * @date: 2022/4/25
  8. * @author: cakin
  9. */
  10. public class ProgrammerTravel extends Thread {
  11. // 门栓
  12. private final Latch latch;
  13. // 程序员
  14. private final String programmer;
  15. // 交通工具
  16. private final String transportation;
  17. public ProgrammerTravel(Latch latch, String programmer, String transportation) {
  18. this.latch = latch;
  19. this.programmer = programmer;
  20. this.transportation = transportation;
  21. }
  22. @Override
  23. public void run() {
  24. System.out.println(programmer + " start take the transportation [" + transportation + "]");
  25. try {
  26. TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. System.out.println(programmer + " arrived by" + transportation);
  31. // 完成任务时使计数器减一
  32. latch.countDown();
  33. }
  34. }

4 WaitTimeoutException

  1. package concurrent.latch;
  2. public class WaitTimeoutException extends Exception {
  3. public WaitTimeoutException(String message) {
  4. super(message);
  5. }
  6. }

5 Test1

  1. package concurrent.latch;
  2. import java.util.concurrent.TimeUnit;
  3. public class Test1 {
  4. public static void main(String[] args) {
  5. Latch latch = new CountDownLatch(4);
  6. new ProgrammerTravel(latch, "Alex", "Bus").start();
  7. new ProgrammerTravel(latch, "Gavin", "Walking").start();
  8. new ProgrammerTravel(latch, "Jack", "Subway").start();
  9. new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
  10. // 当前线程(main 线程会进入阻塞,直到四个程序员全部都达到目的地)
  11. try {
  12. latch.await(TimeUnit.SECONDS,5);
  13. System.out.println("== all of programmer arrvied ==");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } catch (WaitTimeoutException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }

三 测试结果

Gavin start take the transportation [Walking]

Alex start take the transportation [Bus]

Jack start take the transportation [Subway]

Dillon start take the transportation [Bicycle]

Jack arrived bySubway

Gavin arrived byWalking

Alex arrived byBus

concurrent.latch.WaitTimeoutException: The wait time over specify time。

at concurrent.latch.CountDownLatch.await(CountDownLatch.java:56)

at concurrent.latch.Test1.main(Test1.java:14)

Dillon arrived byBicycle

相关文章