NettyIM

Introduction: 基于 Netty+TCP+Protobuf+okhttp 设计模式的 SDK,让你拥有像 Okhttp 一样的使用体验,完全可定制化,内置断线重连,路由自动切换、消息重复、连接超时、读写超时、可定制化拦截器、消息回执(已读,撤回等)、可定制化 protobuf 等功能。
More: Author   ReportBugs   
Tags:

一款基于 Netty 高度定制化的通讯 SDK,它支持 TCP、UDP 和 WebSocket 协议的通信。

English Document

PRs Welcome License

一、功能介绍

  1. 支持 TCP 协议
  2. 支持 WebSocket 的 ws、wss 协议
  3. 支持 UDP 协议
  4. 内置一套默认私有协议实现
  5. 支持断线重连、连接重试
  6. 地址自动切换
  7. 支持消息重发、消息确认机制
  8. 支持心跳机制
  9. tcp 协议、udp 协议、websocket 都支持握手鉴权
  10. 提供 Netty 消息处理器注册
  11. 支持自定义内容编解码器
  12. 支持自定义的 TCP 装包拆包编解码器
  13. 连接状态、消息状态监听
  14. 支持单个消息设置是否需要确认包
  15. 支持各种参数配置

二、典型应用

  1. 应用 IM 通讯
  2. 嵌入式设备通信

    三、引用库

    1. 添加仓库地址
      allprojects {
        repositories {
            ...
            maven { url 'https://jitpack.io' }
        }
      }
      
    2. 添加依赖
      dependencies {
           implementation 'com.github.CWTakiku:NettyIM:latest'
      }
      

      四、使用方式

      //添加网络权限
      <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
      <uses-permission android:name="android.permission.INTERNET" />
      

      1. 客户端的配置

      说明:所有以 Default 开头的类都是可以被替代的,只要开发者实现相应的接口
    3. 多种协议的公共配置
    IMClient.Builder builder = new IMClient.Builder()
                .setConnectTimeout(10, TimeUnit.SECONDS) //设置连接超时
                .setResendCount(3)//设置失败重试数
                .setConnectRetryInterval(1000,TimeUnit.MILLISECONDS)//连接尝试间隔
                .setConnectionRetryEnabled(true)//是否连接重试
                .setSendTimeout(6,TimeUnit.SECONDS)//设置发送超时
                .setHeartIntervalBackground(30,TimeUnit.SECONDS)//后台心跳间隔
                .setReaderIdleTimeBackground(90,TimeUnit.SECONDS)//后台读空闲触发时间 ,搭配心跳机制使用(指在一定的时间内没收到服务器的任何消息,则认为网络异常或者服务器异常,如果 setReaderIdleReconnectEnabled(true)触发重连)
                .setEventListener(eventListener!=null?eventListener:new DefaultEventListener(userId)) //事件监听,可选
                .setMsgTriggerReconnectEnabled(true)  //如果连接已经断开,消息发送是否触发重连
                .setReaderIdleReconnectEnabled(true) //读空闲是否会触发重连
                .setProtocol(protocol) //哪种协议 IMProtocol.PRIVATE、IMProtocol.WEB_SOCKET、IMProtocol.UDP
                .setOpenLog(true);//是否开启日志
  • TCP 协议配置
        //以下默认提供两种数据格式,一种 protobuf,一种 string 格式
         builder.setCodec(codecType == 0?new DefaultTcpProtobufCodec():new DefaultTcpStringCodec())//默认的编解码,开发者可以使用自己的 protobuf 或者其他格式的编解码
                    .setShakeHands(codecType == 0? new DefaultProtobufMessageShakeHandsHandler(getDefaultTcpHands()):new DefaultStringMessageShakeHandsHandler(getDefaultStringHands())) //设置握手认证,可选
                    .setHeartBeatMsg(codecType == 0? getDefaultProtobufHeart(): getDefaultStringHeart()) //设置心跳,可选
                    .setAckConsumer(codecType == 0?new DefaultProtobufAckConsumer():new DefaultStringAckConsumer()) //设置消息确认机制,如果需要消息回执,必选
                    .registerMessageHandler(codecType == 0?new DefaultProtobufMessageReceiveHandler(onMessageArriveListener):new DefaultStringMessageReceiveHandler(onMessageArriveListener)) //消息接收处理器
                    .registerMessageHandler(codecType == 0?new DefaultReplyReceiveHandler(onReplyListener):new DefaultStringMessageReplyHandler(onReplyListener)) //消息状态接收处理器
                    .registerMessageHandler(codecType == 0?new DefaultProtobufHeartbeatRespHandler():new DefaultStringHeartbeatRespHandler()) //心跳接收处理器
                    .setTCPLengthFieldLength(2)//本库拆包采用消息头包含消息长度的协议,装包拆包的长度字段的占用字节数,默认值为 2
                    .addAddress(new Address(ip,9081,Address.Type.TCP))
                    .setFrameCodec(new DefaultLengthFieldBasedFrameCodec(2,65535));//设置 TCP 装包拆包编解码器,也可以使用自定义的装包拆包编解码器
  • WebSocket 协议配置
     builder.setHeartBeatMsg(getDefaultWsHeart())
                    .setAckConsumer(new DefaultWSAckConsumer())
                    .registerMessageHandler(new DefaultWSMessageReceiveHandler(onMessageArriveListener))
                    .registerMessageHandler(new DefaultWSMessageReplyHandler(onReplyListener))
                    .registerMessageHandler(new DefaultWsHeartbeatRespHandler())
                    .addAddress(new Address(ip,8804,Address.Type.WS))
                    .setMaxFrameLength(65535*100)
                  //  .addAddress(new Address(ip,8804,Address.Type.WSS))//支持 WSS 协议,请在 scheme 带上 wss 标识
                    .addWsHeader("user",userId); //webSocket 特有的,可以用来鉴权使用
  • UDP 协议配置
   builder.setCodec(new DefaultUdpStringCodec(new InetSocketAddress(ip,8804), CharsetUtil.UTF_8)) //String 的编解码,开发者可以设定为自己的格式
                    .setShakeHands(new DefaultStringMessageShakeHandsHandler(getDefaultStringHands())) //设置握手认证,可选
                    .setHeartBeatMsg(getDefaultStringHeart()) //设置心跳,可选
                    .setAckConsumer(new DefaultStringAckConsumer()) //设置确认机制
                    .registerMessageHandler(new DefaultStringMessageReceiveHandler(onMessageArriveListener)) //消息接收处理器
                    .registerMessageHandler(new DefaultStringMessageReplyHandler(onReplyListener)) //消息状态接收处理器
                    .registerMessageHandler(new DefaultStringHeartbeatRespHandler()) //心跳接收处理器
                    .addAddress(new Address(ip, 8804, Address.Type.UDP));

2 构造 IMClient

  IMClient imClient = builder.build();

3. 建立连接

imClient.startConnect();//建立连接

4. 断开连接

imClient.disConnect();//主动断开连接,不会自动重连

5. 发送消息

  Request request=new Request.Builder(). //创建一个消息发送 request           
              setNeedACK(true).//需要 ACK,true 就会触发消息确认机制
              setSendRetry(true). //此消息能否发送重试
              setBody(getMsgPack(appMessage.buildProto())). //body 为解码所支持的对象
              build();
 imClient.newCall(request).enqueue(callback);//发送消息,消息在子线程回调
Disposable disposable=   imClient.newCall(request).enqueue(callback).subscribe(consumer); //发送消息,会订阅特定的消息处理,例如:我发送了一个特别的消息,然后想订阅该特定消息的后续响应

6、接收消息

所有的消息接收都在上述配置中注册的 registerMessageHandler()里,开发者可以自行实现 MessageHandler 接口

public  interface  MessageHandler<message extends Object>  {
    boolean isFocusMsg(Object msg); //是否是该处理器关注的消息类型
    void handleMsg(message message);//接收处理消息
}

7.状态监听

状态监听在上述配置中的 setEventListener()里,开发者可以自行继承 EventListener 类监听回调


    /**
     * 连接开始
     * @param inetSocketAddress
     */
    public  void connectStart( InetSocketAddress inetSocketAddress){

    }

    /**
     * 连接成功
     */
    public  void connectSuccess(){

    }

    /**
     * 连接出现异常
     * @param throwable
     */
    public void connectionException(Throwable throwable){

    }

    /**
     * 连接失败
     * @param inetSocketAddress
     * @param ioe
     */
    public void connectFailed( InetSocketAddress inetSocketAddress, IOException ioe) {

    }

    /**
     * 连接断开
     */
    public void connectionBroken(){

    }


    /**
     * 连接释放
     * @param connection
     */
    public void connectionReleased(Connection connection) {
    }

    /**
     * 发送开始
     * @param call
     */
    public void sendMsgStart(Call call) {
    }

    /**
     * 发送结束
     * @param call
     */
    public void sendMsgEnd(Call call) {
    }

    /**
     * 发送失败
     * @param call
     */
    public void sendMsgFailed(Call call){}

8. 其他一些 API

 imClient.setBackground(background);//设置前后台切换,将会自动切换不同的心跳间隔
 imClient.isConnected();//判断是否连接中
 ...

9. 说明

在 TCP 协议中,FrameCodec 为 TCP 的协议的装包拆包。FrameCodec 为 TCP 特有的, Codec 为内容的编解码。在 TCP、UDP 协议里都支持。

   pipeline.addLast("frameEncoder", frameCodec.Encoder()); // out2
   pipeline.addLast("frameDecoder", frameCodec.Decoder()); //in1

   pipeline.addLast(codec.EnCoder().getClass().getSimpleName(),codec.EnCoder()); //out 1
   pipeline.addLast(codec.DeCoder().getClass().getSimpleName(),codec.DeCoder()); //in2
 ...

1、数据的接收处理是按照上述 in1-->in2 顺序,先对数据进行拆包,然后再将真实数据解码。 2、数据的发送处理是按照上述 out1-->out2 顺序,先对真实数据进行编码,再进行装包。

五、项目结构设计图

image

六、Demo 使用

步骤 1、 修改服务端地址,运行项目

在 IPConfig 类里将 SERVER_ADDRESS 改为自己电脑的 ip 地址,运行项目,将 APP 跑在手机或者模拟器上

步骤 2、开启服务器

APP 模块 test 下含有 tcp 协议的 protobuf 和 string 两种数据格式的服务端 demo、webscoket 协议、udp 协议,服务端的 demo,运行相对应的服务器 demo

步骤 3、使用

在 APP 里点击相应的协议进入 chat 界面

七、项目博客地址

掘金

如果使用过程遇到什么问题或者疑问欢迎提交 issue,也欢迎 star! 联系方式QQ916379012

Android IM 反馈交流群 QQ 群:1051018406

image

Apps
About Me
GitHub: Trinea
Facebook: Dev Tools