Picasso(4) -Dispatcher
系列文章
Picasso(1) - 使用(踩坑)
Picasso(2) - 自定义配置
Picasso(3) - 图片加载流程
Picasso(4) - Dispatcher
Picasso(5) - Rocket
前言
前几篇文章是对 Picasso 配置 、图片加载流程的分析 , 是宏观上的分析 。 本篇文章将对 Dispatcher 源码进行研究。
什么是 Dispatcher
Dispatcher 是 Picasso 用于分发事件 、线程切换的类。
Dispatcher 分发的事件
REQUEST_SUBMIT = 1; // 请求提交
REQUEST_CANCEL = 2; // 请求取消
REQUEST_GCED = 3; // 给主线程发送REQUEST_GCED消息,让其清除弱引用并取消请求。
HUNTER_COMPLETE = 4; // BitmapHunter获取了 Bitmap
HUNTER_RETRY = 5; // 失败重试
HUNTER_DECODE_FAILED = 6; // 获取 Bitmap 失败
HUNTER_DELAY_NEXT_BATCH = 7; // 延迟批处理
HUNTER_BATCH_COMPLETE = 8; // 批处理完成
NETWORK_STATE_CHANGE = 9; // 网络变化
AIRPLANE_MODE_CHANGE = 10; // 飞行模式
TAG_PAUSE = 11; // 暂停
TAG_RESUME = 12; // 继续
REQUEST_BATCH_RESUME = 13; // 继续批处理
相信你已经被 HUNTER_COMPLETE 、HUNTER_DELAY_NEXT_BATCH 、HUNTER_BATCH_COMPLETE 搞晕了,先写一下这三个事件的执行顺序。
HUNTER_COMPLETE -> HUNTER_DELAY_NEXT_BATCH -> HUNTER_BATCH_COMPLETE。
构造器
首选来看下 Dispatcher 的构造器 。
Dispatcher(Context context, ExecutorService service, Handler mainThreadHandler,
Downloader downloader, Cache cache, Stats stats) {
this.dispatcherThread = new DispatcherThread(); // 子线程,分发的事件都在该线程执行。
this.dispatcherThread.start(); // 启动线程。
Utils.flushStackLocalLeaks(dispatcherThread.getLooper());
this.context = context;
this.service = service; // 线程池
this.hunterMap = new LinkedHashMap<>(); // 处理结果 Map
this.failedActions = new WeakHashMap<>(); // 请求失败 Map
this.pausedActions = new WeakHashMap<>(); // 请求暂停 Map
this.pausedTags = new LinkedHashSet<>(); // 暂停的 Tag 集合
this.handler = new DispatcherHandler(dispatcherThread.getLooper(), this); // 重点关注
this.downloader = downloader; // 下载器: Picasso 中默认为 OkHttp3Downloader
this.mainThreadHandler = mainThreadHandler; // 主线程 Handler
this.cache = cache; // 内存缓存
this.stats = stats; // Picasso 中的统计类
this.batch = new ArrayList<>(4); // 批处理(每隔 200 ms,将需要处理的结果放到列表中 )
this.airplaneMode = Utils.isAirplaneModeOn(this.context); //是否为飞行模式
this.scansNetworkChanges = hasPermission(context, Manifest.permission.ACCESS_NETWORK_STATE); //是否有监听网络状态变化的权限
this.receiver = new NetworkBroadcastReceiver(this); // 广播接收者,监听网络状态变化
receiver.register(); //注册广播
}
线程切换
构造器中的 DispatcherThread 继承自 HandlerThread 。设置了线程名称 以及线程优先级 。
static class DispatcherThread extends HandlerThread {
DispatcherThread() {
super(Utils.THREAD_PREFIX + DISPATCHER_THREAD_NAME, THREAD_PRIORITY_BACKGROUND);
}
}
构造器中 DispatcherHandler 传入的 Looper 对象为 DispatcherThread 的 Looper , 这意味着 Handler 的
handleMessage () 方法执行是在 HandlerThread 中,也就是说是在子线程执行。
this.handler = new DispatcherHandler(dispatcherThread.getLooper(), this);
我们注意到 Dispatcher 中有非常多以 dispatch 开头命名的方法。而这些方法统统通过 handler 来发送消息。
void dispatchSubmit(Action action) {
handler.sendMessage(handler.obtainMessage(REQUEST_SUBMIT, action));
}
void dispatchCancel(Action action) {
handler.sendMessage(handler.obtainMessage(REQUEST_CANCEL, action));
}
void dispatchPauseTag(Object tag) {
handler.sendMessage(handler.obtainMessage(TAG_PAUSE, tag));
}
void dispatchResumeTag(Object tag) {
handler.sendMessage(handler.obtainMessage(TAG_RESUME, tag));
}
void dispatchComplete(BitmapHunter hunter) {
handler.sendMessage(handler.obtainMessage(HUNTER_COMPLETE, hunter));
}
void dispatchRetry(BitmapHunter hunter) {
handler.sendMessageDelayed(handler.obtainMessage(HUNTER_RETRY, hunter), RETRY_DELAY);
}
void dispatchFailed(BitmapHunter hunter) {
handler.sendMessage(handler.obtainMessage(HUNTER_DECODE_FAILED, hunter));
}
void dispatchNetworkStateChange(NetworkInfo info) {
handler.sendMessage(handler.obtainMessage(NETWORK_STATE_CHANGE, info));
}
void dispatchAirplaneModeChange(boolean airplaneMode) {
handler.sendMessage(handler.obtainMessage(AIRPLANE_MODE_CHANGE, airplaneMode ? AIRPLANE_MODE_ON : AIRPLANE_MODE_OFF, 0));
}
这么做的目的只有一个, 切换线程,在 HandlerThread 中进行逻辑处理。即 Dispatcher 中以 perform 开头的方法都是在 HandlerThred 中执行。
Dispatcher 中有不少的集合类 , 但是 Dispatcher 中并没有使用 synchronized 关键字来保障线程安全。
因为操作集合的方法都在同一个线程 HandlerThread 当中。 Picasso 这样写,性能更好,也更优雅。
事件分发
Dispatcher 分发的事件, 分发的事件大体如下 :
看到上面这个图所分发的事件,大家有没有想到其他功能。
没错 ! 这些分发的事件不就是一个下载框架应该具有的功能吗。
我们完全可以照着 Dispatcher 来写一个项目通用的下载框架 , 支持下载 apk、图片、mp3 等。
下载框架这里暂且不提,我们先来分析 Dispatcher 的源码。
submit
当你提交一个请求到 Dispatcher 中, 首选会调用 dispatchSubmit () , 然后线程切换到 HandlerThread , 调用 performSubmit()。
void performSubmit(Action action) {
performSubmit(action, true);
}
void performSubmit(Action action, boolean dismissFailed) {
// 提交的请求是否需要暂停,当你调用了 Picasso.get().pauseTag() 后 ,如果请求正在下载, Picasso 会暂停下载。
// 如果之后提交的请求 tag 和 暂停的 tag 相等, 那么也将添加到暂停队列中,不会去下载。
if (pausedTags.contains(action.getTag())) {
pausedActions.put(action.getTarget(), action);
if (action.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_PAUSED, action.request.logId(),
"because tag '" + action.getTag() + "' is paused");
}
return;
}
//请求是否正在下载
BitmapHunter hunter = hunterMap.get(action.getKey());
if (hunter != null) {
// 避免重复请求多次下载 。
hunter.attach(action);
return;
}
// 线程池是否已经释放
// 通常使用 Picasso 的时候是单例使用 。如果你创建了其他 Picasso 实例, 并且释放其他 Picasso 实例的时候 ,service.isShutdown() 返回值为 true 。
// 通俗一点讲, 线程池已经释放了,那么提交过来的请求,就不要再去执行了。
if (service.isShutdown()) {
if (action.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_IGNORED, action.request.logId(), "because shut down");
}
return;
}
// 创建 BitmapHunter 对象 , 这个对象就相当于OkHttp 中的 Response 。
hunter = forRequest(action.getPicasso(), this, cache, stats, action);
// 提交请求到线程池
hunter.future = service.submit(hunter);
// 将请求放到下载队列中
hunterMap.put(action.getKey(), hunter);
// 是否存下载失败队列中移除请求
if (dismissFailed) {
failedActions.remove(action.getTarget());
}
if (action.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_ENQUEUED, action.request.logId());
}
}
cancel
void performCancel(Action action) {
String key = action.getKey();
BitmapHunter hunter = hunterMap.get(key);
//是否在下载队列中
if (hunter != null) {
//移除回调
hunter.detach(action);
//是否取消成功
if (hunter.cancel()) {
//移除请求
hunterMap.remove(key);
if (action.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_CANCELED, action.getRequest().logId());
}
}
}
//当前请求是否在暂停队列中。
if (pausedTags.contains(action.getTag())) {
// 从暂停队列中移除
pausedActions.remove(action.getTarget());
if (action.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_CANCELED, action.getRequest().logId(),
"because paused request got canceled");
}
}
// 从失败队列中移除
Action remove = failedActions.remove(action.getTarget());
if (remove != null && remove.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_CANCELED, remove.getRequest().logId(), "from replaying");
}
}
重复请求的处理
讲 pause() 方法之前先讲下 Picasso 对重复请求的处理。不然大家会对 pause() 相关代码的时候会有疑惑。
BitmapHunter 与 请求之间的关系如下图 :
我给大家举个例子 :
想要下一张图片显示在 ImageView 中,图片下载地址为 http://i.imgur.com/DvpvklR.png 。
Picasso 将下载结果封装为 BitmapHunter 对象 。将请求和 ImagView 封装到 初始请求 Action 对象中。
图片还没下载好, 我又想把图片显示到 ImageView2中。
Picasso 为了避免重复下载 , 使用刚才创建的 BitmapHunter, 将请求和 ImageView2 添加到另一个 Aciton 对象中 , 添加到 List actions 中。 actions 相当于回调列表。
当我们取消请求的时候, 取消的请求可能为初始请求 Action ,也可能为重复请求列表中的某个请求 。
因此我们可以看到 picasso 分发事件的时候 , 会对 action 和 actions 分别做判断。
有人肯定会问 ? 为什么初始 Action 要单独作为一个对象 , 直接放到 List 不是更简单吗 ?
我认为这样写也可以。
pause
void performPauseTag(Object tag) {
// 是否已经加入到 pauseTags 集合中。
if (!pausedTags.add(tag)) {
return;
}
// 遍历所有的请求,如果请求的 tag 和 暂停的 tag 相同
// 那么取消请求。
for (Iterator<BitmapHunter> it = hunterMap.values().iterator(); it.hasNext();) {
BitmapHunter hunter = it.next();
boolean loggingEnabled = hunter.getPicasso().loggingEnabled;
// 第一个请求
Action single = hunter.getAction();
// 重复请求
List<Action> joined = hunter.getActions();
boolean hasMultiple = joined != null && !joined.isEmpty();
// BitmapHunter 中没有请求要处理
if (single == null && !hasMultiple) {
continue;
}
if (single != null && single.getTag().equals(tag)) {
// 移除请求
hunter.detach(single);
// 放入暂停队列
pausedActions.put(single.getTarget(), single);
if (loggingEnabled) {
log(OWNER_DISPATCHER, VERB_PAUSED, single.request.logId(),
"because tag '" + tag + "' was paused");
}
}
// 如果有重复请求
if (hasMultiple) {
for (int i = joined.size() - 1; i >= 0; i--) {
Action action = joined.get(i);
// 请求 tag 是否相等
if (!action.getTag().equals(tag)) {
continue;
}
// 移除请求
hunter.detach(action);
// 放入暂停队列
pausedActions.put(action.getTarget(), action);
if (loggingEnabled) {
log(OWNER_DISPATCHER, VERB_PAUSED, action.request.logId(),
"because tag '" + tag + "' was paused");
}
}
}
// 取消请求
if (hunter.cancel()) {
it.remove();
if (loggingEnabled) {
log(OWNER_DISPATCHER, VERB_CANCELED, getLogIdsForHunter(hunter), "all actions paused");
}
}
}
}
真正从线程池中取消请求是执行 hunter.cancel() ,可以看到想要成功取消请求是有条件的。
action : 第一个请求,
actions : 重复请求。
future : ExecutorService.submit() 提交的返回值 。
future.cancel(false) : 如果任务正在线程池中执行,那么不中断任务。如果任务正在线程池队列中, 那么取消任务。
boolean cancel() {
return action == null
&& (actions == null || actions.isEmpty())
&& future != null
&& future.cancel(false);
}
resume
void performResumeTag(Object tag) {
// 是否在暂停队列中
if (!pausedTags.remove(tag)) {
return;
}
List<Action> batch = null;
// 遍历暂停队列
for (Iterator<Action> i = pausedActions.values().iterator(); i.hasNext();) {
Action action = i.next();
if (action.getTag().equals(tag)) {
if (batch == null) {
batch = new ArrayList<>();
}
// 添加到批处理列表当中
batch.add(action);
i.remove();
}
}
// 切换到主线程:如果内存缓存中有 Bimmap ,直接显示 Bitmap ,否则执行 submit 流程。
if (batch != null) {
mainThreadHandler.sendMessage(mainThreadHandler.obtainMessage(REQUEST_BATCH_RESUME, batch));
}
}
从代码中可以看出 , Picasso 中的 pause() 和 resume() , 其实相当于执行了 cancel() 和 submit() 。
retry
下载重试 , 这个功能很实用。
void performRetry(BitmapHunter hunter) {
// 请求是否取消
if (hunter.isCancelled()) return;
// 线程池是否已经释放
if (service.isShutdown()) {
performError(hunter, false);
return;
}
NetworkInfo networkInfo = null;
// 是否有监听网络变化的权限
if (scansNetworkChanges) {
ConnectivityManager connectivityManager = getService(context, CONNECTIVITY_SERVICE);
networkInfo = connectivityManager.getActiveNetworkInfo();
}
// 是否应该重试
if (hunter.shouldRetry(airplaneMode, networkInfo)) {
if (hunter.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_RETRYING, getLogIdsForHunter(hunter));
}
// 如果是 ContentLengthException 则不从磁盘缓存中读取数据。
if (hunter.getException() instanceof NetworkRequestHandler.ContentLengthException) {
hunter.networkPolicy |= NetworkPolicy.NO_CACHE.index;
}
// 线程池提交任务
hunter.future = service.submit(hunter);
} else {
//如果网络变化并且支持重试
boolean willReplay = scansNetworkChanges && hunter.supportsReplay();
// 分发失败事件
performError(hunter, willReplay);
if (willReplay) {
// 添加到失败列表中,如果网络变为可用,下载重试。
markForReplay(hunter);
}
}
}
complete
void performComplete(BitmapHunter hunter) {
// 根据内存策略判断是否写入内存缓存
if (shouldWriteToMemoryCache(hunter.getMemoryPolicy())) {
cache.set(hunter.getKey(), hunter.getResult());
}
// 移除下载队列
hunterMap.remove(hunter.getKey());
//批处理
batch(hunter);
if (hunter.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_BATCHED, getLogIdsForHunter(hunter), "for completion");
}
}
private void batch(BitmapHunter hunter) {
// 请求是否取消
if (hunter.isCancelled()) {
return;
}
if (hunter.result != null) {
hunter.result.prepareToDraw();
}
batch.add(hunter);
// 每隔 200 ms , 对处理一批请求。
if (!handler.hasMessages(HUNTER_DELAY_NEXT_BATCH)) {
// 发送消息后调用了 performBatchComplete() 方法。
handler.sendEmptyMessageDelayed(HUNTER_DELAY_NEXT_BATCH, BATCH_DELAY);
}
}
void performBatchComplete() {
// 将 200 ms 内待处理的请求放入 copy 列表中。
List<BitmapHunter> copy = new ArrayList<>(batch);
batch.clear();
// 给 mainThreadHandler 发送休息,主线程处理返回结果。
mainThreadHandler.sendMessage(mainThreadHandler.obtainMessage(HUNTER_BATCH_COMPLETE, copy));
logBatch(copy);
}
static final Handler HANDLER = new Handler(Looper.getMainLooper()) {
@Override public void handleMessage(Message msg) {
switch (msg.what) {
case HUNTER_BATCH_COMPLETE: {
@SuppressWarnings("unchecked") List<BitmapHunter> batch = (List<BitmapHunter>) msg.obj;
// 遍历待处理列表
for (int i = 0, n = batch.size(); i < n; i++) {
BitmapHunter hunter = batch.get(i);
// 调用 picasso 的 complete 方法。
hunter.picasso.complete(hunter);
}
break;
}
// picasso 的 complete() 方法 。
void complete(BitmapHunter hunter) {
//初始请求是否存在
Action single = hunter.getAction();
List<Action> joined = hunter.getActions();
// 是否有重复请求
boolean hasMultiple = joined != null && !joined.isEmpty();
boolean shouldDeliver = single != null || hasMultiple;
// 如果没有一个请求都没有,直接 return 。这种情况,一般是主动取消所有请求。
if (!shouldDeliver) {
return;
}
Uri uri = hunter.getData().uri;
Exception exception = hunter.getException();
Bitmap result = hunter.getResult();
// 图片加载的位置:内存/磁盘/网络
LoadedFrom from = hunter.getLoadedFrom();
// 分发单个请求
if (single != null) {
deliverAction(result, from, single, exception);
}
// 分发重复请求
if (hasMultiple) {
//noinspection ForLoopReplaceableByForEach
for (int i = 0, n = joined.size(); i < n; i++) {
Action join = joined.get(i);
deliverAction(result, from, join, exception);
}
}
// 下载过程中有一场。 listener 为全局回调。 在 Picasso 的 Builder 中配置。
if (listener != null && exception != null) {
listener.onImageLoadFailed(this, uri, exception);
}
}
// 分发处理 Action
private void deliverAction(Bitmap result, LoadedFrom from, Action action, Exception e) {
// 是否 Action 被取消
if (action.isCancelled()) {
return;
}
// 图片下载是否会重试。
// 如 : 图片因 IO 异常加载失败, 当前网络不可用。
// 此时应用能够监听网络变化,并且当前 RequestHandler 支持重试的时候, willReplay = 值为 true 。
// 表示当网络可用的时候,会重新加载图片。
if (!action.willReplay()) {
targetToAction.remove(action.getTarget());
}
if (result != null) {
if (from == null) {
throw new AssertionError("LoadedFrom cannot be null.");
}
//图片加载成功, 将 Bitmap 交给 Action 处理。
action.complete(result, from);
if (loggingEnabled) {
log(OWNER_MAIN, VERB_COMPLETED, action.request.logId(), "from " + from);
}
} else {
//图片加载失败
action.error(e);
if (loggingEnabled) {
log(OWNER_MAIN, VERB_ERRORED, action.request.logId(), e.getMessage());
}
}
}
error
error 的处理过程比较简单 。直接从结果队列中移除, 然后调用 batch () 方法。
batch() 方法在 complete() 的时候已经分析过了,这里就不再分析了。
void performError(BitmapHunter hunter, boolean willReplay) {
if (hunter.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_BATCHED, getLogIdsForHunter(hunter),
"for error" + (willReplay ? " (will replay)" : ""));
}
hunterMap.remove(hunter.getKey());
batch(hunter);
}
NetworkStateChange
当前网络状态由不可用变为可用的时候, Picasso 会尝试加载之前下载失败的图片。
void performNetworkStateChange(NetworkInfo info) {
// 根据当前网络状态,调整线程池中线程数目。
if (service instanceof PicassoExecutorService) {
((PicassoExecutorService) service).adjustThreadCount(info);
}
// 当前网络可用
if (info != null && info.isConnected()) {
flushFailedActions();
}
}
private void flushFailedActions() {
//是否有下载失败请求
if (!failedActions.isEmpty()) {
Iterator<Action> iterator = failedActions.values().iterator();
// 遍历下载失败请求
while (iterator.hasNext()) {
Action action = iterator.next();
iterator.remove();
if (action.getPicasso().loggingEnabled) {
log(OWNER_DISPATCHER, VERB_REPLAYING, action.getRequest().logId());
}
//提交请求,重新走图片下载的流程。
performSubmit(action, false);
}
}
}
flushStackLocalLeaks
在构造器中有这么一行代码。 Utils.flushStackLocalLeaks(dispatcherThread.getLooper());
Utils.flushStackLocalLeaks(dispatcherThread.getLooper());
// THREAD_LEAK_CLEANING_MS 为 1000 ms 即 1秒
static void flushStackLocalLeaks(Looper looper) {
Handler handler = new Handler(looper) {
@Override public void handleMessage(Message msg) {
sendMessageDelayed(obtainMessage(), THREAD_LEAK_CLEANING_MS);
}
};
handler.sendMessageDelayed(handler.obtainMessage(), THREAD_LEAK_CLEANING_MS);
}
可以看到创建了 Handler , 每隔 1 s 发送了一个空的消息。代码中的注释说 :
这个方法主要用于 Android 5.0 之前 。HandlerThread 总是持有上一个发送 message 的堆栈本地应用 , 因此为了保证上一个 messsage 能够被及时回收 , 每隔一秒发送了一个 message 。
对这里的代码,我有点疑惑。因为我不知道如何才能重现这样的 Bug 。也没看到添加 Android 5.0 之前的判断。(注释写的 Android 5 , 我怀疑作者少写了 .0)。
总结
以上就是对 Dispatcher 所有代码的分析。
如果是第一次阅读 Dispatcher 的源码 , 难免会有很多的疑惑。
静下心细细阅读,会发现作者这样写的好处。
还没有评论,来说两句吧...