JUC学习之共享模型工具之JUC并发工具包下

x33g5p2x  于2022-01-09 转载在 其他  
字(22.7k)|赞(0)|评价(0)|浏览(289)

StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

加解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
 // 锁升级
}

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

package com;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.StampedLock;

import static java.lang.Thread.sleep;

@Slf4j
class DataContainerStamped {
    private int data;
    private final StampedLock lock = new StampedLock();

    public DataContainerStamped(int data) {
        this.data = data;
    }

    @SneakyThrows
    public int read(int readTime) {
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        sleep(readTime);
        if (lock.validate(stamp)) {
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        }
        // 锁升级 - 读锁
        log.debug("updating to read lock... {}", stamp);
        try {
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            sleep(readTime);
            log.debug("read finish...{}, data:{}", stamp, data);
            return data;
        } finally {
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }

    @SneakyThrows
    public void write(int newData) {
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
            sleep(2);
            this.data = newData;
        } finally {
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
}

测试 读-读 可以优化

package com;

public class Main {
    @SneakyThrows
    public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);

        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();

        sleep((long) 0.5);

        new Thread(() -> {
            dataContainer.read(0);
        }, "t2").start();
    }
}

输出结果,可以看到实际没有加读锁

15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

测试 读-写 时优化读补加读锁

public class Main {
    @SneakyThrows
    public static void main(String[] args) {
        DataContainerStamped dataContainer = new DataContainerStamped(1);

        new Thread(() -> {
            dataContainer.read(1);
        }, "t1").start();

        sleep((long) 0.5);

        new Thread(() -> {
            dataContainer.write(100);
        }, "t2").start();
    }
}

输出结果

15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384 
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 
15:57:02.719 c.DataContainerStamped [t1] - read lock 513 
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513

注意

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入

Semaphore

基本使用

[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限。

// 1. 创建 semaphore 对象
        Semaphore semaphore = new Semaphore(3);
        // 2. 10个线程同时运行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                // 3. 获取许可
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.debug("running...");
                    sleep(1);
                    log.debug("end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 4. 释放许可
                    semaphore.release();
                }
            }).start();
        }
[当前线程: Thread-1][DEBUG] 2022年01月08日 17时46分39秒981毫秒-消息:running...
[当前线程: Thread-2][DEBUG] 2022年01月08日 17时46分39秒981毫秒-消息:running...
[当前线程: Thread-0][DEBUG] 2022年01月08日 17时46分39秒981毫秒-消息:running...
[当前线程: Thread-2][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:end...
[当前线程: Thread-0][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:end...
[当前线程: Thread-1][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:end...
[当前线程: Thread-4][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:running...
[当前线程: Thread-3][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:running...
[当前线程: Thread-5][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:running...
[当前线程: Thread-5][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:end...
[当前线程: Thread-4][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:end...
[当前线程: Thread-3][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:end...
[当前线程: Thread-7][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:running...
[当前线程: Thread-6][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:running...
[当前线程: Thread-8][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:running...
[当前线程: Thread-7][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:end...
[当前线程: Thread-8][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:end...
[当前线程: Thread-6][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:end...
[当前线程: Thread-9][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:running...
[当前线程: Thread-9][DEBUG] 2022年01月08日 17时46分39秒990毫秒-消息:end...

限制对共享资源的使用

semaphore 实现

  • 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)
  • 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的
package com;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicIntegerArray;

@Slf4j(topic = "c.Pool")
class Pool {
    // 1. 连接池大小
    private final int poolSize;
    // 2. 连接对象数组
    private Connection[] connections;
    // 3. 连接状态数组 0 表示空闲, 1 表示繁忙
    private AtomicIntegerArray states;

    private Semaphore semaphore;

    // 4. 构造方法初始化
    public Pool(int poolSize) {
        this.poolSize = poolSize;
        // 让许可数与资源数一致
        this.semaphore = new Semaphore(poolSize);
        this.connections = new Connection[poolSize];
        this.states = new AtomicIntegerArray(new int[poolSize]);
        for (int i = 0; i < poolSize; i++) {
            connections[i] = new MockConnection("连接" + (i + 1));
        }
    }

    // 5. 借连接
    public Connection borrow() {// t1, t2, t3
        // 获取许可
        try {
            semaphore.acquire(); // 没有许可的线程,在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < poolSize; i++) {
            // 获取空闲连接
            if (states.get(i) == 0) {
                if (states.compareAndSet(i, 0, 1)) {
                    log.debug("borrow {}", connections[i]);
                    return connections[i];
                }
            }
        }
        // 不会执行到这里
        return null;
    }

    // 6. 归还连接
    public void free(Connection conn) {
        for (int i = 0; i < poolSize; i++) {
            if (connections[i] == conn) {
                states.set(i, 0);
                log.debug("free {}", conn);
                semaphore.release();
                break;
            }
        }
    }
}

单位时间内限流

guava 实现

@RestController
    public class TestController {
        private RateLimiter limiter = RateLimiter.create(50);

        @GetMapping("/test")
        public String test() {
       // limiter.acquire();
            return "ok";
        }
        
    }

没有限流之前—ab压测

ab -c 10 -t 10 http://localhost:8080/test

结果

This is ApacheBench, Version 2.3 <$Revision: 1843412 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 5000 requests
Completed 10000 requests
Completed 15000 requests
Completed 20000 requests
Finished 24706 requests

Server Software:
Server Hostname: localhost
Server Port: 8080

Document Path: /test
Document Length: 2 bytes

Concurrency Level: 10
Time taken for tests: 10.005 seconds
Complete requests: 24706
Failed requests: 0
Total transferred: 3311006 bytes
HTML transferred: 49418 bytes
Requests per second: 2469.42 [#/sec] (mean)
Time per request: 4.050 [ms] (mean)
Time per request: 0.405 [ms] (mean, across all concurrent requests)
Transfer rate: 323.19 [Kbytes/sec] received

Connection Times (ms)
          min mean[+/-sd] median max
Connect: 0 0 1.4 0 16
Processing: 0 4 7.6 0 323
Waiting: 0 3 6.9 0 323
Total: 0 4 7.6 0 323

Percentage of the requests served within a certain time (ms)
 50% 0
 66% 2
 75% 8
 80% 8
 90% 10
 95% 16
 98% 16
 99% 16
 100% 323 (longest request)

限流之后

This is ApacheBench, Version 2.3 <$Revision: 1843412 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Finished 545 requests

Server Software:
Server Hostname: localhost
Server Port: 8080

Document Path: /test
Document Length: 2 bytes

Concurrency Level: 10
Time taken for tests: 10.007 seconds
Complete requests: 545
Failed requests: 0
Total transferred: 73030 bytes
HTML transferred: 1090 bytes
Requests per second: 54.46 [#/sec] (mean)
Time per request: 183.621 [ms] (mean)
Time per request: 18.362 [ms] (mean, across all concurrent requests)
Transfer rate: 7.13 [Kbytes/sec] received

Connection Times (ms)
         min mean[+/-sd] median max
Connect: 0 0 1.1 0 16
Processing: 0 179 57.0 199 211
Waiting: 0 178 57.6 198 211
Total: 0 179 56.9 199 211

Percentage of the requests served within a certain time (ms)
 50% 199
  66% 200
 75% 200
 80% 200
 90% 201
 95% 201
 98% 202
 99% 203
 100% 211 (longest request)

Semaphore 原理

1. 加锁流程

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一

刚开始,permits(state)为 3,这时 5 个线程来获取资源

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,
Semaphore 的构造方法

获取资源的acquire方法

acquireSharedInterruptibly(int arg)

而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

doAcquireInterruptibly(arg);

2.解锁流程

这时 Thread-4 释放了 permits,状态如下

接下来 Thread-0 竞争成功,state再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 state是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

源码

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        // permits 即 state
        super(permits);
    }

    // Semaphore 方法, 方便阅读, 放在此处
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    // 尝试获得共享锁
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }

    // Sync 继承过来的方法, 方便阅读, 放在此处
    final int nonfairTryAcquireShared(int acquires) {
        for (; ; ) {
            int available = getState();
            int remaining = available - acquires;
            if (
                // 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
                    remaining < 0 ||
                            // 如果 cas 重试成功, 返回正数, 表示获取成功
                            compareAndSetState(available, remaining)
            ) {
                return remaining;
            }
        }
    }

    // AQS 继承过来的方法, 方便阅读, 放在此处
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 再次尝试获取许可
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 成功后本线程出队(AQS), 所在 Node设置为 head
                        // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
                        // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
                        // r 表示可用资源数, 为 0 则不会继续传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // Semaphore 方法, 方便阅读, 放在此处
    public void release() {
        sync.releaseShared(1);
    }

    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    // Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryReleaseShared(int releases) {
        for (; ; ) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
}

为什么要有 PROPAGATE

早期有 bug

  • releaseShared 方法
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • doAcquireShared 方法
private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//返回剩余许可数
                    if (r >= 0) {
                        // 这里会有空档
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • setHeadAndPropagate 方法
private void setHeadAndPropagate(Node node, int propagate) {
        setHead(node);
        // 有空闲资源
        if (propagate > 0 && node.waitStatus != 0) {
            Node s = node.next;
            // 下一个
            if (s == null || s.isShared())
                unparkSuccessor(node);
        }
    }
  • 假设存在某次循环中队列里排队的结点情况为 head(-1)->t1(-1)->t2(-1)
  • 假设存在将要信号量释放的 T3 和 T4,释放顺序为先 T3 后 T4

正常流程

产生 bug 的情况

修复前版本执行流程

  • T3 调用 releaseShared(1),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0
  • T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,假设返回值为 0(获取锁成功,但没有剩余资源量)
  • T4 调用 releaseShared(1),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个head),不满足条件,因此不调用 unparkSuccessor(head)
  • T1 获取信号量成功,调用 setHeadAndPropagate 时,因为不满足 propagate > 0(2 的返回值也就是propagate(剩余资源量) == 0),从而不会唤醒后继结点, T2 线程得不到唤醒

bug 修复后

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 设置自己为 head
        setHead(node);
        // propagate 表示有共享资源(例如共享读锁或信号量)
        // 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
        // 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 如果是最后一个节点或者是等待共享读锁的节点
            if (s == null || s.isShared()) {
                doReleaseShared();
            }
        }
    }

    private void doReleaseShared() {
        // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
        // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
        for (; ; ) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue; // loop to recheck cases
                    unparkSuccessor(h);
                } else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; // loop on failed CAS
            }
            if (h == head) // loop if head changed
                break;
        }
    }

  • T3 调用 releaseShared(),直接调用了 unparkSuccessor(head),head 的等待状态从 -1 变为 0
  • T1 由于 T3 释放信号量被唤醒,调用 tryAcquireShared,假设返回值为 0(获取锁成功,但没有剩余资源量)
  • T4 调用 releaseShared(),此时 head.waitStatus 为 0(此时读到的 head 和 1 中为同一个 head),调用doReleaseShared() 将等待状态置为 PROPAGATE(-3)
  • T1 获取信号量成功,调用 setHeadAndPropagate 时,读到 h.waitStatus < 0,从而调用doReleaseShared() 唤醒 T2

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。

其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {

        CountDownLatch latch = new CountDownLatch(3);

        new Thread(() -> {
            log.debug("begin...");
            try {
                sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();

        new Thread(() -> {
            log.debug("begin...");
            try {
                sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();

        new Thread(() -> {
            log.debug("begin...");
            try {
                sleep((long) 1.5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        }).start();
        log.debug("waiting...");
        latch.await();
        log.debug("wait end...");
    }
}

结果

[Thread-0][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:begin...
[Thread-2][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:begin...
[Thread-1][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:begin...
[main][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:waiting...
[Thread-2][DEBUG][2022年01月08日 20时32分20秒758毫秒]消息:end...2
[Thread-0][DEBUG][2022年01月08日 20时32分20秒758毫秒]消息:end...1
[Thread-1][DEBUG][2022年01月08日 20时32分20秒759毫秒]消息:end...0
[main][DEBUG][2022年01月08日 20时32分20秒759毫秒]消息:wait end...

可以配合线程池使用,改进如下

package com;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.lang.Thread.sleep;

/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService service = Executors.newFixedThreadPool(4);
        
        service.submit(() -> {
            log.debug("begin...");
            try {
                sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        });
        
        service.submit(() -> {
            log.debug("begin...");
            try {
                sleep((long) 1.5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        });
        
        service.submit(() -> {
            log.debug("begin...");
            try {
                sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            log.debug("end...{}", latch.getCount());
        });
        
        service.submit(() -> {
            try {
                log.debug("waiting...");
                latch.await();
                log.debug("wait end...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

输出

[pool-1-thread-2] [DEBUG] [2022年01月08日 20时34分31秒823毫秒] 消息:begin...
[pool-1-thread-4] [DEBUG] [2022年01月08日 20时34分31秒824毫秒] 消息:waiting...
[pool-1-thread-3] [DEBUG] [2022年01月08日 20时34分31秒823毫秒] 消息:begin...
[pool-1-thread-1] [DEBUG] [2022年01月08日 20时34分31秒823毫秒] 消息:begin...
[pool-1-thread-4] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:wait end...
[pool-1-thread-2] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:end...2
[pool-1-thread-3] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:end...0
[pool-1-thread-1] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:end...1

应用之同步等待多线程准备完毕

package com;

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Thread.sleep;

/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger num = new AtomicInteger(0);

        ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
            //传入线程工厂,给每个线程起一个名字
            return new Thread(r, "t" + num.getAndIncrement());
        });

        CountDownLatch latch = new CountDownLatch(10);

        String[] all = new String[10];

        Random r = new Random();

        for (int j = 0; j < 10; j++) {
            //lambda中只能传入局部常量
            int x = j;
            service.submit(() -> {
                for (int i = 0; i <= 100; i++) {
                    try {
                        Thread.sleep(r.nextInt(100));
                    } catch (InterruptedException e) {
                    }
                    all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";
                    //\r是回车符,回到行首
                    System.out.print("\r" + Arrays.toString(all));
                }
                latch.countDown();
            });
        }
        latch.await();
        System.out.println("\n游戏开始...");
        service.shutdown();
    }
}

等待所有玩家准备完毕,然后开始游戏

应用之同步等待多个远程调用结束

@RestController
public class TestCountDownlatchController {
    @GetMapping("/order/{id}")
    public Map<String, Object> order(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("total", "2300.00");
        sleep(2000);
        return map;
    }

    @GetMapping("/product/{id}")
    public Map<String, Object> product(@PathVariable int id) {

        HashMap<String, Object> map = new HashMap<>();
        if (id == 1) {
            map.put("name", "小爱音箱");
            map.put("price", 300);
        } else if (id == 2) {
            map.put("name", "小米手机");
            map.put("price", 2000);
        }
        map.put("id", id);
        sleep(1000);
        return map;
    }

    @GetMapping("/logistics/{id}")
    public Map<String, Object> logistics(@PathVariable int id) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("id", id);
        map.put("name", "中通快递");
        sleep(2500);
        return map;
    }

    private void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

rest 远程调用

CountDownLatch 实现

/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        RestTemplate restTemplate = new RestTemplate();
        log.debug("begin");
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(4);
          service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
              latch.countDown();
        });
        service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
            latch.countDown();
        });
        service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
            latch.countDown();
        });
         service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);

             latch.countDown();
        });
         latch.await();
        log.debug("执行完毕");
        service.shutdown();
    }
}

future实现:

/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        RestTemplate restTemplate = new RestTemplate();
        log.debug("begin");
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(4);
        Future<Map<String, Object>> f1 = service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
            return r;
        });
        Future<Map<String, Object>> f2 = service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
            return r;
        });
        Future<Map<String, Object>> f3 = service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
            return r;
        });
        Future<Map<String, Object>> f4 = service.submit(() -> {
            Map<String, Object> r =
                    restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
            return r;
        });
        System.out.println(f1.get());
        System.out.println(f2.get());
        System.out.println(f3.get());
        System.out.println(f4.get());
        log.debug("执行完毕");
        service.shutdown();
    }
}

执行结果

19:51:39.711 c.TestCountDownLatch [main] - begin 
{total=2300.00, id=1} 
{price=300, name=小爱音箱, id=1} 
{price=2000, name=小米手机, id=2} 
{name=中通快递, id=1} 
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕

CountDownLatch和Future都可以实现,但是各有各的好处,future更适合需要获取结果,进行合并操作的业务逻辑
当然CountDownLatch也可以实现

CyclicBarrier

[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行

/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行

        new Thread(() -> {
            System.out.println("线程1开始.." + new Date());
            try {
                cb.await(); // 当个数不足时,等待,count--,当count==0的时候,恢复运行
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("线程1继续向下运行..." + new Date());
        }).start();

        new Thread(() -> {
            System.out.println("线程2开始.." + new Date());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            try {
                cb.await(); // 2 秒后,线程个数够2,继续运行,count--,当count==0的时候,恢复运行
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("线程2继续向下运行..." + new Date());
        }).start();
    }
}

输出

线程2开始..Sat Jan 08 21:37:19 CST 2022
线程1开始..Sat Jan 08 21:37:19 CST 2022
线程2继续向下运行...Sat Jan 08 21:37:21 CST 2022
线程1继续向下运行...Sat Jan 08 21:37:21 CST 2022

注意 CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』

CyclicBarrier 可重用性演示

package com;

import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.concurrent.*;

/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行

        for(int i=0;i<3;i++)
        {
        //当第一次count变为0后,再次调用await方法,count恢复为2
            new Thread(() -> {
                System.out.println("线程1开始.." + new Date());
                try {
                    cb.await(); // 当个数不足时,等待,count--,当count==0的时候,恢复运行
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("线程1继续向下运行..." + new Date());
            }).start();

            new Thread(() -> {
                System.out.println("线程2开始.." + new Date());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                try {
                    cb.await(); // 2 秒后,线程个数够2,继续运行,count--,当count==0的时候,恢复运行
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("线程2继续向下运行..." + new Date());
            }).start();
        }
    }
}

输出:

线程1开始..Sat Jan 08 21:34:49 CST 2022
线程1开始..Sat Jan 08 21:34:49 CST 2022
线程2开始..Sat Jan 08 21:34:49 CST 2022
线程2开始..Sat Jan 08 21:34:49 CST 2022
线程2开始..Sat Jan 08 21:34:49 CST 2022
线程1开始..Sat Jan 08 21:34:49 CST 2022
线程1继续向下运行...Sat Jan 08 21:34:49 CST 2022
线程1继续向下运行...Sat Jan 08 21:34:49 CST 2022
线程2继续向下运行...Sat Jan 08 21:34:51 CST 2022
线程1继续向下运行...Sat Jan 08 21:34:51 CST 2022
线程2继续向下运行...Sat Jan 08 21:34:51 CST 2022
线程2继续向下运行...Sat Jan 08 21:34:51 CST 2022
  • CyclicBarrier也是以count作为计数,当count为0时,被暂停的所有线程恢复运行,但是与CountDownLatch 不同在于,当count=0时:

相关文章