SMSGate

Project Url: Lihuanghe/SMSGate
Introduction: 这是一个在 netty4 框架下实现的三网合一短信网关核心框架,支持(cmpp/smpp3.4/sgip1.2/smgp3) 短信协议解析,支持长短信合并和拆分,也支持 wap 短信和闪信。
More: Author   ReportBugs   
Tags:

qq 20180420170449

群名称:cmppGate 短信
群 号:770738500

How To Use

<dependency>
  <groupId>com.chinamobile.cmos</groupId>
  <artifactId>sms-core</artifactId>
  <version>2.1.13.6</version>
</dependency>

商用短信网关平台推荐

  • 平台名称:SMSWG 短信网关 系统官网:www.smswg.com 系统有 3 个 PC 端:管理员端+用户端+代理商端,支持代理分销短信,日发送支持上亿,实时精准计费,不阻塞高并发,单机每秒可支持 1.5 万条短信下发,更多系统功能请进官网查看演示系统。

常见问题

  • 纯客户端发送短信

    可以使用sms-client, 一个纯发送短信的客户端 jar 包,Api 简单。【sgip 协议用 sms-client 无法接收上行和状态报告】

    也可以参考htt2cmpp 实现将一个短信长连接协议封装成 http 接口。

    或者参考smsServer用 SpringBoot 实现一个能支持 http,cmpp,sgip,smgp,smpp 等多种协议的网关服务。

  • 没看懂如何发送短信?

    短信协议是 tcp 长连接,类似数据库连接,如 jdbc-connection. 所以发送短信前必须要先有一个短信连接。因此你需要在程序启动时建立短信连接。参考 demo 里的 client,调用 manager.openEntity()方法,,调用 manager.startConnectionCheckTask()开启断线重连。 然后就像调用其它库一样,在需要发送短信的地方,new 一个对应的 Message,调用

    List< Future > f = ChannelUtil.syncWriteLongMsgToEntity([clientEntityId],message)方法发送,要判断 f 是否为 Null,为 Null 表示发送失败,一条短信可能拆分成多条,因此返回 List

  • 关闭默认超速错误自动重发功能

    如 CMPP 协议接收到错误码为 8 的响应(超速错误),系统默认会再次重发直到成功,最大重试次数默认是 30 次。如果要关闭默认重试功能,须设置 entity.overSpeedSendCountLimit0

    SGIP、SMPP 的超速错误码是 88,于 CMPP 协议相同,也会超速重发。

    SMGP 协议因为未定义超速错误码,不会超速重试。

  • 如何发送长短信?

    smsgate 默认已经处理好长短信了,就像发送普通短信一样。长短信发送的时候,框架内部自动拆分成短短信分片发送(一般按 67 个汉字拆分)。

  • 如何发送闪信?

  //创建一个闪信对象,跟发送普通短信一样
  CmppSubmitRequestMessage msg = CmppSubmitRequestMessage.create(phone, "10690021", "");
  msg.setMsgContent(new SmsTextMessage("你好,我是闪信!",SmsAlphabet.UCS2,SmsMsgClass.CLASS_0));  //class0 是闪信
  • 如何接收短信?

    如果你了解 netty 的 handler,那么请看 AbstractBusinessHandler 的源码即可,这是一个 netty 的 handler.

    如果你不了解 netty, 你只需知道:

    当连接刚刚建立时[指登陆验证成功],smsgate 会自动调用 handler 里的 userEventTriggered 方法,因此在此方法中可以开启一个 Consumer 去消费 MQ 里的消息发送到网络连接上;

    当对方发送任意一个消息给你时[包括 request,response 消息],smsgate 会自动调用 handler 里的 channelRead 方法,因此可在此方法内接收消息并作处理业务,但避免作非常耗时的操作,会影响 netty 的处理效率,甚至完全耗完 netty 的 io 线程造成消息不响应。在 channelRead 方法里能获取接收到的消息对象,同时通过本 Handler 的 getEndpointEntity()方法,或者 ctx.channel().attr(GlobalConstance.entityPointKey).get();能够获取该消息的发送方账号实体 Entity 对象。

    当连接关闭时,smsgate 会自动调用 handler 里的 channelInactive 方法,可在此方法中实现连接关闭后的一些清理操作。

  • 如何不改源码,实现修改框架默认的 handler

    比如 SGIP 协议要设置 NodeId;你需要这样做:

    1、写一个扩展的 SgipClientEndpointEntity 子类,如:MySgipClientEndpointEntity,重写 buildConnector()方法

    2、再写一个 SgipClientEndpointConnector 子类,如:MySgipClientEndpointConnector,重写 doinitPipeLine()方法

    3、最后再写一个 SgipSessionLoginManager 子类,如:MySgipSessionLoginManager,重写 doLogin 方法,实现登陆方法的重写,在方法里创建自己定义的实现。

    4、最后在 openEntity 通道里,new MySgipClientEndpointEntity 就可以了

  • 使用 http 或者 socks 代理

    SmsGate 支持 HTTP、SOCKS 代理以方便在使用代理访问服务器的情况。代理设置方式:

    // 无 username 和 password 可写为  http://ipaddress:port
    client.setProxy("http://username:password@ipaddress:port");  //http 代理
    client.setProxy("https://username:password@ipaddress:port");  //https 代理
    client.setProxy("socks://username@ipaddress:port");  //socks4 代理
    client.setProxy("socks4://username@ipaddress:port");  //socks4 代理
    client.setProxy("socks5://username:password@ipaddress:port");  //socks5 代理
  • 抓包,打印二进制的收发日志

    框架使用 entity.[EntityId]的 loggerName 打印该 EntityID 上所有的收发记录。

    Debug 级别打印短信消息对象的toString内容。

    Trace 级别打印短信消息对象的二进制内容。

    如:针对cmppclientEntityId 通道的 logback.xml , log4j2.xml 配置

    <logger name="entity.cmppclientEntityId" level ="debug" additivity="false">
    </logger>
  如: log4j.properties 配置 
   log4j.logger.entity.cmppclientEntityId=debug
  • 在 Java9 以上版上运行
    在 java9 以上运行,启动 java 进程要增加以下参数:

--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.math=ALL-UNNAMED  
--add-opens java.base/java.util=ALL-UNNAMED  
--add-opens java.base/java.util.concurrent=ALL-UNNAMED  
--add-opens java.base/java.net=ALL-UNNAMED   
--add-opens java.base/java.text=ALL-UNNAMED 
--add-exports java.base/sun.security.x509=ALL-UNNAMED

新手指引

  • 先看 doc 目录下的CMPP 接口协议 V3.0.0.doc文档 (看不懂的到群里咨询)
  • 再看 readme 里的说明 (看不懂的到群里咨询)
  • 导入工程后,运行测试 demo: TestCMPPEndPoint,学会配置账号密码等参数
  • 由于代码是基于 netty 网络框架,您有必要先有一些 Netty 的基础

开发短信网关的常见问题

  • 长短信拆分合并原理

短信支持长短信功能是在手机终端实现的,即:手机陆续收到多个短信片断后,会根据短信 PDU 里的前 6 个 byte 信息进行合并。最终在手机上显示为一条短信,但实际却是接收了多条短信(因此收多条的费用)。

因此,长短信在发送时要进行拆分。在开发短信网关时,由于要对短信内容进行校验,拼签名等处理,因此在接收到短信分片后,要进行合并成一条处理,之后发送时再拆分为多条(当然有可能始终只收到一个片断,造成永远无法合并成一条完整的短信)。

短信内容(PDU)字段的前 6 字节是长短信的协议头(其余内容才是短信文本),前 3 个字节固定是 0x050003,后 3 个字节用来做长短信合并的依据(类似 IP 包的分片)

1 字节 包 ID[最大 255],
1 字节 包总分片数
1 字节 分片序号

如:45,03,01 表示 ID 为 45 的第 1 个分片,总共 3 个分片。45,03,02 表示 ID 为 45 的第 2 个分片,总共 3 个分片。 当手机收到完整的 3 个分片后,手机才进行合并显示。

  • 使用 redis 实现集群长短信合并

框架内部自带一个 JVM 内存缓存(Guava Cache)的 LongMessageFrameCache 类,用于保存未完成合并的短信片断。 但集群(多进程,多节点)部署服务时,有可能从不同的节点上(主机上)接收到同一个长短信的不同片断,此时框架默认的 JVM 内存缓存无法完成长短信合并。 为解决此问题,框架使用 SPI 机制加载 LongMessageFrameCache 的实现类,业务侧以 SPI 方式提供 Redis 版的 LongMessageFrameProvider 实现类。 为了让业务自制的 LongMessageFrameProvider 实现类生效, 要确保业务自制的 LongMessageFrameProvider 实现类 order() 大于 0 。框架优先使用 order 最大的实现。

具体为:

1) 打开该通道账号的配置 EndpointEntity.isRecvLongMsgOnMultiLink属性,用于标识该通道的长短信要使用集群部署的长短信合并能力(由于只有少量系统有此问题,不需要所有账号打开该特性,会影响合并性能)。

2) 提供一个 Redis 的合并实现类,可以参考测试包中的代码:RedisLongMessageFrameCache , RedisLongMessageFrameProvider

  • 网关服务前边有 nginx,haproxy 代理的时候如何获取真实的客户端 IP?

首先感谢群友 狠人 提供了使用proxy protocol 协议支持从代理服务器获取真实客户 IP 的思路。

针对 ServerEndpoint ,通过设置setProxyProtocol(true) 开启 proxy protocol 协议开关。框架从 channel 上第一个消息(HAProxyMessage)获取真实的客户端 IP 后,设置到 channel 的 Attribute 属性上。业务代码可以从 channel.attr(GlobalConstance.proxyProtocolKey) 获取该信息,从而拿到真实的客户 IP. 该特性使得短信网关的集群部署架构更为灵活,比如:服务入口使用 nginx,haproxy 等代理,真实网关服务以集群的方式部署在后端,横向扩展。

  • 如何关联状态报告【即短信回执,以下都称为状态报告】和 submit 消息?

运宽商网关响应submitRequest消息时,你会收到submitResponse消息。在response里会有msgId。通过这个msgId跟之后收到的状态报告(reportMessage)里的msgId关联。

  • 如何记录每个消息的发送日志,并向我的客户发送状态报告【即短信回执,以下都称为状态报告】?

当接收到来源客户的submitRequest消息后,要回复response,注意此时要记录回复response时所使用的msgId,即你回复给来源客户的msgId

将消息转发给通道后,当接收到submitResponse 后,通过response.getRequest()获取对应的request 。注意此时有两个msgID,一个是通道给你的msgID,一个是你给来源客户的。在数据库里记录相关信息(至少包括消息来源客户,消息出去的通道,两个msgId,消息详情)。之后在接收到状态报告后,通过通道给你的msgId更新消息状态报告里的msgId,并根据来源客户将状态报告回传给客户,注意回传reportMessage里的msgId要使用你给客户回复response时用的msgId. 详见流程图

  • 关于长短信类 LongSMSMessage 中 UniqueLongMsgId 的使用

由于 cmpp,sgip 等短信协议的异步化特点,框架默认实现长短信的拆分与合并,接收 Sp 发送的 MT 消息并匹配上游状态报告【即短信回执,以下都称为状态报告】时,由于缺少短信唯一标识,从 Sp 接收的短信和最终发送给运营商的短信之间没有关联标识, 造成状态报告回来时难以匹配,实现起来很复杂。为了解决 cmpp 协议的接收的短信与发送出去的短信关联问题,给长短信增加了这个UniqueLongMsgId。 对 http 协议接收的短信同样可以使用UniqueLongMsgId: 通过 http 接收的长短信对象在发送到 cmpp 协议的短信通道连接以前是没有UniqueLongMsgId的,发送以后框架会设置 UniqueLongMsgId 的值 。因此可以在发送完成收到 response 后通过response.getRequest()获取 Request 对象从而拿到UniqueLongMsgId

UniqueLongMsgId 中 id 是唯一标识,即使在极短时间内收到相同手机号端口号的短信也能保持唯一性。该 ID 当短信从网络上接收到还未合并时进行设置,直到转发给运营商通道都不会变化,并且相同长短信的不同分片的 ID 也相同。

UniqueLongMsgId 除了 id 以外,还包含其它信息如:从消息从哪个通道账号 Id 提交的,从哪个 IP 端口提交的、长短信的分片 ID、总分片数、分片序号以及消息序列号、时间戳。 在 Test 包里有一个模拟的匹配状态报告的测试用例用是用 UniqueLongMsgId 实现的,并且经过相同手机号、端口号在极限并发压力下的匹配测试,单 JVM 多线程安全。逻辑供参考: com.zx.sms.transgate.TestReportForward

  • 集群环境如何平均分配上游连接数?

网关平台通常会有多个服务节点,而对接的通道给的连接数通常不是服务节点数的整倍数,极端情况连接数小于服务节点数,这样如何平均分配连接数就成了一个问题。

这里介绍一个算法:通过在 redis 里记录 {全局的服务节点列表},来计算每个服务节点连几个 tcp 连接。

var curNodeIndex = getCurNodeIndexFromRedis(thisNode); // 当前节点在全局服务节点的排序号,{0,1,2,3,...}
var cntNode ; // 从 Redis 里获取的总的服务进程节点数
//所有短信通道,逐一计算每一个通道,在当前节点上最大允许的 tcp 连接数, 
allEntityPointList.foreach(e->{
    var curEntityIndex = getCurEntityIndex(e); //所有短信通道根据 Id 排序后,当前通道的排序号,{0,1,2,3,4,5,6,7,...}
    var curMaxChannel = e.getMaxChannel(); //当前通道全局允许的最大连接数

    //连接数不是服务节点数的整倍数,按服务节点数平均分配后一定会有余数, 按当前节点的排序号先后把余下的连接数分完。
    //但是服务节点排序号是固定不变的,这样排序号靠前的节点总是优先分到余下的连接数,造成全局通道总连接数分配不均,因此要结合"当前通道的排序号" 对 "服务节点排序号"进行位移
    //因此,当"服务节点"或者"全局通道账号"有任一个变化时,都会影响连接的分配。
    var shiftNodeIndex = (curNodeIndex + curEntityIndex) % cntNode;
    //平均分配后,余下的连接数
    var remainderChannel = curMaxChannel % cntNode;
    //平均分配连接数
    var hostMaxChannel = curMaxChannel / cntNode;
    //余数处理
    if(remainderChannel > 0 && shiftNodeIndex < remainderChannel){
        hostMaxChannel = hostMaxChannel + 1;
    }
    var hostChannel = getConnectionCountAtCurrentNode(e);   //当前通道在本节点上的连接数
    if(hostChannel < hostMaxChannel){
        openChannel(e); //新建一个连接
    }else{
        //关闭该通道超过数量的连接
        closeSomeChannel(e,hostMaxChannel - hostChannel);
    }
});
  • 框架内部的 netty 的 Handler 前后顺序 如图:

CMPPGate , SMPPGate , SGIPGate, SMGPGate

中移短信 cmpp 协议/smpp 协议 netty 实现编解码

这是一个在 netty4 框架下实现的 cmpp3.0/cmpp2.0 短信协议解析及网关端口管理。 代码 copy 了 huzorro@gmail.com 基于 netty3.7 的 cmpp 协议解析 huzorro@gmail.com 的代码

目前已支持发送和解析长文本短信拆分合并WapPush 短信,以及彩信通知类型的短信。可以实现对彩信或者 wap-push 短信的拦截和加工处理。wap 短信的解析使用 smsj 的短信库

cmpp 协议已经跟华为,东软,亚信的短信网关都做过联调测试,兼容了不同厂家的错误和异常,如果跟网关通信出错,可以打开 trace 日志查看二进制数据。

因要与短信中心对接,新增了对 SMPP 协议的支持。

SMPP 的协议解析代码是从 Twitter-SMPP 的代码 copy 过来的。

新增对 sgip 协议(联通短信协议)的支持

sgip 的协议解析代码是从 huzorro@gmail.com 的代码 copy 过来后改造的。

新增对 smgp 协议(电信短信协议)的支持

smgp 的协议解析代码是从 SMS-China 的代码 copy 过来后改造的。

支持发送彩信通知,WAP 短信以及闪信(Flash Message):

性能测试

在 48core,128G 内存的物理服务器上测试协议解析效率:35K 条/s, cpu 使用率 25%.

Build

执行 mvn package . jdk1.6 以上.

增加了业务处理 API

业务层实现接口:BusinessHandlerInterface,或者继承 AbstractBusinessHandler 抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。

如何实现自己的 Handler,比如按短短信计费

参考 CMPPChargingDemoTest 里的扩展位置

实体类说明

CMPP 的连接端口

com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一个 Tcp 连接的发起端,或者接收端。用来记录连接的 IP.port,以及 CMPP 协议的用户名,密码,业务处理的 ChannelHandler 集合等其它端口参数。包含三个子类:

  1. com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个 List属性。 一个服务端口包含多个 CMPPServerChildEndpointEntity 端口

  2. com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含 CMPP 连接用户名,密码,以及协议版本等信息

  3. com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含 CMPP 连接用户名,密码,以及协议版本,以及服务端 IP.port. 用于连接服务端

端口连接器接口

com.zx.sms.connect.manager.EndpointConnector 负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个 EndpointConnector.当 CMPP 连接建立完成,将连接加入连接器管理,并给 pipeLine 上挂载业务处理的 ChannelHandler.

  1. com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的 open()调用 netty 的 ServerBootstrap.bind()开一个服务监听

  2. com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集 CMPPServerChildEndpointEntity 端口下的所有连接。它的 open()方法为空.

  3. com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类 open()调用 netty 的 Bootstrap.connect()发起一个 TCP 连接

端口管理器

com.zx.sms.connect.manager.EndpointManager 该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。

CMPP 协议的连接登陆管理

com.zx.sms.session.cmpp.SessionLoginManager 这是一个 netty 的 ChannelHandler 实现,主要负责 CMPP 连接的建立。当 CMPP 连接登陆成功、会话建立完成后,会调用 EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给 channel 的 pipeline 上挂载业务处理的 Handler,最后触发 SessionState.Connect 事件,通知业务处理 Handler 连接已建立成功。

CMPP 的连接状态管理器

com.zx.sms.session.cmpp.SessionStateManager 这是一个 netty 的 ChannelHandler 实现。负责每个连接上 CMPP 消息的存储,短信重发,流量窗口控制,过期短信的处理

CMPP 协议解析器

CMPP20MessageCodecAggregator [2.0 协议] CMPPMessageCodecAggregator [这是 3.0 协议] 聚合了 CMPP 主要消息协议的解析,编码,长短信拆分,合并处理。

短信持久化存储实现 StoredMapFactory

使用 BDB 的 StoreMap 实现消息持久化,防止系统意外丢失短信。

程序启动处理流程

  1. 程序启动类 new 一个 CMPPEndpointEntity 的实体类并设置 IP,port,用户名,密码,业务处理的 Handler 等参数,
  2. 程序启动类 调用 EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器
  3. 程序启动类 调用 EndpointManager.openAll()或者 EndpointManager.openEndpoint()方法打开端口。
  4. EndpointManager 会调用 EndpointEntity.buildConnector()创建一个端口连接器,并调用 EndpointConnector.open()方法打开端口。
  5. 如果是 CMPPClientEndpointEntity 的话,就会向服务器发起 TCP 连接请求,如果是 CMPPServerEndpointEntity 则会在本机开启一个服务端口等客户端连接。
  6. TCP 连接建立完成后。netty 会调用 EndpointConnector.initPipeLine()方法初始化 PipeLine,把 CMPP 协议解析器,SessionLoginManager 加到 PipeLine 里去,然后 netty 触发 ChannelActive 事件。
  7. 在 SessionLoginManager 类里,客户端收到 ChannelActive 事件后会发送一个 CMPPConnnect 消息,请求建立 CMPP 连接.
  8. 同样在 SessionLoginManager.channelRead()方法里,服务端会收到 CMPPConnnect 消息,开始对用户名,密码进行鉴权,并给客户端返回鉴权结果。
  9. 鉴权通过后,SessionLoginManager 调用 EndpointConnector.addChannel(channel)方法,把 channel 加入 ArrayList,并给 pipeLine 上挂载 SessionStateManager 和业务处理的 ChannelHandler,如心跳处理,日志记录,长短信合并拆分处理类。
  10. EndpointConnector.addChannel(channel)完成后,SessionLoginManager 调用 ctx.fireUserEventTriggered()方法,触发 SessionState.Connect 事件。

以上 CMPP 连接建立完成。

  1. 业务处理类收到 SessionState.Connect 事件,开始业务处理,如从 MQ 获取短信下发,或开启 Consumer 接收 MQ 推送的消息。
  2. SessionStateManager 会拦截所有 read()和 write()的消息,进行消息持久化,消息重发,流量控制。

增加同步调用 api

smsgate 自开发以来,一直使用 netty 的异步发送消息,但实际使用场景中同步发送消息的更方便,或者能方便的取到 response。因此增加一个同步调用的 api。即:发送消息后等接收到对应的响应后才返回。 使用方法如下:


    //因为长短信要拆分,因此返回一个 promiseList.每个拆分后的短信对应一个 promise
    List<Promise> futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
    for(Promise  future: futures){
        //调用 sync()方法,阻塞线程。等待接收 response
        future.sync(); 
        //接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
        if(future.isSuccess()){
            //打印收到的 response 消息
            logger.info("response:{}",future.get());
        }else{
            打印错误原因
            logger.error("response:{}",future.cause());
        }
    }

    //或者不阻塞进程,不调用 sync()方法。
    List<Promise> promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
    for(Promise  promise: promises){
        //接收到 response 后回调 Listener 方法
        promise.addListener(new GenericFutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                //接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
                if(future.isSuccess()){
                    //打印收到的 response 消息
                    logger.info("response:{}",future.get());
                }else{
                    打印错误原因
                    logger.error("response:{}",future.cause());
                }
            }
        });
    }

CMPP Api 使用举例

public class TestCMPPEndPoint {
    private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class);

    @Test
    public void testCMPPEndpoint() throws Exception {
        ResourceLeakDetector.setLevel(Level.ADVANCED);
        final EndpointManager manager = EndpointManager.INS;

        CMPPServerEndpointEntity server = new CMPPServerEndpointEntity();
        server.setId("server");
        server.setHost("127.0.0.1");
        server.setPort(7890);
        server.setValid(true);
        //使用 ssl 加密数据流
        server.setUseSSL(false);

        CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity();
        child.setId("child");  //自定义通道账号 ID,保持全局唯一
        child.setChartset(Charset.forName("utf-8"));
        child.setGroupName("test");   //自定义通道账号分组 ID,用于对通道标识不同组,方便路由实现
        child.setUserName("901783");  //通道账号,可能和企业代码相同
        child.setPassword("ICP001");  //密码

        child.setValid(true);
        child.setVersion((short)0x30);   //协议版本号,48 是 3.0 协议,32 是 2.0 协议

        child.setMaxChannels((short)4);
        child.setRetryWaitTimeSec((short)30);
        child.setMaxRetryCnt((short)3);
        child.setReSendFailMsg(true);
//        child.setWriteLimit(200);
//        child.setReadLimit(200);
        List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
        serverhandlers.add(new CMPPMessageReceiveHandler()); //在这个 handler 里接收短信
        child.setBusinessHandlerSet(serverhandlers);
        server.addchild(child);

        manager.addEndpointEntity(server);

        CMPPClientEndpointEntity client = new CMPPClientEndpointEntity();
        client.setId("client");   //自定义通道账号 ID,保持全局唯一
        client.setHost("127.0.0.1");
//        client.setLocalhost("127.0.0.1");
//        client.setLocalport(65521);
        client.setPort(7890);
        client.setChartset(Charset.forName("utf-8"));
        client.setGroupName("test"); //自定义通道账号分组 ID,用于对通道标识不同组,方便路由实现
        client.setUserName("901783"); //通道账号,可能和企业代码相同
        client.setPassword("ICP001"); //密码
        client.setMsgSrc("902176");  //企业代码 ,可能和 UserName 相同
        client.setSpCode("10658762"); //服务代码,即显示到手机上的号码
        client.setMaxChannels((short)10);  //最大连接数
        client.setVersion((short)0x30);    //协议版本号
        client.setRetryWaitTimeSec((short)30);//发送 request 后 等待 N 秒后没有收到 response,则重发消息
        client.setMaxRetryCnt((short)3);  // 发送消息的最大次数,如果为 3,则表示连带第一次发送,再重试 2 次,一共发送 3 次
        client.setCloseWhenRetryFailed(false);  // 当发送消息次数达到最大(MaxRetryCnt)后 ,是否关闭连接。默认是 true 关闭连接
        client.setUseSSL(false);          //是否使用 SSL 加密连接,默认为 false,不加密
        client.setWriteLimit(100);        //发送 request 消息的最大速度(单位条数)
        client.setReadLimit(100);         //接收 request 的最大速度(单位条数),当消息量超过一定限制后,消息将积压在 TCP 网络协议栈的接收缓冲区
        client.setWindow(16);         //设置发送消息的滑动窗口,滑动窗口默认为 16,该大小根据网络时延不同,会影响发送速度

        //默认为 false ,发送 request 是否保存在本地磁盘。如果为 true,当进程关闭后,本地磁盘会保存未收到 response 的消息,当进程再次启动框架自动读取消息并发送。可能造成消息重复发送
        client.setReSendFailMsg(true);  

        client.setSupportLongmsg(SupportLongMessage.BOTH);  //是否支持长短信的自动拆分和拼接

        List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
        clienthandlers.add( new CMPPSessionConnectedHandler(10000));  //在这个 handler 里发送短信
        client.setBusinessHandlerSet(clienthandlers);

        manager.addEndpointEntity(client);

        manager.openEndpoint(server);

        Thread.sleep(1000);
        for(int i=0;i<=child.getMaxChannels()+1;i++)
            manager.openEndpoint(client);

        System.out.println("start.....");

//        Thread.sleep(300000);
        LockSupport.park();
        EndpointManager.INS.close();
    }
}

SMPP Api 使用举例


public class TestSMPPEndPoint {
    private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class);

    @Test
    public void testSMPPEndpoint() throws Exception {

        final EndpointManager manager = EndpointManager.INS;

        SMPPServerEndpointEntity server = new SMPPServerEndpointEntity();
        server.setId("smppserver");
        server.setHost("127.0.0.1");
        server.setPort(2776);
        server.setValid(true);
        //使用 ssl 加密数据流
        server.setUseSSL(false);

        SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity();
        child.setId("smppchild");
        child.setSystemId("901782");
        child.setPassword("ICP");

        child.setValid(true);
        child.setChannelType(ChannelType.DUPLEX);
        child.setMaxChannels((short)3);
        child.setRetryWaitTimeSec((short)30);
        child.setMaxRetryCnt((short)3);
        child.setReSendFailMsg(true);
        child.setIdleTimeSec((short)15);
//        child.setWriteLimit(200);
//        child.setReadLimit(200);
        List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
        serverhandlers.add(new SMPPSessionConnectedHandler(10000));   
        child.setBusinessHandlerSet(serverhandlers);
        server.addchild(child);

        SMPPClientEndpointEntity client = new SMPPClientEndpointEntity();
        client.setId("smppclient");
        client.setHost("127.0.0.1");
        client.setPort(2776);
        client.setSystemId("901782");
        client.setPassword("ICP");
        client.setChannelType(ChannelType.DUPLEX);

        client.setMaxChannels((short)12);
        client.setRetryWaitTimeSec((short)100);
        client.setUseSSL(false);
        client.setReSendFailMsg(true);
//        client.setWriteLimit(200);
//        client.setReadLimit(200);
        client.setSupportLongmsg(SupportLongMessage.SEND);  //接收长短信时不自动合并
        List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
        clienthandlers.add( new SMPPMessageReceiveHandler()); 
        client.setBusinessHandlerSet(clienthandlers);


        manager.addEndpointEntity(server);
        manager.addEndpointEntity(client);
        manager.openAll();
        manager.startConnectionCheckTask();
        Thread.sleep(1000);
        for(int i=0;i<child.getMaxChannels();i++)
            manager.openEndpoint(client);
        System.out.println("start.....");
        LockSupport.park();
        EndpointManager.INS.close();
    }
}

SGIP Api 使用举例

public class TestSgipEndPoint {
    private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class);

    @Test
    public void testsgipEndpoint() throws Exception {
        ResourceLeakDetector.setLevel(Level.ADVANCED);
        final EndpointManager manager = EndpointManager.INS;

        SgipServerEndpointEntity server = new SgipServerEndpointEntity();
        server.setId("sgipserver");
        server.setHost("127.0.0.1");
        server.setPort(8001);
        server.setValid(true);
        //使用 ssl 加密数据流
        server.setUseSSL(false);

        SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity();
        child.setId("sgipchild");
        child.setLoginName("333");
        child.setLoginPassowrd("0555");

        child.setValid(true);
        child.setChannelType(ChannelType.DUPLEX);
        child.setMaxChannels((short)3);
        child.setRetryWaitTimeSec((short)30);
        child.setMaxRetryCnt((short)3);
        child.setReSendFailMsg(false);
        child.setIdleTimeSec((short)30);
//        child.setWriteLimit(200);
//        child.setReadLimit(200);
        child.setSupportLongmsg(SupportLongMessage.SEND);  //接收长短信时不自动合并
        List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();

        serverhandlers.add(new SgipReportRequestMessageHandler());
        serverhandlers.add(new SGIPMessageReceiveHandler());   
        child.setBusinessHandlerSet(serverhandlers);
        server.addchild(child);

        manager.addEndpointEntity(server);


        SgipClientEndpointEntity client = new SgipClientEndpointEntity();
        client.setId("sgipclient");
        client.setHost("127.0.0.1");
        client.setPort(8001);
        client.setLoginName("333");
        client.setLoginPassowrd("0555");
        client.setChannelType(ChannelType.DUPLEX);

        client.setMaxChannels((short)10);
        client.setRetryWaitTimeSec((short)100);
        client.setUseSSL(false);
        client.setReSendFailMsg(true);
//        client.setWriteLimit(200);
//        client.setReadLimit(200);
        List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
        clienthandlers.add(new SGIPSessionConnectedHandler(10000));
        client.setBusinessHandlerSet(clienthandlers);
        manager.addEndpointEntity(client);
        manager.openAll();
        Thread.sleep(1000);
        for(int i=0;i<child.getMaxChannels();i++)
            manager.openEndpoint(client);
        System.out.println("start.....");

        LockSupport.park();

        EndpointManager.INS.close();
    }
}

Demo 执行日志


11:31:52.842 [workGroup2] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df
11:31:52.852 [workGroup1] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be
11:31:52.852 [workGroup1] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58
11:31:52.869 [workGroup1] INFO  c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776]
11:31:52.867 [workGroup2] INFO  c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481]
11:31:53.863 [busiWork-3] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343,   speed : 343/s
11:31:54.872 [busiWork-1] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381,   speed : 1038/s
11:31:55.873 [busiWork-8] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704,   speed : 1323/s
11:31:56.875 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010,   speed : 1306/s
11:31:57.880 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416,   speed : 1406/s
11:31:58.881 [busiWork-7] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442,   speed : 2026/s
11:31:59.882 [busiWork-8] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581,   speed : 2139/s
11:32:00.883 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865,   speed : 3284/s
11:32:01.884 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937,   speed : 3072/s
11:32:02.886 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489,   speed : 3552/s
11:32:03.887 [busiWork-6] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065,   speed : 3576/s
11:32:04.888 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337,   speed : 3272/s