Worker-Thread 模式实现

x33g5p2x  于2022-05-05 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(424)

一 点睛

Worker-Thread 的设计模式有如下几个角色。

流水线工人:流水线工人主要用来对传送带上的产品进行加工。

流水线传送带:用于传送来自上线的产品。

产品组装说明书:用来说明该产品如何组装。

Worker-Thread 模式关键角色的关系图。

左侧的线程,也就是传送带上游的线程,不断地往传送带(Queue)中生产数据,而当 Channel 被启动,就会同时创建并启动若干数量的 Worker 线程,因此,可以看出,Worker 于 Channel 来说并不是单纯的依赖关系,而是聚合关系,Channel 必须知道 Worker 的存在。

二 实战

1 产品及组装说明书

  1. package concurrent.workrtthread;
  2. /**
  3. * @className: InstructionBook
  4. * @description: 产品及组装说明书,在流水线上需要加工的产品,create 作为一个模板方法,提供了用工产品的说明书
  5. * @date: 2022/5/3
  6. * @author: cakin
  7. */
  8. public abstract class InstructionBook {
  9. /**
  10. * 功能描述:加工产品的第1个步骤
  11. *
  12. * @author cakin
  13. * @date 2022/5/3
  14. */
  15. protected abstract void firstProcess();
  16. /**
  17. * 功能描述:加工产品的第2个步骤
  18. *
  19. * @author cakin
  20. * @date 2022/5/3
  21. */
  22. protected abstract void secondProcess();
  23. /**
  24. * 功能描述:经过流水线传送带的产品通过该方法进行加工
  25. *
  26. * @author cakin
  27. * @date 2022/5/3
  28. */
  29. public final void create() {
  30. this.firstProcess();
  31. this.secondProcess();
  32. }
  33. }

2 产品

  1. package concurrent.workrtthread;
  2. /**
  3. * @className: Production
  4. * @description: 产品
  5. * @date: 2022/5/3
  6. * @author: cakin
  7. */
  8. public class Production extends InstructionBook {
  9. // 产品编号
  10. private final int prodID;
  11. public Production(int prodID) {
  12. this.prodID = prodID;
  13. }
  14. @Override
  15. protected void firstProcess() {
  16. System.out.println("execute the " + prodID + " first process");
  17. }
  18. @Override
  19. protected void secondProcess() {
  20. System.out.println("execute the " + prodID + " second process");
  21. }
  22. }

3 流水线传送带

  1. package concurrent.workrtthread;
  2. /**
  3. * @className: ProductionChannel
  4. * @description: 产品传送带,在传送带上除了负责产品加工的工人之外,还有在传送带上等待加工的产品
  5. * @date: 2022/5/3
  6. * @author: cakin
  7. */
  8. public class ProductionChannel {
  9. // 传送带上最多可以有多少个待加工的产品
  10. private final static int MAX_PROD = 100;
  11. // 用于存放待加工的产品,也就是传送带
  12. private final Production[] productionQueue;
  13. // 队列尾
  14. private int tail;
  15. // 队列头
  16. private int head;
  17. // 当前在流水线上有多少个待加工的产品
  18. private int total;
  19. // 流水线工人
  20. private final Worker[] workers;
  21. public ProductionChannel(int workerSize) {
  22. this.workers = new Worker[workerSize];
  23. this.productionQueue = new Production[MAX_PROD];
  24. // 实例化每一个工人并且启动
  25. for (int i = 0; i < workerSize; i++) {
  26. workers[i] = new Worker("Worker-" + i, this);
  27. workers[i].start();
  28. }
  29. }
  30. /**
  31. * 功能描述:接受来自上游的半成品
  32. *
  33. * @author cakin
  34. * @date 2022/5/3
  35. */
  36. public void offerProduction(Production production) {
  37. synchronized (this) {
  38. // 当传送带上加工的产品超过了最大值时需要阻塞上游再次传送产品
  39. while (total > productionQueue.length) {
  40. try {
  41. this.wait();
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. // 将产品放到传送带,并且通知工人线程工作
  47. productionQueue[tail] = production;
  48. tail = (tail + 1) % productionQueue.length;
  49. total++;
  50. this.notifyAll();
  51. }
  52. }
  53. /**
  54. * 功能描述:工人线程从传送带上获取产品,并且进行加工
  55. *
  56. * @author 贝医
  57. * @date 2022/5/3
  58. */
  59. public Production takeProduction() {
  60. synchronized (this) {
  61. // 当传送带上没有产品,工人等待着从上游输送代传送带上
  62. while (total <= 0) {
  63. try {
  64. this.wait();
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. // 获取产品
  70. Production prod = productionQueue[head];
  71. head = (head + 1) % productionQueue.length;
  72. total--;
  73. this.notifyAll();
  74. return prod;
  75. }
  76. }
  77. }

4 工人

  1. package concurrent.workrtthread;
  2. import java.util.Random;
  3. import java.util.concurrent.TimeUnit;
  4. /**
  5. * @className: Worker
  6. * @description: 流水线工人
  7. * @date: 2022/5/3
  8. * @author: cakin24
  9. */
  10. public class Worker extends Thread {
  11. private final ProductionChannel channel;
  12. private final static Random random = new Random(System.currentTimeMillis());
  13. public Worker(String workerName, ProductionChannel channel) {
  14. super(workerName);
  15. this.channel = channel;
  16. }
  17. @Override
  18. public void run() {
  19. while (true) {
  20. try {
  21. // 从传送带上获取产品
  22. Production production = channel.takeProduction();
  23. System.out.println(getName() + " process the " + production);
  24. // 对产品进行加工
  25. production.create();
  26. TimeUnit.SECONDS.sleep(random.nextInt(10));
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. }

5 测试代码

  1. package concurrent.workrtthread;
  2. import java.util.Random;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. import java.util.stream.IntStream;
  6. public class Test {
  7. public static void main(String[] args) {
  8. // 流水线上有 5 个工人
  9. final ProductionChannel channel = new ProductionChannel(5);
  10. AtomicInteger prouctionNo = new AtomicInteger();
  11. // 流水线上有8个工作人员往传送带上不断地放置等待加工的半成品
  12. IntStream.range(1, 8).forEach(i ->
  13. new Thread(() -> {
  14. while (true) {
  15. channel.offerProduction(new Production(prouctionNo.getAndIncrement()));
  16. try {
  17. TimeUnit.SECONDS.sleep(new Random().nextInt(10));
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }).start()
  23. );
  24. }
  25. }

三 测试

Worker-4 process the concurrent.workrtthread.Production@57a380d2

Worker-1 process the concurrent.workrtthread.Production@3d33b971

execute the 0 first process

execute the 0 second process

execute the 1 first process

execute the 1 second process

Worker-3 process the concurrent.workrtthread.Production@71f66139

execute the 2 first process

execute the 2 second process

Worker-0 process the concurrent.workrtthread.Production@414fec4b

execute the 3 first process

execute the 3 second process

Worker-2 process the concurrent.workrtthread.Production@3a237724

execute the 4 first process

execute the 4 second process

Worker-4 process the concurrent.workrtthread.Production@59647456

execute the 5 first process

execute the 5 second process

Worker-1 process the concurrent.workrtthread.Production@33f440a7

Worker-2 process the concurrent.workrtthread.Production@34e49e10

execute the 6 first process

execute the 6 second process

execute the 7 first process

Worker-2 process the concurrent.workrtthread.Production@30db1c54

execute the 7 second process

execute the 8 first process

execute the 8 second process

Worker-1 process the concurrent.workrtthread.Production@37cbd201

Worker-0 process the concurrent.workrtthread.Production@4f881a12

execute the 10 first process

execute the 9 first process

execute the 9 second process

execute the 10 second process

Worker-0 process the concurrent.workrtthread.Production@4bb7873b

Worker-3 process the concurrent.workrtthread.Production@7f319177

execute the 11 first process

execute the 11 second process

execute the 15 first process

execute the 15 second process

在测试中,假设上游的流水线上有8个工人将产品放到传送带上,我们的传送带上定义了5个工人,运行上面的程序,Worker 将根据产品的使用说明书对产品进行再次加工。

相关文章