NettyIM
Introduction: 基于 Netty+TCP+Protobuf+okhttp 设计模式的 SDK,让你拥有像 Okhttp 一样的使用体验,完全可定制化,内置断线重连,路由自动切换、消息重复、连接超时、读写超时、可定制化拦截器、消息回执(已读,撤回等)、可定制化 protobuf 等功能。
Tags:
一款基于 Netty 高度定制化的通讯 SDK,它支持 TCP、UDP 和 WebSocket 协议的通信。
一、功能介绍
- 支持 TCP 协议
- 支持 WebSocket 的 ws、wss 协议
- 支持 UDP 协议
- 内置一套默认私有协议实现
- 支持断线重连、连接重试
- 地址自动切换
- 支持消息重发、消息确认机制
- 支持心跳机制
- tcp 协议、udp 协议、websocket 都支持握手鉴权
- 提供 Netty 消息处理器注册
- 支持自定义内容编解码器
- 支持自定义的 TCP 装包拆包编解码器
- 连接状态、消息状态监听
- 支持单个消息设置是否需要确认包
- 支持各种参数配置
二、典型应用
- 应用 IM 通讯
- 嵌入式设备通信
三、引用库
- 添加仓库地址
allprojects { repositories { ... maven { url 'https://jitpack.io' } } } - 添加依赖
dependencies { implementation 'com.github.CWTakiku:NettyIM:latest' }四、使用方式
//添加网络权限 <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> <uses-permission android:name="android.permission.INTERNET" />1. 客户端的配置
说明:所有以 Default 开头的类都是可以被替代的,只要开发者实现相应的接口 - 多种协议的公共配置
- 添加仓库地址
IMClient.Builder builder = new IMClient.Builder()
.setConnectTimeout(10, TimeUnit.SECONDS) //设置连接超时
.setResendCount(3)//设置失败重试数
.setConnectRetryInterval(1000,TimeUnit.MILLISECONDS)//连接尝试间隔
.setConnectionRetryEnabled(true)//是否连接重试
.setSendTimeout(6,TimeUnit.SECONDS)//设置发送超时
.setHeartIntervalBackground(30,TimeUnit.SECONDS)//后台心跳间隔
.setReaderIdleTimeBackground(90,TimeUnit.SECONDS)//后台读空闲触发时间 ,搭配心跳机制使用(指在一定的时间内没收到服务器的任何消息,则认为网络异常或者服务器异常,如果 setReaderIdleReconnectEnabled(true)触发重连)
.setEventListener(eventListener!=null?eventListener:new DefaultEventListener(userId)) //事件监听,可选
.setMsgTriggerReconnectEnabled(true) //如果连接已经断开,消息发送是否触发重连
.setReaderIdleReconnectEnabled(true) //读空闲是否会触发重连
.setProtocol(protocol) //哪种协议 IMProtocol.PRIVATE、IMProtocol.WEB_SOCKET、IMProtocol.UDP
.setOpenLog(true);//是否开启日志
- TCP 协议配置
//以下默认提供两种数据格式,一种 protobuf,一种 string 格式
builder.setCodec(codecType == 0?new DefaultTcpProtobufCodec():new DefaultTcpStringCodec())//默认的编解码,开发者可以使用自己的 protobuf 或者其他格式的编解码
.setShakeHands(codecType == 0? new DefaultProtobufMessageShakeHandsHandler(getDefaultTcpHands()):new DefaultStringMessageShakeHandsHandler(getDefaultStringHands())) //设置握手认证,可选
.setHeartBeatMsg(codecType == 0? getDefaultProtobufHeart(): getDefaultStringHeart()) //设置心跳,可选
.setAckConsumer(codecType == 0?new DefaultProtobufAckConsumer():new DefaultStringAckConsumer()) //设置消息确认机制,如果需要消息回执,必选
.registerMessageHandler(codecType == 0?new DefaultProtobufMessageReceiveHandler(onMessageArriveListener):new DefaultStringMessageReceiveHandler(onMessageArriveListener)) //消息接收处理器
.registerMessageHandler(codecType == 0?new DefaultReplyReceiveHandler(onReplyListener):new DefaultStringMessageReplyHandler(onReplyListener)) //消息状态接收处理器
.registerMessageHandler(codecType == 0?new DefaultProtobufHeartbeatRespHandler():new DefaultStringHeartbeatRespHandler()) //心跳接收处理器
.setTCPLengthFieldLength(2)//本库拆包采用消息头包含消息长度的协议,装包拆包的长度字段的占用字节数,默认值为 2
.addAddress(new Address(ip,9081,Address.Type.TCP))
.setFrameCodec(new DefaultLengthFieldBasedFrameCodec(2,65535));//设置 TCP 装包拆包编解码器,也可以使用自定义的装包拆包编解码器
- WebSocket 协议配置
builder.setHeartBeatMsg(getDefaultWsHeart())
.setAckConsumer(new DefaultWSAckConsumer())
.registerMessageHandler(new DefaultWSMessageReceiveHandler(onMessageArriveListener))
.registerMessageHandler(new DefaultWSMessageReplyHandler(onReplyListener))
.registerMessageHandler(new DefaultWsHeartbeatRespHandler())
.addAddress(new Address(ip,8804,Address.Type.WS))
.setMaxFrameLength(65535*100)
// .addAddress(new Address(ip,8804,Address.Type.WSS))//支持 WSS 协议,请在 scheme 带上 wss 标识
.addWsHeader("user",userId); //webSocket 特有的,可以用来鉴权使用
- UDP 协议配置
builder.setCodec(new DefaultUdpStringCodec(new InetSocketAddress(ip,8804), CharsetUtil.UTF_8)) //String 的编解码,开发者可以设定为自己的格式
.setShakeHands(new DefaultStringMessageShakeHandsHandler(getDefaultStringHands())) //设置握手认证,可选
.setHeartBeatMsg(getDefaultStringHeart()) //设置心跳,可选
.setAckConsumer(new DefaultStringAckConsumer()) //设置确认机制
.registerMessageHandler(new DefaultStringMessageReceiveHandler(onMessageArriveListener)) //消息接收处理器
.registerMessageHandler(new DefaultStringMessageReplyHandler(onReplyListener)) //消息状态接收处理器
.registerMessageHandler(new DefaultStringHeartbeatRespHandler()) //心跳接收处理器
.addAddress(new Address(ip, 8804, Address.Type.UDP));
2 构造 IMClient
IMClient imClient = builder.build();
3. 建立连接
imClient.startConnect();//建立连接
4. 断开连接
imClient.disConnect();//主动断开连接,不会自动重连
5. 发送消息
Request request=new Request.Builder(). //创建一个消息发送 request
setNeedACK(true).//需要 ACK,true 就会触发消息确认机制
setSendRetry(true). //此消息能否发送重试
setBody(getMsgPack(appMessage.buildProto())). //body 为解码所支持的对象
build();
imClient.newCall(request).enqueue(callback);//发送消息,消息在子线程回调
Disposable disposable= imClient.newCall(request).enqueue(callback).subscribe(consumer); //发送消息,会订阅特定的消息处理,例如:我发送了一个特别的消息,然后想订阅该特定消息的后续响应
6、接收消息
所有的消息接收都在上述配置中注册的 registerMessageHandler()里,开发者可以自行实现 MessageHandler 接口
public interface MessageHandler<message extends Object> {
boolean isFocusMsg(Object msg); //是否是该处理器关注的消息类型
void handleMsg(message message);//接收处理消息
}
7.状态监听
状态监听在上述配置中的 setEventListener()里,开发者可以自行继承 EventListener 类监听回调
/**
* 连接开始
* @param inetSocketAddress
*/
public void connectStart( InetSocketAddress inetSocketAddress){
}
/**
* 连接成功
*/
public void connectSuccess(){
}
/**
* 连接出现异常
* @param throwable
*/
public void connectionException(Throwable throwable){
}
/**
* 连接失败
* @param inetSocketAddress
* @param ioe
*/
public void connectFailed( InetSocketAddress inetSocketAddress, IOException ioe) {
}
/**
* 连接断开
*/
public void connectionBroken(){
}
/**
* 连接释放
* @param connection
*/
public void connectionReleased(Connection connection) {
}
/**
* 发送开始
* @param call
*/
public void sendMsgStart(Call call) {
}
/**
* 发送结束
* @param call
*/
public void sendMsgEnd(Call call) {
}
/**
* 发送失败
* @param call
*/
public void sendMsgFailed(Call call){}
8. 其他一些 API
imClient.setBackground(background);//设置前后台切换,将会自动切换不同的心跳间隔
imClient.isConnected();//判断是否连接中
...
9. 说明
在 TCP 协议中,FrameCodec 为 TCP 的协议的装包拆包。FrameCodec 为 TCP 特有的, Codec 为内容的编解码。在 TCP、UDP 协议里都支持。
pipeline.addLast("frameEncoder", frameCodec.Encoder()); // out2
pipeline.addLast("frameDecoder", frameCodec.Decoder()); //in1
pipeline.addLast(codec.EnCoder().getClass().getSimpleName(),codec.EnCoder()); //out 1
pipeline.addLast(codec.DeCoder().getClass().getSimpleName(),codec.DeCoder()); //in2
...
1、数据的接收处理是按照上述 in1-->in2 顺序,先对数据进行拆包,然后再将真实数据解码。 2、数据的发送处理是按照上述 out1-->out2 顺序,先对真实数据进行编码,再进行装包。
五、项目结构设计图

六、Demo 使用
步骤 1、 修改服务端地址,运行项目
在 IPConfig 类里将 SERVER_ADDRESS 改为自己电脑的 ip 地址,运行项目,将 APP 跑在手机或者模拟器上
步骤 2、开启服务器
APP 模块 test 下含有 tcp 协议的 protobuf 和 string 两种数据格式的服务端 demo、webscoket 协议、udp 协议,服务端的 demo,运行相对应的服务器 demo
步骤 3、使用
在 APP 里点击相应的协议进入 chat 界面
七、项目博客地址
如果使用过程遇到什么问题或者疑问欢迎提交 issue,也欢迎 star! 联系方式QQ916379012
Android IM 反馈交流群 QQ 群:1051018406

