Callable+ThreadPoolExecutor实现多线程并发并获得返回值

缺乏、安全感 2022-05-22 08:10 352阅读 0赞

前言
经常会遇到一些性能问题,比如调用某个接口,可能要循环调用100次,并且需要拿到每一次调用的返回结果,通常我们都是放在for循环中一次次的串行调用,这种方式可想而知道有多慢,那怎么解决这个问题呢?

多线程
为了解决以上问题,我使用的方式是多线程。多线程常规的有两种实现方式,即继承Tread类,实现Runnable接口,但是这两种实现方式,有一个共同的问题,就是没有返回值,对于我们来说,获得每个线程的返回值,是个很困难的问题,因此不能用Tread类或Runnable接口,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允许有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到线程的执行结果

案例
假设需要给100个用户发送邮件,并需要每个用户的返回结果,先看下代码结构
这里写图片描述

CallableTemplate.java

  1. package com.mairuan.base.concurrent;
  2. import java.util.concurrent.Callable;
  3. /** * 多线程模板类 * * @author Administrator * * @param <V> */
  4. public abstract class CallableTemplate<V> implements Callable<V> {
  5. /** * 前置处理,子类可以Override该方法 */
  6. public void beforeProcess() {
  7. System.out.println("before process....");
  8. }
  9. /** * 处理业务逻辑的方法,需要子类去Override * @return */
  10. public abstract V process();
  11. /** * 后置处理,子类可以Override该方法 */
  12. public void afterProcess() {
  13. System.out.println("after process....");
  14. }
  15. @Override
  16. public V call() throws Exception {
  17. beforeProcess();
  18. V result = process();
  19. afterProcess();
  20. return result;
  21. }
  22. }

CallableTemplate类实现了Callable接口,并实现了process方法,该类是一个抽象类,接收任意返回值的类型,beforeProcess方法为前置处理,afterProcess的后置处理,process为具体的业务逻辑抽象方法,该方法在子类中实现

IConcurrentThreadPool.java

  1. package com.mairuan.base.concurrent;
  2. import java.util.List;
  3. import java.util.concurrent.ExecutionException;
  4. public interface IConcurrentThreadPool {
  5. /** * 初始化线程池 */
  6. void initConcurrentThreadPool();
  7. /** * 提交单个任务 * * @param task * @return */
  8. <V> V submit(CallableTemplate<V> task) throws InterruptedException,
  9. ExecutionException;
  10. /** * 提交多个任务 * * @param tasks * @return */
  11. <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
  12. throws InterruptedException, ExecutionException;
  13. }

IConcurrentThreadPool是多线程接口类,声名了三个方法,initConcurrentThreadPool:初始化线程池,submit:提交单个任务的线程,并有返回值,invokeAll:提交多个任务的线程,并有返回值

ConcurrentThreadPool.java

  1. package com.mairuan.base.concurrent.impl;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.concurrent.ExecutionException;
  5. import java.util.concurrent.Future;
  6. import java.util.concurrent.LinkedBlockingDeque;
  7. import java.util.concurrent.ThreadPoolExecutor;
  8. import java.util.concurrent.TimeUnit;
  9. import com.mairuan.base.concurrent.CallableTemplate;
  10. import com.mairuan.base.concurrent.IConcurrentThreadPool;
  11. public class ConcurrentThreadPool implements IConcurrentThreadPool {
  12. private ThreadPoolExecutor threadPoolExecutor;
  13. // 核心线程数
  14. private int corePoolSize = 10;
  15. // 最大线程数
  16. private int maximumPoolSize = 20;
  17. // 超时时间30秒
  18. private long keepAliveTime = 30;
  19. @Override
  20. public void initConcurrentThreadPool() {
  21. threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
  22. maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
  23. new LinkedBlockingDeque<Runnable>());
  24. }
  25. @Override
  26. public <V> V submit(CallableTemplate<V> task) throws InterruptedException,
  27. ExecutionException {
  28. Future<V> result = threadPoolExecutor.submit(task);
  29. return result.get();
  30. }
  31. @Override
  32. public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
  33. throws InterruptedException, ExecutionException {
  34. List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks);
  35. List<V> resultList = new ArrayList<V>();
  36. for (Future<V> future : tasksResult) {
  37. resultList.add(future.get());
  38. }
  39. return resultList;
  40. }
  41. }

ConcurrentThreadPool是创建线程池的实现类,用到了ThreadPoolExecutor线程池类及这个类的invokeAll方法和submit方法,这两个方法的返回值,都可以通过Future类的get方法获得

ICallableTaskFrameWork.java

  1. public interface ICallableTaskFrameWork {
  2. <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
  3. throws InterruptedException, ExecutionException;
  4. }

ICallableTaskFrameWork是定义的线程任务框架接口,所有的多线程调用,都通过该接口发起

CallableTaskFrameWork.java

  1. package com.mairuan.base.concurrent.impl;
  2. import java.util.List;
  3. import java.util.concurrent.ExecutionException;
  4. import com.mairuan.base.concurrent.CallableTemplate;
  5. import com.mairuan.base.concurrent.ICallableTaskFrameWork;
  6. import com.mairuan.base.concurrent.IConcurrentThreadPool;
  7. public class CallableTaskFrameWork implements ICallableTaskFrameWork {
  8. private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool();
  9. @Override
  10. public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
  11. throws InterruptedException, ExecutionException {
  12. concurrentThreadPool.initConcurrentThreadPool();
  13. return concurrentThreadPool.invokeAll(tasks);
  14. }
  15. }

CallableTaskFrameWork是ICallableTaskFrameWork 的实现类,在submitsAll实现方法中,通过调用线程池对象IConcurrentThreadPool接口的invokeAll方法来发起多线程的调用,这里注意一个,在submitAll实现方法中,我手动的调用了初始化线程池的方法concurrentThreadPool.initConcurrentThreadPool(),在真实的项目上,应该在应用启动的时候就调用该方法来初始化线程池

测试类代码
SendMessageService.java,假设这是一个发送邮件信息的服务类

  1. package com.mairuan.base.concurrent.test;
  2. public class SendMessageService {
  3. public void sendMessage(String email,String content){
  4. System.out.println("发送邮件。。。");
  5. }
  6. }

SendMessageHander.java,多线程发送邮件的处理类

  1. package com.mairuan.base.concurrent.test;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import com.mairuan.base.concurrent.CallableTemplate;
  5. public class SendMessageHander extends CallableTemplate<Map<String, String>> {
  6. private String email;
  7. private String content;
  8. public SendMessageHander(String email,String content) {
  9. this.email = email;
  10. this.content = content;
  11. }
  12. @Override
  13. public Map<String, String> process() {
  14. SendMessageService sendMessageService = new SendMessageService();
  15. sendMessageService.sendMessage(email, content);
  16. Map<String, String> map = new HashMap<String, String>();
  17. map.put(email, content);
  18. return map;
  19. }
  20. }

这个类继承了上面的CallableTemplate,我们要的返回值是Map,因此泛型类型是Map,在类中还重写了process方法,在方法中调用发送邮件的业务逻辑接口SendMessageService.sendMessage,并将返回结果组装成Map返回,这里我就简单处理了,将邮件地址及内容放在Map中直接返回了;另外还要注意这个类有个有参构造器,通过构建器可以接收需要传递进来的参数

SendMessageTest.java,测试类

  1. package com.mairuan.base.concurrent.test;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Map.Entry;
  6. import java.util.concurrent.ExecutionException;
  7. import com.mairuan.base.concurrent.CallableTemplate;
  8. import com.mairuan.base.concurrent.ICallableTaskFrameWork;
  9. import com.mairuan.base.concurrent.impl.CallableTaskFrameWork;
  10. public class SendMessageTest {
  11. public static void main(String[] args) throws InterruptedException,
  12. ExecutionException {
  13. ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork();
  14. List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>();
  15. SendMessageHander sendMessageHander = null;
  16. // 将需要发送邮件的邮件地址及内容组装好,放在一个集合中
  17. for (int i = 0; i < 1000; i++) {
  18. sendMessageHander = new SendMessageHander("email" + i, "content"
  19. + i);
  20. tasks.add(sendMessageHander);
  21. }
  22. //通过多线程一次性发起邮件,并拿到返回结果集
  23. List<Map<String, String>> results = callableTaskFrameWork
  24. .submitsAll(tasks);
  25. // 解析返回结果集
  26. for (Map<String, String> map : results) {
  27. for (Entry<String, String> entry : map.entrySet()) {
  28. System.out.println(entry.getKey() + "\t" + entry.getValue());
  29. }
  30. }
  31. }
  32. }

运行结果
这里写图片描述

发表评论

表情:
评论列表 (有 0 条评论,352人围观)

还没有评论,来说两句吧...

相关阅读

    相关 线返回问题

    > 平时我们写Java程序或者是Android程序的时候有这样一个问题,耗时操作我们需要写到线程里面去,但是一旦写到线程里面去,我们又会发现很难获得返回值,当然天无绝人之路 我

    相关 Java线线返回

    对于Java多线程的理解,我以前仅仅局限于实现Runnable接口或者继承Thread类,然后重写run()方法,最后start()调用就算完事,但是一旦涉及死锁以及对共享资源