Guarded Suspension模式
Guarded Suspension意为保护暂停,其核心思想是仅当服务进程准备好时,才提供服务。设想一种场景,服务器可能会在很短时间内承受大量的客户端请求,客户端请求的数量可能超过服务器本身的即时处理能力,而服务端程序又不能丢弃任何一个客户请求。此时,最佳的处理方案莫过于让客户端请求进行排队,由服务端程序一个接一个处理。这样,既保证了所有的客户端请求均不丢失,同时也避免了服务器由于同时处理太多请求而崩溃。
1. Guarded Suspension模式的结构
public class Request {
private String name;
// 模拟请求内容
public Request(String name) {
super();
this.name = name;
}
public String getName() {
return name;
}
@Override
public String toString() {
return "Request [name=" + name + "]";
}
}
public class RequestQueue {
private LinkedList<Request> queue = new LinkedList<>();
public synchronized Request getRequest() {
while(queue.size() == 0) {
try {
wait();// 等待直到有新的Request加入
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return queue.remove();// 返回Request队列中的第一个请求
}
public synchronized void addRequest(Request request) {
queue.add(request);// 加入新的Request请求
notifyAll();// 通知getRequest()方法,唤醒正在等待对象监视器的所有线程。 线程通过调用wait方法之一等待对象的监视器。
}
}
public class ServerThread extends Thread {
private RequestQueue requestQueue;// 请求队列
public ServerThread(RequestQueue requestQueue, String name) {
super(name);
this.requestQueue = requestQueue;
}
public void run() {
while(true) {
final Request request = requestQueue.getRequest();// 得到请求
try {
Thread.sleep(100);// 模拟请求处理耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "handles" + request);
}
}
}
public class ClientThread extends Thread {
private RequestQueue requestQueue;// 请求队列
public ClientThread(RequestQueue requestQueue, String name) {
super(name);
this.requestQueue = requestQueue;
}
public void run() {
for(int i=0; i<10; i++) {
// 构造请求
Request request = new Request("RequestID: " + i + " Thread_Name:"+Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName()+" requests " + request);
requestQueue.addRequest(request);// 提交请求
try {
Thread.sleep(10);// 客户端请求的速度
} catch (InterruptedException e) {
e.printStackTrace();//
}
System.out.println("ClientThread Name is " + Thread.currentThread().getName());
}
System.out.println(Thread.currentThread().getName() + " request end");
}
public static void main(String[] args) {
RequestQueue requestQueue = new RequestQueue();
for(int i=0; i<10; i++)
new ServerThread(requestQueue, "ServerThread"+i).start();// 服务器进程开启
for(int i=0; i<10; i++)
new ClientThread(requestQueue, "ChientThread"+i).start();// 请求进程开启
}
}
在main函数中,开启了10个Client进程和10个Server处理进程。由于Client进程的请求数高于Server的处理速度,因此RequestQueue发挥了中间缓存的作用。 从结果可以看出,所有的ClientThread陆续运行结束,但是RequestQueue中仍有大量的请求,于是ServerThread便陆续工作,知道所有的Request请求均得到处理,客户端的请求没有丢失。
2.携带返回结果的Guarded Suspension
前面提到的Guarded Suspension模式虽然使用了用户请求列表,从而有序地对客户的请求进行处理。但是,客户进程的Request不能获得服务进程的返回结果。 当客户进程必须使用服务进程的返回值时,这个结构就无法胜任了。因为,客户进程不知道服务进程何时可以处理这个请求,也不知道需要处理多久。对此,需要对它进行加强。结合前文提到的Future模式,便很容易对Guarded Suspension模式进行扩展,构造一个可以携带返回值的Guarded Suspension。
public interface Data {
public String getResult();
}
public class RealData implements Data{
protected final String result;
public RealData(String para) {
StringBuffer sb = new StringBuffer();
for(int i=0; i<10; i++) {
sb.append(para);
try {
// 这里使用sleep来代替逻辑处理
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
result = sb.toString();
}
public String getResult() {
return result;
}
}
public class FutureData implements Data{
protected RealData realData = null;// FutureData是RealDta的包装
protected boolean isReady = false;
public synchronized void setRealData(RealData realData) {
if (isReady) {
return;
}
this.realData = realData;
isReady = true;
notifyAll();// realData已经被注入,通知getResult()
}
// 会等待RealData构造完成
public synchronized String getResult() {
while(!isReady) {
try {
wait();// 一直等待,直到RealData被注入
} catch (InterruptedException e) {
}
}
return realData.result;// 由RealData实现
}
}
public class Request {
private String name;
private Data response;// 请求的返回值
// 模拟请求内容
public Request(String name) {
super();
this.name = name;
}
public String getName() {
return name;
}
public synchronized Data getResponse() {
return response;
}
public synchronized void setResponse(Data response) {
this.response = response;
}
@Override
public String toString() {
return "Request [name=" + name + "]";
}
}
public class RequestQueue {
private LinkedList<Request> queue = new LinkedList<>();
public synchronized Request getRequest() {
while(queue.size() == 0) {
try {
wait();// 等待直到有新的Request加入
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return queue.remove();// 返回Request队列中的第一个请求
}
public synchronized void addRequest(Request request) {
queue.add(request);// 加入新的Request请求
notifyAll();// 通知getRequest()方法
}
}
public class ServerThread extends Thread {
private RequestQueue requestQueue;// 请求队列
public ServerThread(RequestQueue requestQueue, String name) {
super(name);
this.requestQueue = requestQueue;
}
public void run() {
while(true) {
final Request request = requestQueue.getRequest();// 得到请求
final FutureData future = (FutureData) request.getResponse();
// RealData的创建比较耗时
RealData realData = new RealData(request.getName());
future.setRealData(realData);
System.out.println(Thread.currentThread().getName() + " handles " + request);
}
}
}
public class ClientThread extends Thread {
private RequestQueue requestQueue;// 请求队列
private List<Request> myRequest = new ArrayList<Request>();
public ClientThread(RequestQueue requestQueue, String name) {
super(name);
this.requestQueue = requestQueue;
}
public void run() {
for(int i=0; i<10; i++) {
// 构造请求
Request request = new Request("RequestID: " + i + " Thread_Name:"+Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName()+" request is " + request);
request.setResponse(new FutureData());
requestQueue.addRequest(request);// 提交请求
myRequest.add(request);
// 这里可以做一些额外的业务处理,等待服务端装配数据
try {
Thread.sleep(1000);// 客户端请求的速度
} catch (InterruptedException e) {
e.printStackTrace();//
}
for(Request r : myRequest) {
System.out.println("ClientThread Name is " + Thread.currentThread().getName() +" Response is" +
r.getResponse().getResult());
}
}
System.out.println(Thread.currentThread().getName() + " request end");
}
public static void main(String[] args) {
RequestQueue requestQueue = new RequestQueue();
for(int i=0; i<1; i++)
new ServerThread(requestQueue, "ServerThread"+i).start();// 服务器进程开启
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("客户端发起请求...");
List<Request> requests = new ArrayList<>();
for(int i=0; i<1; i++){
new ClientThread(requestQueue, "ChientThread"+i).start();;// 请求进程开启
}
}
转载:https://blog.csdn.net/bas_ball/article/details/82625550
还没有评论,来说两句吧...