db-elasticsearch-tool

Introduction: 一个数据库导入 Elasticsearch 工具,支持的数据库: mysql,maridb,postgress,oracle ,sqlserver,db2 等 支持的 Elasticsearch 版本: 1.x,2.x,5.x,6.x,+ 支持海量 PB 级数据同步导入功能 使用参考文档 https://esdoc.bbossgroups.com/#/db-es-tool
More: Author   ReportBugs   
Tags:

通过本工具可以非常方便地实现数据库和 Elasticsearch 之间的数据同步功能,数据库与数据库之间的数据同步功能

BBoss Environmental requirements

JDK requirement: JDK 1.8+

Elasticsearch version requirements: 1.x,2.X,5.X,6.X,7.x,8,x,+

Spring booter 1.x,2.x,+

bboss elasticsearch 数据导入工具 demo

使用本 demo 所带的应用程序运行容器环境,可以快速编写,打包发布可运行的数据导入工具

支持的数据库: mysql,maridb,postgress,oracle ,sqlserver,db2 等

支持的 Elasticsearch 版本: 1.x,2.X,5.X,6.X,7.x,8,x,+

支持海量 PB 级数据同步导入功能

使用参考文档

建表 sql

mysql :
CREATE TABLE
    batchtest
    (
        id bigint NOT NULL AUTO_INCREMENT,
        name VARCHAR(4000),
        author VARCHAR(1000),
        content longtext,
        title VARCHAR(1000),
        optime DATETIME,
        oper VARCHAR(1000),
        subtitle VARCHAR(1000),
        collecttime DATETIME,
        ipinfo VARCHAR(2000),
        PRIMARY KEY (id)
    )
    ENGINE=InnoDB DEFAULT CHARSET=utf8;
postgresql:

CREATE TABLE
    batchtest
    (
        id bigint ,
        name VARCHAR(4000),
        author VARCHAR(1000),
        content text,
        title VARCHAR(1000),
        optime timestamp,
        oper VARCHAR(1000),
        subtitle VARCHAR(1000),
        collecttime timestamp,
        ipinfo VARCHAR(2000),
        PRIMARY KEY (id)
    )

构建部署

准备工作

需要通过 gradle 构建发布版本,gradle 安装配置参考文档:

https://esdoc.bbossgroups.com/#/bboss-build

下载源码工程-基于 gradle

https://gitee.com/bboss/db-elasticsearch-tool

从上面的地址下载源码工程,然后导入 idea 或者 eclipse,根据自己的需求,修改导入程序逻辑

org.frameworkset.elasticsearch.imp.Dbdemo

如果需要测试和调试导入功能,运行 Dbdemo 的 main 方法即可即可:

public class Dbdemo {
    public static void main(String args[]){

        long t = System.currentTimeMillis();
        Dbdemo dbdemo = new Dbdemo();
        String repsonse = ElasticSearchHelper.getRestClientUtil().getIndice("dbdemo");
        boolean dropIndice = true;//CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值
        dbdemo.scheduleImportData(  dropIndice);//定时增量导入
//        dbdemo.scheduleFullImportData(dropIndice);//定时全量导入

//        dbdemo.scheduleFullAutoUUIDImportData(dropIndice);//定时全量导入,自动生成 UUID
//        dbdemo.scheduleDatePatternImportData(dropIndice);//定时增量导入,按日期分表 yyyy.MM.dd
    }
    .....
}

修改 es 和数据库配置-db-elasticsearch-tool\src\main\resources\application.properties

db-elasticsearch-tool 工程已经内置 mysql jdbc 驱动,如果有依赖的第三方 jdbc 包(比如 oracle 驱动),可以将第三方 jdbc 依赖包放入 db-elasticsearch-tool\lib 目录下

修改完毕配置后,就可以进行功能调试了。

测试调试通过后,就可以构建发布可运行的版本了:进入命令行模式,在源码工程根目录 db-elasticsearch-tool 下运行以下 gradle 指令打包发布版本

release.bat

运行作业

gradle 构建成功后,在 build/distributions 目录下会生成可以运行的 zip 包,解压运行导入程序

linux:

chmod +x restart.sh

./restart.sh

windows: restart.bat

作业 jvm 配置

修改 jvm.options,设置内存大小和其他 jvm 参数

-Xms1g

-Xmx1g

在工程中添加多个表同步作业

默认的作业任务是 Dbdemo,同步表 td_sm_log 的数据到索引 dbdemo/dbdemo 中

现在我们在工程中添加另外一张表 td_cms_document 的同步到索引 cms_document/cms_document 的作业步骤:

1.首先,新建一个带 main 方法的类 org.frameworkset.elasticsearch.imp.CMSDocumentImport,实现同步的逻辑

如果需要测试调试,就在 test 目录下面编写 src\test\java\org\frameworkset\elasticsearch\imp\CMSDocumentImportTest.java 测试类,然后 debug 即可

2.然后,在 runfiles 目录下新建 CMSDocumentImport 作业主程序和作业进程配置文件:runfiles/config-cmsdocmenttable.properties,内容如下:

mainclass=org.frameworkset.elasticsearch.imp.CMSDocumentImport

pidfile=CMSDocumentImport.pid

3.最后在 runfiles 目录下新建作业启动 sh 文件(这里只新建 linux/unix 指令,windows 的类似):runfiles/restart-cmsdocumenttable.sh

内容与默认的作业任务是 Dbdemo 内容一样,只是在 java 命令后面多加了一个参数,用来指定作业配置文件:--conf=config-cmsdocmenttable.properties

nohup java \$RT_JAVA_OPTS -jar ${project}-${bboss_version}.jar restart --conf=config-cmsdocmenttable.properties --shutdownLevel=9 > ${project}.log &

其他 stop shell 指令也类似建立即可

管理提取数据的 sql 语句

db2es 工具管理提取数据的 sql 语句有两种方法:代码中直接编写 sql 语句,配置文件中采用 sql 语句

1.代码中写 sql

    `//指定导入数据的 sql 语句,必填项,可以设置自己的提取逻辑,
    // 设置增量变量 log_id,增量变量名称#[log_id]可以多次出现在 sql 语句的不同位置中,例如:
    // select * from td_sm_log where log_id > #[log_id] and parent_id = #[log_id]
    // log_id 和数据库对应的字段一致,就不需要设置 setLastValueColumn 信息,
    // 但是需要设置 setLastValueType 告诉工具增量字段的类型

    importBuilder.setSql("select * from td_sm_log where log_id > #[log_id]");`

2.在配置文件中管理 sql

设置 sql 语句配置文件路径和对应在配置文件中 sql name importBuilder.setSqlFilepath("sql.xml") ​ .setSqlName("demoexportFull");

配置文件 sql.xml,编译到 classes 根目录即可:

<?xml version="1.0" encoding='UTF-8'?>

​ <![CDATA[ ​ 配置数据导入的 sql ]]> ​ <![CDATA[select from td_sm_log where log_id > #[log_id]]]><![CDATA[select from td_sm_log ]]>

在 sql 配置文件中可以配置多条 sql 语句

作业参数配置

在使用db-elasticsearch-tool时,为了避免调试过程中不断打包发布数据同步工具,可以将部分控制参数配置到启动配置文件 resources/application.properties 中,然后在代码中通过以下方法获取配置的参数:

#工具主程序
mainclass=org.frameworkset.elasticsearch.imp.Dbdemo

# 参数配置
# 在代码中获取方法:CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值 false
dropIndice=false

在代码中获取参数 dropIndice 方法:

boolean dropIndice = CommonLauncher.getBooleanAttribute("dropIndice",false);//同时指定了默认值 false

另外可以在 resources/application.properties 配置控制作业执行的一些参数,例如工作线程数,等待队列数,批处理 size 等等:

queueSize=50
workThreads=10
batchSize=20

在作业执行方法中获取并使用上述参数:

int batchSize = CommonLauncher.getIntProperty("batchSize",10);//同时指定了默认值
int queueSize = CommonLauncher.getIntProperty("queueSize",50);//同时指定了默认值
int workThreads = CommonLauncher.getIntProperty("workThreads",10);//同时指定了默认值
importBuilder.setBatchSize(batchSize);
importBuilder.setQueue(queueSize);//设置批量导入线程池等待队列长度
importBuilder.setThreadCount(workThreads);//设置批量导入线程池工作线程数量

技术交流群:166471282

微信公众号:bbossgroup

GitHub Logo

Apps
About Me
GitHub: Trinea
Facebook: Dev Tools