本文共 5856 字,大约阅读时间需要 19 分钟。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/** * @ClassName:Restraurant * @Description:何为生产者与消费者 * @author: * @date:2018年5月3日 */public class Restraurant { Meal m=null; Chef chef=new Chef(this); WaitPerson wait=new WaitPerson(this); ExecutorService service=Executors.newCachedThreadPool(); public Restraurant() { service.execute(chef); service.execute(wait); } public static void main(String[] args) { new Restraurant(); }}/** * @ClassName:Meal * @Description:生产者生成的数据 * @author: * @date:2018年5月3日 */class Meal{ private final int orderNum;//食物订单编号 public Meal(int num){ orderNum=num; } public String toString(){ return "Meal"+orderNum; }}/** * @ClassName:Chef * @Description:厨师类,及生产者 * @author: * @date:2018年5月3日 */class Chef implements Runnable{ Restraurant r; int count=0; public Chef(Restraurant r) { this.r=r; } @Override public void run() { try{ while(!Thread.interrupted()){ synchronized (this) { while(r.m!=null){ System.out.println("厨师等待中"); wait();//等待服务员取餐 } } if(count++==10){ System.out.println("今日已售完"); r.service.shutdownNow(); } System.out.println("订单完成,服务员取餐"); synchronized (r.wait) { r.m=new Meal(count); r.wait.notifyAll(); } TimeUnit.SECONDS.sleep(1); } }catch (InterruptedException e) { System.out.println("生产者线程强制中断"); } }}/** * @ClassName:WaitPerson * @Description:服务员类,即消费者 * @author: * @date:2018年5月3日 */class WaitPerson implements Runnable{ Restraurant r; public WaitPerson(Restraurant r) { this.r=r; } @Override public void run() { try { while (!Thread.interrupted()) { synchronized (this) { while (r.m == null) { System.out.println("服务员等待中"); wait();// 等待厨师生成食物 } } System.out.println("服务员以取餐" + r.m); synchronized (r.chef) { r.m = null; r.chef.notifyAll(); } } } catch (InterruptedException e) { System.out.println("消费者线程强制中断"); } } }
1)产生原因:在多线程开发 中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理 完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须 等待生产者。wait与notify方法以一种非常低级的方式解决了任务互相通知的问题,即每次交互都要进行一次握手,极大影响的效率以及性能,为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。
2)原理:生产者和消费者模式是通过一个容器(比如同步阻塞队列)来解决生产者和消费者的强耦合问题。生产者和消 费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用 等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。java.util.concurrent.BlockingQueue接口提供了这个队列,通常使用其实现子类ArrayBlockingQueue,LinkedBlockingQueue。当消费者任务试图从同步队列中获取对象,如果队列为空时,那么队列则会挂起消费者任务,并且当拥有足够多的元素可用时才会恢复消费者任务。
import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class UseBlockingQueue { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueuedry=new LinkedBlockingQueue (), butter=new LinkedBlockingQueue (), jam=new LinkedBlockingQueue (), con=new LinkedBlockingQueue (); ExecutorService exec=Executors.newCachedThreadPool(); exec.execute(new MakeToast(dry));//制作初始吐司任务 exec.execute(new Butter(dry,butter));//吐司抹黄油任务 exec.execute(new Jam(butter,jam));//吐司抹果酱任务 exec.execute(new Consumer(jam));//消费者任务,食用吐司 TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); }}class Toast{ private int status;//吐司状态:0代表制作吐司,1代表抹黄油,2代表向抹了黄油的吐司抹果酱 private final int id; public Toast(int id1) { id=id1; } public void butter(){ status=1; }; public void jam(){ status=2; } public int getStatus(){ return status; } public int getId(){ return id; } public String toString(){ return "toast "+id+":"+status; }}/** * @Description:制作初始吐司 */class MakeToast implements Runnable{ private LinkedBlockingQueue queue=new LinkedBlockingQueue (); private int count=0; public MakeToast(LinkedBlockingQueue q) { queue=q; } @Override public void run() { try{ while(!Thread.interrupted()){ Thread.sleep(1000);//制作时间 Toast t=new Toast(count); System.out.println(t); queue.put(t);//添加到同步队列 count++; } }catch (InterruptedException e) { System.out.println("make process interrupted"); } System.out.println("make process off"); }}/** * @Description:涂抹黄油 */class Butter implements Runnable{ private LinkedBlockingQueue queue1,queue2;//未加料吐司队列,抹黄油后吐司队列 public Butter(LinkedBlockingQueue q1,LinkedBlockingQueue q2) { queue1=q1; queue2=q2; } @Override public void run() { try{ while(!Thread.interrupted()){ Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加 t.butter(); System.out.println(t); queue2.put(t); } }catch (InterruptedException e) { System.out.println("butter process interrupted"); } System.out.println("butter process off"); }}/** * @Description:涂抹果酱 */class Jam implements Runnable{ private LinkedBlockingQueue queue1,queue2;//抹黄油后吐司队列,抹果酱吐司队列 public Jam(LinkedBlockingQueue q1,LinkedBlockingQueue q2) { queue1=q1; queue2=q2; } @Override public void run() { try{ while(!Thread.interrupted()){ Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加 t.jam(); System.out.println(t); queue2.put(t); } }catch (InterruptedException e) { System.out.println("jam process interrupted"); } System.out.println("jam process off"); }}/** * @Description:被食用 */class Consumer implements Runnable{ private LinkedBlockingQueue finished;//抹黄油后吐司队列,抹果酱吐司队列 int count=0; public Consumer(LinkedBlockingQueue q) { finished=q; } @Override public void run() { try{ while(!Thread.interrupted()){ Toast t=finished.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加 if(t.getId()!=count++||t.getStatus()!=2){ System.out.println("过程出现错误"); return; }else{ System.out.println("所有过程正确实现"+"toast "+t.getId()+"被食用"); } } }catch (InterruptedException e) { System.out.println("eat process interrupted"); } System.out.println("eat process off"); }}
转载地址:http://ngcta.baihongyu.com/