该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
long stamp = lock.readLock();
lock.unlockRead(stamp);
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
乐观读,StampedLock
支持 tryOptimisticRead() 方法(乐观读)
,读取完毕后需要做一次 戳校验
如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}
提供一个 数据容器类
内部分别使用读锁保护数据的 read()
方法,写锁保护数据的 write()
方法
package com;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.StampedLock;
import static java.lang.Thread.sleep;
@Slf4j
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
@SneakyThrows
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 锁升级 - 读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}
@SneakyThrows
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}
测试 读-读
可以优化
package com;
public class Main {
@SneakyThrows
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep((long) 0.5);
new Thread(() -> {
dataContainer.read(0);
}, "t2").start();
}
}
输出结果,可以看到实际没有加读锁
15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
测试 读-写
时优化读补加读锁
public class Main {
@SneakyThrows
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep((long) 0.5);
new Thread(() -> {
dataContainer.write(100);
}, "t2").start();
}
}
输出结果
15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256
15:57:00.717 c.DataContainerStamped [t2] - write lock 384
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384
15:57:02.719 c.DataContainerStamped [t1] - read lock 513
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513
注意
[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限。
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);
// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 3. 获取许可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
[当前线程: Thread-1][DEBUG] 2022年01月08日 17时46分39秒981毫秒-消息:running...
[当前线程: Thread-2][DEBUG] 2022年01月08日 17时46分39秒981毫秒-消息:running...
[当前线程: Thread-0][DEBUG] 2022年01月08日 17时46分39秒981毫秒-消息:running...
[当前线程: Thread-2][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:end...
[当前线程: Thread-0][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:end...
[当前线程: Thread-1][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:end...
[当前线程: Thread-4][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:running...
[当前线程: Thread-3][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:running...
[当前线程: Thread-5][DEBUG] 2022年01月08日 17时46分39秒984毫秒-消息:running...
[当前线程: Thread-5][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:end...
[当前线程: Thread-4][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:end...
[当前线程: Thread-3][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:end...
[当前线程: Thread-7][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:running...
[当前线程: Thread-6][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:running...
[当前线程: Thread-8][DEBUG] 2022年01月08日 17时46分39秒986毫秒-消息:running...
[当前线程: Thread-7][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:end...
[当前线程: Thread-8][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:end...
[当前线程: Thread-6][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:end...
[当前线程: Thread-9][DEBUG] 2022年01月08日 17时46分39秒988毫秒-消息:running...
[当前线程: Thread-9][DEBUG] 2022年01月08日 17时46分39秒990毫秒-消息:end...
package com;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicIntegerArray;
@Slf4j(topic = "c.Pool")
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
private Semaphore semaphore;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
// 让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i + 1));
}
}
// 5. 借连接
public Connection borrow() {// t1, t2, t3
// 获取许可
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if (states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 不会执行到这里
return null;
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug("free {}", conn);
semaphore.release();
break;
}
}
}
}
@RestController
public class TestController {
private RateLimiter limiter = RateLimiter.create(50);
@GetMapping("/test")
public String test() {
// limiter.acquire();
return "ok";
}
}
没有限流之前—ab压测
ab -c 10 -t 10 http://localhost:8080/test
结果
This is ApacheBench, Version 2.3 <$Revision: 1843412 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient)
Completed 5000 requests
Completed 10000 requests
Completed 15000 requests
Completed 20000 requests
Finished 24706 requests
Server Software:
Server Hostname: localhost
Server Port: 8080
Document Path: /test
Document Length: 2 bytes
Concurrency Level: 10
Time taken for tests: 10.005 seconds
Complete requests: 24706
Failed requests: 0
Total transferred: 3311006 bytes
HTML transferred: 49418 bytes
Requests per second: 2469.42 [#/sec] (mean)
Time per request: 4.050 [ms] (mean)
Time per request: 0.405 [ms] (mean, across all concurrent requests)
Transfer rate: 323.19 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 1.4 0 16
Processing: 0 4 7.6 0 323
Waiting: 0 3 6.9 0 323
Total: 0 4 7.6 0 323
Percentage of the requests served within a certain time (ms)
50% 0
66% 2
75% 8
80% 8
90% 10
95% 16
98% 16
99% 16
100% 323 (longest request)
限流之后
This is ApacheBench, Version 2.3 <$Revision: 1843412 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient)
Finished 545 requests
Server Software:
Server Hostname: localhost
Server Port: 8080
Document Path: /test
Document Length: 2 bytes
Concurrency Level: 10
Time taken for tests: 10.007 seconds
Complete requests: 545
Failed requests: 0
Total transferred: 73030 bytes
HTML transferred: 1090 bytes
Requests per second: 54.46 [#/sec] (mean)
Time per request: 183.621 [ms] (mean)
Time per request: 18.362 [ms] (mean, across all concurrent requests)
Transfer rate: 7.13 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 1.1 0 16
Processing: 0 179 57.0 199 211
Waiting: 0 178 57.6 198 211
Total: 0 179 56.9 199 211
Percentage of the requests served within a certain time (ms)
50% 199
66% 200
75% 200
80% 200
90% 201
95% 201
98% 202
99% 203
100% 211 (longest request)
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一
刚开始,permits(state)为 3,这时 5 个线程来获取资源
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,
Semaphore 的构造方法
获取资源的acquire方法
acquireSharedInterruptibly(int arg)
而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
doAcquireInterruptibly(arg);
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,state再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 state是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
// permits 即 state
super(permits);
}
// Semaphore 方法, 方便阅读, 放在此处
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 尝试获得共享锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Sync 继承过来的方法, 方便阅读, 放在此处
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
int remaining = available - acquires;
if (
// 如果许可已经用完, 返回负数, 表示获取失败, 进入 doAcquireSharedInterruptibly
remaining < 0 ||
// 如果 cas 重试成功, 返回正数, 表示获取成功
compareAndSetState(available, remaining)
) {
return remaining;
}
}
}
// AQS 继承过来的方法, 方便阅读, 放在此处
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
// 再次尝试获取许可
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功后本线程出队(AQS), 所在 Node设置为 head
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
// r 表示可用资源数, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Semaphore 方法, 方便阅读, 放在此处
public void release() {
sync.releaseShared(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
早期有 bug
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//返回剩余许可数
if (r >= 0) {
// 这里会有空档
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
// 有空闲资源
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
// 下一个
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
修复前版本执行流程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置自己为 head
setHead(node);
// propagate 表示有共享资源(例如共享读锁或信号量)
// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
// 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最后一个节点或者是等待共享读锁的节点
if (s == null || s.isShared()) {
doReleaseShared();
}
}
}
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (; ; ) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
log.debug("begin...");
try {
sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
new Thread(() -> {
log.debug("begin...");
try {
sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
new Thread(() -> {
log.debug("begin...");
try {
sleep((long) 1.5);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();
log.debug("waiting...");
latch.await();
log.debug("wait end...");
}
}
结果
[Thread-0][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:begin...
[Thread-2][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:begin...
[Thread-1][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:begin...
[main][DEBUG][2022年01月08日 20时32分20秒755毫秒]消息:waiting...
[Thread-2][DEBUG][2022年01月08日 20时32分20秒758毫秒]消息:end...2
[Thread-0][DEBUG][2022年01月08日 20时32分20秒758毫秒]消息:end...1
[Thread-1][DEBUG][2022年01月08日 20时32分20秒759毫秒]消息:end...0
[main][DEBUG][2022年01月08日 20时32分20秒759毫秒]消息:wait end...
可以配合线程池使用,改进如下
package com;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.Thread.sleep;
/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(4);
service.submit(() -> {
log.debug("begin...");
try {
sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
try {
sleep((long) 1.5);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
try {
sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
try {
log.debug("waiting...");
latch.await();
log.debug("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
输出
[pool-1-thread-2] [DEBUG] [2022年01月08日 20时34分31秒823毫秒] 消息:begin...
[pool-1-thread-4] [DEBUG] [2022年01月08日 20时34分31秒824毫秒] 消息:waiting...
[pool-1-thread-3] [DEBUG] [2022年01月08日 20时34分31秒823毫秒] 消息:begin...
[pool-1-thread-1] [DEBUG] [2022年01月08日 20时34分31秒823毫秒] 消息:begin...
[pool-1-thread-4] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:wait end...
[pool-1-thread-2] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:end...2
[pool-1-thread-3] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:end...0
[pool-1-thread-1] [DEBUG] [2022年01月08日 20时34分31秒827毫秒] 消息:end...1
package com;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Thread.sleep;
/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
AtomicInteger num = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {
//传入线程工厂,给每个线程起一个名字
return new Thread(r, "t" + num.getAndIncrement());
});
CountDownLatch latch = new CountDownLatch(10);
String[] all = new String[10];
Random r = new Random();
for (int j = 0; j < 10; j++) {
//lambda中只能传入局部常量
int x = j;
service.submit(() -> {
for (int i = 0; i <= 100; i++) {
try {
Thread.sleep(r.nextInt(100));
} catch (InterruptedException e) {
}
all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";
//\r是回车符,回到行首
System.out.print("\r" + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始...");
service.shutdown();
}
}
等待所有玩家准备完毕,然后开始游戏
@RestController
public class TestCountDownlatchController {
@GetMapping("/order/{id}")
public Map<String, Object> order(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("total", "2300.00");
sleep(2000);
return map;
}
@GetMapping("/product/{id}")
public Map<String, Object> product(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
if (id == 1) {
map.put("name", "小爱音箱");
map.put("price", 300);
} else if (id == 2) {
map.put("name", "小米手机");
map.put("price", 2000);
}
map.put("id", id);
sleep(1000);
return map;
}
@GetMapping("/logistics/{id}")
public Map<String, Object> logistics(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("name", "中通快递");
sleep(2500);
return map;
}
private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
rest 远程调用
CountDownLatch 实现
/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
latch.countDown();
});
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
latch.countDown();
});
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
latch.countDown();
});
service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
latch.countDown();
});
latch.await();
log.debug("执行完毕");
service.shutdown();
}
}
future实现:
/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
Future<Map<String, Object>> f1 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f2 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f3 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
return r;
});
Future<Map<String, Object>> f4 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
return r;
});
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
log.debug("执行完毕");
service.shutdown();
}
}
执行结果
19:51:39.711 c.TestCountDownLatch [main] - begin
{total=2300.00, id=1}
{price=300, name=小爱音箱, id=1}
{price=2000, name=小米手机, id=2}
{name=中通快递, id=1}
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕
CountDownLatch和Future都可以实现,但是各有各的好处,future更适合需要获取结果,进行合并操作的业务逻辑
当然CountDownLatch也可以实现
[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
new Thread(() -> {
System.out.println("线程1开始.." + new Date());
try {
cb.await(); // 当个数不足时,等待,count--,当count==0的时候,恢复运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..." + new Date());
}).start();
new Thread(() -> {
System.out.println("线程2开始.." + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
try {
cb.await(); // 2 秒后,线程个数够2,继续运行,count--,当count==0的时候,恢复运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..." + new Date());
}).start();
}
}
输出
线程2开始..Sat Jan 08 21:37:19 CST 2022
线程1开始..Sat Jan 08 21:37:19 CST 2022
线程2继续向下运行...Sat Jan 08 21:37:21 CST 2022
线程1继续向下运行...Sat Jan 08 21:37:21 CST 2022
注意 CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』
CyclicBarrier 可重用性演示
package com;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.concurrent.*;
/** * @author 大忽悠 * @create 2022/1/8 15:20 */
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
for(int i=0;i<3;i++)
{
//当第一次count变为0后,再次调用await方法,count恢复为2
new Thread(() -> {
System.out.println("线程1开始.." + new Date());
try {
cb.await(); // 当个数不足时,等待,count--,当count==0的时候,恢复运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..." + new Date());
}).start();
new Thread(() -> {
System.out.println("线程2开始.." + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
try {
cb.await(); // 2 秒后,线程个数够2,继续运行,count--,当count==0的时候,恢复运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..." + new Date());
}).start();
}
}
}
输出:
线程1开始..Sat Jan 08 21:34:49 CST 2022
线程1开始..Sat Jan 08 21:34:49 CST 2022
线程2开始..Sat Jan 08 21:34:49 CST 2022
线程2开始..Sat Jan 08 21:34:49 CST 2022
线程2开始..Sat Jan 08 21:34:49 CST 2022
线程1开始..Sat Jan 08 21:34:49 CST 2022
线程1继续向下运行...Sat Jan 08 21:34:49 CST 2022
线程1继续向下运行...Sat Jan 08 21:34:49 CST 2022
线程2继续向下运行...Sat Jan 08 21:34:51 CST 2022
线程1继续向下运行...Sat Jan 08 21:34:51 CST 2022
线程2继续向下运行...Sat Jan 08 21:34:51 CST 2022
线程2继续向下运行...Sat Jan 08 21:34:51 CST 2022
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://cjdhy.blog.csdn.net/article/details/122380872
内容来源于网络,如有侵权,请联系作者删除!