SMSGate

群名称:cmppGate 短信
群 号:770738500
How To Use
<dependency>
<groupId>com.chinamobile.cmos</groupId>
<artifactId>sms-core</artifactId>
<version>2.1.13.6</version>
</dependency>
商用短信网关平台推荐
平台名称:SMSWG 短信网关系统官网:www.smswg.com 系统有 3 个 PC 端:管理员端+用户端+代理商端,支持代理分销短信,日发送支持上亿,实时精准计费,不阻塞高并发,单机每秒可支持 1.5 万条短信下发,更多系统功能请进官网查看演示系统。
常见问题
纯客户端发送短信可以使用sms-client, 一个纯发送短信的客户端 jar 包,Api 简单。【sgip 协议用 sms-client 无法接收上行和状态报告】
也可以参考htt2cmpp 实现将一个短信长连接协议封装成 http 接口。
或者参考smsServer用 SpringBoot 实现一个能支持 http,cmpp,sgip,smgp,smpp 等多种协议的网关服务。
没看懂如何发送短信?短信协议是 tcp 长连接,类似数据库连接,如 jdbc-connection. 所以发送短信前必须要先有一个短信连接。因此你需要在程序启动时建立短信连接。参考 demo 里的 client,调用 manager.openEntity()方法,,调用 manager.startConnectionCheckTask()开启断线重连。 然后就像调用其它库一样,在需要发送短信的地方,new 一个对应的 Message,调用
List< Future > f = ChannelUtil.syncWriteLongMsgToEntity([clientEntityId],message)方法发送,
要判断 f 是否为 Null,为 Null 表示发送失败,一条短信可能拆分成多条,因此返回 List。关闭默认超速错误自动重发功能如 CMPP 协议接收到错误码为 8 的响应(超速错误),系统默认会再次重发直到成功,最大重试次数默认是 30 次。如果要关闭默认重试功能,须设置
entity.overSpeedSendCountLimit为0SGIP、SMPP 的超速错误码是 88,于 CMPP 协议相同,也会超速重发。
SMGP 协议因为未定义超速错误码,不会超速重试。
如何发送长短信?smsgate 默认已经处理好长短信了,就像发送普通短信一样。长短信发送的时候,框架内部自动拆分成短短信分片发送(一般按 67 个汉字拆分)。
如何发送闪信?
//创建一个闪信对象,跟发送普通短信一样
CmppSubmitRequestMessage msg = CmppSubmitRequestMessage.create(phone, "10690021", "");
msg.setMsgContent(new SmsTextMessage("你好,我是闪信!",SmsAlphabet.UCS2,SmsMsgClass.CLASS_0)); //class0 是闪信
如何接收短信?如果你了解 netty 的 handler,那么请看 AbstractBusinessHandler 的源码即可,这是一个 netty 的 handler.
如果你不了解 netty, 你只需知道:
当连接刚刚建立时[指登陆验证成功],smsgate 会自动调用 handler 里的 userEventTriggered 方法,因此在此方法中可以开启一个 Consumer 去消费 MQ 里的消息发送到网络连接上;
当对方发送任意一个消息给你时[包括 request,response 消息],smsgate 会自动调用 handler 里的 channelRead 方法,因此可在此方法内接收消息并作处理业务,但避免作非常耗时的操作,会影响 netty 的处理效率,甚至完全耗完 netty 的 io 线程造成消息不响应。在 channelRead 方法里能获取接收到的消息对象,同时通过本 Handler 的
getEndpointEntity()方法,或者ctx.channel().attr(GlobalConstance.entityPointKey).get();能够获取该消息的发送方账号实体 Entity 对象。当连接关闭时,smsgate 会自动调用 handler 里的 channelInactive 方法,可在此方法中实现连接关闭后的一些清理操作。
如何不改源码,实现修改框架默认的 handler比如 SGIP 协议要设置 NodeId;你需要这样做:
1、写一个扩展的 SgipClientEndpointEntity 子类,如:MySgipClientEndpointEntity,重写 buildConnector()方法
2、再写一个 SgipClientEndpointConnector 子类,如:MySgipClientEndpointConnector,重写 doinitPipeLine()方法
3、最后再写一个 SgipSessionLoginManager 子类,如:MySgipSessionLoginManager,重写 doLogin 方法,实现登陆方法的重写,在方法里创建自己定义的实现。
4、最后在 openEntity 通道里,new MySgipClientEndpointEntity 就可以了
使用 http 或者 socks 代理SmsGate 支持 HTTP、SOCKS 代理以方便在使用代理访问服务器的情况。代理设置方式:
// 无 username 和 password 可写为 http://ipaddress:port
client.setProxy("http://username:password@ipaddress:port"); //http 代理
client.setProxy("https://username:password@ipaddress:port"); //https 代理
client.setProxy("socks://username@ipaddress:port"); //socks4 代理
client.setProxy("socks4://username@ipaddress:port"); //socks4 代理
client.setProxy("socks5://username:password@ipaddress:port"); //socks5 代理
抓包,打印二进制的收发日志框架使用
entity.[EntityId]的 loggerName 打印该 EntityID 上所有的收发记录。Debug 级别打印短信消息对象的
toString内容。Trace 级别打印短信消息对象的
二进制内容。如:针对
cmppclientEntityId通道的 logback.xml , log4j2.xml 配置
<logger name="entity.cmppclientEntityId" level ="debug" additivity="false">
</logger>
如: log4j.properties 配置
log4j.logger.entity.cmppclientEntityId=debug
在 Java9 以上版上运行
在 java9 以上运行,启动 java 进程要增加以下参数:
--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.math=ALL-UNNAMED
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.util.concurrent=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED
--add-opens java.base/java.text=ALL-UNNAMED
--add-exports java.base/sun.security.x509=ALL-UNNAMED
新手指引
- 先看 doc 目录下的
CMPP 接口协议 V3.0.0.doc文档 (看不懂的到群里咨询) - 再看 readme 里的说明 (看不懂的到群里咨询)
- 导入工程后,运行测试 demo: TestCMPPEndPoint,学会配置账号密码等参数
- 由于代码是基于 netty 网络框架,您有必要先有一些 Netty 的基础
开发短信网关的常见问题
长短信拆分合并原理
短信支持长短信功能是在手机终端实现的,即:手机陆续收到多个短信片断后,会根据短信 PDU 里的前 6 个 byte 信息进行合并。最终在手机上显示为一条短信,但实际却是接收了多条短信(因此收多条的费用)。
因此,长短信在发送时要进行拆分。在开发短信网关时,由于要对短信内容进行校验,拼签名等处理,因此在接收到短信分片后,要进行合并成一条处理,之后发送时再拆分为多条(当然有可能始终只收到一个片断,造成永远无法合并成一条完整的短信)。
短信内容(PDU)字段的前 6 字节是长短信的协议头(其余内容才是短信文本),前 3 个字节固定是 0x050003,后 3 个字节用来做长短信合并的依据(类似 IP 包的分片)
1 字节 包 ID[最大 255],
1 字节 包总分片数
1 字节 分片序号
如:45,03,01 表示 ID 为 45 的第 1 个分片,总共 3 个分片。45,03,02 表示 ID 为 45 的第 2 个分片,总共 3 个分片。 当手机收到完整的 3 个分片后,手机才进行合并显示。
使用 redis 实现集群长短信合并
框架内部自带一个 JVM 内存缓存(Guava Cache)的 LongMessageFrameCache 类,用于保存未完成合并的短信片断。 但集群(多进程,多节点)部署服务时,有可能从不同的节点上(主机上)接收到同一个长短信的不同片断,此时框架默认的 JVM 内存缓存无法完成长短信合并。 为解决此问题,框架使用 SPI 机制加载 LongMessageFrameCache 的实现类,业务侧以 SPI 方式提供 Redis 版的 LongMessageFrameProvider 实现类。 为了让业务自制的 LongMessageFrameProvider 实现类生效, 要确保业务自制的 LongMessageFrameProvider 实现类 order() 大于 0 。框架优先使用 order 最大的实现。
具体为:
1) 打开该通道账号的配置 EndpointEntity.isRecvLongMsgOnMultiLink属性,用于标识该通道的长短信要使用集群部署的长短信合并能力(由于只有少量系统有此问题,不需要所有账号打开该特性,会影响合并性能)。
2) 提供一个 Redis 的合并实现类,可以参考测试包中的代码:RedisLongMessageFrameCache , RedisLongMessageFrameProvider
网关服务前边有 nginx,haproxy 代理的时候如何获取真实的客户端 IP?
首先感谢群友 狠人 提供了使用proxy protocol 协议支持从代理服务器获取真实客户 IP 的思路。
针对 ServerEndpoint ,通过设置setProxyProtocol(true) 开启 proxy protocol 协议开关。框架从 channel 上第一个消息(HAProxyMessage)获取真实的客户端 IP 后,设置到 channel 的 Attribute 属性上。业务代码可以从 channel.attr(GlobalConstance.proxyProtocolKey) 获取该信息,从而拿到真实的客户 IP.
该特性使得短信网关的集群部署架构更为灵活,比如:服务入口使用 nginx,haproxy 等代理,真实网关服务以集群的方式部署在后端,横向扩展。
如何关联状态报告【即短信回执,以下都称为状态报告】和 submit 消息?
运宽商网关响应submitRequest消息时,你会收到submitResponse消息。在response里会有msgId。通过这个msgId跟之后收到的状态报告(reportMessage)里的msgId关联。
如何记录每个消息的发送日志,并向我的客户发送状态报告【即短信回执,以下都称为状态报告】?
当接收到来源客户的submitRequest消息后,要回复response,注意此时要记录回复response时所使用的msgId,即你回复给来源客户的msgId。
将消息转发给通道后,当接收到submitResponse 后,通过response.getRequest()获取对应的request 。注意此时有两个msgID,一个是通道给你的msgID,一个是你给来源客户的。在数据库里记录相关信息(至少包括消息来源客户,消息出去的通道,两个msgId,消息详情)。之后在接收到状态报告后,通过通道给你的msgId更新消息状态报告里的msgId,并根据来源客户将状态报告回传给客户,注意回传reportMessage里的msgId要使用你给客户回复response时用的msgId. 详见流程图
关于长短信类 LongSMSMessage 中 UniqueLongMsgId 的使用
由于 cmpp,sgip 等短信协议的异步化特点,框架默认实现长短信的拆分与合并,接收 Sp 发送的 MT 消息并匹配上游状态报告【即短信回执,以下都称为状态报告】时,由于缺少短信唯一标识,从 Sp 接收的短信和最终发送给运营商的短信之间没有关联标识,
造成状态报告回来时难以匹配,实现起来很复杂。为了解决 cmpp 协议的接收的短信与发送出去的短信关联问题,给长短信增加了这个UniqueLongMsgId。 对 http 协议接收的短信同样可以使用UniqueLongMsgId: 通过 http 接收的长短信对象在发送到 cmpp 协议的短信通道连接以前是没有UniqueLongMsgId的,发送以后框架会设置 UniqueLongMsgId 的值 。因此可以在发送完成收到 response 后通过response.getRequest()获取 Request 对象从而拿到UniqueLongMsgId。
UniqueLongMsgId 中 id 是唯一标识,即使在极短时间内收到相同手机号端口号的短信也能保持唯一性。该 ID 当短信从网络上接收到还未合并时进行设置,直到转发给运营商通道都不会变化,并且相同长短信的不同分片的 ID 也相同。
UniqueLongMsgId 除了 id 以外,还包含其它信息如:从消息从哪个通道账号 Id 提交的,从哪个 IP 端口提交的、长短信的分片 ID、总分片数、分片序号以及消息序列号、时间戳。
在 Test 包里有一个模拟的匹配状态报告的测试用例用是用 UniqueLongMsgId 实现的,并且经过相同手机号、端口号在极限并发压力下的匹配测试,单 JVM 多线程安全。逻辑供参考: com.zx.sms.transgate.TestReportForward
集群环境如何平均分配上游连接数?
网关平台通常会有多个服务节点,而对接的通道给的连接数通常不是服务节点数的整倍数,极端情况连接数小于服务节点数,这样如何平均分配连接数就成了一个问题。
这里介绍一个算法:通过在 redis 里记录 {全局的服务节点列表},来计算每个服务节点连几个 tcp 连接。
var curNodeIndex = getCurNodeIndexFromRedis(thisNode); // 当前节点在全局服务节点的排序号,{0,1,2,3,...}
var cntNode ; // 从 Redis 里获取的总的服务进程节点数
//所有短信通道,逐一计算每一个通道,在当前节点上最大允许的 tcp 连接数,
allEntityPointList.foreach(e->{
var curEntityIndex = getCurEntityIndex(e); //所有短信通道根据 Id 排序后,当前通道的排序号,{0,1,2,3,4,5,6,7,...}
var curMaxChannel = e.getMaxChannel(); //当前通道全局允许的最大连接数
//连接数不是服务节点数的整倍数,按服务节点数平均分配后一定会有余数, 按当前节点的排序号先后把余下的连接数分完。
//但是服务节点排序号是固定不变的,这样排序号靠前的节点总是优先分到余下的连接数,造成全局通道总连接数分配不均,因此要结合"当前通道的排序号" 对 "服务节点排序号"进行位移
//因此,当"服务节点"或者"全局通道账号"有任一个变化时,都会影响连接的分配。
var shiftNodeIndex = (curNodeIndex + curEntityIndex) % cntNode;
//平均分配后,余下的连接数
var remainderChannel = curMaxChannel % cntNode;
//平均分配连接数
var hostMaxChannel = curMaxChannel / cntNode;
//余数处理
if(remainderChannel > 0 && shiftNodeIndex < remainderChannel){
hostMaxChannel = hostMaxChannel + 1;
}
var hostChannel = getConnectionCountAtCurrentNode(e); //当前通道在本节点上的连接数
if(hostChannel < hostMaxChannel){
openChannel(e); //新建一个连接
}else{
//关闭该通道超过数量的连接
closeSomeChannel(e,hostMaxChannel - hostChannel);
}
});
框架内部的 netty 的 Handler 前后顺序如图:
CMPPGate , SMPPGate , SGIPGate, SMGPGate
中移短信 cmpp 协议/smpp 协议 netty 实现编解码
这是一个在 netty4 框架下实现的 cmpp3.0/cmpp2.0 短信协议解析及网关端口管理。 代码 copy 了
huzorro@gmail.com基于 netty3.7 的 cmpp 协议解析 huzorro@gmail.com 的代码目前已支持发送和解析
长文本短信拆分合并,WapPush 短信,以及彩信通知类型的短信。可以实现对彩信或者 wap-push 短信的拦截和加工处理。wap 短信的解析使用 smsj 的短信库cmpp 协议已经跟华为,东软,亚信的短信网关都做过联调测试,兼容了不同厂家的错误和异常,如果跟网关通信出错,可以打开 trace 日志查看二进制数据。
因要与短信中心对接,新增了对 SMPP 协议的支持。
SMPP 的协议解析代码是从 Twitter-SMPP 的代码 copy 过来的。
新增对 sgip 协议(联通短信协议)的支持
sgip 的协议解析代码是从 huzorro@gmail.com 的代码 copy 过来后改造的。
新增对 smgp 协议(电信短信协议)的支持
smgp 的协议解析代码是从 SMS-China 的代码 copy 过来后改造的。
支持发送彩信通知,WAP 短信以及闪信(Flash Message):
性能测试
在 48core,128G 内存的物理服务器上测试协议解析效率:35K 条/s, cpu 使用率 25%.
Build
执行 mvn package . jdk1.6 以上.
增加了业务处理 API
业务层实现接口:BusinessHandlerInterface,或者继承 AbstractBusinessHandler 抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。
如何实现自己的 Handler,比如按短短信计费
参考 CMPPChargingDemoTest 里的扩展位置
实体类说明
CMPP 的连接端口
com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity表示一个 Tcp 连接的发起端,或者接收端。用来记录连接的 IP.port,以及 CMPP 协议的用户名,密码,业务处理的 ChannelHandler 集合等其它端口参数。包含三个子类:com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个 List
属性。 一个服务端口包含多个 CMPPServerChildEndpointEntity 端口 com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含 CMPP 连接用户名,密码,以及协议版本等信息
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含 CMPP 连接用户名,密码,以及协议版本,以及服务端 IP.port. 用于连接服务端
端口连接器接口
com.zx.sms.connect.manager.EndpointConnector负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个 EndpointConnector.当 CMPP 连接建立完成,将连接加入连接器管理,并给 pipeLine 上挂载业务处理的 ChannelHandler.com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的 open()调用 netty 的 ServerBootstrap.bind()开一个服务监听
com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集 CMPPServerChildEndpointEntity 端口下的所有连接。它的 open()方法为空.
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类 open()调用 netty 的 Bootstrap.connect()发起一个 TCP 连接
端口管理器
com.zx.sms.connect.manager.EndpointManager该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。CMPP 协议的连接登陆管理
com.zx.sms.session.cmpp.SessionLoginManager这是一个 netty 的 ChannelHandler 实现,主要负责 CMPP 连接的建立。当 CMPP 连接登陆成功、会话建立完成后,会调用 EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给 channel 的 pipeline 上挂载业务处理的 Handler,最后触发 SessionState.Connect 事件,通知业务处理 Handler 连接已建立成功。CMPP 的连接状态管理器
com.zx.sms.session.cmpp.SessionStateManager这是一个 netty 的 ChannelHandler 实现。负责每个连接上 CMPP 消息的存储,短信重发,流量窗口控制,过期短信的处理CMPP 协议解析器
CMPP20MessageCodecAggregator [2.0 协议] CMPPMessageCodecAggregator [这是 3.0 协议] 聚合了 CMPP 主要消息协议的解析,编码,长短信拆分,合并处理。
短信持久化存储实现 StoredMapFactory
使用 BDB 的 StoreMap 实现消息持久化,防止系统意外丢失短信。
程序启动处理流程
- 程序启动类 new 一个 CMPPEndpointEntity 的实体类并设置 IP,port,用户名,密码,业务处理的 Handler 等参数,
- 程序启动类 调用 EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器
- 程序启动类 调用 EndpointManager.openAll()或者 EndpointManager.openEndpoint()方法打开端口。
- EndpointManager 会调用 EndpointEntity.buildConnector()创建一个端口连接器,并调用 EndpointConnector.open()方法打开端口。
- 如果是 CMPPClientEndpointEntity 的话,就会向服务器发起 TCP 连接请求,如果是 CMPPServerEndpointEntity 则会在本机开启一个服务端口等客户端连接。
- TCP 连接建立完成后。netty 会调用 EndpointConnector.initPipeLine()方法初始化 PipeLine,把 CMPP 协议解析器,SessionLoginManager 加到 PipeLine 里去,然后 netty 触发 ChannelActive 事件。
- 在 SessionLoginManager 类里,客户端收到 ChannelActive 事件后会发送一个 CMPPConnnect 消息,请求建立 CMPP 连接.
- 同样在 SessionLoginManager.channelRead()方法里,服务端会收到 CMPPConnnect 消息,开始对用户名,密码进行鉴权,并给客户端返回鉴权结果。
- 鉴权通过后,SessionLoginManager 调用 EndpointConnector.addChannel(channel)方法,把 channel 加入 ArrayList,并给 pipeLine 上挂载 SessionStateManager 和业务处理的 ChannelHandler,如心跳处理,日志记录,长短信合并拆分处理类。
- EndpointConnector.addChannel(channel)完成后,SessionLoginManager 调用 ctx.fireUserEventTriggered()方法,触发 SessionState.Connect 事件。
以上 CMPP 连接建立完成。
- 业务处理类收到 SessionState.Connect 事件,开始业务处理,如从 MQ 获取短信下发,或开启 Consumer 接收 MQ 推送的消息。
- SessionStateManager 会拦截所有 read()和 write()的消息,进行消息持久化,消息重发,流量控制。
增加同步调用 api
smsgate 自开发以来,一直使用 netty 的异步发送消息,但实际使用场景中同步发送消息的更方便,或者能方便的取到 response。因此增加一个同步调用的 api。即:发送消息后等接收到对应的响应后才返回。 使用方法如下:
//因为长短信要拆分,因此返回一个 promiseList.每个拆分后的短信对应一个 promise List<Promise> futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage); for(Promise future: futures){ //调用 sync()方法,阻塞线程。等待接收 response future.sync(); //接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等 if(future.isSuccess()){ //打印收到的 response 消息 logger.info("response:{}",future.get()); }else{ 打印错误原因 logger.error("response:{}",future.cause()); } } //或者不阻塞进程,不调用 sync()方法。 List<Promise> promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage); for(Promise promise: promises){ //接收到 response 后回调 Listener 方法 promise.addListener(new GenericFutureListener() { @Override public void operationComplete(Future future) throws Exception { //接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等 if(future.isSuccess()){ //打印收到的 response 消息 logger.info("response:{}",future.get()); }else{ 打印错误原因 logger.error("response:{}",future.cause()); } } }); }CMPP Api 使用举例
public class TestCMPPEndPoint { private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class); @Test public void testCMPPEndpoint() throws Exception { ResourceLeakDetector.setLevel(Level.ADVANCED); final EndpointManager manager = EndpointManager.INS; CMPPServerEndpointEntity server = new CMPPServerEndpointEntity(); server.setId("server"); server.setHost("127.0.0.1"); server.setPort(7890); server.setValid(true); //使用 ssl 加密数据流 server.setUseSSL(false); CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity(); child.setId("child"); //自定义通道账号 ID,保持全局唯一 child.setChartset(Charset.forName("utf-8")); child.setGroupName("test"); //自定义通道账号分组 ID,用于对通道标识不同组,方便路由实现 child.setUserName("901783"); //通道账号,可能和企业代码相同 child.setPassword("ICP001"); //密码 child.setValid(true); child.setVersion((short)0x30); //协议版本号,48 是 3.0 协议,32 是 2.0 协议 child.setMaxChannels((short)4); child.setRetryWaitTimeSec((short)30); child.setMaxRetryCnt((short)3); child.setReSendFailMsg(true); // child.setWriteLimit(200); // child.setReadLimit(200); List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>(); serverhandlers.add(new CMPPMessageReceiveHandler()); //在这个 handler 里接收短信 child.setBusinessHandlerSet(serverhandlers); server.addchild(child); manager.addEndpointEntity(server); CMPPClientEndpointEntity client = new CMPPClientEndpointEntity(); client.setId("client"); //自定义通道账号 ID,保持全局唯一 client.setHost("127.0.0.1"); // client.setLocalhost("127.0.0.1"); // client.setLocalport(65521); client.setPort(7890); client.setChartset(Charset.forName("utf-8")); client.setGroupName("test"); //自定义通道账号分组 ID,用于对通道标识不同组,方便路由实现 client.setUserName("901783"); //通道账号,可能和企业代码相同 client.setPassword("ICP001"); //密码 client.setMsgSrc("902176"); //企业代码 ,可能和 UserName 相同 client.setSpCode("10658762"); //服务代码,即显示到手机上的号码 client.setMaxChannels((short)10); //最大连接数 client.setVersion((short)0x30); //协议版本号 client.setRetryWaitTimeSec((short)30);//发送 request 后 等待 N 秒后没有收到 response,则重发消息 client.setMaxRetryCnt((short)3); // 发送消息的最大次数,如果为 3,则表示连带第一次发送,再重试 2 次,一共发送 3 次 client.setCloseWhenRetryFailed(false); // 当发送消息次数达到最大(MaxRetryCnt)后 ,是否关闭连接。默认是 true 关闭连接 client.setUseSSL(false); //是否使用 SSL 加密连接,默认为 false,不加密 client.setWriteLimit(100); //发送 request 消息的最大速度(单位条数) client.setReadLimit(100); //接收 request 的最大速度(单位条数),当消息量超过一定限制后,消息将积压在 TCP 网络协议栈的接收缓冲区 client.setWindow(16); //设置发送消息的滑动窗口,滑动窗口默认为 16,该大小根据网络时延不同,会影响发送速度 //默认为 false ,发送 request 是否保存在本地磁盘。如果为 true,当进程关闭后,本地磁盘会保存未收到 response 的消息,当进程再次启动框架自动读取消息并发送。可能造成消息重复发送 client.setReSendFailMsg(true); client.setSupportLongmsg(SupportLongMessage.BOTH); //是否支持长短信的自动拆分和拼接 List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>(); clienthandlers.add( new CMPPSessionConnectedHandler(10000)); //在这个 handler 里发送短信 client.setBusinessHandlerSet(clienthandlers); manager.addEndpointEntity(client); manager.openEndpoint(server); Thread.sleep(1000); for(int i=0;i<=child.getMaxChannels()+1;i++) manager.openEndpoint(client); System.out.println("start....."); // Thread.sleep(300000); LockSupport.park(); EndpointManager.INS.close(); } }SMPP Api 使用举例
public class TestSMPPEndPoint { private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class); @Test public void testSMPPEndpoint() throws Exception { final EndpointManager manager = EndpointManager.INS; SMPPServerEndpointEntity server = new SMPPServerEndpointEntity(); server.setId("smppserver"); server.setHost("127.0.0.1"); server.setPort(2776); server.setValid(true); //使用 ssl 加密数据流 server.setUseSSL(false); SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity(); child.setId("smppchild"); child.setSystemId("901782"); child.setPassword("ICP"); child.setValid(true); child.setChannelType(ChannelType.DUPLEX); child.setMaxChannels((short)3); child.setRetryWaitTimeSec((short)30); child.setMaxRetryCnt((short)3); child.setReSendFailMsg(true); child.setIdleTimeSec((short)15); // child.setWriteLimit(200); // child.setReadLimit(200); List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>(); serverhandlers.add(new SMPPSessionConnectedHandler(10000)); child.setBusinessHandlerSet(serverhandlers); server.addchild(child); SMPPClientEndpointEntity client = new SMPPClientEndpointEntity(); client.setId("smppclient"); client.setHost("127.0.0.1"); client.setPort(2776); client.setSystemId("901782"); client.setPassword("ICP"); client.setChannelType(ChannelType.DUPLEX); client.setMaxChannels((short)12); client.setRetryWaitTimeSec((short)100); client.setUseSSL(false); client.setReSendFailMsg(true); // client.setWriteLimit(200); // client.setReadLimit(200); client.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并 List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>(); clienthandlers.add( new SMPPMessageReceiveHandler()); client.setBusinessHandlerSet(clienthandlers); manager.addEndpointEntity(server); manager.addEndpointEntity(client); manager.openAll(); manager.startConnectionCheckTask(); Thread.sleep(1000); for(int i=0;i<child.getMaxChannels();i++) manager.openEndpoint(client); System.out.println("start....."); LockSupport.park(); EndpointManager.INS.close(); } }SGIP Api 使用举例
public class TestSgipEndPoint { private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class); @Test public void testsgipEndpoint() throws Exception { ResourceLeakDetector.setLevel(Level.ADVANCED); final EndpointManager manager = EndpointManager.INS; SgipServerEndpointEntity server = new SgipServerEndpointEntity(); server.setId("sgipserver"); server.setHost("127.0.0.1"); server.setPort(8001); server.setValid(true); //使用 ssl 加密数据流 server.setUseSSL(false); SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity(); child.setId("sgipchild"); child.setLoginName("333"); child.setLoginPassowrd("0555"); child.setValid(true); child.setChannelType(ChannelType.DUPLEX); child.setMaxChannels((short)3); child.setRetryWaitTimeSec((short)30); child.setMaxRetryCnt((short)3); child.setReSendFailMsg(false); child.setIdleTimeSec((short)30); // child.setWriteLimit(200); // child.setReadLimit(200); child.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并 List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>(); serverhandlers.add(new SgipReportRequestMessageHandler()); serverhandlers.add(new SGIPMessageReceiveHandler()); child.setBusinessHandlerSet(serverhandlers); server.addchild(child); manager.addEndpointEntity(server); SgipClientEndpointEntity client = new SgipClientEndpointEntity(); client.setId("sgipclient"); client.setHost("127.0.0.1"); client.setPort(8001); client.setLoginName("333"); client.setLoginPassowrd("0555"); client.setChannelType(ChannelType.DUPLEX); client.setMaxChannels((short)10); client.setRetryWaitTimeSec((short)100); client.setUseSSL(false); client.setReSendFailMsg(true); // client.setWriteLimit(200); // client.setReadLimit(200); List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>(); clienthandlers.add(new SGIPSessionConnectedHandler(10000)); client.setBusinessHandlerSet(clienthandlers); manager.addEndpointEntity(client); manager.openAll(); Thread.sleep(1000); for(int i=0;i<child.getMaxChannels();i++) manager.openEndpoint(client); System.out.println("start....."); LockSupport.park(); EndpointManager.INS.close(); } }Demo 执行日志
11:31:52.842 [workGroup2] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df 11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be 11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58 11:31:52.869 [workGroup1] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776] 11:31:52.867 [workGroup2] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481] 11:31:53.863 [busiWork-3] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343, speed : 343/s 11:31:54.872 [busiWork-1] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381, speed : 1038/s 11:31:55.873 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704, speed : 1323/s 11:31:56.875 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010, speed : 1306/s 11:31:57.880 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416, speed : 1406/s 11:31:58.881 [busiWork-7] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442, speed : 2026/s 11:31:59.882 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581, speed : 2139/s 11:32:00.883 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865, speed : 3284/s 11:32:01.884 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937, speed : 3072/s 11:32:02.886 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489, speed : 3552/s 11:32:03.887 [busiWork-6] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065, speed : 3576/s 11:32:04.888 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337, speed : 3272/s
