RxJava+Retrofit使用 rxjava详解

RxJava+Retrofit使用.build();
}
/** 创建相应的服务接口 暴露出接口
*/
public T create(Class service) {
checkNotNull(service, “服务接口不

。建造();

}

/**

创建对应的服务接口

暴露接口

*/

公共T 创建(类服务){

checkNotNull(service, \”服务接口不能为空\”);

返回mRetrofit.create(service);

}

私有T checkNotNull(T 对象,字符串消息) {

如果(对象==空){

抛出一个新的NullPointerException(消息)。

}

返回一个对象。

}

}

封装接口数据

BaseResult 允许您确定返回值是否正确。

公共类BaseResult {

私有int 代码。

私人字符串消息。

公共int getCode() {

返回码;

}

公共无效setCode(int代码){

this.code=代码;

}

公共字符串getMessage() {

回复消息;

}

公共无效setMessage(字符串消息){

this.message=消息;

}

公共布尔isOk() {

返回值200==代码;

}

}

公共类Result 扩展BaseResult {

公共对象数据。

}

为后台返回的数据创建一个Javabean(注意必须继承自BaseResult)。

公共类LoginBean 扩展BaseResult {

私有字符串openId;

私有int hasNickNameHeadUrl;

私有字符串令牌。

私有int 有电话;

@覆盖

公共字符串toString() {

返回“LoginBean{”+

\”openId=\’\” + openId + \”\” +

\”, hasNickNameHeadUrl=\” + hasNickNameHeadUrl +

“, token=’” + token + “”” +

\”, hasPhone=\” + hasPhone +

“}”;

}

公共字符串getOpenId() {

返回openId。

}

公共无效setOpenId(字符串openId){

this.openId=开放ID;

}

公共int getHasNickNameHeadUrl() {

返回hasNickNameHeadUrl。

}

公共无效setHasNickNameHeadUrl(int hasNickNameHeadUrl){

this.hasNickNameHeadUrl=hasNickNameHeadUrl;

}

公共字符串getToken() {

返回令牌。

}

公共无效setToken(字符串令牌){

this.token=令牌;

}

公共int getHasPhone() {

返回hasPhone。

}

公共无效setHasPhone(int hasPhone){

this.hasPhone=hasPhone;

}

}

封装并返回一个通用的回调类。

导入java.lang.reflect.ParameterizedType;

导入io.reactivex.rxjava3.functions.Consumer;

/**

封装返回泛型

@参数

*/

公共抽象类ConsumerCallback 实现ConsumerResult {;

@覆盖

公共无效接受(结果结果)抛出Throwable {

if (!result.isOk()) {

UnifyCallback.handleUnifyCode(result.getCode(),result.getMessage());

onError(result.getCode(), result.getMessage());

返回;

}

//解析并获取类的泛型类型

类dataClass=(class) ((ParameterizedType) this.getClass().getGenericSuperclass())

.getActualTypeArguments()[0];

T data=JSON.parseObject(JSON.toJSONString(result.data), dataClass);

onSucceed(数据);

}

公共抽象无效onSucceed(T 结果);

公共抽象无效onError(int code, String msg);

}

/**

封装返回泛型

@参数

*/

公共抽象类ObserverCallback 实现ObserverResult {;

@覆盖

公共无效onSubscribe(@NonNull一次性d){

//这个一次性对象也需要保存,并且在页面销毁时需要调用unsubscribe方法。

//或者,使用CompositeDisposable 容器将所有一次性物品放入其中并统一处理。

}

@覆盖

公共无效onNext(@NonNull结果){

if (!result.isOk()) {

UnifyCallback.handleUnifyCode(result.getCode(),result.getMessage());

onFail(result.getCode(), result.getMessage());

返回;

}

//解析并获取类的泛型类型

类dataClass=(class) ((ParameterizedType) this.getClass().getGenericSuperclass())

.getActualTypeArguments()[0];

T data=JSON.parseObject(JSON.toJSONString(result.data), dataClass);

onSucceed(数据);

}

@覆盖

公共无效onError(@NonNull Throwable throwable) {

if(异常的可抛出实例){

onFail(ThrowableHandler.handleThrowable(throwable));

} 除此之外{

onFail(new HttpThrowable(HttpThrowable.UNKNOWN, \”未知错误\”, throwable));

}

}

@覆盖

公共无效onComplete() {

}

公共抽象无效onSucceed(T 结果);

公共抽象无效onFail(int 代码, String msg);

//应用程序中的具体实现是下面的onFail方法

私有无效onFail(HttpThrowable httpThrowable) {

onFail(httpThrowable.errorType, httpThrowable.message);

}

}

导入cn.yumakeji.lib_common.global.AppGlobals;

导入io.reactivex.rxjava3.functions.Consumer;

/**

网络恢复、错误解决

*/

公共类ThrowableCallback 实现Consumer {;

@覆盖

公共无效接受(Throwable throwable)抛出Throwable {

if(异常的可抛出实例){

onError(ThrowableHandler.handleThrowable(throwable));

} 除此之外{

onError(new HttpThrowable(HttpThrowable.UNKNOWN, \”未知错误\”, throwable));

}

}

//应用程序中的具体实现是下面的onError方法

公共无效onError(HttpThrowable httpThrowable){

Toast.makeText(AppGlobals.getApplication().getApplicationContext(),

httpThrowable.message, Toast.LENGTH_LONG).show();

}

}

请求后台访问数据的接口类

/**

请求后台访问数据的接口类

*/

公共接口ServiceApi {

/**

获取@Query方法

*/

@GET(\”主页/绘画列表\”)

ObservableResult getSearchContent(@Query(\”searchContent\”) String text, @Query(\”size\”) int size);

/**

获取@Query方法

*/

@GET(\”主页/绘画列表\”)

ObservableResult getSearchContent2(@Query(“searchContent”) String text, @Query(“size”) int size);

@POST(\”小程序/code2Session\”)

ObservableResult postCode2Session(@Body MapString, 目标代码);

}

BaseActivity绑定生命周期

公共抽象类BaseActivity 扩展AppCompatActivity {

/**

绑定生命周期

*/

protected LifecycleProviderLifecycle.Event rxLifecycle=AndroidLifecycle.createLifecycleProvider(this);

@覆盖

protected void onCreate(@Nullable Bundle SavedInstanceState) {

super.onCreate(savedInstanceState);

}

@覆盖

受保护无效onDestroy() {

super.onDestroy();

}

}

转移

包cc.network.rxjava;

导入android.content.Intent。

导入android.nfc.Tag。

导入android.os.Bundle。

导入android.util.ArrayMap。

导入android.util.Log。

导入android.view.View。

导入android.widget.AdapterView。

导入android.widget.ArrayAdapter。

导入android.widget.ListView。

导入android.widget.Toast。

导入androidx.lifecycle.Lifecycle。

导入com.alibaba.fastjson.JSON。

导入com.yumakeji.rxjava.network.RetrofitClient;

导入com.yumakeji.rxjava.network.bean.Result。

导入com.yumakeji.rxjava.network.callback.ConsumerCallback;

导入com.yumakeji.rxjava.network.callback.ObserverCallback;

导入com.yumakeji.rxjava.network.callback.ThrowableCallback;

导入com.yumakeji.rxjava.network.error.HttpThrowable。

导入com.yumakeji.rxjava.network.handler.RetryWithDelay;

导入java.util.ArrayList。

导入java.util.List。

导入java.util.concurrent.CopyOnWriteArrayList;

导入java.util.concurrent.TimeUnit。

导入cc.network.rxjava.rxjava.ServiceApi。

导入cc.network.rxjava.textbean.LoginBean;

导入cc.network.rxjava.textbean.Top250Bean;

导入cc.network.rxjava.viewmodel.RxjavaViewModel;

导入cn.yumakeji.jetpackroomstudy.R。

导入cn.yumakeji.lib_common.global.AppGlobals;

导入cn.yumakeji.lib_common.utils.AppUtils;

导入io.reactivex.rxjava3.android.schedulers.AndroidSchedulers;

导入io.reactivex.rxjava3.annotations.NonNull;

导入io.reactivex.rxjava3.core.Observable;

导入io.reactivex.rxjava3.core.ObservableEmitter;

导入io.reactivex.rxjava3.core.ObservableOnSubscribe;

导入io.reactivex.rxjava3.core.ObservableSource;

导入io.reactivex.rxjava3.core.Observer;

导入io.reactivex.rxjava3.disposables.Disposable;

导入io.reactivex.rxjava3.functions.Action;

导入io.reactivex.rxjava3.functions.BiFunction;

导入io.reactivex.rxjava3.functions.Consumer;

导入io.reactivex.rxjava3.functions.Function;

导入io.reactivex.rxjava3.schedulers.Schedulers;

/**

防止多次点击

https://www.jianshu.com/p/ea45670a364f,

改造

https://blog.csdn.net/huangxiaoguo1/article/details/65627293

Rxjava

https://www.jianshu.com/p/092452f287db

使用RxLifeCycle 而不继承RxAppCompatActivity

https://blog.csdn.net/kevinscsdn/article/details/78980010

*/

公共类RxjavaActivity 扩展BaseActivity {

私有ListView mListView;

私有列表字符串=new CopyOnWriteArrayList();

私有最终字符串TAG=“Rxjava”;

私有RxjavaViewModel 视图模型;

@覆盖

公共无效onDetachedFromWindow() {

super.onDetachedFromWindow();

getLifecycle().removeObserver(viewModel);

}

@覆盖

protected void onCreate(bundle 保存实例状态) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_rxjava);

mListView=findViewById(R.id.list_item);

strings.add(\’方法@Query\’);

strings.add(\’Post方法@Body\’);

strings.add(\”使用Rxjava切换线程\”);

strings.add(\”Rxjava(1) 发射器\”);

strings.add(\”Rxjava(2)转换运算符映射\”);

strings.add(\”Rxjava(3)FlatMap\”);

strings.add(\”Rxjava(4)concatMap\”);

strings.add(\”顺序调用Rxjava(5)接口\”);

strings.add(\”顺序调用Rxjava(6)接口(发生内部错误)\”);

strings.add(\”Rxjava(7)压缩到包\”);

}

/**

我们来尝试一下改造

@参数视图

*/

公共无效onRetrofitClick(查看视图){

startActivity(new Intent(this, RetrofitActivity.class));

}

@覆盖

公共无效onAttachedToWindow() {

super.onAttachedToWindow();

viewModel=new RxjavaViewModel(this);

getLifecycle().addObserver(viewModel);

mListView.setAdapter(new ArrayAdapter(this, android.R.layout.simple_list_item_1, strings));

mListView.setOnItem

ClickListener(new AdaptorView.OnItemClickListener() {

@覆盖

公共无效onItemClick(AdapterView?parent,视图视图,int位置,长id){

开关(位置){

case 0://获取方法@Query

RetrofitClient.getInstance()

.create(ServiceApi.class)//创建服务

.getSearchContent(\”\”, 5)//调用接口

.subscribeOn(Schedulers.io())//指定的观察者操作在io线程上完成

.doOnSubscribe(一次性- viewModel.showProgressDialog(“请求”))

.observeOn(AndroidSchedulers.mainThread()) //指定一个观察者来接收数据并在主线程上完成。

.retryWhen(new RetryWithDelay(1, 1))//发生错误时重试。第一个参数是重试次数,第二个参数是重试间隔。

.doAfterTerminate(() – viewModel.hintProgressDialog())

.compose(rxLifecycle.bindUntilEvent(Lifecycle.Event.ON_DESTROY))//生命周期

.subscribe(new ConsumerCallback() {

@覆盖

公共无效onSucceed(Top250Bean结果){

//成功获取数据

Toast.makeText(AppGlobals.getApplication().getApplicationContext(),

\”成功\” + 结果, Toast.LENGTH_LONG).show();

}

@覆盖

公共无效onError(int代码,字符串消息){

//获取数据失败

Toast.makeText(AppGlobals.getAp

plication().getApplicationContext(),

msg, Toast.LENGTH_LONG).show();

}

}, new ThrowableCallback() {

@Override

public void onError(HttpThrowable httpThrowable) {

// 获取数据失败

Log.i(TAG, httpThrowable.message);

}

});

break;

case 1://POST方法 @Body

ArrayMap<String, Object> map = new ArrayMap<>();

map.put(“jsCode”, “033sHaaK03pRwa2Bic9K0EmcaK0sHaaq”);

RetrofitClient.getInstance()

.create(ServiceApi.class)

.postCode2Session(map)

.subscribeOn(Schedulers.io())

.doOnSubscribe(disposable -> viewModel.showProgressDialog(“正在请求中”))

.observeOn(AndroidSchedulers.mainThread())

.compose(rxLifecycle.bindUntilEvent(Lifecycle.Event.ON_DESTROY))

.retryWhen(new RetryWithDelay(1, 1))//遇到错误时重试,第一个参数为重试几次,第二个参数为重试的间隔

.doAfterTerminate(() -> viewModel.hintProgressDialog())

.subscribe(new ObserverCallback() {

@Override

public void onSucceed(LoginBean result) {

// 成功获取数据

Toast.makeText(AppGlobals.getApplication().getApplicationContext(),

“成功了” + result, Toast.LENGTH_LONG).show();

}

@Override

public void onFail(int code, String msg) {

// 获取数据失败

Toast.makeText(AppGlobals.getApplication().getApplicationContext(),

msg, Toast.LENGTH_LONG).show();

}

@Override

public void onComplete() {

super.onComplete();

}

});

break;

case 2://使用Rxjava切换线程

Observable.empty().observeOn(Schedulers.io())

.doOnComplete(new Action() {

@Override

public void run() throws Throwable {

Log.i(TAG, “我是子线程:” + AppUtils.isMainThread());

Observable.timer(3, TimeUnit.SECONDS)

.observeOn(AndroidSchedulers.mainThread())

.doOnComplete(() -> msgManagement(10))

.subscribe();

Observable.empty().observeOn(AndroidSchedulers.mainThread())

.doOnComplete(() -> msgManagement(20)

).subscribe();

}

}).subscribe();

break;

case 3:

//Rxjava(一)emitter 发射器

/**

借鉴:给初学者的RxJava2.0教程(一)
https://www.jianshu.com/p/464fa025229e
规则:
1、上游可以发送无限个onNext, 下游也可以接收无限个onNext.
2、当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
3、当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
4、上游可以不发送onComplete或onError.
5、最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError,
也不能先发一个onComplete, 然后再发一个onError, 反之亦然
注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则,
**并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了,
但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.

*/

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

Log.e(TAG, “是否是主线程=>” + AppUtils.isMainThread());

emitter.onNext(“huangxiaoguo1”);

emitter.onNext(“huangxiaoguo2”);

emitter.onNext(“huangxiaoguo3”);

emitter.onComplete();

}

})

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Observer() {

private Disposable mDisposable;

/**

不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见,
@param d

*/

@Override

public void onSubscribe(@NonNull Disposable d) {

Log.d(TAG, “subscribe”);

mDisposable = d;

}

@Override

public void onNext(@NonNull String s) {

/**

当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.
调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件

*/

Log.e(TAG, “onNext 是否是主线程 ==>” + AppUtils.isMainThread());

Log.d(TAG, “onNext==>” + s);

if (s.equals(“huangxiaoguo2”)) {

mDisposable.dispose();

}

}

@Override

public void onError(@NonNull Throwable e) {

Log.d(TAG, “onError”);

}

@Override

public void onComplete() {

Log.d(TAG, \”onComplete \”);

}

});

break;

case 4:

//Rxjava(二)变换操作符Map

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

Log.e(TAG, “subscribe 是否是主线程 ==>” + AppUtils.isMainThread());

emitter.onNext(1);

emitter.onNext(2);

emitter.onNext(3);

}

})

.subscribeOn(Schedulers.io())

.map(new Function<Integer, String>() {

@Override

public String apply(Integer integer) throws Throwable {

Log.e(TAG, “map 是否是主线程 ==>” + AppUtils.isMainThread());

return “我是在map中处理过了的===>” + String.valueOf(integer);

}

})

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(String s) throws Throwable {

Log.e(TAG, “accept 是否是主线程 ==>” + AppUtils.isMainThread());

Log.i(TAG, “accept===>” + s);

}

});

break;

case 5:

//flatMap 并不保证事件的顺序

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

emitter.onNext(1);

emitter.onNext(2);

emitter.onNext(3);

}

}).flatMap(new Function<Integer, ObservableSource>() {

@Override

public ObservableSource apply(Integer integer) throws Throwable {

ArrayList strings = new ArrayList<>();

for (int i = 0; i < 3; i++) {

strings.add(\”I am value \” + integer);

}

return Observable.fromIterable(strings).delay(2, TimeUnit.SECONDS);

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Throwable {

Log.d(TAG, s);

}

});

break;

case 6:

//concatMap 保证顺序

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

emitter.onNext(1);

emitter.onNext(2);

emitter.onNext(3);

}

}).concatMap(new Function<Integer, ObservableSource>() {

@Override

public ObservableSource apply(Integer integer) throws Throwable {

ArrayList strings = new ArrayList<>();

for (int i = 0; i < 3; i++) {

strings.add(\”I am value \” + integer);

}

return Observable.fromIterable(strings).delay(2, TimeUnit.SECONDS);

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Throwable {

Log.d(TAG, s);

}

});

break;

case 7:

//Rxjava(五)接口顺序调用

RetrofitClient.getInstance()

.create(ServiceApi.class)// 创建服务

.getSearchContent(“”, 5)//调用接口

.subscribeOn(Schedulers.io())// 指定被观察者的操作在io线程中完成

.doOnSubscribe(disposable -> viewModel.showProgressDialog(“正在请求中”))

.observeOn(AndroidSchedulers.mainThread())//指定观察者接收到数据,然后在Main线程中完成

.doOnNext(new ConsumerCallback() {

@Override

public void onSucceed(Top250Bean result) {

Log.e(TAG, result.toString());

}

@Override

public void onError(int code, String msg) {

}

})

.observeOn(Schedulers.io())

.flatMap(new Function<Result, Observable<Result>>() {

@Override

public Observable<Result> apply(Result result) throws Throwable {

Log.e(TAG, result.getMessage());

ArrayMap<String, Object> map = new ArrayMap<>();

map.put(“jsCode”, “033sHaaK03pRwa2Bic9K0EmcaK0sHaaq”);

return RetrofitClient.getInstance().create(ServiceApi.class).postCode2Session(map);

}

})

.retryWhen(new RetryWithDelay(1, 1))//遇到错误时重试,第一个参数为重试几次,第二个参数为重试的间隔

.compose(rxLifecycle.bindUntilEvent(Lifecycle.Event.ON_DESTROY))//生命周期

.observeOn(AndroidSchedulers.mainThread())

.doAfterTerminate(() -> viewModel.hintProgressDialog())

.subscribe(new ConsumerCallback() {

@Override

public void onSucceed(LoginBean result) {

Log.e(TAG, result.toString());

}

@Override

public void onError(int code, String msg) {

Log.e(TAG, msg);

}

}, new ThrowableCallback() {

@Override

public void onError(HttpThrowable httpThrowable) {

super.onError(httpThrowable);

}

});

break;

case 8:

//Rxjava(六)接口顺序调用(内部发生错误)

ArrayMap<String, Object> map1 = new ArrayMap<>();

map1.put(“jsCode”, “033sHaaK03pRwa2Bic9K0EmcaK0sHaaq”);

RetrofitClient.getInstance()

.create(ServiceApi.class)// 创建服务

.postCode2Session(map1)//调用接口

.subscribeOn(Schedulers.io())// 指定被观察者的操作在io线程中完成

.doOnSubscribe(disposable -> viewModel.showProgressDialog(“正在请求中”))

.observeOn(AndroidSchedulers.mainThread())//指定观察者接收到数据,然后在Main线程中完成

.doOnNext(new ConsumerCallback() {

@Override

public void onSucceed(LoginBean result) {

Log.e(TAG, result.toString());

}

@Override

public void onError(int code, String msg) {

Log.e(TAG, msg);

}

})

.observeOn(Schedulers.io())

.flatMap(new Function<Result, Observable<Result>>() {

@Override

public Observable<Result> apply(Result result) throws Throwable {

if (!result.isOk()) {

return null;

}

return RetrofitClient.getInstance().create(ServiceApi.class).getSearchContent(“”, 5);

}

})

.retryWhen(new RetryWithDelay(1, 1))//遇到错误时重试,第一个参数为重试的间隔,第二个参数为重试几次

.compose(rxLifecycle.bindUntilEvent(Lifecycle.Event.ON_DESTROY))//生命周期

.observeOn(AndroidSchedulers.mainThread())

.doAfterTerminate(() -> viewModel.hintProgressDialog())

.subscribe(new ConsumerCallback() {

@Override

public void onSucceed(Top250Bean result) {

Log.e(TAG, result.toString());

}

@Override

public void onError(int code, String msg) {

Log.e(TAG, msg);

}

}, new ThrowableCallback() {

@Override

public void onError(HttpThrowable httpThrowable) {

super.onError(httpThrowable);

}

});

break;

case 9:

//Rxjava(七)Zip来打包请求

Observable.zip(RetrofitClient.getInstance()

.create(ServiceApi.class)

.getSearchContent(“”, 1)

.subscribeOn(Schedulers.io()),

RetrofitClient.getInstance()

.create(ServiceApi.class)

.getSearchContent2(“”, 1)

.subscribeOn(Schedulers.io()),

new BiFunction<Result, Result, Top250Bean>() {

@Override

public Top250Bean apply(Result result, Result result2) throws Throwable {

Top250Bean data1 = JSON.parseObject(JSON.toJSONString(result.data), Top250Bean.class);

Top250Bean data2 = JSON.parseObject(JSON.toJSONString(result.data), Top250Bean.class);

data1.getRecords().addAll(data2.getRecords());

return data1;

}

}).observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Top250Bean result) throws Throwable {

Log.e(TAG, result.toString());

Log.e(TAG, result.getRecords().size()+“—”);

}

});

break;

}

}

});

}

/**

代替Handler
@param what

*/

private void msgManagement(int what) {

if (what == 10) {

Log.i(TAG, “我是代替Handler 有延时” + AppUtils.isMainThread());

} else {

Log.i(TAG, “我是代替Handler 没有延时” + AppUtils.isMainThread());

}

}

}

MVVM中管理Rxjava的生命周期
可直接使用开源框架:rxjava-RxLife

//使用rxlife对viewModel生命周期执行支持(rxlifecycle在google jetpack的videModel中不太好用,针对的是MVP)

api ‘com.ljx.rxlife3:rxlife-rxjava:3.0.0’

注意: 一定要在Activity/Fragment通过以下方式获取ViewModel对象,否则RxLife接收不到生命周期的回调

viewModel = new ViewModelProvider(this).get(RxjavaViewModel.class);

Observable.interval(2, TimeUnit.SECONDS)

.observeOn(AndroidSchedulers.mainThread())

.to(RxLife.to(this))

.subscribe(new Consumer() {

@Override

public void accept(Long aLong) throws Throwable {

Log.e(“huangxiaoguo”, “接口轮询”);

}

});

其他设计到的类

error
package com.yumakeji.rxjava.network.error;

public class HttpThrowable extends Exception {

public int errorType;

public String message;

public Throwable throwable;

/**

未知错误
*/

public static final int UNKNOWN = 1000;

/**

解析错误
*/

public static final int PARSE_ERROR = 1001;

/**

连接错误
*/

public static final int CONNECT_ERROR = 1002;

/**

DNS解析失败(无网络)
*/

public static final int NO_NET_ERROR = 1003;

/**

连接超时错误
*/

public static final int TIME_OUT_ERROR = 1004;

/**

网络(协议)错误
*/

public static final int HTTP_ERROR = 1005;

/**

证书错误
*/

public static final int SSL_ERROR = 1006;

public HttpThrowable(int errorType, String message, Throwable throwable) {

super(throwable);

this.errorType = errorType;

this.message = message;

this.throwable = throwable;

}

}

import com.alibaba.fastjson.JSONException;

import com.google.gson.JsonParseException;

import java.net.ConnectException;

import java.net.SocketTimeoutException;

import java.net.UnknownHostException;

import retrofit2.HttpException;

public class ThrowableHandler {

public static HttpThrowable handleThrowable(Throwable throwable) {

if (throwable instanceof HttpException) {

return new HttpThrowable(HttpThrowable.HTTP_ERROR, “网络(协议)异常”, throwable);

} else if (throwable instanceof JsonParseException || throwable instanceof JSONException || throwable instanceof ParseException) {

return new HttpThrowable(HttpThrowable.PARSE_ERROR, “数据解析异常”, throwable);

} else if (throwable instanceof UnknownHostException) {

return new HttpThrowable(HttpThrowable.NO_NET_ERROR, “网络连接失败,请稍后重试”, throwable);

} else if (throwable instanceof SocketTimeoutException) {

return new HttpThrowable(HttpThrowable.TIME_OUT_ERROR, “连接超时”, throwable);

} else if (throwable instanceof ConnectException) {

return new HttpThrowable(HttpThrowable.CONNECT_ERROR, “连接异常”, throwable);

} else if (throwable instanceof javax.net.ssl.SSLHandshakeException) {

return new HttpThrowable(HttpThrowable.SSL_ERROR, “证书验证失败”, throwable);

} else {

return new HttpThrowable(HttpThrowable.UNKNOWN, throwable.getMessage(), throwable);

}

}

}

误时重试
public class RetryWithDelay implements Function<Observable, ObservableSource<?>> {

private int retryDelaySeconds;//延迟重试的时间

private int retryCount;//记录当前重试次数

private int retryCountMax;//最大重试次数

public RetryWithDelay(int retryDelaySeconds, int retryCountMax) {

this.retryDelaySeconds = retryDelaySeconds;

this.retryCountMax = retryCountMax;

}

@Override

public ObservableSource<?> apply(Observable throwableObservable) throws Exception {

//方案一:使用全局变量来控制重试次数,重试3次后不再重试,通过代码显式回调onError结束请求

return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

@Override

public ObservableSource<?> apply(Throwable throwable) throws Exception {

//如果失败的原因是UnknownHostException(DNS解析失败,当前无网络),则没必要重试,直接回调error结束请求即可

if (throwable instanceof UnknownHostException) {

return Observable.error(throwable);

}

//没超过最大重试次数的话则进行重试

if (++retryCount <= retryCountMax) {

//延迟retryDelaySeconds后开始重试

return Observable.timer(retryDelaySeconds, TimeUnit.SECONDS);

}

return Observable.error(throwable);

}

});

}

}

拦截器
public class CacheIntercepter implements Interceptor {

@NotNull

@Override

public Response intercept(@NotNull Chain chain) throws IOException {

//对request的设置用来指定有网/无网下所走的方式

//对response的设置用来指定有网/无网下的缓存时长

Request request = chain.request();

if (!NetworkUtils.isNetWorkAvailable(AppGlobals.getApplication())) {

//无网络下强制使用缓存,无论缓存是否过期,此时该请求实际上不会被发送出去。

//有网络时则根据缓存时长来决定是否发出请求

request = request.newBuilder()

.cacheControl(CacheControl.FORCE_CACHE).build();

最后

小编这些年深知大多数初中级Android工程师,想要提升自己,往往是自己摸索成长,自己不成体系的自学效果低效漫长且无助。

因此我收集整理了一份《2024年Android移动开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人

都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

资料⬅专栏获取
ss RetryWithDelay implements Function<Observable, ObservableSource<?>> {

private int retryDelaySeconds;//延迟重试的时间

private int retryCount;//记录当前重试次数

private int retryCountMax;//最大重试次数

public RetryWithDelay(int retryDelaySeconds, int retryCountMax) {

this.retryDelaySeconds = retryDelaySeconds;

this.retryCountMax = retryCountMax;

}

@Override

public ObservableSource<?> apply(Observable throwableObservable) throws Exception {

//方案一:使用全局变量来控制重试次数,重试3次后不再重试,通过代码显式回调onError结束请求

return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

@Override

public ObservableSource<?> apply(Throwable throwable) throws Exception {

//如果失败的原因是UnknownHostException(DNS解析失败,当前无网络),则没必要重试,直接回调error结束请求即可

if (throwable instanceof UnknownHostException) {

return Observable.error(throwable);

}

//没超过最大重试次数的话则进行重试

if (++retryCount <= retryCountMax) {

//延迟retryDelaySeconds后开始重试

return Observable.timer(retryDelaySeconds, TimeUnit.SECONDS);

}

return Observable.error(throwable);

}

});

}

}

拦截器
public class CacheIntercepter implements Interceptor {

@NotNull

@Override

public Response intercept(@NotNull Chain chain) throws IOException {

//对request的设置用来指定有网/无网下所走的方式

//对response的设置用来指定有网/无网下的缓存时长

Request request = chain.request();

if (!NetworkUtils.isNetWorkAvailable(AppGlobals.getApplication())) {

//无网络下强制使用缓存,无论缓存是否过期,此时该请求实际上不会被发送出去。

//有网络时则根据缓存时长来决定是否发出请求

request = request.newBuilder()

.cacheControl(CacheControl.FORCE_CACHE).build();

最后

小编这些年深知大多数初中级Android工程师,想要提升自己,往往是自己摸索成长,自己不成体系的自学效果低效漫长且无助。

因此我收集整理了一份《2024年Android移动开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。

[外链图片转存中…(img-Yx3ivktl-1719092595908)]一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人

都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

资料⬅专栏获取

#以上关于RxJava+Retrofit使用的相关内容来源网络仅供参考,相关信息请以官方公告为准!

原创文章,作者:CSDN,如若转载,请注明出处:https://www.sudun.com/ask/91675.html

Like (0)
CSDN的头像CSDN
Previous 2024年6月23日
Next 2024年6月23日

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注