cenxt-task-scheduler
Introduction: SpringBoot 轻量级分布式任务调度服务
Tags:
支持任务分布式调度、重跑,以及 UI 界面控制
特性
- 分布式任务调度,避免多节点重复执行任务
- 支持任务重试
- 支持任务异常重跑
支持 UI 界面控制
一、快速使用
1、引入依赖
<dependency> <groupId>cn.cenxt</groupId> <artifactId>cenxt-task-scheduler-core</artifactId> <version>1.0.5</version> </dependency>2、启动类添加@EnableCenxtTask,启用任务调度服务
@SpringBootApplication @EnableCenxtTask public class CenxtTaskSchedulerDemoApplication { public static void main(String[] args) { SpringApplication.run(CenxtTaskSchedulerDemoApplication.class, args); } }3、编写任务代码,实现 CenxtJob 接口,并添加@TaskInfo 描述任务信息(可选)
@Component @TaskInfo(description = "测试任务",paramsDescription = "{}",cron = "0 */1 * * * ?") public class TestJob implements CenxtJob { /** * 开始执行任务 * !!!不要捕获 InterruptedException 异常,否则超时进程无法停止 * * @param task 任务 * @param execReport 执行状态 * @return 执行结果 */ @Override public boolean exec(Task task, ExecReport execReport) throws Exception { //任务业务代码 //任务数据采集(可选) execReport.incrSuccessCount(1); execReport.incrFailCount(1); //返回 true 表示任务成功执行,false 为任务失败 return true; } }4、启动服务,打开控制页面,维护任务
http://localhost:8080/{应用上下文(可选)}/cenxt-task-view/index.html
demo 地址:http://task.cenxt.cn/task-demo/cenxt-task-view/index.html
默认账户:admin/admin、guest/guest
二、进阶
1、参数配置
#(可选)是否启用配置,缺省为 true
cenxt.task.enabled=true
#(可选)任务扫描间隔单位 ms,缺省 3000,最小 500
cenxt.task.scanInterval=3000
#(可选)任务执行线程数,缺省 3,最小 3
cenxt.task.thread=3
#(可选)每次获取待执行任务条数,缺省 3,最小 3,同时不能大于任务执行线程数
cenxt.task.fetchSize=3
#(可选)初始化任务表,如果启用,不存在任务表将自动创建,缺省 true
cenxt.task.initTable=true
#(可选)任务表的前缀
cenxt.task.tableNamePrefix=
#(可选)是否启用控制界面,缺省 true
cenxt.task.view.enabled=true
#(可选)管理员用户名,缺省 admin
cenxt.task.view.adminUsername=admin
#(可选)管理员密码,缺省 admin
cenxt.task.view.adminPassword=admin
#(可选)一般用户名,缺省 normal
cenxt.task.view.normalUsername=normal
#(可选)一般密码,缺省 normal
cenxt.task.view.normalPassword=normal
#(可选)游客用户名,缺省 guest
cenxt.task.view.guestUsername=guest
#(可选)游客密码,缺省 guest
cenxt.task.view.guestPassword=guest
#(可选)单个 IP 最大登录尝试次数,缺省 5
cenxt.task.view.maxTryCount=3000
配置中心地址:http://config.cenxt.cn/ 账号:demo/123456
2、数据统计
在执行批处理任务过程中,设置如下代码,会将执行进度上传到服务端,可以在执行记录看到实时数据变化
//任务数据采集(可选)
//对应执行记录中成功记录数
execReport.incrSuccessCount(1);
//对应执行记录中失败记录数
execReport.incrFailCount(1);
3、任务监听器
实现 CenxtTaskListener 接口,并将其注入到 Ioc 容器中,将会覆盖默认任务监听器,得到任务监听事件。
package cn.cenxt.task.listeners;
import cn.cenxt.task.model.Task;
/**
* 任务监听器
*/
public interface CenxtTaskListener {
/**
* 任务开始
*
* @param task 任务
*/
void begin(Task task);
/**
* 任务执行成果并结束
*
* @param task 任务
* @param cost 耗时,单位 ms
* @param retryTimes 重试次数
*/
void finish(Task task, long cost, int retryTimes);
/**
* 任务执行失败并结束
*
* @param task 任务
* @param cost 耗时,单位 ms
* @param retryTimes 重试次数
*/
void exceptionFinish(Task task, long cost, int retryTimes);
/**
* 任务执行失败
*
* @param task 任务
* @param cost 单次执行耗时,单位 ms
* @param times 执行次数,从 1 开始
* @param e 异常信息
*/
void fail(Task task, long cost, int times, Exception e);
/**
* 任务重试
*
* @param task 任务
* @param retryTimes 重试次数,从 1 开始
*/
void retry(Task task, int retryTimes);
}
4、自定义验证
实现 CenxtSecurityService 接口,并将其注入到 Ioc 容器中,将会覆盖默认身份验证服务,实现自定义验证(默认登陆界面将会失效)。
package cn.cenxt.task.service;
import cn.cenxt.task.enums.RoleEnum;
import javax.servlet.http.HttpServletRequest;
/**
* 安全服务
*/
public interface CenxtSecurityService {
/**
* 获取用户名
*
* @param request 请求
* @return 角色 如果为 null,表示未登录
*/
String getUserName(HttpServletRequest request);
/**
* 获取角色
*
* @param request 请求
* @return 角色 如果为 null,表示未登录
*/
RoleEnum getRole(HttpServletRequest request);
}
三、设计说明
1、业务流程
启动任务扫描线程->查询可执行任务列表->锁定任务->添加执行记录->执行任务->计算下次执行时间,并释放任务
2、数据表设计(程序启动,如有权限会自动创建)
(1)任务表
CREATE TABLE `cenxt_task` (
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '编号',
`name` VARCHAR(100) NOT NULL COMMENT '任务名称',
`description` VARCHAR(256) NOT NULL COMMENT '描述',
`enabled` TINYINT(1) NOT NULL DEFAULT '0' COMMENT '启用状态',
`flag` TINYINT(2) NOT NULL DEFAULT '0' COMMENT '执行状态:0 待执行 1 执行中 2 执行失败',
`cron_str` VARCHAR(256) NOT NULL COMMENT '时间表达式',
`expire` INT(11) NOT NULL COMMENT '超时时间,单位分钟,0 为不超时',
`retry_times` INT(11) NOT NULL COMMENT '重试次数',
`params` VARCHAR(4096) NOT NULL DEFAULT '' COMMENT '参数',
`exec_time` TIMESTAMP NULL DEFAULT NULL COMMENT '执行时间',
`exec_ip` VARCHAR(50) NULL DEFAULT NULL COMMENT '执行机器 IP',
`next_time` TIMESTAMP NOT NULL COMMENT '下次执行时间',
`create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
`creator` VARCHAR(50) NOT NULL COMMENT '创建人',
`updator` VARCHAR(50) NULL DEFAULT NULL COMMENT '更新人',
PRIMARY KEY (`id`),
INDEX `name` (`name`),
INDEX `create_time` (`create_time`),
INDEX `update_time` (`update_time`),
INDEX `exec_time` (`exec_time`),
INDEX `next_time` (`next_time`),
INDEX `expire` (`expire`),
INDEX `exec_ip` (`exec_ip`)
)
COMMENT='任务表'
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=1;
(2)执行记录表
CREATE TABLE `cenxt_exec_history` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`task_id` BIGINT(20) NOT NULL COMMENT '任务编号',
`exec_id` VARCHAR(50) NOT NULL DEFAULT '' COMMENT '执行编号',
`exec_ip` VARCHAR(50) NOT NULL DEFAULT '' COMMENT '执行机器 IP',
`exec_time` TIMESTAMP NOT NULL COMMENT '执行时间',
`finish_time` TIMESTAMP NULL DEFAULT NULL COMMENT '结束时间',
`cost` DOUBLE NOT NULL DEFAULT '0' COMMENT '执行耗时,单位 ms',
`exec_result` TINYINT(2) NOT NULL COMMENT '执行结果:0 待执行 1 执行中 2 执行成功 3 重试中 4 重试成功 5 执行失败 6 超时中断 ',
`retry_times` INT(11) NULL DEFAULT '0' COMMENT '重试次数',
`success_count` BIGINT(20) NULL DEFAULT '0' COMMENT '成功记录数',
`fail_count` BIGINT(20) NULL DEFAULT '0' COMMENT '失败记录数',
`exec_message` VARCHAR(10240) NULL DEFAULT '' COMMENT '执行信息',
PRIMARY KEY (`id`),
UNIQUE INDEX `exec_id` (`exec_id`),
INDEX `exec_time` (`exec_time`),
INDEX `finish_time` (`finish_time`),
INDEX `task_id` (`task_id`),
INDEX `cost` (`cost`),
INDEX `exec_ip` (`exec_ip`),
INDEX `exec_result` (`exec_result`)
)
COMMENT='执行记录表'
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=1;
3、查询可执行任务列表
SELECT * FROM cenxt_task WHERE
##任务是启用状态
enabled=1
AND (
##任务待执行且下次执行时间小于当前时间
(flag=0 AND next_time<NOW())
##任务为执行中,且任务已超时执行
OR (flag=1 AND exec_time<DATE_SUB(NOW(),interval `expire`+1 MINUTE))
OR (
##执行失败
flag=2
AND (
##如果上次执行机器为当前机器,执行时间延后 1 分钟
(exec_ip=? AND next_time<DATE_SUB(NOW(),interval 1 MINUTE))
##如果上次执行机器不是当前,到时间自动执行
OR (exec_ip<>? AND next_time<NOW())
)
)
) LIMIT ?
4、锁定任务
##设置执行状态为执行中,更新执行时间和机器 IP
UPDATE cenxt_task SET flag=1,exec_time=NOW(),exec_ip=? WHERE
##任务编号
id=?
##是否启用
AND enabled=1
##执行时间和查询的结果一致,或者执行时间为空(任务新创建执行时间为空)
AND (exec_time=? OR exec_time IS NULL)
5、释放任务
##更新执行状态和下次执行时间
UPDATE cenxt_task SET flag=?,next_time=? WHERE
##任务编号
id=?
##任务状态为执行中
AND flag=1
