RxJava2+Retrofit网络轮询(绑定生命周期)
有这样一个需求,需要间隔时间轮询服务器接口,但是当不在当前界面时,就要停止轮询,当回到这个界面时,再次开始轮询。
得益于RxJava2强大的操作符,和它本身提供的生命周期控制,可以帮助我们较优雅的实现这个功能。
// 创建Retrofit
Retrofit mRetrofit = new Retrofit.Builder()
.client(builder.build())
.addConverterFactory(ResponseConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.baseUrl(baseUrl)
.build();
// 创建ApiService 实例,用于Retrofit 网络请求
ApiService apiService = mRetrofit.create(ApiService.class);
// 网络请求
apiService.warnTotal()
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
// 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
// 以此决定是否重新订阅 & 发送原来的 Observable,即轮询 // 此处有2种情况:
// 1. 若返回1个Complete() / Error()事件,则不重新订阅 & 发送原来的 Observable,即轮询结束
// 2. 若返回其余事件,则重新订阅 & 发送原来的 Observable,即继续轮询
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
// 延时一分钟轮询
return Observable.timer(1, TimeUnit.MINUTES);
}
});
}
})
.compose(((RxFragmentActivity)context).<BeanWarnTotal>bindUntilEvent(ActivityEvent.PAUSE)) // 绑定生命周期,当PAUSE的时候接触绑定,停止请求。RxJava提供的生命周期管理,是不是很方便,不过需要我们的activity继承自RxFragmentActivity
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BeanWarnTotal>() {
@Override
public void accept(BeanWarnTotal beanWarnTotal) throws Exception {
// TODO: 2018/11/15 拿到我们想要的bean
}
});
贴出另外两个类:
public class BeanWarnTotal extends BeanBase {
public int total;
}
public interface ApiService {
@POST("org/warn/total")
Observable<BeanWarnTotal> warnTotal();
}
只要在activity的onResume方法中调用上述方法即可,这样,当每次进入到页面的时候,都会重新进行轮询。
我们可以把上面的方法进行封装,使用起来简洁又方便:
Api.getInstance().warnTotal()
.repeatWhen(new RepeatFunction()) // 轮询
.compose(RetrofitUtils.<BeanWarnTotal>bindLifeCircle(context, ActivityEvent.PAUSE)) // 绑定生命周期
.subscribe(new ApiObserver<BeanWarnTotal>(context, callBack)); // 重写Observer
public class RepeatFunction implements Function<Observable<Object>, ObservableSource<?>> {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
return Observable.timer(1, TimeUnit.MINUTES);
}
});
}
}
public class RetrofitUtils {
public static <T> ObservableTransformer<T, T> bindLifeCircle(final Context context,
final ActivityEvent activityEvent) {
final ObservableTransformer<T, T> observableTransformer = new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> upstream) {
if (context instanceof RxFragmentActivity) {
RxFragmentActivity fragmentActivity = (RxFragmentActivity) context;
return (ObservableSource<T>) upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(fragmentActivity.bindUntilEvent(activityEvent));
}
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
return observableTransformer;
}
}
public class ApiObserver<T> implements Observer<T> {
Context context;
ApiCallBack callBack;
public ApiObserver(Context context, ApiCallBack callBack) {
this.context = context;
this.callBack = callBack;
}
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(T value) {
// todo
}
@Override
public void onError(Throwable t) {
// todo
}
@Override
public void onComplete() {
}
}
参考:
Android:RxJava 结合 Retrofit 优雅实现 网络请求轮询
还没有评论,来说两句吧...