SMSGate

Project Url: Lihuanghe/SMSGate
Introduction: 这是一个在 netty4 框架下实现的三网合一短信网关核心框架,支持(cmpp/smpp3.4/sgip1.2/smgp3) 短信协议解析,支持长短信合并和拆分,也支持 wap 短信和闪信。
More: Author   ReportBugs   
Tags:

qq 20180420170449

群名称:cmppGate 短信
群 号:770738500

How To Use

<dependency>
  <groupId>com.chinamobile.cmos</groupId>
  <artifactId>sms-core</artifactId>
  <version>2.1.5</version>
</dependency>

CMPPGate , SMPPGate , SGIPGate, SMGPGate

中移短信 cmpp 协议/smpp 协议 netty 实现编解码

这是一个在 netty4 框架下实现的 cmpp3.0/cmpp2.0 短信协议解析及网关端口管理 (master 分支是依赖于 netty5 的)。 代码 copy 了 huzorro@gmail.com 基于 netty3.7 的 cmpp 协议解析 huzorro@gmail.com 的代码

目前已支持发送和解析长文本短们拆分合并WapPush 短信,以及彩信通知类型的短信。可以实现对彩信或者 wap-push 短信的拦截和加工处理。wap 短信的解析使用 [smsj] (https://github.com/marre/smsj)的短信库

cmpp 协议已经跟华为,东软,亚信的短信网关都做过联调测试,兼容了不同厂家的错误和异常,如果跟网关通信出错,可以打开 trace 日志查看二进制数据。

因要与短信中心对接,新增了对 SMPP 协议的支持。

SMPP 的协议解析代码是从 Twitter-SMPP 的代码 copy 过来的。

新增对 sgip 协议(联通短信协议)的支持

sgip 的协议解析代码是从 huzorro@gmail.com 的代码 copy 过来后改造的。

新增对 smgp 协议(电信短信协议)的支持

smgp 的协议解析代码是从 SMS-China 的代码 copy 过来后改造的。

支持发送彩信通知,WAP 短信以及闪信(Flash Message):

性能测试

在 48core,128G 内存的物理服务器上测试协议解析效率:35K 条/s, cpu 使用率 25%.

Build

执行 mvn package . jdk1.6 以上. (如果用 netty5 则必须使用 jdk1.7)

增加了业务处理 API

业务层实现接口:BusinessHandlerInterface,或者继承 AbstractBusinessHandler 抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。

如何实现自己的 Handler,比如按短短信计费

参考 CMPPChargingDemoTest 里的扩展位置

实体类说明

CMPP 的连接端口

com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一个 Tcp 连接的发起端,或者接收端。用来记录连接的 IP.port,以及 CMPP 协议的用户名,密码,业务处理的 ChannelHandler 集合等其它端口参数。包含三个字类:

  1. com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个 List属性。 一个服务端口包含多个 CMPPServerChildEndpointEntity 端口

  2. com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含 CMPP 连接用户名,密码,以及协议版本等信息

  3. com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含 CMPP 连接用户名,密码,以及协议版本,以及服务端 IP.port. 用于连接服务端

端口连接器接口

com.zx.sms.connect.manager.EndpointConnector 负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个 EndpointConnector.当 CMPP 连接建立完成,将连接加入连接器管理,并给 pipeLine 上挂载业务处理的 ChannelHandler.

  1. com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的 open()调用 netty 的 ServerBootstrap.bind()开一个服务监听

  2. com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集 CMPPServerChildEndpointEntity 端口下的所有连接。它的 open()方法为空.

  3. com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类 open()调用 netty 的 Bootstrap.connect()开始一个 TCP 连接

端口管理器

com.zx.sms.connect.manager.EndpointManager 该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。

CMPP 协议的连接登陆管理

com.zx.sms.session.cmpp.SessionLoginManager 这是一个 netty 的 ChannelHandler 实现,主要负责 CMPP 连接的建立。当 CMPP 连接建立完成后,会调用 EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给 channel 的 pipeline 上挂载业务处理的 Handler,最后触发 SessionState.Connect 事件,通知业务处理 Handler 连接已建立成功。

CMPP 的连接状态管理器

com.zx.sms.session.cmpp.SessionStateManager 这是一个 netty 的 ChannelHandler 实现。负责每个连接上 CMPP 消息的存储,短信重发,流量窗口控制,过期短信的处理

CMPP 协议解析器

CMPP20MessageCodecAggregator [2.0 协议] CMPPMessageCodecAggregator [这是 3.0 协议] 聚合了 CMPP 主要消息协议的解析,编码,长短信拆分,合并处理。

短信持久化存储实现 StoredMapFactory

使用 BDB 的 StoreMap 实现消息持久化,防止系统意外丢失短信。

程序启动处理流程

  1. 程序启动类 new 一个 CMPPEndpointEntity 的实体类并设置 IP,port,用户名,密码,业务处理的 Handler 等参数,
  2. 程序启动类 调用 EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器
  3. 程序启动类 调用 EndpointManager.openAll()或者 EndpointManager.openEndpoint()方法打开端口。
  4. EndpointManager 会调用 EndpointEntity.buildConnector()创建一个端口连接器,并调用 EndpointConnector.open()方法打开端口。
  5. 如果是 CMPPClientEndpointEntity 的话,就会向服务器发起 TCP 连接请求,如果是 CMPPServerEndpointEntity 则会在本机开启一个服务端口等客户端连接。
  6. TCP 连接建立完成后。netty 会调用 EndpointConnector.initPipeLine()方法初始化 PipeLine,把 CMPP 协议解析器,SessionLoginManager 加到 PipeLine 里去,然后 netty 触发 ChannelActive 事件。
  7. 在 SessionLoginManager 类里,客户端收到 ChannelActive 事件后会发送一个 CMPPConnnect 消息,请求建立 CMPP 连接.
  8. 同样在 SessionLoginManager.channelRead()方法里,服务端会收到 CMPPConnnect 消息,开始对用户名,密码进行鉴权,并给客户端鉴权结果。
  9. 鉴权通过后,SessionLoginManager 调用 EndpointConnector.addChannel(channel)方法,把 channel 加入 ArrayList,并给 pipeLine 上挂载 SessionStateManager 和业务处理的 ChannelHandler。
  10. EndpointConnector.addChannel(channel)完成后,SessionLoginManager 调用 ctx.fireUserEventTriggered()方法,触发 SessionState.Connect 事件。

以上 CMPP 连接建立完成。

  1. 业务处理类收到 SessionState.Connect 事件,开始业务处理,如下发短信。
  2. SessionStateManager 会拦截所有 read()和 write()的消息,进行消息持久化,消息重发,流量控制。

增加同步调用 api

smsgate 自开发以来,一直使用 netty 的异步发送消息,但实际使用场景中同步发送消息的更方便,或者能方便的取到 response。因此增加一个同步调用的 api。即:发送消息后等接收到对应的响应后才完成。 使用方法如下:


    //因为长短信要拆分,因此返回一个 promiseList.每个拆分后的短信对应一个 promise
    List<Promise> futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
    for(Promise  future: futures){
        //调用 sync()方法,阻塞线程。等待接收 response
        future.sync(); 
        //接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
        if(future.isSuccess()){
            //打印收到的 response 消息
            logger.info("response:{}",future.get());
        }else{
            打印错误原因
            logger.error("response:{}",future.cause());
        }
    }

    //或者不阻塞进程,不调用 sync()方法。
    List<Promise> promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
    for(Promise  promise: promises){
        //接收到 response 后回调 Listener 方法
        promise.addListener(new GenericFutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                //接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
                if(future.isSuccess()){
                    //打印收到的 response 消息
                    logger.info("response:{}",future.get());
                }else{
                    打印错误原因
                    logger.error("response:{}",future.cause());
                }
            }
        });
    }

CMPP Api 使用举例


public class TestCMPPEndPoint {
    private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class);

    @Test
    public void testCMPPEndpoint() throws Exception {

        final EndpointManager manager = EndpointManager.INS;

        CMPPServerEndpointEntity server = new CMPPServerEndpointEntity();
        server.setId("server");
        server.setHost("127.0.0.1");
        server.setPort(7891);
        server.setValid(true);
        //使用 ssl 加密数据流
        server.setUseSSL(false);

        CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity();
        child.setId("child");
        child.setChartset(Charset.forName("utf-8"));
        child.setGroupName("test");
        child.setUserName("901782");
        child.setPassword("ICP");

        child.setValid(true);
        child.setWindows((short)16);
        child.setVersion((short)0x20);

        child.setMaxChannels((short)20);
        child.setRetryWaitTimeSec((short)5);
        child.setMaxRetryCnt((short)3);
        child.setReSendFailMsg(false);
        //child.setReadLimit(200);
        List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
        serverhandlers.add(new SessionConnectedHandler(300000));      //在这个类里发送短信
        child.setBusinessHandlerSet(serverhandlers);
        server.addchild(child);


        manager.addEndpointEntity(server);

        CMPPClientEndpointEntity client = new CMPPClientEndpointEntity();
        client.setId("client");
        client.setHost("127.0.0.1");
        client.setPort(7891);
        client.setChartset(Charset.forName("utf-8"));
        client.setGroupName("test");
        client.setUserName("901782");
        client.setPassword("ICP");


        client.setMaxChannels((short)12);
        client.setWindows((short)16);
        client.setVersion((short)0x20);
        client.setRetryWaitTimeSec((short)10);
        client.setUseSSL(false);
        client.setReSendFailMsg(false);
        //client.setWriteLimit(200);
        //client.setReadLimit(200);
        List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
        clienthandlers.add( new MessageReceiveHandler());  //在这个类里接收短信消息
        client.setBusinessHandlerSet(clienthandlers);
        manager.addEndpointEntity(client);

        manager.openAll();
        //LockSupport.park();

        Thread.sleep(300000);
        EndpointManager.INS.close();
    }

SMPP Api 使用举例


public class TestSMPPEndPoint {
    private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class);

    @Test
    public void testSMPPEndpoint() throws Exception {

        final EndpointManager manager = EndpointManager.INS;

        SMPPServerEndpointEntity server = new SMPPServerEndpointEntity();
        server.setId("server");
        server.setHost("127.0.0.1");
        server.setPort(2776);
        server.setValid(true);
        //使用 ssl 加密数据流
        server.setUseSSL(false);

        SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity();
        child.setId("child");
        child.setSystemId("901782");
        child.setPassword("ICP");

        child.setValid(true);
        child.setChannelType(ChannelType.DUPLEX);
        child.setMaxChannels((short)20);
        child.setRetryWaitTimeSec((short)30);
        child.setMaxRetryCnt((short)3);
        child.setReSendFailMsg(false);
        child.setIdleTimeSec((short)15);
        List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();
        serverhandlers.add(new SMPP2CMPPBusinessHandler());  //  将 CMPP 的对象转成 SMPP 对象,然后再经 SMPP 解码器处理
        serverhandlers.add( new MessageReceiveHandler());   // 复用 CMPP 的 Handler
        child.setBusinessHandlerSet(serverhandlers);
        server.addchild(child);

        manager.addEndpointEntity(server);
        manager.openAll();

        SMPPClientEndpointEntity client = new SMPPClientEndpointEntity();
        client.setId("client");
        client.setHost("127.0.0.1");
        client.setPort(2776);
        client.setSystemId("901782");
        client.setPassword("ICP");
        client.setChannelType(ChannelType.DUPLEX);

        client.setMaxChannels((short)12);
        client.setRetryWaitTimeSec((short)100);
        client.setUseSSL(false);
        client.setReSendFailMsg(false);
        List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
        clienthandlers.add(new SMPP2CMPPBusinessHandler()); //  将 CMPP 的对象转成 SMPP 对象,然后再经 SMPP 解码器处理
        clienthandlers.add(new SessionConnectedHandler(600000)); //// 复用 CMPP 的 Handler ,在这个类里发送短信
        client.setBusinessHandlerSet(clienthandlers);

        manager.openEndpoint(client);

        //LockSupport.park();
        Thread.sleep(300000);
        EndpointManager.INS.close();
    }

SGIP Api 使用举例

public class TestSgipEndPoint {
    private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class);

    @Test
    public void testsgipEndpoint() throws Exception {
        ResourceLeakDetector.setLevel(Level.ADVANCED);
        final EndpointManager manager = EndpointManager.INS;

        SgipServerEndpointEntity server = new SgipServerEndpointEntity();
        server.setId("sgipserver");
        server.setHost("127.0.0.1");
        server.setPort(8801);
        server.setValid(true);
        //使用 ssl 加密数据流
        server.setUseSSL(false);

        SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity();
        child.setId("sgipchild");
        child.setLoginName("333");
        child.setLoginPassowrd("0555");

        child.setValid(true);
        child.setChannelType(ChannelType.DOWN);
        child.setMaxChannels((short)20);
        child.setRetryWaitTimeSec((short)30);
        child.setMaxRetryCnt((short)3);
        child.setReSendFailMsg(false);
        child.setIdleTimeSec((short)15);
        List<BusinessHandlerInterface> serverhandlers = new ArrayList<BusinessHandlerInterface>();

        serverhandlers.add(new SgipReportRequestMessageHandler());
        serverhandlers.add(new Sgip2CMPPBusinessHandler());  //  将 CMPP 的对象转成 sgip 对象,然后再经 sgip 解码器处理
        serverhandlers.add(new MessageReceiveHandler());   // 复用 CMPP 的 Handler
        child.setBusinessHandlerSet(serverhandlers);
        server.addchild(child);

        manager.addEndpointEntity(server);


        SgipClientEndpointEntity client = new SgipClientEndpointEntity();
        client.setId("sgipclient");
        client.setHost("127.0.0.1");
        client.setPort(8001);
        client.setLoginName("333");
        client.setLoginPassowrd("0555");
        client.setChannelType(ChannelType.DOWN);

        client.setMaxChannels((short)12);
        client.setRetryWaitTimeSec((short)100);
        client.setUseSSL(false);
        List<BusinessHandlerInterface> clienthandlers = new ArrayList<BusinessHandlerInterface>();
        clienthandlers.add(new Sgip2CMPPBusinessHandler()); //  将 CMPP 的对象转成 sgip 对象,然后再经 sgip 解码器处理
        clienthandlers.add(new SessionConnectedHandler(1)); //// 复用 CMPP 的 Handler ,在这个类里发送短信
        client.setBusinessHandlerSet(clienthandlers);
        manager.addEndpointEntity(client);
        manager.openAll();
        //LockSupport.park();
        System.out.println("start.....");

        Thread.sleep(300000);
        EndpointManager.INS.close();
    }
}

Demo 执行日志


11:31:52.842 [workGroup2] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df
11:31:52.852 [workGroup1] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be
11:31:52.852 [workGroup1] INFO  c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58
11:31:52.869 [workGroup1] INFO  c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776]
11:31:52.867 [workGroup2] INFO  c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481]
11:31:53.863 [busiWork-3] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343,   speed : 343/s
11:31:54.872 [busiWork-1] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381,   speed : 1038/s
11:31:55.873 [busiWork-8] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704,   speed : 1323/s
11:31:56.875 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010,   speed : 1306/s
11:31:57.880 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416,   speed : 1406/s
11:31:58.881 [busiWork-7] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442,   speed : 2026/s
11:31:59.882 [busiWork-8] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581,   speed : 2139/s
11:32:00.883 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865,   speed : 3284/s
11:32:01.884 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937,   speed : 3072/s
11:32:02.886 [busiWork-5] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489,   speed : 3552/s
11:32:03.887 [busiWork-6] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065,   speed : 3576/s
11:32:04.888 [busiWork-2] INFO  c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337,   speed : 3272/s
Support Me
Apps
About Me
Google+: Trinea trinea
GitHub: Trinea