博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程间的协作(2)——生产者与消费者模式
阅读量:6291 次
发布时间:2019-06-22

本文共 5856 字,大约阅读时间需要 19 分钟。

1.何为生产者与消费者

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。

import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;/** * @ClassName:Restraurant * @Description:何为生产者与消费者 * @author:  * @date:2018年5月3日 */public class Restraurant {	Meal m=null;	Chef chef=new Chef(this);	WaitPerson wait=new WaitPerson(this);	ExecutorService service=Executors.newCachedThreadPool();	public Restraurant() {		service.execute(chef);		service.execute(wait);	}	public static void main(String[] args) {		new Restraurant();	}}/** * @ClassName:Meal * @Description:生产者生成的数据 * @author:  * @date:2018年5月3日 */class Meal{	private final int orderNum;//食物订单编号	public Meal(int num){		orderNum=num;	}	public String toString(){		return "Meal"+orderNum;	}}/** * @ClassName:Chef * @Description:厨师类,及生产者 * @author:  * @date:2018年5月3日 */class Chef implements Runnable{	Restraurant r;	int count=0;	public Chef(Restraurant r) {		this.r=r;	}	@Override	public void run() {		try{			while(!Thread.interrupted()){				synchronized (this) {					while(r.m!=null){						System.out.println("厨师等待中");						wait();//等待服务员取餐					}				}				if(count++==10){					System.out.println("今日已售完");					r.service.shutdownNow();				}				System.out.println("订单完成,服务员取餐");				synchronized (r.wait) {					r.m=new Meal(count);					r.wait.notifyAll();									}				TimeUnit.SECONDS.sleep(1);			}		}catch (InterruptedException e) {			System.out.println("生产者线程强制中断");		}			}}/** * @ClassName:WaitPerson * @Description:服务员类,即消费者 * @author:  * @date:2018年5月3日 */class WaitPerson implements Runnable{	Restraurant r;	public WaitPerson(Restraurant r) {		this.r=r;	}	@Override	public void run() {		try {			while (!Thread.interrupted()) {				synchronized (this) {					while (r.m == null) {						System.out.println("服务员等待中");						wait();// 等待厨师生成食物					}				}				System.out.println("服务员以取餐" + r.m);				synchronized (r.chef) {					r.m = null;					r.chef.notifyAll();				}			}		} catch (InterruptedException e) {			System.out.println("消费者线程强制中断");		}			}	}

2.生产者与消费者模式

    1)产生原因:在多线程开发 中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理 完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须 等待生产者。wait与notify方法以一种非常低级的方式解决了任务互相通知的问题,即每次交互都要进行一次握手,极大影响的效率以及性能,为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

    2)原理:生产者和消费者模式是通过一个容器(比如同步阻塞队列)来解决生产者和消费者的强耦合问题。生产者和消 费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用 等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。java.util.concurrent.BlockingQueue接口提供了这个队列,通常使用其实现子类ArrayBlockingQueue,LinkedBlockingQueue。当消费者任务试图从同步队列中获取对象,如果队列为空时,那么队列则会挂起消费者任务,并且当拥有足够多的元素可用时才会恢复消费者任务。

import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class UseBlockingQueue {	public static void main(String[] args) throws InterruptedException {		LinkedBlockingQueue
dry=new LinkedBlockingQueue
(), butter=new LinkedBlockingQueue
(), jam=new LinkedBlockingQueue
(), con=new LinkedBlockingQueue
(); ExecutorService exec=Executors.newCachedThreadPool(); exec.execute(new MakeToast(dry));//制作初始吐司任务 exec.execute(new Butter(dry,butter));//吐司抹黄油任务 exec.execute(new Jam(butter,jam));//吐司抹果酱任务 exec.execute(new Consumer(jam));//消费者任务,食用吐司 TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); }}class Toast{ private int status;//吐司状态:0代表制作吐司,1代表抹黄油,2代表向抹了黄油的吐司抹果酱 private final int id; public Toast(int id1) { id=id1; } public void butter(){ status=1; }; public void jam(){ status=2; } public int getStatus(){ return status; } public int getId(){ return id; } public String toString(){ return "toast "+id+":"+status; }}/** * @Description:制作初始吐司 */class MakeToast implements Runnable{ private LinkedBlockingQueue
queue=new LinkedBlockingQueue
(); private int count=0; public MakeToast(LinkedBlockingQueue
q) { queue=q; } @Override public void run() { try{ while(!Thread.interrupted()){ Thread.sleep(1000);//制作时间 Toast t=new Toast(count); System.out.println(t); queue.put(t);//添加到同步队列 count++; } }catch (InterruptedException e) { System.out.println("make process interrupted"); } System.out.println("make process off"); }}/** * @Description:涂抹黄油 */class Butter implements Runnable{ private LinkedBlockingQueue
queue1,queue2;//未加料吐司队列,抹黄油后吐司队列 public Butter(LinkedBlockingQueue
q1,LinkedBlockingQueue
q2) { queue1=q1; queue2=q2; } @Override public void run() { try{ while(!Thread.interrupted()){ Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加 t.butter(); System.out.println(t); queue2.put(t); } }catch (InterruptedException e) { System.out.println("butter process interrupted"); } System.out.println("butter process off"); }}/** * @Description:涂抹果酱 */class Jam implements Runnable{ private LinkedBlockingQueue
queue1,queue2;//抹黄油后吐司队列,抹果酱吐司队列 public Jam(LinkedBlockingQueue
q1,LinkedBlockingQueue
q2) { queue1=q1; queue2=q2; } @Override public void run() { try{ while(!Thread.interrupted()){ Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加 t.jam(); System.out.println(t); queue2.put(t); } }catch (InterruptedException e) { System.out.println("jam process interrupted"); } System.out.println("jam process off"); }}/** * @Description:被食用 */class Consumer implements Runnable{ private LinkedBlockingQueue
finished;//抹黄油后吐司队列,抹果酱吐司队列 int count=0; public Consumer(LinkedBlockingQueue
q) { finished=q; } @Override public void run() { try{ while(!Thread.interrupted()){ Toast t=finished.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加 if(t.getId()!=count++||t.getStatus()!=2){ System.out.println("过程出现错误"); return; }else{ System.out.println("所有过程正确实现"+"toast "+t.getId()+"被食用"); } } }catch (InterruptedException e) { System.out.println("eat process interrupted"); } System.out.println("eat process off"); }}

 

转载地址:http://ngcta.baihongyu.com/

你可能感兴趣的文章
索引失效 ORA-01502
查看>>
Oracle取月份,不带前面的0
查看>>
Linux Network Device Name issue
查看>>
IP地址的划分实例解答
查看>>
如何查看Linux命令源码
查看>>
运维基础命令
查看>>
入门到进阶React
查看>>
SVN 命令笔记
查看>>
检验手机号码
查看>>
重叠(Overlapped)IO模型
查看>>
Git使用教程
查看>>
使用shell脚本自动监控后台进程,并能自动重启
查看>>
Flex&Bison手册
查看>>
solrCloud+tomcat+zookeeper集群配置
查看>>
/etc/fstab,/etc/mtab,和 /proc/mounts
查看>>
Apache kafka 简介
查看>>
socket通信Demo
查看>>
技术人员的焦虑
查看>>
js 判断整数
查看>>
mongodb $exists
查看>>