concurrent 工具类
一、CountDownLatch
经常用于监听某些初始化操作,等初始化执行完毕后,通知主线程继续工作。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest extends Thread {
private final static CountDownLatch countDown = new CountDownLatch(2); // (1)
@Override
public void run() {
// 唤醒线程线程
countDown.countDown(); // (2)
System.out.println(Thread.currentThread().getName() + "执行完毕...");
}
public static void main(String[] args) {
new Thread(new CountDownLatchTest()).start();
new Thread(new CountDownLatchTest()).start();
try {
Thread.sleep(1000);
countDown.await(); // (3)
System.out.println(Thread.currentThread().getName() + "继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、CyclicBarrier
假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个没有准备了,大家都等待。
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UseCyclicBarrier {
static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(name + " 准备OK.");
barrier.await(); //(1)
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(2); // (2)
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(new Thread(new Runner(barrier, "Thread-1")));
executor.submit(new Thread(new Runner(barrier, "Thread-2")));
executor.shutdown();
}
}
三、Future
四、Semaphore
Semaphore 信号量非常适合高并发访问。
public class UseSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5); // (1)
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire(); // (2)
System.out.println("Accessing: " + NO);
//模拟实际业务逻辑
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release(); // (3)
} catch (InterruptedException e) {
;
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(semp.getQueueLength());
// 退出线程池
exec.shutdown();
}
}
补充:
对系统进行峰值评估,采用所谓的80/20原则,即80%的请求20%的时间到达:
QRS = (PV * 80%) / (24 * 60 * 60 * 20%)
五、ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest implements Runnable {
private Lock lock = new ReentrantLock(); // (1)
public void run(){
try {
lock.lock(); // (2)
System.out.println(Thread.currentThread().getName() + "进入..");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "退出..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // (3)
}
}
public static void main(String[] args) throws InterruptedException {
final ReentrantLockTest ur = new ReentrantLockTest();
for (int i = 0; i < 10; i++) {
new Thread(ur).start();
}
}
}
如果文章对您有帮助,欢迎移至上方按钮打赏,非常感谢你的支持!
全部评论