RxUtil2

Project Url: xuexiangjys/RxUtil2
Introduction: 一个实用的 RxJava2 工具类库
More: Author   ReportBugs   
Tags:

I Star

一个实用的 RxJava2 工具类库。

如果你习惯 RxJava1,请移步RxUtil

关于我

公众号 掘金 知乎 CSDN 简书 思否 哔哩哔哩 今日头条
我的 Android 开源之旅 点我 点我 点我 点我 点我 点我 点我

特征

  • RxBus 支持多事件定义,支持数据携带,支持全局和局部的事件订阅和注销。
  • 订阅池管理。
  • 支持非侵入式的订阅生命周期绑定。
  • 线程调度辅助工具。
  • RxBinding 使用工具类。
  • RxJava 常用方法工具类。

1、演示(请 star 支持)

1.1、RxBus

2、如何使用

目前支持主流开发工具 AndroidStudio 的使用,直接配置 build.gradle,增加依赖即可.

2.1、Android Studio 导入方法,添加 Gradle 依赖

1.先在项目根目录的 build.gradle 的 repositories 添加:

allprojects {
     repositories {
        ...
        maven { url "https://jitpack.io" }
    }
}

2.然后在应用项目(一般是 app)的 build.gradle 的 dependencies 添加:

dependencies {
   ...
   implementation 'io.reactivex.rxjava2:rxjava:2.2.20'
   implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
   //rxbinding 的 sdk
   implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'

   implementation 'com.github.xuexiangjys:RxUtil2:1.2.0'
}

3.1、RxBus 使用

3.1.1、事件注册订阅

1.使用RxBusUtils.get().onMainThread方法注册事件,并指定订阅发生在主线程。

RxBusUtils.get().onMainThread(EventKey.EVENT_HAVE_DATA, Event.class, new Consumer<Event>() {
    @Override
    public void accept(Event event) throws Exception {
        showContent(EventKey.EVENT_HAVE_DATA, event.toString());
    }
});

2.使用RxBusUtils.get().on方法注册事件,订阅所在线程为事件发生线程,也可指定订阅发生的线程。

RxBusUtils.get().on(EventKey.EVENT_BACK_NORMAL, String.class, new Consumer<String>() {
    @Override
    public void accept(String eventName) throws Exception {
        final String msg = "事件 Key:" + EventKey.EVENT_BACK_NORMAL + "\n   EventName:" + eventName + ", 当前线程状态: " + Event.getLooperStatus();
        showContent(msg);
    }
});

3.与 RxBus1 相比,使用 RxJava2 的 RxBus2 需要指定接收数据的类型,但如果使用默认的RxEvent进行事件注册, 就不需要指定类型了。

RxBusUtils.get().onMainThread(EventKey.EVENT_NO_DATA, new Consumer<RxEvent>() {
    @Override
    public void accept(RxEvent rxEvent) throws Exception {
        showContent(rxEvent.toString());
    }
});

3.1.2、事件发送

1.使用RxBusUtils.get().post(Object eventName)发送不带数据的事件。

RxBusUtils.get().post(EventKey.EVENT_NO_DATA);

RxBusUtils.get().postRxEvent(EventKey.EVENT_NO_DATA); //发送使用 RxEvent 注册的事件

2.使用RxBusUtils.get().post(Object eventName, Object content)发送携带数据的事件。

RxBusUtils.get().post(EventKey.EVENT_HAVE_DATA, new Event(EventKey.EVENT_HAVE_DATA, "这里携带的是数据"));
RxBusUtils.get().post(EventKey.EVENT_HAVE_DATA, true);

3.1.3、事件注销

1.使用RxBusUtils.get().unregisterAll(Object eventName)取消事件的所有订阅并注销事件。

RxBusUtils.get().unregisterAll(EventKey.EVENT_HAVE_DATA);

2.使用RxBusUtils.get().unregister(Object eventName, SubscribeInfo subscribeInfo)取消事件的某个指定订阅。 SubscribeInfo 是事件注册订阅后返回的订阅信息。如果在取消该订阅后,该事件如无其他订阅,便自动注销该事件。

RxBusUtils.get().unregister(EventKey.EVENT_CLEAR, mSubscribeInfo);

3.2、RxJavaUtils 使用

3.2.1、线程任务

1.RxIOTask:在 io 线程中操作的任务

RxJavaUtils.doInIOThread(new RxIOTask<String>("我是入参 123") {
    @Override
    public Void doInIOThread(String s) {
        Log.e(TAG, "[doInIOThread]  " + getLooperStatus() + ", 入参:" + s);
        return null;
    }
});

2.RxUITask:在 UI 线程中操作的任务

RxJavaUtils.doInUIThread(new RxUITask<String>("我是入参 456") {
    @Override
    public void doInUIThread(String s) {
        Log.e(TAG, "[doInUIThread]  " + getLooperStatus() + ", 入参:" + s);
    }
});

3.RxAsyncTask:在 IO 线程中执行耗时操作 执行完成后在 UI 线程中订阅的任务。

RxJavaUtils.executeAsyncTask(new RxAsyncTask<String, Integer>("我是入参 789") {
    @Override
    public Integer doInIOThread(String s) {
        Log.e(TAG, "[doInIOThread]  " + getLooperStatus() + ", 入参:" + s);
        return 12345;
    }

    @Override
    public void doInUIThread(Integer integer) {
        Log.e(TAG, "[doInUIThread]  " + getLooperStatus() + ", 入参:" + integer);
    }
});

4.RxIteratorTask:遍历集合或者数组的任务,在 IO 线程中执行耗时操作 执行完成后在 UI 线程中订阅的任务。

RxJavaUtils.executeRxIteratorTask(new RxIteratorTask<String, Integer>(new String[]{"123", "456", "789"}) {
    @Override
    public Integer doInIOThread(String s) {
        RxLog.e("[doInIOThread]" + getLooperStatus() + ", 入参:" + s);
        return Integer.parseInt(s);
    }

    @Override
    public void doInUIThread(Integer integer) {
        RxLog.e("[doInUIThread]  " + getLooperStatus() + ", 入参:" + integer);
    }
});

3.2.2、订阅者 Subscriber

1.SimpleSubscriber:简单的订阅者,已对错误进行捕获处理,并对生命周期进行日志记录。可设置 IExceptionHandler 接口自定义错误处理,设置 ILogger 接口自定义日志记录。

2.ProgressLoadingSubscriber:带进度条加载的订阅者,实现IProgressLoader接口可自定义加载方式。

Observable.just("加载完毕!")
        .delay(3, TimeUnit.SECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new ProgressLoadingSubscriber<String>(mProgressLoader) {
            @Override
            public void onNext(String s) {
                Toast.makeText(RxJavaActivity.this, s, Toast.LENGTH_SHORT).show();
            }
        });

3.3、DisposablePool 使用

DisposablePool:RxJava 的订阅池

1.增加订阅:add(@NonNull Object tagName, Disposable disposable) 或者 add(Disposable disposable, @NonNull Object tagName)

DisposablePool.get().add(RxJavaUtils.polling(5, new Consumer<Long>() {
    @Override
    public void accept(Long o) throws Exception {
        Toast.makeText(RxJavaActivity.this, "正在监听", Toast.LENGTH_SHORT).show();
    }
}), "polling");

2.取消订阅:remove(@NonNull Object tagName)remove(@NonNull Object tagName, Disposable disposable)removeAll()

DisposablePool.get().remove("polling");

3.4、RxBindingUtils 使用

1.setViewClicks:设置点击事件

RxBindingUtils.setViewClicks(mBtnClick, 5, TimeUnit.SECONDS, new Action1<Object>() {
    @Override
    public void call(Object o) {
        toast("触发点击");
    }
});

2.setItemClicks:设置条目点击事件

3.5、RxSchedulerUtils 使用

  1. 订阅发生在主线程 ( -> -> main)
.compose(RxSchedulerUtils.<T>_main_o())  //Observable 使用
.compose(RxSchedulerUtils.<T>_main_f())  //Flowable 使用
  1. 订阅发生在 io 线程 ( -> -> io)
.compose(RxSchedulerUtils.<T>_io_o())  //Observable 使用
.compose(RxSchedulerUtils.<T>_io_f())  //Flowable 使用
  1. 处理在 io 线程,订阅发生在主线程( -> io -> main)
.compose(RxSchedulerUtils.<T>_io_main_o()) //Observable 使用
.compose(RxSchedulerUtils.<T>_io_main_f()) //Flowable 使用
  1. 处理在 io 线程,订阅也发生在 io 线程( -> io -> io)
.compose(RxSchedulerUtils.<T>_io_io_o()) //Observable 使用
.compose(RxSchedulerUtils.<T>_io_io_f()) //Flowable 使用
  1. 自定义线程池

由于Schedulers.io()内部使用的是CachedWorkerPool,而他最终创建线程池的方法是newScheduledThreadPool,它是一个核心只有 1 个线程,有效时间为 60s,但是线程池的线程容纳数量是Integer.MAX_VALUE的线程池。

如果线程在执行的过程中发生了长时间的阻塞,导致线程一直在工作状态的话,线程池将无法回收线程,只能不断地创建线程,最终有可能造成线程创建的数量过多,导致程序 OOM。

使用RxSchedulerUtils.setIOExecutor可以替换工具类中使用的 io 线程:

RxSchedulerUtils.setIOExecutor(AppExecutors.get().poolIO()); //全局替换

.compose(new SchedulerTransformer<Integer>(AppExecutors.get().poolIO())) //局部替换

3.6、RxLifecycle 使用

1.使用RxLifecycle.injectRxLifecycle进行生命周期的绑定。

(1)在 Activity 的onCreate方法中进行注入和生命周期绑定

@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(getLayoutId());
    RxLifecycle.injectRxLifecycle(this);
}

(2)当然,如果你嫌麻烦,可以在 Application 的onCreate方法中进行注入和生命周期绑定。

RxLifecycle.injectRxLifecycle(this);

2.使用compose将订阅绑定至生命周期。

使用RxLifecycle.with可以获取生命周期管理者LifecycleManager,通过它我们可以将订阅绑定至生命周期。

  • bindToActivityLifecycle:绑定到特定的 Activity 生命周期进行订阅注销

  • bindToLifecycle:自动绑定 Activity 生命周期进行订阅注销

  • bindOnDestroy:绑定到 Activity 的 OnDestroy 进行订阅注销

RxJavaUtils.polling(5)
        .compose(RxLifecycle.with(this).<Long>bindToLifecycle())
        .subscribe(new SimpleSubscriber<Long>() {
            @Override
            public void onNext(Long aLong) {
                toast(" 正在监听 :" + aLong);
            }
        });

公众号

更多资讯内容,欢迎扫描关注我的个人微信公众号:【我的 Android 开源之旅】

gzh_weixin.jpg

Apps
About Me
GitHub: Trinea
Facebook: Dev Tools