消费者
package consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Consumer implements Runnable {
/*
* 用util.concurrent.BlockingQueue沟通生产者和消费者的桥梁
*/
BlockingQueue<String> queue;
String id;
@SuppressWarnings("unused")
private volatile boolean isRunning = true;
public Consumer(BlockingQueue<String> queue, String id) {
this.queue = queue;
this.id = id;
}
public void stop() {
isRunning = false;
}
@Override
public void run() {
System.out.println("Thread: " + id + " Consumer thread is running...");
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("Thread: " + id + " fetch data from linkedQueue..." + " queue size: " + queue.size());
/*
* 从队列里取出一个元素,2秒超时,如果两秒之后还没有东西可以取,则poll返回null
*/
String data = queue.poll(2, TimeUnit.SECONDS);
if (null != data) {
System.out.println("Thread: " + id + " has consumed one data from queue: " + data
+ " Queue sise: " + queue.size());
// simulate data consumption
Thread.sleep(1000);
} else {
isRunning = false;
// 消费者准备退出
System.out.println("Thread: " + id + " Consumer read queue timeout");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("Thread: " + id + " consumer thread ends");
}
}
}
生产者
package consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable {
BlockingQueue<String> queue;
String id;
public Producer(BlockingQueue<String> queue, String id) {
this.queue = queue;
this.id = id;
}
@Override
public void run() {
String data = null;
try {
while (isRunning) {
System.out.println("PRODUCER: " + id + " is running");
Thread.sleep(100);
data = "data:" + count.incrementAndGet();
System.out.println("Thread: " + id + " procedued data into queue: " + data + " ...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("failed to put data into queue: " + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("Thread: " + id + " quit from producer thread");
}
}
public void stop() {
isRunning = false;
}
private volatile boolean isRunning = true;
private static AtomicInteger count = new AtomicInteger();
}
测试代码
package consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class ConsumerProducerTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(15);
Producer producer1 = new Producer(queue, "PROD1");
Producer producer2 = new Producer(queue, "PROD2");
Producer producer3 = new Producer(queue, "PROD3");
Consumer consumer1 = new Consumer(queue, "CONSUMER1");
Consumer consumer2 = new Consumer(queue, "CONSUMER2");
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1);
service.execute(consumer2);
Thread.sleep(3 * 1000);
producer1.stop(); // 一定要先关闭生产者
producer2.stop();
producer3.stop();
consumer1.stop();
consumer2.stop();
Thread.sleep(2000);
service.shutdown();
}
}