问题描述
实现经典同步问题:生产者—消费者,具体要求如下:
① 一个大小为 n的缓冲区,初始状态为空。
② 生产者: 往缓冲区中添加数据,若缓冲区已满,等待消费者取走数据之后再添加
③ 消费者: 从缓冲区中读取数据,若缓冲区为空,等待生产者添加数据之后再读取
下面为java代码实现
设定
- n赋值为10
static final int n=10;
- 往缓冲区中添加的数据为buffer[in]的下标in
buffer[in]= ""+in;
简单实现
package os.prioducer_consumer;
import java.util.concurrent.Semaphore;
public class ProcessTest1 {
static final int n=10;
static int in=0,out=0;
static String[] buffer=new String[n];
static Semaphore mutex=new Semaphore(1,false);
static Semaphore empty=new Semaphore(n,false);
static Semaphore full=new Semaphore(0,false);
public static void main(String[] args) {
Thread producer=new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
empty.acquire();
mutex.acquire();
System.out.println(Thread.currentThread().getName()+"放入"+in);
buffer[in]= ""+in;
in=(in+1)%n;
Thread.sleep(1000);//休眠表示放入的过程
mutex.release();
full.release();
}catch (InterruptedException e) {
System.out.println("生产者获取资源失败!");
e.printStackTrace();
}
}
}
});
Thread consumer=new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
full.acquire();
mutex.acquire();
String o=buffer[out];
System.out.println(Thread.currentThread().getName()+"取出"+o);
Thread.sleep(1000);//休眠表示放入的过程
out=(out+1)%n;
mutex.release();
empty.release();
}catch (InterruptedException e) {
System.out.println("消费者获取资源失败!");
e.printStackTrace();
}
}
}
});
producer.setName("producer");
consumer.setName("consumer");
producer.start();
consumer.start();
}
}
运行结果
producer放入0
producer放入1
producer放入2
producer放入3
producer放入4
producer放入5
consumer取出0
consumer取出1
consumer取出2
consumer取出3
consumer取出4
consumer取出5
producer放入6
producer放入7
producer放入8
consumer取出6
consumer取出7
consumer取出8
producer放入9
producer放入0
consumer取出9
producer放入1
producer放入2
producer放入3
producer放入4
producer放入5
producer放入6
producer放入7
consumer取出0
consumer取出1
producer放入8
如果需要增加生产者和消费者的数量,只能复制一份,增加变量,比较麻烦
所以,下面封装为类。
Thread producer1=new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
empty.acquire();
mutex.acquire();
System.out.println(Thread.currentThread().getName()+"放入"+in);
buffer[in]= ""+in;
in=(in+1)%n;
Thread.sleep(1000);//休眠表示放入的过程
mutex.release();
full.release();
}catch (InterruptedException e) {
System.out.println("生产者获取资源失败!");
e.printStackTrace();
}
}
}
});
Thread consumer1=new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
full.acquire();
mutex.acquire();
String o=buffer[out];
System.out.println(Thread.currentThread().getName()+"取出"+o);
Thread.sleep(1000);//休眠表示放入的过程
out=(out+1)%n;
mutex.release();
empty.release();
}catch (InterruptedException e) {
System.out.println("消费者获取资源失败!");
e.printStackTrace();
}
}
}
});
producer1.setName("producer1");
consumer1.setName("consumer1");
producer1.start();
consumer1.start();
运行结果
producer放入0
producer放入1
producer放入2
producer放入3
producer放入4
producer放入5
producer放入6
producer放入7
producer放入8
producer1放入9
consumer取出0
consumer取出1
consumer取出2
consumer取出3
consumer取出4
consumer取出5
consumer取出6
consumer1取出7
producer放入0
producer放入1
producer放入2
producer1放入3
consumer取出8
consumer1取出9
consumer1取出0
consumer1取出1
封装为类和对象
package os.prioducer_consumer;
import java.util.concurrent.Semaphore;
class MyProcess {
static final int n=5;
static int in = 0, out = 0;
static String[] buffer = new String[n];
static Semaphore mutex = new Semaphore(1, false);
static Semaphore empty = new Semaphore(n, false);
static Semaphore full = new Semaphore(0, false);
public void producerFunc() {
try {
empty.acquire();
mutex.acquire();
System.out.println(Thread.currentThread().getName()+"放入" + in);
buffer[in] = "" + in;
in = (in + 1) % n;
Thread.sleep(1000);//休眠表示放入的过程
mutex.release();
full.release();
} catch (InterruptedException e) {
System.out.println("生产者获取资源失败!");
e.printStackTrace();
}
}
public void consumerFunc() {
try {
full.acquire();
mutex.acquire();
String o = buffer[out];
System.out.println(Thread.currentThread().getName()+"取出" + o);
Thread.sleep(1000);//休眠表示放入的过程
out = (out + 1) % n;
mutex.release();
empty.release();
} catch (InterruptedException e) {
System.out.println("消费者获取资源失败!");
e.printStackTrace();
}
}
}
class Producer extends Thread {
private MyProcess myProcess;
public Producer(MyProcess myProcess) {
this.myProcess = myProcess;
}
@Override
public void run() {
while (true){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
myProcess.producerFunc();
}
}
}
class Consumer extends Thread{
private MyProcess myProcess;
public Consumer(MyProcess myProcess) {
this.myProcess = myProcess;
}
@Override
public void run() {
while (true){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
myProcess.consumerFunc();
}
}
}
public class ProcessTest2 {
public static void main(String[] args) {
MyProcess myProcess=new MyProcess();
Producer producer = new Producer(myProcess);
Producer producer1 = new Producer(myProcess);
Consumer consumer = new Consumer(myProcess);
Consumer consumer1 = new Consumer(myProcess);
producer.setName("producer");
producer1.setName("producer1");
consumer.setName("consumer");
consumer1.setName("consumer1");
producer.start();
producer1.start();
consumer.start();
consumer1.start();
}
}
运行结果
producer放入0
producer1放入1
consumer1取出0
producer放入2
consumer取出1
producer1放入3
consumer取出2
consumer1取出3
producer放入4
producer1放入5
consumer取出4
producer放入6
consumer1取出5
producer1放入7
consumer取出6
producer放入8
consumer1取出7
producer1放入9
consumer取出8
producer放入0
consumer1取出9
producer1放入1
consumer取出0
producer放入2
consumer1取出1
producer1放入3
consumer取出2
如果需要增加生产者和消费者的数量,只能需新建变量即可
Producer producer1 = new Producer(myProcess);
Consumer consumer1 = new Consumer(myProcess);
producer1.setName("producer1");
consumer1.setName("consumer1");
producer1.start();
consumer1.start();