Guarded Suspension模式

喜欢ヅ旅行 2022-03-07 18:10 283阅读 0赞

Guarded Suspension意为保护暂停,其核心思想是仅当服务进程准备好时,才提供服务。设想一种场景,服务器可能会在很短时间内承受大量的客户端请求,客户端请求的数量可能超过服务器本身的即时处理能力,而服务端程序又不能丢弃任何一个客户请求。此时,最佳的处理方案莫过于让客户端请求进行排队,由服务端程序一个接一个处理。这样,既保证了所有的客户端请求均不丢失,同时也避免了服务器由于同时处理太多请求而崩溃。

1. Guarded Suspension模式的结构

  1. public class Request {
  2. private String name;
  3. // 模拟请求内容
  4. public Request(String name) {
  5. super();
  6. this.name = name;
  7. }
  8. public String getName() {
  9. return name;
  10. }
  11. @Override
  12. public String toString() {
  13. return "Request [name=" + name + "]";
  14. }
  15. }
  16. public class RequestQueue {
  17. private LinkedList<Request> queue = new LinkedList<>();
  18. public synchronized Request getRequest() {
  19. while(queue.size() == 0) {
  20. try {
  21. wait();// 等待直到有新的Request加入
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. return queue.remove();// 返回Request队列中的第一个请求
  27. }
  28. public synchronized void addRequest(Request request) {
  29. queue.add(request);// 加入新的Request请求
  30. notifyAll();// 通知getRequest()方法,唤醒正在等待对象监视器的所有线程。 线程通过调用wait方法之一等待对象的监视器。
  31. }
  32. }
  33. public class ServerThread extends Thread {
  34. private RequestQueue requestQueue;// 请求队列
  35. public ServerThread(RequestQueue requestQueue, String name) {
  36. super(name);
  37. this.requestQueue = requestQueue;
  38. }
  39. public void run() {
  40. while(true) {
  41. final Request request = requestQueue.getRequest();// 得到请求
  42. try {
  43. Thread.sleep(100);// 模拟请求处理耗时
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. System.out.println(Thread.currentThread().getName() + "handles" + request);
  48. }
  49. }
  50. }
  51. public class ClientThread extends Thread {
  52. private RequestQueue requestQueue;// 请求队列
  53. public ClientThread(RequestQueue requestQueue, String name) {
  54. super(name);
  55. this.requestQueue = requestQueue;
  56. }
  57. public void run() {
  58. for(int i=0; i<10; i++) {
  59. // 构造请求
  60. Request request = new Request("RequestID: " + i + " Thread_Name:"+Thread.currentThread().getName());
  61. System.out.println(Thread.currentThread().getName()+" requests " + request);
  62. requestQueue.addRequest(request);// 提交请求
  63. try {
  64. Thread.sleep(10);// 客户端请求的速度
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();//
  67. }
  68. System.out.println("ClientThread Name is " + Thread.currentThread().getName());
  69. }
  70. System.out.println(Thread.currentThread().getName() + " request end");
  71. }
  72. public static void main(String[] args) {
  73. RequestQueue requestQueue = new RequestQueue();
  74. for(int i=0; i<10; i++)
  75. new ServerThread(requestQueue, "ServerThread"+i).start();// 服务器进程开启
  76. for(int i=0; i<10; i++)
  77. new ClientThread(requestQueue, "ChientThread"+i).start();// 请求进程开启
  78. }
  79. }

在main函数中,开启了10个Client进程和10个Server处理进程。由于Client进程的请求数高于Server的处理速度,因此RequestQueue发挥了中间缓存的作用。 从结果可以看出,所有的ClientThread陆续运行结束,但是RequestQueue中仍有大量的请求,于是ServerThread便陆续工作,知道所有的Request请求均得到处理,客户端的请求没有丢失。

2.携带返回结果的Guarded Suspension

前面提到的Guarded Suspension模式虽然使用了用户请求列表,从而有序地对客户的请求进行处理。但是,客户进程的Request不能获得服务进程的返回结果。 当客户进程必须使用服务进程的返回值时,这个结构就无法胜任了。因为,客户进程不知道服务进程何时可以处理这个请求,也不知道需要处理多久。对此,需要对它进行加强。结合前文提到的Future模式,便很容易对Guarded Suspension模式进行扩展,构造一个可以携带返回值的Guarded Suspension。

  1. public interface Data {
  2. public String getResult();
  3. }
  4. public class RealData implements Data{
  5. protected final String result;
  6. public RealData(String para) {
  7. StringBuffer sb = new StringBuffer();
  8. for(int i=0; i<10; i++) {
  9. sb.append(para);
  10. try {
  11. // 这里使用sleep来代替逻辑处理
  12. Thread.sleep(100);
  13. } catch (InterruptedException e) {
  14. }
  15. }
  16. result = sb.toString();
  17. }
  18. public String getResult() {
  19. return result;
  20. }
  21. }
  22. public class FutureData implements Data{
  23. protected RealData realData = null;// FutureData是RealDta的包装
  24. protected boolean isReady = false;
  25. public synchronized void setRealData(RealData realData) {
  26. if (isReady) {
  27. return;
  28. }
  29. this.realData = realData;
  30. isReady = true;
  31. notifyAll();// realData已经被注入,通知getResult()
  32. }
  33. // 会等待RealData构造完成
  34. public synchronized String getResult() {
  35. while(!isReady) {
  36. try {
  37. wait();// 一直等待,直到RealData被注入
  38. } catch (InterruptedException e) {
  39. }
  40. }
  41. return realData.result;// 由RealData实现
  42. }
  43. }
  44. public class Request {
  45. private String name;
  46. private Data response;// 请求的返回值
  47. // 模拟请求内容
  48. public Request(String name) {
  49. super();
  50. this.name = name;
  51. }
  52. public String getName() {
  53. return name;
  54. }
  55. public synchronized Data getResponse() {
  56. return response;
  57. }
  58. public synchronized void setResponse(Data response) {
  59. this.response = response;
  60. }
  61. @Override
  62. public String toString() {
  63. return "Request [name=" + name + "]";
  64. }
  65. }
  66. public class RequestQueue {
  67. private LinkedList<Request> queue = new LinkedList<>();
  68. public synchronized Request getRequest() {
  69. while(queue.size() == 0) {
  70. try {
  71. wait();// 等待直到有新的Request加入
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. return queue.remove();// 返回Request队列中的第一个请求
  77. }
  78. public synchronized void addRequest(Request request) {
  79. queue.add(request);// 加入新的Request请求
  80. notifyAll();// 通知getRequest()方法
  81. }
  82. }
  83. public class ServerThread extends Thread {
  84. private RequestQueue requestQueue;// 请求队列
  85. public ServerThread(RequestQueue requestQueue, String name) {
  86. super(name);
  87. this.requestQueue = requestQueue;
  88. }
  89. public void run() {
  90. while(true) {
  91. final Request request = requestQueue.getRequest();// 得到请求
  92. final FutureData future = (FutureData) request.getResponse();
  93. // RealData的创建比较耗时
  94. RealData realData = new RealData(request.getName());
  95. future.setRealData(realData);
  96. System.out.println(Thread.currentThread().getName() + " handles " + request);
  97. }
  98. }
  99. }
  100. public class ClientThread extends Thread {
  101. private RequestQueue requestQueue;// 请求队列
  102. private List<Request> myRequest = new ArrayList<Request>();
  103. public ClientThread(RequestQueue requestQueue, String name) {
  104. super(name);
  105. this.requestQueue = requestQueue;
  106. }
  107. public void run() {
  108. for(int i=0; i<10; i++) {
  109. // 构造请求
  110. Request request = new Request("RequestID: " + i + " Thread_Name:"+Thread.currentThread().getName());
  111. System.out.println(Thread.currentThread().getName()+" request is " + request);
  112. request.setResponse(new FutureData());
  113. requestQueue.addRequest(request);// 提交请求
  114. myRequest.add(request);
  115. // 这里可以做一些额外的业务处理,等待服务端装配数据
  116. try {
  117. Thread.sleep(1000);// 客户端请求的速度
  118. } catch (InterruptedException e) {
  119. e.printStackTrace();//
  120. }
  121. for(Request r : myRequest) {
  122. System.out.println("ClientThread Name is " + Thread.currentThread().getName() +" Response is" +
  123. r.getResponse().getResult());
  124. }
  125. }
  126. System.out.println(Thread.currentThread().getName() + " request end");
  127. }
  128. public static void main(String[] args) {
  129. RequestQueue requestQueue = new RequestQueue();
  130. for(int i=0; i<1; i++)
  131. new ServerThread(requestQueue, "ServerThread"+i).start();// 服务器进程开启
  132. // try {
  133. // Thread.sleep(10000);
  134. // } catch (InterruptedException e) {
  135. // e.printStackTrace();
  136. // }
  137. System.out.println("客户端发起请求...");
  138. List<Request> requests = new ArrayList<>();
  139. for(int i=0; i<1; i++){
  140. new ClientThread(requestQueue, "ChientThread"+i).start();;// 请求进程开启
  141. }
  142. }

转载:https://blog.csdn.net/bas_ball/article/details/82625550

发表评论

表情:
评论列表 (有 0 条评论,283人围观)

还没有评论,来说两句吧...

相关阅读