Java 并发工具类:深入理解 CountDownLatch 等
一、并发工具类简介
Java并发API提供了多种工具类来帮助开发者处理并发问题,CountDownLatch
是其中之一,用于同步一个或多个线程的进度。
二、CountDownLatch 的使用
CountDownLatch
允许一个或多个线程等待一组操作完成。
import java.util.concurrent.CountDownLatch;
public class LatchExample {
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
new Thread(new Worker(latch, i)).start();
}
try {
latch.await(); // 等待所有线程完成
System.out.println("All tasks completed.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class Worker implements Runnable {
private final CountDownLatch latch;
private final int id;
Worker(CountDownLatch latch, int id) {
this.latch = latch;
this.id = id;
}
@Override
public void run() {
try {
// 模拟工作
Thread.sleep((long) (Math.random() * 10000));
System.out.println("Worker " + id + " completed.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数减一
}
}
}
}
三、CyclicBarrier 的使用
CyclicBarrier
与CountDownLatch
类似,但它可以重复使用,用于等待一组线程在某个点上达到一致状态。
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
public class CyclicBarrierExample {
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
int workerCount = 5;
CyclicBarrier barrier = new CyclicBarrier(workerCount);
for (int i = 0; i < workerCount; i++) {
new Thread(new Worker(barrier, i)).start();
}
}
static class Worker implements Runnable {
private final CyclicBarrier barrier;
private final int id;
Worker(CyclicBarrier barrier, int id) {
this.barrier = barrier;
this.id = id;
}
@Override
public void run() {
try {
// 模拟工作
Thread.sleep((long) (Math.random() * 10000));
System.out.println("Worker " + id + " waiting at the barrier.");
barrier.await(); // 等待其他线程
System.out.println("Worker " + id + " passed the barrier.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
四、Semaphore 的使用
Semaphore
是一种基于计数的同步工具,用于控制同时访问某个特定资源的线程数量。
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
new Thread(new Worker(semaphore, i)).start();
}
}
static class Worker implements Runnable {
private final Semaphore semaphore;
private final int id;
Worker(Semaphore semaphore, int id) {
this.semaphore = semaphore;
this.id = id;
}
@Override
public void run() {
try {
semaphore.acquire(); // 获取许可
System.out.println("Worker " + id + " is working.");
Thread.sleep((long) (Math.random() * 5000));
semaphore.release(); // 释放许可
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
五、Exchanger 的使用
Exchanger
用于两个线程之间的数据交换。
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
System.out.println("Thread 1 is ready to exchange.");
String data1 = "Hello";
System.out.println("Thread 1 exchanged: " + data1);
try {
String newData = exchanger.exchange(data1);
System.out.println("Thread 1 received: " + newData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
System.out.println("Thread 2 is ready to exchange.");
String data2 = "World";
System.out.println("Thread 2 exchanged: " + data2);
try {
String newData = exchanger.exchange(data2);
System.out.println("Thread 2 received: " + newData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
六、并发工具类的使用场景
并发工具类适用于各种并发场景,如任务协调、资源共享、线程通信等。
七、并发工具类的选择
根据具体的并发需求选择合适的并发工具类,以实现高效且安全的并发控制。