RxWebSocket

Introduction: An auto reconnection-webSocket build with okhttp and rxJava
More: Author   ReportBugs   OfficialWebsite   
Tags:
Auto-Proxy-Socket-Web-Net-

此项目不再维护,请慎重使用,可查看源码学习!

Download API License

RxWebSocket 是一个基于 okhttp 和 RxJava(RxJava1 和 RxJava2 都有)封装的 WebSocket 客户端,此库的核心特点是 除了手动关闭 WebSocket(就是 RxJava 取消订阅),WebSocket 在异常关闭的时候(onFailure,发生异常,如 WebSocketException 等等),会自动重连,永不断连.其次,对 WebSocket 做的缓存处理,同一个 URL,共享一个 WebSocket.

原理解析: 戳我戳我戳我

RxJava2 版本点我(RxJava2 和 RxJava1 调用方式一样)

查看 changeLog

效果图

断网重连测试

断网重连测试

how to use

添加依赖:

在项目 module 下 gradle 加入:

    implementation 'com.dhh:websocket:2.1.2'

init


        //init config 在使用 RxWebSocket 之前设置即可,推荐在 application 里初始化
        Config config = new Config.Builder()
                .setShowLog(true)           //show  log
                .setClient(yourClient)   //if you want to set your okhttpClient
                .setShowLog(true, "your logTag")
                .setReconnectInterval(2, TimeUnit.SECONDS)  //set reconnect interval
                .setSSLSocketFactory(yourSSlSocketFactory, yourX509TrustManager) // wss support
                .build();
        RxWebSocket.setConfig(config);

WSS support,其实就是设置 okhttp 的 SSL,请参照 okhttp 的设置,请参照上面 Config 配置

心跳检测:需要设置自己的 okhttpClient,在上面的 Config 里设置心跳间隔:


        Config config = new Config.Builder()
                .setClient(new OkHttpClient.Builder()
                        .pingInterval(3, TimeUnit.SECONDS) // 设置心跳间隔,这个是 3 秒检测一次
                        .build())  //if you want to set your okhttpClient
                .build();

open WebSocket:和 RxJava 调用一样,回调请使用项目里提供的 WebSocketSubscriber,WebSocketSubscriber 是一个没有抽象方法的抽象类,根据业务需求,重写你想使用的回调


        RxWebSocket.get("url")
                .subscribe(new WebSocketSubscriber() {
                    @Override
                    protected void onMessage(@NonNull String text) {

                    }
                });

        RxWebSocket.get("your url")
                //RxLifecycle : https://github.com/dhhAndroid/RxLifecycle
                .compose(RxLifecycle.with(this).<WebSocketInfo>bindToLifecycle())
                .subscribe(new WebSocketSubscriber() {
                    @Override
                    public void onOpen(@NonNull WebSocket webSocket) {
                        Log.d("MainActivity", "onOpen1:");
                    }

                    @Override
                    public void onMessage(@NonNull String text) {
                        Log.d("MainActivity", "返回数据:" + text);
                    }

                    @Override
                    public void onMessage(@NonNull ByteString byteString) {

                    }

                    @Override
                    protected void onReconnect() {
                        Log.d("MainActivity", "重连:");
                    }

                    @Override
                    protected void onClose() {
                        Log.d("MainActivity", "onClose:");
                    }
                });

如果你想将 String 类型的 text 解析成具体的实体类,请使用 WebSocketSubscriber2

        /**
         *
         *如果你想将 String 类型的 text 解析成具体的实体类,比如{@link List<String>},
         * 请使用 {@link  WebSocketSubscriber2},仅需要将泛型传入即可
         */
        RxWebSocket.get("your url")
                .compose(RxLifecycle.with(this).<WebSocketInfo>bindToLifecycle())
                .subscribe(new WebSocketSubscriber2<List<String>>() {
                    @Override
                    protected void onMessage(List<String> strings) {

                    }
                });

发送消息

        //用 WebSocket 的引用直接发
        mWebSocket.send("hello word");
        //url 对应的 WebSocket 必须打开,否则报错
        RxWebSocket.send(url, "hello");
        RxWebSocket.send(url, ByteString.EMPTY);
        //异步发送,若 WebSocket 已经打开,直接发送,若没有打开,打开一个 WebSocket 发送完数据,直接关闭.
        RxWebSocket.asyncSend(url, "hello");
        RxWebSocket.asyncSend(url, ByteString.EMPTY);

注销

RxJava 的注销方式,就可以取消订阅.


    Subscription subscription = RxWebSocket.get("ws://sdfs").subscribe();
    //注销
    if(subscription!=null&&!subscription.isUnsubscribed()) {
        subscription.unsubscribe();
    }

更优雅的注销处理方式,请看我的另一个项目: RxLife,优雅地处理 RxJava 注销问题,和 Activity 生命周期绑定.

License

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Apps
About Me
GitHub: Trinea
Facebook: Dev Tools