thinking in java 05
线程之间的协作:
只能在同步方法或者同步控制块中调用wait() notify() notifyAll() 否则运行时抛出异常
线程调用sleep() yield()时 线程上的锁没有释放,调用wait()时 线程被挂起 锁释放
使用notify()而不是notifyAll()是一种优化
使用notify()的限制 :1 .所有任务必须等待相同的条件
2. 当条件变化时,必须只有一个任务从中受益
notifyAll()因某个特定锁调用时,只能等待这个锁的任务才会被唤醒
而这个锁与底层锁有关。
具体代码如下:
package Number_2103;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* notifyAll() 只能唤醒特定锁 wait()的线程
* 该特定锁与底层对象有关 ---->
*
* @author he
*
*/
class Blocker {
synchronized void waitingforCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.println(Thread.currentThread().hashCode() + "----");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
synchronized void prod() {
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
static Blocker b1;
public Task(Blocker b) {
b1 = b;
}
public synchronized void run() {
// TODO Auto-generated method stub
b1.waitingforCall();
}
}
class Task2 implements Runnable {
static Blocker b2;
public Task2(Blocker b) {
b2 = b;
}
public synchronized void run() {
// TODO Auto-generated method stub
b2.waitingforCall();
}
}
public class P707 {
public static void main(String[] args) throws Exception {
ExecutorService eService = Executors.newCachedThreadPool();
Blocker b = new Blocker();
Blocker b2=new Blocker();
eService.execute(new Task(b));
eService.execute(new Task2(b2));
Timer t = new Timer();
t.schedule(new TimerTask() {
boolean pro = true;
@Override
public void run() {
if (pro) {
Task.b1.prod();
pro = false;
} else {
Task.b1.prodAll();
pro = true;
}
}
}, 400, 400);
TimeUnit.SECONDS.sleep(5);
;
t.cancel();
System.out.println(" timer cancled");
TimeUnit.MICROSECONDS.sleep(500);
Task2.b2.prodAll();
TimeUnit.MICROSECONDS.sleep(5000);
System.out.println("Shutting down");
eService.shutdown();
}
}
代码分析:public void run() {
if (pro) {
Task.b1.prod();
pro = false;
} else {
Task.b1.prodAll();
pro = true;
}
}
这段代码中交替调用Task中Blocker对象b1的prod()方法和prodall()方法
但是
Blocker b = new Blocker();
Blocker b2=new Blocker();
eService.execute(new Task(b));
eService.execute(new Task2(b2));
由于Task()中穿的Blocker对象不同,所以此时b1.prodall()并不能唤醒task2对象
代码结果如下:
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
2059770520——
timer cancled
1824109660——
Shutting down
-———————————————————————————————————
如果将代码
Blocker b = new Blocker();
Blocker b2=new Blocker();
eService.execute(new Task(b));
eService.execute(new Task2(b2));
改为:
eService.execute(new Task(b));
eService.execute(new Task2(b));
代码结果如下:
2059770520——
2059770520——
1824109660——
2059770520——
2059770520——
1824109660——
2059770520——
2059770520——
1824109660——
2059770520——
2059770520——
1824109660——
2059770520——
2059770520——
1824109660——
2059770520——
2059770520——
1824109660——
timer cancled
1824109660——
2059770520——
Shutting down
-——————————————————————-
由于 task 和task2 中传的是相同的Blocker对象具有相同的底层锁
此时 此时b1.prodall()能唤醒task2对象
task2.b2.prodall()也能唤醒task对象
使用同步队列进行同步控制 LinkedBlockingQueue 无界 ArrayBlockingQueue 有固定大小
package Number_2103;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import Number_2103.Toast.Status;
/**
* 使用同步队列进行同步控制 LinkedBlockingQueue 无界 ArrayBlockingQueue 有固定大小
*
* @author he
*
*/
class Toast {
public enum Status {
DRY, BUTTERED, JAMMED
}
private static Status status = Status.DRY;
private final int id;
public Toast(int id) {
this.id = id;
}
public void butter() {
status = Status.BUTTERED;
}
public void jam() {
status = Status.JAMMED;
}
public int getId() {
return id;
}
public static Status getStatus() {
return status;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "Toast" + id + ":" + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> {
}
class Toaster implements Runnable {
ToastQueue toastDry;
private int count = 0;
public Toaster(ToastQueue t) {
toastDry = t;
}
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = new Toast(count++);
System.out.println(t);
toastDry.put(t);
}
} catch (Exception e) {
// TODO: handle exception
}
System.out.println("Toaster off");
}
}
class ToastButter implements Runnable {
ToastQueue dryToast, butterToast;
public ToastButter(ToastQueue dry, ToastQueue butter) {
dryToast = dry;
butterToast = butter;
}
public void run() {
try {
while (!Thread.interrupted()) {
// 获取并移除头部,在元素变得可用之前一直等待
Toast t = dryToast.take();
t.butter();
System.out.println(t);
butterToast.put(t);
}
} catch (Exception e) {
// TODO: handle exception
}
System.out.println("Butter off");
}
}
class ToastJam implements Runnable {
ToastQueue butt, jam;
public ToastJam(ToastQueue buf, ToastQueue j) {
butt = buf;
jam = j;
}
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = butt.take();
t.jam();
System.out.println(t);
jam.put(t);
}
} catch (Exception e) {
// TODO: handle exception
}
System.out.println("Jam off");
}
}
class Eater implements Runnable {
private ToastQueue fin;
private int count = 0;
public Eater(ToastQueue t) {
fin = t;
}
public void run() {
try {
while (!Thread.interrupted()) {
Toast t = fin.take();
if (t.getId() != count++ || t.getStatus() != Status.JAMMED) {
System.out.println("Error" + t);
System.exit(0);
}
else{
System.out.println("Chomp "+t);
}
}
} catch (Exception e) {
// TODO: handle exception
}
}
}
public class P716 {
public static void main(String[] args) throws Exception {
ExecutorService eService = Executors.newCachedThreadPool();
ToastQueue dryQueue = new ToastQueue();
ToastQueue butterQueue = new ToastQueue();
ToastQueue jamQueue = new ToastQueue();
eService.execute(new Toaster(dryQueue));
eService.execute(new ToastButter(dryQueue, butterQueue));
eService.execute(new ToastJam(butterQueue, jamQueue));
eService.execute(new Eater(jamQueue));
TimeUnit.MILLISECONDS.sleep(1);
eService.shutdownNow();
}
}
生产者与消费者:
package Number_2103;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* P709 生产者与消费者
*
* @author he
*
*/
class Meal {
private final int count;
public Meal(int count) {
this.count = count;
}
@Override
public String toString() {
return "Meal " + count;
}
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant r) {
restaurant = r;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null) {
wait();
}
}
System.out.println("WaitPerson got:" + restaurant.meal);
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll();
}
}
} catch (InterruptedException e) {
System.out.println("WaitPerson interruped");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) {
restaurant = r;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null) {
wait();
}
}
if (++count == 10) {
System.out.println("停止出售");
restaurant.eService.shutdownNow();
}
TimeUnit.MICROSECONDS.sleep(100);
System.out.print("Order up");
synchronized (restaurant.waitPerson) {
restaurant.meal = new Meal(count);
// 唤醒服务员线程
restaurant.waitPerson.notifyAll();
}
}
} catch (InterruptedException e) {
System.out.println("退出线程");
}
}
}
public class Restaurant {
Meal meal;
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
ExecutorService eService = Executors.newCachedThreadPool();
public Restaurant() {
eService.execute(chef);
eService.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
还没有评论,来说两句吧...