使用Kettle及DataX实现ETL数据处理

10/28/2022 ETLDataXKettle表结构差异比对

# 1. ETL基本介绍

# 1.1 基本概念

ETL是英文Extract-Transform-Load的缩写,中文名为抽取(extract)、转换(transform)和加载(load)。ETL描述的是一个端到端流程,在此流程中,公司获取所有数据(由世界上任何地方任何数量的团队管理的结构化和非结构化数据),然后对其进行处理,将其转换为对业务目的有用的数据。

# 1.2 基本流程

ETL 通过三个不同的步骤将数据从一个或多个源移动到另一个目的地,这可以是数据库、数据仓库、数据存储或数据湖。

ETL基本流程

# 1.2.1 数据抽取

数据抽取指的是从不同的网络、不同的操作平台、不同的数据库和数据格式、不同的应用中抽取数据的过程。目标源可能包括ERP、CRM和其他企业系统,以及来自第三方源的数据。

不同的系统倾向于使用不同的数据格式,在这个过程中,首先需要结合业务需求确定抽取的字段,并且数据库字段也应与这些需求字段形成一一映射关系。这样通过数据抽取所得到的数据都具有统一、规整的字段内容,为后续的数据转换和加载提供基础,具体步骤如下:

  • 确定数据源:需要确定从哪些源系统进行数据抽取。
  • 定义数据接口:对每个源文件及系统的每个字段进行详细说明。
  • 确定数据抽取的方法:是主动抽取还是由源系统推送?是增量抽取还是全量抽取?是按照每日抽取还是按照每月抽取?

# 1.2.2 数据转换

数据转换实际上还包含了数据清洗的工作,需要根据业务规则对异常数据进行清洗,主要将不完整数据、错误数据、重复数据进行处理,保证后续分析结果的准确性。数据转换就是处理抽取上来的数据中存在的不一致的过程。主要涉及以下几个方面:

  • 空值处理:可捕获字段空值,进行加载或替换为其他含义数据。
  • 数据标准:统一元数据、统一标准字段、统一字段类型定义。
  • 数据拆分:依据业务需求做数据拆分,如身份证号,拆分区划、出生日期、性别等。
  • 数据验证:时间规则、业务规则、自定义规则。
  • 数据替换:实现无效数据、缺失数据的替换。
  • 数据关联:关联其他数据,保障数据完整性。

# 1.2.3 数据加载

数据加载的主要任务是将经过清洗后的干净的数据集按照物理数据模型定义的表结构装入目标数据仓库的数据表中,如果是全量方式则采用 Load 方式,如果是增量则根据业务规则 Merge 进数据库,并允许人工干预,以及提供强大的错误报告、系统日志、数据备份与恢复功能。整个操作过程往往要跨网络、跨操作平台。

在实际的工作中,数据加载需要结合使用的数据库系统,确定最优的数据加载方案,节约CPU、硬盘IO和网络传输资源。

# 1.3 应用场景

ETL 是将所有相关数据放在同一个地方以便分析,ETL 通常应用于以下几个场景:

ETL使用场景

# 1.3.1 ETL业务背景

随着企业的发展,各业务线、产品线、部门都会承建各种信息化系统方便开展自己的业务。随着信息化建设的不断深入,由于业务系统之间各自为政、相互独立造成的数据孤岛”现象尤为普遍,业务不集成、流程不互通、数据不共享。这给企业进行数据的分析利用、报表开发、分析挖掘等带来了巨大困难。

在此情况下,为了实现企业全局数据的系统化运作管理(信息孤岛、数据统计、数据分析、数据挖掘) ,为DSS(决策支持系统)、BI(商务智能)、经营分析系统等深度开发应用奠定基础,挖掘数据价值 ,企业会开始着手建立数据仓库,数据中台。将相互分离的业务系统的数据源整合在一起,建立一个统一的数据采集、处理、存储、分发、共享中心,从而使公司的成员能够从不同业务部门查看综合数据,而这个过程中使用的数据处理方法之一就是ETL。

# 1.3.2 ETL的重要性

ETL是数据中心建设、BI分析项目中不可或缺的环节。各个业务系统中分布的、异构的数据源,经过ETL过程的数据抽取、转换,最终存储到目标数据库或者数据仓库,为上层BI数据分析,或其他业务功能做数据支撑。

ETL是BI项目最重要的一个环节,通常情况下ETL会花掉整个项目的1/3的时间,ETL设计的好坏直接关接到BI项目的成败。ETL也是一个长期的过程,只有不断的发现问题并解决问题,才能使ETL运行效率更高,为项目后期开发提供准确的数据。

# 1.4 ETL与ELT架构选型

提取-转换-加载和提取-加载-转换是两个不同的数据集成过程,他们以不同的顺序对不同的数据管理功能使用相同的步骤,具体如何选型应根据业务需求而定。

# 1.4.1 ETL架构

ETL架构按其字面含义理解就是按照E—T—L这个顺序流程进行处理的架构:先抽取、然后转换、完成后加载到目标数据库中。在ETL架构中,数据的流向是从源数据流到ETL工具,ETL工具是一个单独的数据处理引擎,一般会在单独的硬件服务器上,实现所有数据转化的工作,然后将数据加载到目标数据仓库中。如果要增加整个ETL过程的效率,则只能增强ETL工具服务器的配置,优化系统处理流程(一般可调的东西非常少)。

ETL架构

ETL架构的优势:

  • ETL可以分担数据库系统的负载(采用单独的硬件服务器)。
  • ETL相对于ELT架构可以实现更为复杂的数据转化逻辑。
  • ETL采用单独的硬件服务器。
  • ETL与底层的数据库数据存储无关。

# 1.4.2 ELT架构

ELT架构则把“L”这一步工作提前到“T”之前来完成:先抽取、然后加载到目标数据库中、在目标数据库中完成转换操作。在ELT架构中,ELT只负责提供图形化的界面来设计业务规则,数据的整个加工过程都在目标和源的数据库之间流动,ELT协调相关的数据库系统来执行相关的应用,数据加工过程既可以在源数据库端执行,也可以在目标数据仓库端执行(主要取决于系统的架构设计和数据属性)。当ETL过程需要提高效率,则可以通过对相关数据库进行调优,或者改变执行加工的服务器就可以达到。

ELT架构

ELT架构的优势:

  • ELT充分利用数据库引擎来实现系统的可扩展性(尤其是当数据加工过程在晚上时,可以充分利用数据库引擎的资源)。
  • ELT可以保持所有的数据始终在数据库当中,避免数据的加载和导出,从而保证效率,提高系统的可监控性。
  • ELT可以根据数据的分布情况进行并行处理优化,并可以利用数据库的固有功能优化磁盘I/O。
  • 通过对相关数据库进行性能调优,ELT过程获得3-4倍的效率提升比较容易。

# 2. ETL技术选型

本节介绍了 Kettle、DataX、BitSail、Logstash、Sqoop、Flume、Maxwell、NiFi、Canal 等9个著名的ETL工具,下文选用 Kettle 和 DataX 作为主要介绍的 ETL 工具。由于 Kettle 的性能和可靠性比较差,且不是实时的,不那么适合大数据的情况,综合考量之后,选用 DataX 作为实际使用的 ETL工具。

Kettle和DataX的对比如下表所示:

比较维度\产品 Kettle DataX
适用场景 面向数据仓库建模传统ETL工具 面向数据仓库建模传统ETL工具
支持数据源 多数关系型数据库 少数关系型数据库和大数据非关系型数据库
开发语言 Java Java、Python
可视化Web界面 不佳,data-integration (opens new window) 较好,datax-web (opens new window)
底层架构 主从结构非高可用,扩展性差,架构容错性低,不适用大数据场景 支持单机部署和第三方调度的集群部署两种方式
CDC机制 基于时间戳、触发器等 离线批处理
抽取策略 支持增量,全量抽取 支持全量抽取。不支持增量抽取要通过shell脚本自己实现
对数据库的影响 对数据库表结构有要求,存在一定侵入性 通过sql select 采集数据,对数据源没有侵入性
自动断点续传 不支持 不支持
数据清洗 围绕数据仓库的数据需求进行建模计算,清洗功能相对复杂,需要手动编程 需要根据自身清洗规则编写清洗脚本,进行调用
数据转换 手动配置schema mapping 通过编写json脚本进行schema mapping映射
数据实时性 非实时 实时
应用难度
是否需要开发
易用性
稳定性
抽取速度
售后服务 开源软件,社区活跃度高 阿里开源代码,社区活跃度低

注:抽取速度方面,小数据量的情况下二者差别不大,大数据量时DataX比Kettle快,DataX对于数据库压力比较小。

# 2.1 Kettle工具

# 2.1.1 基本介绍

Kettle是一款国外开源的ETL工具,纯Java编写,可以在Window、Linux、Unix上运行,绿色无需安装,数据抽取高效稳定。中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出,它允许你管理来自不同数据库的数据,通过提供一个图形化的用户环境来描述你想做什么,而不是你想怎么做。

项目地址:https://github.com/pentaho/pentaho-kettle (opens new window)

# 2.1.2 基本组成

Kettle中有两种脚本文件,transformation(.ktr)和job(.kjb),transformation完成针对数据的基础转换,job则完成整个工作流的控制。Kettle目前包含五个产品:Spoon、Pan、Chef、Kithcen、Encr。

  • Spoon:是一个本地GUI工具,允许你通过图形化界面来设计ETL转换过程(Transformation)和任务。
  • Pan:转换(trasform)执行器,允许你批量运行由Spoon设计的ETL转换。Pan是一个后台执行的程序,没有图形界面。
  • Chef:通过转换,任务,脚本自动化更新数据仓库的复杂工作。
  • Kithcen:作业(job)执行器,允许你批量使用由Chef设计的任务。Kithcen是一个后台执行的程序,没有图形界面。
  • Encr:用来加密连接数据库密码与集群时使用的密码。

Kettle概念模型

# 2.1.3 工具特点

Kettle 采用拖拽组件、连线、配置的方式来构建数据管道,通过超过200个不同的组件,用户可以在不编写一句代码就能轻松完成对数据源读取,对数据进行关联、过滤、格式转换、计算、统计、建模、挖掘、输出到不同的数据目标,极大程度地降低开发技术门槛和有效减低开发和维护成本。

# 2.2 DataX工具

# 2.2.1 基本介绍

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

DataX工具

设计理念:为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

当前使用现状:DataX在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。

项目地址:https://github.com/alibaba/DataX (opens new window)

介绍文档:https://github.com/alibaba/DataX/blob/master/introduction.md (opens new window)

部署文档:https://github.com/alibaba/DataX/blob/master/userGuid.md (opens new window)

# 2.2.2 数据源支持

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下表所示:

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL (opens new window) (opens new window)
Oracle (opens new window) (opens new window)
OceanBase (opens new window) (opens new window)
SQLServer (opens new window) (opens new window)
PostgreSQL (opens new window) (opens new window)
DRDS (opens new window) (opens new window)
Kingbase (opens new window) (opens new window)
通用RDBMS(支持所有关系型数据库) (opens new window) (opens new window)
阿里云数仓数据存储 ODPS (opens new window) (opens new window)
ADS (opens new window)
OSS (opens new window) (opens new window)
OCS (opens new window)
Hologres (opens new window)
AnalyticDB For PostgreSQL
阿里云中间件 datahub 读 、写
SLS 读 、写
阿里云图数据库 GDB (opens new window) (opens new window)
NoSQL数据存储 OTS (opens new window) (opens new window)
Hbase0.94 (opens new window) (opens new window)
Hbase1.1 (opens new window) (opens new window)
Phoenix4.x (opens new window) (opens new window)
Phoenix5.x (opens new window) (opens new window)
MongoDB (opens new window) (opens new window)
Cassandra (opens new window) (opens new window)
数仓数据存储 StarRocks 读 、 (opens new window)
ApacheDoris (opens new window)
ClickHouse
Hive (opens new window) (opens new window)
kudu (opens new window)
无结构化数据存储 TxtFile (opens new window) (opens new window)
FTP (opens new window) (opens new window)
HDFS (opens new window) (opens new window)
Elasticsearch (opens new window)
时间序列数据库 OpenTSDB (opens new window)
TSDB (opens new window) (opens new window)
TDengine (opens new window) (opens new window)

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。

# 2.2.3 框架设计

DataX本身作为离线数据同步框架,采用Framework + Plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX框架设计

# 2.2.4 核心架构

核心模块介绍

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

# 2.2.5 工具特点

DataX工具有以下优势特点。

1)可靠的数据质量监控;2)丰富的数据转换功能;3)精准的速度控制;4)强劲的同步性能;5)健壮的容错机制;6)极简的使用体验。

# 2.2.6 可视化面板

DataX Web是在DataX之上开发的分布式数据同步工具,提供简单易用的操作界面,降低用户使用DataX的学习成本,缩短任务配置时间,避免配置过程中出错。

用户可通过页面选择数据源即可创建数据同步任务,支持RDBMS、Hive、HBase、ClickHouse、MongoDB等数据源,RDBMS数据源可批量创建数据同步任务,支持实时查看数据同步进度及日志并提供终止同步功能,集成并二次开发xxl-job可根据时间、自增主键增量同步数据。任务执行器支持集群部署,支持执行器多节点路由策略选择,支持超时控制、失败重试、失败告警、任务依赖,执行器CPU、内存、负载的监控等。

项目地址:https://github.com/WeiYe-Jing/datax-web (opens new window)(后端)、https://github.com/WeiYe-Jing/datax-web-ui (opens new window)(前端)

用户手册:https://github.com/WeiYe-Jing/datax-web/blob/master/userGuid.md (opens new window)

datax-web架构图

DataX Web 特性:

  • 1、通过Web构建DataX Json;
  • 2、DataX Json保存在数据库中,方便任务的迁移,管理;
  • 3、Web实时查看抽取日志,类似Jenkins的日志控制台输出功能;
  • 4、DataX运行记录展示,可页面操作停止DataX作业;
  • 5、支持DataX定时任务,支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;
  • 6、调度采用中心式设计,支持集群部署;
  • 7、任务分布式执行,任务"执行器"支持集群部署;
  • 8、执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行;
  • 9、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
  • 10、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;
  • 11、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;
  • 12、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
  • 13、任务失败告警;默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;
  • 14、用户管理:支持在线管理系统用户,存在管理员、普通用户两种角色;
  • 15、任务依赖:支持配置子任务依赖,当父任务执行结束且执行成功后将会主动触发一次子任务的执行, 多个子任务用逗号分隔;
  • 16、运行报表:支持实时查看运行数据,以及调度报表,如调度日期分布图,调度成功分布图等;
  • 17、指定增量字段,配置定时任务自动获取每次的数据区间,任务失败重试,保证数据安全;
  • 18、页面可配置DataX启动JVM参数;
  • 19、数据源配置成功后添加手动测试功能;
  • 20、可以对常用任务进行配置模板,在构建完JSON之后可选择关联模板创建任务;
  • 21、jdbc添加hive数据源支持,可在构建JSON页面选择数据源生成column信息并简化配置;
  • 22、优先通过环境变量获取DataX文件目录,集群部署时不用指定JSON及日志目录;
  • 23、通过动态参数配置指定hive分区,也可以配合增量实现增量数据动态插入分区;
  • 24、任务类型由原来DataX任务扩展到Shell任务、Python任务、PowerShell任务;
  • 25、添加HBase数据源支持,JSON构建可通过HBase数据源获取hbaseConfig,column;
  • 26、添加MongoDB数据源支持,用户仅需要选择collectionName即可完成json构建;
  • 27、添加执行器CPU、内存、负载的监控页面;
  • 28、添加24类插件DataX JSON配置样例
  • 29、公共字段(创建时间,创建人,修改时间,修改者)插入或更新时自动填充
  • 30、对swagger接口进行token验证
  • 31、任务增加超时时间,对超时任务kill datax进程,可配合重试策略避免网络问题导致的datax卡死。
  • 32、添加项目管理模块,可对任务分类管理;
  • 33、对RDBMS数据源增加批量任务创建功能,选择数据源,表即可根据模板批量生成DataX同步任务;
  • 34、JSON构建增加ClickHouse数据源支持;
  • 35、执行器CPU.内存.负载的监控页面图形化;
  • 36、RDBMS数据源增量抽取增加主键自增方式并优化页面参数配置;
  • 37、更换MongoDB数据源连接方式,重构HBase数据源JSON构建模块;
  • 38、脚本类型任务增加停止功能;
  • 39、rdbms json构建增加postSql,并支持构建多个preSql,postSql;
  • 40、数据源信息加密算法修改及代码优化;
  • 41、日志页面增加DataX执行结果统计数据

# 2.3 BitSail工具

# 2.3.1 基本介绍

BitSail是字节跳动开源的基于分布式架构的高性能数据集成引擎。支持多个异构数据源之间的数据同步,提供批量、流式、增量场景下的全局数据集成解决方案。目前,它服务于字节跳动的几乎所有业务线,如抖音、今日头条等,每天同步数百万亿数据。

项目地址:https://github.com/bytedance/bitsail (opens new window)

# 2.3.2 技术架构

数据处理流水线如下:首先通过Input Sources拉取源数据,然后通过中间框架层进行处理,最后通过Output Sinks将数据写入目标。

Source[Input Sources] -> Framework[Data Transmission] -> Sink[Output Sinks]
1

在框架层,提供了丰富的功能并对所有同步场景生效,例如脏数据收集、自动并行计算、任务监控等。在数据同步场景下,涵盖批量、流式、增量数据同步。

bitsail架构

# 2.3.3 工具特点

BitSail工具有以下优势特点。

1)低启动成本和高灵活性;2)流批一体化和数据湖仓一体化架构,一个框架覆盖几乎所有数据同步场景;3)高性能、海量数据处理能力;4)DDL 自动同步;5)类型系统,不同数据源类型之间的转换;6)引擎独立读写接口,开发成本低;7)实时监控任务状态。

# 2.3.4 应用场景

BitSail工具有以下应用场景。

1)异构数据源中的海量数据同步;2)流和批集成数据处理能力;3)数据湖与仓库一体化数据处理能力;4)高性能、高可靠的数据同步;5)分布式、云原生架构数据集成引擎

# 2.4 其他ETL工具

# 2.4.1 Logstash工具

项目简介:Logstash 是一个应用程序日志、事件的传输、处理、管理和搜索的平台。Logstash是具有实时流水线能力的开源的数据收集引擎。 Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。 它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。Logstash 现在是 ElasticSearch 家族成员之一。

项目地址:https://github.com/elastic/logstash (opens new window)

Logstash简介

# 2.4.2 Sqoop工具

项目简介: Apache Sqoop(SQL 到 Hadoop)允许在数据库和 HDFS 之间轻松导入和导出数据集。

项目地址:https://github.com/apache/sqoop (opens new window)

# 2.4.3 Flume工具

项目简介:Apache Flume 是一种分布式、可靠且可用的服务,用于高效收集、聚合和移动大量日志数据。它具有基于流数据流的简单灵活的架构。它具有可调整的可靠性机制以及许多故障转移和恢复机制,具有健壮性和容错性。系统集中管理,实现智能动态管理。它使用允许在线分析应用程序的简单可扩展数据模型。

项目地址:https://github.com/apache/flume (opens new window)

# 2.4.4 Maxwell工具

项目简介:Maxwell 是一个读取 MySQL 二进制日志并将行更新作为 JSON 写入 Kafka、Kinesis 或其他流平台的应用程序。Maxwell 的操作开销很低,只需要 MySQL 和一个可以写入的地方。它的常见用例包括 ETL、缓存构建/过期、指标收集、搜索索引和服务间通信。

项目地址:https://github.com/zendesk/maxwell (opens new window)

# 2.4.5 NiFi工具

项目简介:Apache NiFi 是为数据流而设计的。它支持高度可配置的数据路由、转换和系统中介逻辑的有向图。

项目地址:https://github.com/apache/nifi (opens new window)

# 2.4.6 Canal工具

项目简介:Canal 是阿里巴巴开源的 MySQL 增量订阅&消费组件。主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

项目地址:https://github.com/alibaba/canal (opens new window)

Canal简介

# 3. Kettle Spoon的本地安装与基本使用

# 3.1 Kettle Spoon本地环境

本文使用M1pro芯片的Mac作为本地开发机。Intel 芯片是x86架构的,M1pro芯片是ARM架构的,Kettle Spoon 并没有原生支持M系列芯片,但是可以使用Rosetta 转译运行,或者使用Parallels Desktop虚拟机用Win系统运行,但性能均会打一定的折扣。Windows系统直接使用 Spoon.bat 脚本启动即可,这里就不赘述了。

# 3.1.1 安装基础依赖环境

[1] 安装 JDK8 环境

官网:https://www.oracle.com/java/technologies/downloads/#java8 (opens new window)

Step1:去官网下载 jdk-8u333-macosx-x64.dmg(以实际版本为准即可),然后安装。

Step2:配置环境变量

$ cd ~/
$ open .bash_profile

添加如下一行配置:
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_333.jdk/Contents/Home
1
2
3
4
5

[2] 安装 Rosetta 环境

打开终端,输入以下命令安装 Rosetta 环境。

$ /usr/sbin/softwareupdate --install-rosetta --agree-to-license
1

# 3.1.2 安装并启动 Kettle Spoon

[1] 下载 Kettle Spoon 工具

打开 Kettle Spoon 官网 (opens new window),下载最新版本即可,下载成功后得到一个 1.16GB 的 pdi-ce-9.3.0.0-428.zip 压缩包,解压后得到 data-integration 目录。

[2] 启动 Kettle Spoon 工具

打开终端,输入以下命令启动 Kettle Spoon。

$ cd data-integration
$ env /usr/bin/arch -x86_64 /bin/zsh --login
$ ./spoon.sh
1
2
3

注:如果未输入第2句,用Rosetta转译运行,会出现如下报错

I’m sorry, this Mac platform [arm64] is not yet supported! Please try starting using ‘Data Integration 32-bit’ or ‘Data Integration 64-bit’ as appropriate.
1

启动加载页面如下图所示,加载的时间会比较长,请耐心等待。

Kettle-Spoon启动加载界面

启动成功之后,可以看到如下界面。

启动Kettle-Spoon

注:如上图所示,MacOS 深色模式的 Kettle Spoon 打开之后有点儿显示异常的问题,很多东西都看不清,非常影响使用。建议换成浅色系统模式进行使用,显示正常且美观。如果非的要用深色模式,需要对外观进行调整,才能看得清界面。

Step1:先把系统外观设置成浅色(不然你啥都看不见,设置项都没法点)

Step2:打开Preferences...——观感——勾选“使用操作系统外观”,背景颜色、工作区背景颜色、标签颜色根据喜好修改。

修改Kettle-Spoon界面外观

Step3:重启 Kettle Spoon,再把系统外观设置回自己喜欢的深色,这时候再打开虽然不够美观,但最起码功能区能看清字了。

# 3.2 Kettle Spoon基本介绍

# 3.2.1 基本概念

在使用 Kettle Spoon 之前,建议先了解一下基本概念。

  • Transformation:定义对数据操作的容器,数据操作就是数据从输入到输出的一个过程,可以理解为比Job粒度更小一级的容器,我们将任务分解成Job,然后需要将Job分解成一个或多个Transformation,每个Transformation只完成一部分工作。
  • Step:是Transformation内部的最小单元,每一个Step完成一个特定的功能。
  • Job:负责将Transformation组织在一起进而完成某一工作,通常我们需要把一个大的任务分解成几个逻辑上隔离的Job,当这几个Job都完成了,也就说明这项任务完成了。
  • Job Entry:Job Entry是Job内部的执行单元,每一个Job Entry用于实现特定的功能,如:验证表是否存在,发送邮件等。可以通过Job来执行另一个Job或者Transformation,也就是说Transformation和Job都可以作为Job Entry。
  • Hop:用于在Transformation中连接Step,或者在Job中连接Job Entry,是一个数据流的图形化表示。

# 3.2.2 连接数据库

以连接MySQL数据库为例,数据库版本为5.7。

Step1:下载驱动包:mysql-connector-java-5.1.34.jar (opens new window),然后把他放到./data-integration/lib目录。

Step2:打开 Kettle Spoon 工具,新建一个转换,然后选择DB连接,连接类型选择“MySQL”,连接方式选择“Native (JDBC)”,输入连接信息,测试成功后点击确认。

Kettle-Spoon连接MySQL

# 3.2.3 工作界面

新建一个作业,选择核心对象模块,可以看到 Kettle Spoon 已经分门别类的提供了各类可视化组件,我们只需要拖拽上去,然后连接起来配置一下即可。

Kettle-Spoon工作界面的核心对象

以下是一个配置好的作业任务示例,通过SQL组件和转换组件组成的工作流,具体配置内容可以点开查看和修改。

Kettle-Spoon测试作业任务

# 4. 使用Docker部署DataX服务

以下我将采用Docker的方式搭建DataX3.0服务(版本是datax_v202210,版本说明 (opens new window)),服务器系统用的是Debian 11 x86_64。

  • 另注:后来在实际使用过程中,感觉还是直接在服务器上部署更方便,坑也更少一些。

这里只搭建DataX服务,不搭建DataX-Web可视化界面,后者有比较多的bug且不再维护了,个人感觉不太好用,如需可视化且可在其上进行二次开发。

另外也不建议使用 linshellfeng/datax_web:3.0.1 镜像,虽然里面内置了DataX和DataX-Web服务,但里面有不少问题。

# 4.1 搭建Docker环境

$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh -   # 安装docker
$ sudo systemctl start docker  # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)
1
2
3
4

# 4.2 使用DataX工具包构建镜像并部署

官方提供了DataX工具包,使用方法可参考:https://github.com/alibaba/DataX/blob/master/userGuid.md (opens new window)

Step1:下载 datax.tar.gz (opens new window) 工具包并解压,修改 core.json 文件的配置(将channel/speed/byte的-1改为2000000即可)


{
    "entry": {
        "jvm": "-Xms1G -Xmx1G",
        "environment": {}
    },
    "common": {
        "column": {
            "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
            "timeFormat": "HH:mm:ss",
            "dateFormat": "yyyy-MM-dd",
            "extraFormats":["yyyyMMdd"],
            "timeZone": "GMT+8",
            "encoding": "utf-8"
        }
    },
    "core": {
        "dataXServer": {
            "address": "http://localhost:7001/api",
            "timeout": 10000,
            "reportDataxLog": false,
            "reportPerfLog": false
        },
        "transport": {
            "channel": {
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": 2000000,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },
        "container": {
            "job": {
                "reportInterval": 10000
            },
            "taskGroup": {
                "channel": 5
            },
            "trace": {
                "enable": "false"
            }

        },
        "statistics": {
            "collector": {
                "plugin": {
                    "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
                    "maxDirtyNumber": 10
                }
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

注:如未修改该配置,实际使用时会出现“在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数”的报错。

Step2:编写Dockerfile及部署脚本

Dockerfile

# 基于python3.9创建新镜像
FROM python:3.9

# 安装常用命令(非必要)
RUN apt-get update
RUN apt-get install -y cron && service cron start   # 安装crontab并启动
RUN apt-get install -y nano                         # 安装nano-给crontab用
RUN apt-get install -y wget                         # 安装wget
RUN apt-get install -y vim                          # 安装vim
RUN apt-get install -y psmisc                       # 安装ps

# 创建容器内部目录并作为工作目录
RUN mkdir /datax
ADD . /datax
WORKDIR /datax

# 安装java8环境
RUN mkdir /usr/local/java
# 方式一:下载jdk并解压到指定目录(适用于网速快的情况,需要提前安装wget)
RUN wget https://mirrors.huaweicloud.com/java/jdk/8u202-b08/jdk-8u202-linux-x64.tar.gz
RUN tar zxvf jdk-8u202-linux-x64.tar.gz -C /usr/local/java && rm -f jdk-8u202-linux-x64.tar.gz
# 方式二:将本地jdk复制到内部目录并自动解压(适用于网速慢的情况,提前下载好)
# ADD jdk-8u202-linux-x64.tar.gz /usr/local/java
# RUN rm -f jdk-8u202-linux-x64.tar.gz
RUN ln -s /usr/local/java/jdk1.8.0_202 /usr/local/java/jdk
ENV JAVA_HOME /usr/local/java/jdk
ENV JRE_HOME ${JAVA_HOME}/jre
ENV CLASSPATH .:${JAVA_HOME}/lib:${JRE_HOME}/lib
ENV PATH ${JAVA_HOME}/bin:$PATH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

注:这里安装nano是给crontab用的,crontab -e 创建的时候,默认使用以下编辑器,由于docker里的系统是阉割版的,这里就会报错。

docker里crontab编辑器报错

另注:该docker里的Crontab定时任务要写成crontab配置 cd /dir_path && ./script.sh的形式,如果写成了crontab配置 /dir_path/script.sh形式会不执行任务。有关Crontab定时任务的编写以及存在的坑,请参考我另一篇博客:VPS基本部署环境的搭建与配置 (opens new window) 的2.2.5节。

build.sh

#!/bin/bash

base_path=$(cd `dirname $0`; pwd)
docker build -t 'datax_image' .
docker run -itd --name datax -h datax -v ${base_path}/conf:/datax/conf -v ${base_path}/job:/datax/job -v ${base_path}/log:/datax/log -v ${base_path}/plugin:/datax/plugin -v ${base_path}/script:/datax/script -e TZ="Asia/Shanghai" datax_image
$ docker update datax --restart=always
1
2
3
4
5
6

项目结构如下:

.
├── Dockerfile
├── build.sh
├── bin
│   ├── datax.py
│   ├── dxprof.py
│   └── perftrace.py
├── conf
│   ├── core.json
│   └── logback.xml
├── job
│   └── job.json
├── lib
│   ├── commons-beanutils-1.9.2.jar
│   ├── commons-cli-1.2.jar
│   ├── commons-codec-1.11.jar
│   ├── commons-collections-3.2.1.jar
│   ├── commons-configuration-1.10.jar
│   ├── commons-io-2.4.jar
│   ├── commons-lang-2.6.jar
│   ├── commons-lang3-3.3.2.jar
│   ├── commons-logging-1.1.1.jar
│   ├── commons-math3-3.1.1.jar
│   ├── datax-common-0.0.1-SNAPSHOT.jar
│   ├── datax-core-0.0.1-SNAPSHOT.jar
│   ├── datax-transformer-0.0.1-SNAPSHOT.jar
│   ├── fastjson-1.1.46.sec10.jar
│   ├── fluent-hc-4.5.jar
│   ├── groovy-all-2.1.9.jar
│   ├── hamcrest-core-1.3.jar
│   ├── httpclient-4.5.13.jar
│   ├── httpcore-4.4.13.jar
│   ├── janino-2.5.16.jar
│   ├── logback-classic-1.0.13.jar
│   ├── logback-core-1.0.13.jar
│   └── slf4j-api-1.7.10.jar
├── log
├── plugin
│   ├── reader
│   │   ├── cassandrareader
│   │   ├── datahubreader
│   │   ├── drdsreader
│   │   ├── ftpreader
│   │   ├── gdbreader
│   │   ├── hbase094xreader
│   │   ├── hbase11xreader
│   │   ├── hbase11xsqlreader
│   │   ├── hbase20xsqlreader
│   │   ├── hdfsreader
│   │   ├── kingbaseesreader
│   │   ├── loghubreader
│   │   ├── mongodbreader
│   │   ├── mysqlreader
│   │   ├── oceanbasev10reader
│   │   ├── odpsreader
│   │   ├── opentsdbreader
│   │   ├── oraclereader
│   │   ├── ossreader
│   │   ├── otsreader
│   │   ├── otsstreamreader
│   │   ├── postgresqlreader
│   │   ├── rdbmsreader
│   │   ├── sqlserverreader
│   │   ├── starrocksreader
│   │   ├── streamreader
│   │   ├── tdenginereader
│   │   ├── tsdbreader
│   │   └── txtfilereader
│   └── writer
│       ├── adbpgwriter
│       ├── adswriter
│       ├── cassandrawriter
│       ├── clickhousewriter
│       ├── datahubwriter
│       ├── doriswriter
│       ├── drdswriter
│       ├── elasticsearchwriter
│       ├── ftpwriter
│       ├── gdbwriter
│       ├── hbase094xwriter
│       ├── hbase11xsqlwriter
│       ├── hbase11xwriter
│       ├── hbase20xsqlwriter
│       ├── hdfswriter
│       ├── hologresjdbcwriter
│       ├── kingbaseeswriter
│       ├── kuduwriter
│       ├── loghubwriter
│       ├── mongodbwriter
│       ├── mysqlwriter
│       ├── oceanbasev10writer
│       ├── ocswriter
│       ├── odpswriter
│       ├── oraclewriter
│       ├── oscarwriter
│       ├── osswriter
│       ├── otswriter
│       ├── postgresqlwriter
│       ├── rdbmswriter
│       ├── sqlserverwriter
│       ├── starrockswriter
│       ├── streamwriter
│       ├── tdenginewriter
│       ├── tsdbwriter
│       └── txtfilewriter
├── script
└── tmp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

Step3:上传到服务器进行部署

将项目部署到服务器上,执行 build.sh 脚本,将DataX项目部署起来,然后使用默认 job.json 进行测试。

$ chmod u+x build.sh && ./build.sh
$ docker exec -it datax /bin/bash
$ python /datax/bin/datax.py /datax/job/es_job.json
1
2
3

# 4.3 DataX插件编写及打包

DataX插件编写及打包详见:DataX插件开发宝典 (opens new window)

// 下载项目代码(我这里使用的是datax_v202210的)
$ wget https://github.com/alibaba/DataX/archive/refs/tags/datax_v202210.zip
// 打包插件(注意不要忘了加-DskipTests assembly:assembly)
// 为了打包快速,可以将dataX父pom文件内的模块都注释掉,只留下需要打包的模块和公用模块
$ mvn clean package -DskipTests assembly:assembly
1
2
3
4
5

打包完将 elasticsearchwriter-0.0.1-SNAPSHOT.jar 文件上传到容器内的datax插件目录即可。

# 4.4 DataX与SpringCloud搭配实现集群部署

DataX只是单机工具,如果希望集群部署可以配合SpringCloud工程实现:

  • 分布式SpringCloud集群配合euraka/zookeeper注册中心+负载均衡实现一个稳定的集群环境。
  • 每个SpringCloud接口后台直接调用datax的python指令启动本地datax并接收反馈收集日志。
  • 集群日志、监控、配置变更(datax使用的json文件)等均可基于SpringCloud生态圈工具快速搭建。

# 5. DataX的基本使用示例

# 5.1 准备测试数据表

source数据库的测试数据表结构及数据示例如下(测试数据由链家爬虫采集获取):

CREATE TABLE `spider_second_hand` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `area` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '区域',
  `city` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '城市',
  `descs` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '描述',
  `direction` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '朝向',
  `district` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '区县',
  `floor` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '楼层',
  `furnish` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '装修',
  `insert_time` datetime DEFAULT NULL COMMENT '插入时间',
  `is_deleted` int(11) DEFAULT NULL COMMENT '是否删除(0未删除 1已删除)',
  `layout` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '布局',
  `price` double DEFAULT NULL COMMENT '价格(万/套)',
  `size` double DEFAULT NULL COMMENT '大小(平米)',
  `source` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '数据来源',
  `house_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '房屋类型',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16383 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='二手房信息表';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

DataX测试数据表数据示例

target数据库的表结构如下:

CREATE TABLE `spider_second_hand` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `area` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '区域',
  `city` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '城市',
  `descs` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '描述',
  `direction` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '朝向',
  `district` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '区县',
  `floorType` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '层高类型',
  `floorNum` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '楼层高度',
  `furnish` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '装修',
  `insertTime` datetime DEFAULT NULL COMMENT '插入时间',
  `isDeleted` int(11) DEFAULT NULL COMMENT '是否删除(0未删除 1已删除)',
  `layout` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '布局',
  `price` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '价格(万/套)',
  `size` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '大小(平米)',
  `source` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '数据来源',
  `houseType` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '房屋类型',
  `updateTime` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=16383 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='二手房信息表';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

注:与source表的字段变更如下,用于测试。

  • 字段名称映射:原先下划线_命名的字段改为驼峰命名。
  • 字段内容拆分:floor字段拆分成floorType和floorNum两个字段。
  • 字段类型变更:price和size字段类型由double变更为varchar。

# 5.2 全量数据同步

官方的插件文档写的很详细,务必仔细阅读,尤其是参数说明。

# 5.2.1 MySQL至MySQL全量数据同步

demo_mysql2es.json:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "your_password",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://ip:3306/source_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"
                ],
                "querySql": [
                  "select id,area,city,descs,direction,district,furnish,insert_time,is_deleted,layout,price,size,source,house_type,update_time,SUBSTRING_INDEX(floor, '(', 1) floor_type, replace(SUBSTRING_INDEX(floor, '(', -1),')','') floor_num from spider_second_hand where 1=1"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "root",
            "password": "your_password",
            "writeMode": "update",
            "column": [
              "`id`",
              "`area`",
              "`city`",
              "`descs`",
              "`direction`",
              "`district`",
              "`furnish`",
              "`insertTime`",
              "`isDeleted`",
              "`layout`",
              "`price`",
              "`size`",
              "`source`",
              "`houseType`",
              "`updateTime`",
              "`floorType`",
              "`floorNum`"
            ],
            "connection": [
              {
                "table": [
                  "spider_second_hand"
                ],
                "jdbcUrl": "jdbc:mysql://ip:3306/target_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"
              }
            ]
          }
        }
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

注意事项:

  • 1)jdbcUrl连接信息的?之后必须加上useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8,否则会出现如下错误:

    Caused by: java.lang.Exception: DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确。
    
    1
  • 2)where 1=1是为了给下面增量更新Shell脚本使用,会替换这里的1=1为实际增量条件,请务必存在1=1(中间不要有空格)且整个配置文件仅此处有。

  • 3)writeMode为写入模式参数,取值为 insert/replace/update,默认为insert,如果需要更新数据,把这里设置成update,update也可以插入数据。

执行任务:

$ python /datax/bin/datax.py /datax/job/demo_mysql2mysql.json
1

执行成功的日志如下:

DataX执行MySQL全量数据同步

# 5.2.2 MySQL至ElasticSearch全量数据同步

demo_mysql2es.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "your_password",
            "where":"1=1",
            "column": [
              "`id`",
              "`area`",
              "`city`",
              "`descs`",
              "`direction`",
              "`district`",
              "`furnish`",
              "`insert_time`",
              "`is_deleted`",
              "`layout`",
              "`price`",
              "`size`",
              "`source`",
              "`house_type`",
              "`update_time`"
            ],
            "splitPk": "",
            "connection": [
              {
                "table": [
                  "spider_second_hand"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://ip:3306/target_db?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://ip:9200",
            "accessId": "elastic",
            "accessKey": "your_password",
            "index": "spider_second_hand",
            "type": "default",
            "cleanup": false,
            "discovery": false,
            "dynamic": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": [
              {
                "name": "pk",
                "type": "id"
              },
              {
                "name": "area",
                "type": "text"
              },
              {
                "name": "city",
                "type": "text"
              },
              {
                "name": "descs",
                "type": "text"
              },
              {
                "name": "direction",
                "type": "text"
              },
              {
                "name": "district",
                "type": "text"
              },
              {
                "name": "furnish",
                "type": "text"
              },
              {
                "name": "insertTime",
                "type": "date"
              },
              {
                "name": "isDeleted",
                "type": "long"
              },
              {
                "name": "layout",
                "type": "text"
              },
              {
                "name": "price",
                "type": "double"
              },
              {
                "name": "size",
                "type": "double"
              },
              {
                "name": "source",
                "type": "text"
              },
              {
                "name": "houseType",
                "type": "text"
              },
              {
                "name": "updateTime",
                "type": "date"
              }
            ]
          }
        }
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126

注意事项:

  • 1)jdbcUrl连接信息的?之后必须加上useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8,否则会出现如下错误:

    Caused by: java.lang.Exception: DataX无法连接对应的数据库,可能原因是:1) 配置的ip/port/database/jdbc错误,无法连接。2) 配置的username/password错误,鉴权失败。请和DBA确认该数据库的连接信息是否正确。
    
    1
  • 2)where 1=1是为了给下面增量更新Shell脚本使用,会替换这里的1=1为实际增量条件,请务必存在1=1(中间不要有空格)且整个配置文件仅此处有。

  • 3)虽然 datax_v202210 的文档上只写了支持 7.x,但实际测试发现8.x(如8.4.1)也是支持的。

  • 4)cleanup设置为false,每次同步时不清空数据,如果设置为true会导致每次同步清空数据,增量同步会受影响。

  • 5)使用数据库id作为ES中记录的_id可通过以下方式实现(不需要再指定name为id的字段了,不然会报错)。强烈建议这里使用数据库id,不要用默认的随机id,不然数据更新会出问题。

    {"name": "pk", "type": "id"},
    
    1

    注:如果source表有多个(可能存在id重复问题,会影响到数据更新),这里提供两种解决方案。一种是将多个source先同步到mysql,用mysql的唯一自增主键作为_id;另一种是将不同source的id区分开作为_id(比如source_table的表名与id进行拼接)。

  • 6)如果是向已有的ES索引中同步数据,结构已经有了,需要将dynamic配置为true,不使用DataX中的索引配置。

  • 7)DataX目前不支持dense_vector向量类型的数据同步,会报如下错误:

                      "error": {
                        "type": "mapper_parsing_exception",
                        "reason": "failed to parse",
                        "caused_by": {
                            "type": "parsing_exception",
                            "reason": "Failed to parse object: expecting token of type [VALUE_NUMBER] but found [END_OBJECT]",
                            "line": 1,
                            "col": 838
                         }
                      }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    DataX仅支持如下类型:

    ID,PARENT,ROUTING,VERSION,STRING,TEXT,KEYWORD,LONG,INTEGER,SHORT,BYTE,DOUBLE,FLOAT,DATE,BOOLEAN,BINARY,INTEGER_RANGE,
    FLOAT_RANGE,LONG_RANGE,DOUBLE_RANGE,DATE_RANGE,GEO_POINT,GEO_SHAPE,IP,IP_RANGE,COMPLETION,TOKEN_COUNT,OBJECT,NESTED;
    
    1
    2

执行任务:

$ python /datax/bin/datax.py /datax/job/demo_mysql2es.json
1

执行成功的日志如下:

DataX执行ES全量数据同步

# 5.3 增量数据同步

对上述的 MySQL --> MySQL、MySQL --> ElasticSearch 进行改进,使其支持基于时间戳的增量数据同步。

# 5.3.1 增量数据同步的实现思路

DataX 支持多种数据库的读写,JSON 格式配置文件很容易编写,同步性能很好,但是缺乏对增量更新的内置支持。其实增量更新非常简单,只要从目标数据库读取一个最大值的记录,然后根据这个最大值对源数据库要同步的表进行过滤,然后再进行同步即可。

由于 DataX 支持多种数据库的读写,一种相对简单并且可靠的思路就是:

  • Step1:利用 DataX 的 DataReader 去目标数据库读取一个最大值;
  • Step2:将这个最大值用 TextFileWriter 写入到一个 txt 文件;
  • Step3:用 Shell 脚本来读取 txt 文件, 并动态修改全部同步的配置文件;
  • Step4:执行修改后的配置文件, 进行增量同步。

注意:使用此方案进行数据同步前,务必保证系统时间正确(可使用date命令查看)。若服务器的系统时间不正确,会使上次同步时间记录出错,最终会导致下次数据同步的条件筛选出错。

# 5.3.2 增量数据同步的Shell脚本

编写一个自动判断增量同步或全量同步的shell脚本(第一次为全量,后续为增量)

/datax/script/auto_sync_data.sh

#!/bin/bash

# 错误检查,有错误的时候退出
set -e

# dataX脚本里需要的参数
systemTime=`date +%Y-%m-%d,%H:%M:%S`

# 获取当前时间,这个是同步后要写到txt文件的变量
currentTime=`date +"%Y-%m-%d %H:%M:%S"`
echo ${currentTime}

# datax根路径定义[需要修改]
basePath=/datax
txtPath=${basePath}/job/txt
if [ ! -d "${txtPath}" ];then
    mkdir ${txtPath}
fi
tempPath=${basePath}/job/temp
if [ ! -d "${tempPath}" ];then
    mkdir ${tempPath}
fi

# python命令版本[需要修改]
pythonCmd=python  # python或python3

# json配置文件数组[需要修改]
jsonArray[${#jsonArray[*]}]=demo_mysql2mysql.json
jsonArray[${#jsonArray[*]}]=demo_mysql2es.json
# ...

# 遍历json文件
for i in ${jsonArray[@]}
do
    # 去除.json,拼接获取txt文件名
    echo ${i}
    txtName=${i%.*}.txt
    echo ${txtName}

    # 找到txt文本文件,并将内容读取到一个变量中
    if [ ! -f "${txtPath}" ];then
        touch ${txtPath}/${txtName}
    fi
    MAX_TIME=`cat ${txtPath}/${txtName}`
    echo ${MAX_TIME}

    # 如果最大时间不为""的话,修改全部同步的配置,进行增量更新;如果最大时间为null ,进行全量更新;
    if [ "$MAX_TIME" != "" ]; then
        # 设置增量更新过滤条件
        WHERE="update_time > str_to_date('$MAX_TIME', '%Y-%m-%d %H:%i:%s')"
        # 创建改写后的json临时配置文件(把1=1替换成上面的where条件)
        sed "s/1=1/$WHERE/g" ${basePath}/job/${i} > ${tempPath}/${i/.json/_temp.json}
        # 增量更新
        ${pythonCmd} ${basePath}/bin/datax.py -p "-DsystemTime='$systemTime'" -j "-Xms1g -Xmx1g" ${tempPath}/${i/.json/_temp.json}
    else
        # 全量更新
        ${pythonCmd} ${basePath}/bin/datax.py -p "-DsystemTime='$systemTime'" -j "-Xms1g -Xmx1g"  ${basePath}/job/${i}
    fi

    if [ $? -ne 0 ]; then
      # 执行失败
      echo "datax task execution failed"
    else
      # 执行成功,将最大日期写入txt文件,覆盖写入
      echo  ${currentTime} > ${txtPath}/${txtName}
    fi

# 删除临时文件
rm -rf ${tempPath}/${i/.json/_temp.json}

done
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

先把5.2节同步过去的全量数据清空,执行增量同步脚本。

$ chmod u+x /datax/script/auto_sync_data.sh && /datax/script/auto_sync_data.sh
1

执行结果如下:

脚本第一次执行时全量同步数据

然后去source表造一条新数据,重新执行增量同步脚本。

执行结果如下:

脚本第二次执行时增量同步数据

另注:使用这个增量数据同步脚本在服务器用Crontab定时任务部署时,有个坑是找不到java、python命令(自己执行脚本时可以找到,但用Crontab定时执行却报错),解决办法是找到java、python解释器路径,将其加到脚本中。

$ which python
/usr/bin/python
$ which java
/usr/local/java/jdk1.8.0_351/bin/java
1
2
3
4

在 auto_sync_data.sh 脚本加上解释器路径

# 配置环境变量,解决crontab执行时找不到python和java环境的问题
PATH+=:/usr/local/bin
PATH+=:/usr/local/java/jdk1.8.0_351/bin
1
2
3

定时任务可以这么配置

0 */1 * * * cd /root/datax/script && ./auto_sync_data.sh > /root/datax/log/auto_sync_data.log 2>&1 &
1

# 6. 常见情形处理方案

# 6.1 数据库表结构差异比对

项目简介:MySQL 数据库表结构对比工具

参考项目:https://github.com/zeonto/diff_schema (opens new window)(原项目在Python3环境下跑不了,下面对其进行了修改)

环境依赖:Python3.9(别的版本可能也行,但我没试),依赖安装pip3 install mysql-connector

使用方式:支持“使用SQL文件对比”、“连接数据库对比”两种使用方式。

  • 使用SQL文件对比:python3 mysql_diff_schema.py -d file -s source.sql -t target.sql -o diff.sql
  • 连接数据库对比:python3 mysql_diff_schema.py -d db -s root:[email protected]:3306~dbname1 -t root:[email protected]:3306~dbname2 -o diff.sql

使用效果:会生成一个 diff.sql 文件,里面会生成同步表结构差异的修改语句,执行它即可同步表结构差异。

注意事项:

  • 表设置信息修改只检查 ENGINE, CHARSET, COMMENT的改变来做对比。
  • 使用 SQL 文件对比请确保源文件和目标文件都是使用相同一个工具导出。

mysql_diff_schema.py

# -*- coding: utf-8 -*-

import re
import mysql.connector
from optparse import OptionParser


def config_option():
    usage = "%prog [options] arg \n"
    usage += " Demo1: %prog -d file -s source_schema.sql -t target_schema.sql -o diff_schema.sql\n Demo2: %prog -d db -s root:[email protected]:3306~dbname1 -t root:[email protected]:3306~dbname2 -o diff.sql"
    parser = OptionParser(usage)
    parser.add_option("-d", "--data", dest="data", help="data sources can be a file or db")
    parser.add_option("-s", "--source", dest="source_schema", help="source database schema can be a sql file or a database")
    parser.add_option("-t", "--target", dest="target_schema", help="target database schema can be a sql file or a database")
    parser.add_option("-o", "--output", dest="diff_alters", help="output diff alters to sql file")
    (options, args) = parser.parse_args()

    if not options.data or not options.target_schema or not options.source_schema or not options.diff_alters:
        parser.error("必须输入参数:-d -t -s -o");
    if options.data and options.data not in ['file', 'db']:
        parser.error('Incorrect data source type.')

    global opt_main
    opt_main = {}
    opt_main["data_source"] = options.data
    opt_main["target_schema"] = options.target_schema
    opt_main["source_schema"] = options.source_schema
    opt_main["diff_alters"] = options.diff_alters


class SchemaObjects(object):
    def __init__(self, target_schema, source_schema):
        self.target_schema = target_schema
        self.source_schema = source_schema
        self.run()

    def run(self):
        self.objects_alters = ''
        self.return_objects = {}
        self.return_objects['tables'] = {}
        self.return_objects['servers'] = {}
        self.return_objects['events'] = {}
        self.return_objects['routines'] = {}
        self.return_objects['triggers'] = {}

        if opt_main['data_source'] == 'db':
            self.source_tables = self._get_database_tables(self.source_schema)
            self.target_tables = self._get_database_tables(self.target_schema)
        else:
            self.source_tables = self._get_sql_tables(self.source_schema)
            self.target_tables = self._get_sql_tables(self.target_schema)

        self.diff_tables = self._get_diff_tables(self.target_tables, self.source_tables)
        for table in self.diff_tables:
            self.return_objects['tables'][table] = {}
            self.return_objects['tables'][table]['target_table'] = self._get_table_definitions(
                self.diff_tables[table]['target_table'])
            self.return_objects['tables'][table]['source_table'] = self._get_table_definitions(
                self.diff_tables[table]['source_table'])

    def _record_alters(self, alter):
        self.objects_alters += alter
        self.objects_alters += "\n"
        print(alter)

    def get_objects_alters(self):
        return self.objects_alters

    def get_schema_objects(self):
        return self.return_objects

    def _get_servers(self, schema_name):
        pass

    def _get_events(self, schema_name):
        pass

    def _get_routines(self, schema_name):
        pass

    def _get_triggers(self, schema_name):
        pass

    def _get_sql_tables(self, schema_name):
        """
        @brief      Gets the sql tables.
        
        @param      self         The object
        @param      schema_name  The schema name
        
        @return     The sql tables.
        """
        try:
            schema_file = open(schema_name, 'r')
        except IOError:
            print('Cannot open file', schema_name)
        else:
            schema_file.readline()
            schema_string = ''
            for line in schema_file:
                schema_string = schema_string + line
            schema_file.close()
            return_tables = {}
            tables = re.findall(r"CREATE\s*TABLE[^;]*;", schema_string)
            for table in tables:
                table_name = re.match(r"(CREATE\s*TABLE\s*\`)(.*)(\`\s*\()", table)
                if table_name:
                    return_tables[table_name.group(2)] = table

            return return_tables

    def _get_database_tables(self, conn_config):
        """
        @brief      Gets the database tables.
        
        @param      self         The object
        @param      conn_config  The connection configuration
        
        @return     The database tables.
        """
        config = re.match(r"([^:]*):(.*)@(.*)~([^~]*)", conn_config)  # format -> root:[email protected]:3306~db1
        if not config:
            raise Exception('parameter errors: %s' % (conn_config))
        db_info = {}
        if config.group(1):
            db_info['user'] = config.group(1)
        else:
            raise Exception('parameter errors: %s' % (conn_config))
        if config.group(2):
            db_info['pass'] = config.group(2)
        else:
            db_info['pass'] = ''
        host_info = config.group(3)
        host_re = re.match(r"([^:]*)(.*)", host_info)
        host = host_re.group(1)
        port = host_re.group(2).strip(':')
        db_info['host'] = host
        if port:
            db_info['port'] = port
        else:
            db_info['port'] = 3306
        if config.group(4):
            db_info['db'] = config.group(4)
        else:
            raise Exception('parameter errors: %s' % (conn_config))

        conn = mysql.connector.connect(
            host=db_info['host'],
            user=db_info['user'],
            password=db_info['pass'],
            database=db_info['db'],
            port=db_info['port'],
            charset='utf8'
        )
        cmd = conn.cursor()
        cmd.execute("show tables")
        tables = cmd.fetchall()
        table_schema = {}
        for table_name in tables:
            cmd.execute("show create table `%s`;" % (table_name))
            show_create = cmd.fetchone()
            table_schema[show_create[0]] = show_create[1]

        return table_schema

    def _get_diff_tables(self, target_tables, source_tables):
        return_tables = {}
        if target_tables and source_tables:
            for table in target_tables:
                if table in source_tables:
                    if target_tables[table] == source_tables[table]:
                        pass
                    else:
                        return_tables[table] = {}
                        return_tables[table]['target_table'] = target_tables[table]
                        return_tables[table]['source_table'] = source_tables[table]
                else:
                    self._record_alters("-- %s" % (table))
                    self._record_alters("DROP TABLE `%s`;" % (table))
                    self._record_alters(" ")

            for table in source_tables:
                if table in target_tables:
                    pass
                else:
                    self._record_alters("-- %s" % (table))
                    self._record_alters("%s;" % (source_tables[table].strip(';')))
                    self._record_alters(" ")

        return return_tables

    def _get_table_definitions(self, schema_table):
        return_definitions = {}
        return_definitions['column'] = {}
        return_definitions['primary'] = {}
        return_definitions['unique'] = {}
        return_definitions['key'] = {}
        return_definitions['foreign'] = {}
        return_definitions['fulltext'] = {}
        return_definitions['option'] = {}
        return_definitions['column_position'] = {}

        table_definitions = schema_table.split('\n')

        for definition in table_definitions:
            column_name = re.match(r"(\s*\`)([^`]*)(\`.*)", definition)
            if column_name:
                tmp = column_name.group().split(",")
                _column_content = ",".join(tmp[:-1])
                return_definitions['column'][column_name.group(2)] = _column_content.strip()
                return_definitions['column_position'][column_name.group(2)] = table_definitions.index(definition)

            primary_name = re.match(r"(\s*PRIMARY KEY\s*)", definition)
            if primary_name:
                return_definitions['primary']['primary'] = re.match(r"(\s*)(PRIMARY KEY \(.*\))(,?)", definition).group(2)

            unique_name = re.match(r"(\s*UNIQUE KEY \`)([^`]*)(\`.*)", definition)
            if unique_name:
                return_definitions['unique'][unique_name.group(2)] = re.match(r"(\s*)(UNIQUE KEY.*\))(,?)", definition).group(2)

            key_name = re.match(r"(\s*KEY \`)([^`]*)(\`.*)", definition)
            if key_name:
                return_definitions['key'][key_name.group(2)] = re.match(r"(\s*)(KEY.*\))(,?)", definition).group(2)

            foreign_name = re.match(r"(\s*CONSTRAINT \`)([^`]*)(\`.*)", definition)
            if foreign_name:
                return_definitions['foreign'][foreign_name.group(2)] = re.match(r"(\s*)(CONSTRAINT[^,]*)(,?)", definition).group(2)

            fulltext_name = re.match(r"(\s*FULLTEXT KEY \`)([^`]*)(\`.*)", definition)
            if fulltext_name:
                return_definitions['fulltext'][fulltext_name.group(2)] = re.match(r"(\s*)(FULLTEXT KEY.*\))(,?)", definition).group(2)

            option_name = re.match(r"(\)\s*ENGINE=.*)", definition)
            if option_name:
                pattern = re.compile(r' AUTO_INCREMENT=\d+| ROW_FORMAT=\w+', re.I)
                engine_content = re.sub(pattern, '', re.match(r"(\)\s*)(ENGINE[^\n]*)(;?)", definition).group(2))
                return_definitions['option']['option'] = engine_content

        return return_definitions


class SchemaAlters(object):
    def __init__(self, schema_objects):
        self.diff_objects = schema_objects
        self.run()

    def run(self):
        self.definitions_alters = ''
        self.return_alters = {}
        self.return_alters['tables'] = {}
        self.return_alters['servers'] = {}
        self.return_alters['events'] = {}
        self.return_alters['routines'] = {}
        self.return_alters['triggers'] = {}
        self._alter_tables(self.diff_objects['tables'])

    def _record_alters(self, alter):
        self.definitions_alters += alter
        self.definitions_alters += "\n"
        print(alter)

    def get_definitions_alters(self):
        return self.definitions_alters

    def _alter_tables(self, schema_tables):
        for table in schema_tables:
            target_table = schema_tables[table]['target_table']
            source_table = schema_tables[table]['source_table']

            _alter = ''
            _alter += self._column(table, target_table['column'], source_table['column'])
            _alter += self._primary(table, target_table['primary'], source_table['primary'])
            _alter += self._unique(table, target_table['unique'], source_table['unique'])
            _alter += self._key(table, target_table['key'], source_table['key'])
            _alter += self._foreign(table, target_table['foreign'], source_table['foreign'])
            _alter += self._fulltext(table, target_table['fulltext'], source_table['fulltext'])
            _alter += self._option(table, target_table['option'], source_table['option'])
            if _alter:
                self._record_alters("-- %s" % (table))
                self._record_alters("%s \n" % (_alter.strip('\n')))

    def _get_option_diff(self, source_option, target_option):
        """
        @brief      获取表设置差异
        
        @param      self           The object
        @param      source_option  The source option
        @param      target_option  The target option
        
        @return     The option difference.
        """
        check_option = ['ENGINE', 'CHARSET', 'COMMENT']  # 指定检查表设置项
        sources = source_option.strip(';').split(' ')
        targets = target_option.strip(';').split(' ')

        pattern = re.compile(r"COMMENT=(.*)")
        find_comment = pattern.findall(source_option)
        if find_comment:
            _comment = find_comment[0].strip(';')
        else:
            _comment = '\'\''

        _sources = {}
        _targets = {}
        for target_item in targets:
            if target_item:
                _item = target_item.split('=', 1)
                if len(_item) == 2 and _item[0] in check_option:
                    _targets[_item[0]] = _item[1]

        for source_item in sources:
            if source_item:
                _item = source_item.split('=', 1)
                if len(_item) == 2 and _item[0] in check_option:
                    _sources[_item[0]] = _item[1]

        option_diff = ''
        for option_member in check_option:
            if option_member in _sources.keys() and option_member in _targets.keys() and _sources[option_member] == \
                    _targets[option_member]:
                pass
            else:
                if option_member == 'COMMENT':
                    option_diff += option_member + '=' + _comment
                else:
                    if option_member not in _sources.keys() and option_member in _targets.keys():
                        option_diff += option_member + '=\'\''
                    else:
                        option_diff += option_member + '=' + _sources[option_member] + ' '

        return option_diff.strip()

    def _get_column_position_num(self, column_position, column):
        """
        @brief      获取字段所在位置(数字)
        
        @param      self             The object
        @param      column_position  The column position
        @param      column           The column
        
        @return     The column position number.
        """
        if (column_position[column]):
            return column_position[column]
        return 0

    def _get_next_column(self, position_dict, column):
        """
        @brief      获取下一个字段
        
        @param      self           The object
        @param      position_dict  The position dictionary
        @param      column         The column
        
        @return     The next column.
        """
        if position_dict[column] == len(position_dict):
            # 最后一个字段没有下一个字段,返回空
            return ''
        next_position = position_dict[column] + 1
        next_column = list(position_dict.keys())[list(position_dict.values()).index(next_position)]
        return next_column

    def _get_before_column(self, position_dict, column):
        """
        @brief      获取上一个字段
        
        @param      self           The object
        @param      position_dict  The position dictionary
        @param      column         The column
        
        @return     The before column.
        """
        if position_dict[column] == 1:
            # 第1个字段前面没有字段
            return ''
        before_position = position_dict[column] - 1
        before_column = list(position_dict.keys())[list(position_dict.values()).index(before_position)]
        return before_column

    def _get_target_next_column(self, source_position_dict, target_position_dict, column):
        """
        @brief      获取目标结构中的下一个字段
        
        @param      self                  The object
        @param      source_position_dict  The source position dictionary
        @param      target_position_dict  The target position dictionary
        @param      column                The column
        
        @return     The target next column.
        """
        source_position = source_position_dict[column]
        if source_position < len(source_position_dict):
            for x in range(source_position + 1, len(source_position_dict) + 1):
                next_column = list(source_position_dict.keys())[list(source_position_dict.values()).index(x)]
                if next_column in target_position_dict:
                    return next_column
        return ''

    def _get_source_before_column(self, source_position_dict, column):
        """
        @brief      从源结构中获取前面一个字段
        
        @param      self                  The object
        @param      source_position_dict  The source position dictionary
        @param      target_position_dict  The target position dictionary
        @param      column                The column
        
        @return     The source before column.
        """
        source_position = source_position_dict[column]
        # 从第1个参数开始,到第2个参数之前结束
        # 按位置从后往前查,字段前面一个字段存在目标结构中,则返回该字段
        for x in range(source_position - 1, 0, -1):
            before_column = list(source_position_dict.keys())[list(source_position_dict.values()).index(x)]
            return before_column
        return ''

    def _get_target_before_column(self, source_position_dict, target_position_dict, column):
        """
        @brief      从目标结构获取前面一个字段
        
        @param      self                  The object
        @param      source_position_dict  The source position dictionary
        @param      target_position_dict  The target position dictionary
        @param      column                The column
        
        @return     The target before column.
        """
        source_position = source_position_dict[column]
        # 从第1个参数开始,到第2个参数之前结束
        # 按位置从后往前查,字段前面一个字段存在目标结构中,则返回该字段
        for x in range(source_position - 1, 0, -1):
            before_column = list(source_position_dict.keys())[list(source_position_dict.values()).index(x)]
            if before_column in target_position_dict:
                return before_column
        return ''

    def _get_column_position_sql(self, source_position_dict, target_position_dict, column):
        """
        @brief      获取字段所在位置关系
        
        @param      self                  The object
        @param      source_position_dict  The source position dictionary {'status': 5, 'id': 1}
        @param      target_position_dict  The target position dictionary
        @param      column                The column
        
        @return     The column position sql.
        """
        if (column in source_position_dict and column in target_position_dict):
            if (source_position_dict[column] == target_position_dict[column]):
                return ''
            else:
                current_postion = source_position_dict[column]
                if current_postion == 1:
                    # 假如是第一个字段
                    return ' FIRST'
                before_column = self._get_target_before_column(source_position_dict, target_position_dict, column)
                if before_column:
                    return " AFTER `%s`" % (before_column)
        return ''

    def _column(self, table, target_column, source_column):
        source_position_dict = self.diff_objects['tables'][table]['source_table']['column_position']
        target_position_dict = self.diff_objects['tables'][table]['target_table']['column_position']

        _alter = "ALTER TABLE `%s`\n" % (table)
        _sql = ''
        for definition in target_column:
            if definition in source_column:
                source_position = self._get_column_position_num(source_position_dict, definition)
                target_position = self._get_column_position_num(target_position_dict, definition)
                if source_position == target_position and target_column[definition] == source_column[definition]:
                    # 字段内容、字段位置一致,没变化跳过
                    pass
                else:
                    # 字段内容没变化,字段位置改变
                    source_before_column = self._get_before_column(source_position_dict, definition)
                    target_before_column = self._get_before_column(target_position_dict, definition)
                    if target_column[definition] == source_column[
                        definition] and source_before_column == target_before_column:
                        # 字段内容没变化,上一个字段也相同,跳过
                        pass
                    else:
                        source_next_column = self._get_next_column(source_position_dict, definition)
                        target_next_column = self._get_next_column(target_position_dict, definition)
                        if target_column[definition] == source_column[
                            definition] and source_next_column == target_next_column:
                            # 字段内容没变化,下一个字段也相同,跳过
                            pass
                        else:
                            column_position = self._get_column_position_sql(source_position_dict, target_position_dict,
                                                                            definition)
                            _sql += "\tMODIFY COLUMN %s,\n" % (source_column[definition] + column_position)
            else:
                _sql += ("\tDROP COLUMN `%s`,\n" % (definition))

        for definition in source_column:
            if definition in target_column:
                pass
            else:
                target_before_column = self._get_target_before_column(source_position_dict, target_position_dict,
                                                                      definition)
                _sql += ("\tADD COLUMN %s AFTER %s,\n" % (source_column[definition], target_before_column))

        if _sql:
            return _alter + _sql.strip('\n').strip(',') + ';\n'
        else:
            return ''

    def _primary(self, table, target_primary, source_primary):
        _alter = "ALTER TABLE `%s`\n" % (table)
        _sql = ''
        if 'primary' in target_primary:
            if 'primary' in source_primary:
                if target_primary['primary'] == source_primary['primary']:
                    pass
                else:
                    _sql += ("\tDROP PRIMARY KEY,\n")
                    _sql += ("\tADD %s,\n" % (source_primary['primary']))
            else:
                _sql += ("\tDROP PRIMARY KEY,\n")

        if 'primary' in source_primary:
            if 'primary' in target_primary:
                pass
            else:
                _sql += ("\tADD %s,\n" % (source_primary['primary']))

        if _sql:
            return _alter + _sql.strip('\n').strip(',') + ';\n'
        else:
            return ''

    def _unique(self, table, target_unique, source_unique):
        _alter = "ALTER TABLE `%s`\n" % (table)
        _sql = ''
        for definition in target_unique:
            if definition in source_unique:
                if target_unique[definition] == source_unique[definition]:
                    pass
                else:
                    _sql += "\tDROP INDEX %s,\n" % (definition)
                    _sql += "\tADD %s,\n" % (source_unique[definition])
            else:
                _sql += "\tDROP INDEX %s,\n" % (definition)

        for definition in source_unique:
            if definition in target_unique:
                pass
            else:
                _sql += "\tADD %s,\n" % (source_unique[definition])

        if _sql:
            return _alter + _sql.strip('\n').strip(',') + ';\n'
        else:
            return ''

    def _key(self, table, target_key, source_key):
        _alter = "ALTER TABLE `%s`\n" % (table)
        _sql = ''
        for definition in target_key:
            if definition in source_key:
                if target_key[definition] == source_key[definition]:
                    pass
                else:
                    _sql += "\tDROP KEY %s,\n" % (definition)
                    _sql += "\tADD %s,\n" % (source_key[definition])
            else:
                _sql += "\tDROP KEY %s,\n" % (definition)

        for definition in source_key:
            if definition in target_key:
                pass
            else:
                _sql += "\tADD %s,\n" % (source_key[definition])

        if _sql:
            return _alter + _sql.strip('\n').strip(',') + ';\n'
        else:
            return ''

    def _foreign(self, table, target_foreign, source_foreign):
        _alter = "ALTER TABLE `%s`\n" % (table)
        _sql = ''
        for definition in target_foreign:
            if definition in source_foreign:
                if target_foreign[definition] == source_foreign[definition]:
                    pass
                else:
                    _sql += "\tDROP FOREIGN KEY %s,\n" % (definition)
                    _sql += "\tADD %s,\n" % (source_foreign[definition])
            else:
                _sql += "\tDROP FOREIGN KEY %s,\n" % (definition)

        for definition in source_foreign:
            if definition in target_foreign:
                pass
            else:
                _sql += "\tADD %s,\n" % (source_foreign[definition])

        if _sql:
            return _alter + _sql.strip('\n').strip(',') + ';\n'
        else:
            return ''

    def _fulltext(self, table, target_fulltext, source_fulltext):
        _alter = "ALTER TABLE `%s`\n" % (table)
        _sql = ''
        for definition in target_fulltext:
            if definition in source_fulltext:
                if target_fulltext[definition] == source_fulltext[definition]:
                    pass
                else:
                    _sql += "\tDROP FULLTEXT KEY %s,\n" % (definition)
                    _sql += "\tADD %s,\n" % (source_fulltext[definition])
            else:
                _sql += "\tDROP FULLTEXT KEY %s,\n" % (definition)

        for definition in source_fulltext:
            if definition in target_fulltext:
                pass
            else:
                _sql += "\tADD %s,\n" % (source_fulltext[definition])

        if _sql:
            return _alter + _sql.strip('\n').strip(',') + ';\n'
        else:
            return ''

    def _option(self, table, target_option, source_option):
        if 'option' in target_option:
            if 'option' in source_option:
                if target_option['option'] == source_option['option']:
                    pass
                else:
                    option_content = self._get_option_diff(source_option['option'], target_option['option'])
                    return "ALTER TABLE `%s` %s;" % (table, option_content)
        return ''


def main():
    config_option()

    current_objects = SchemaObjects(opt_main["target_schema"], opt_main["source_schema"])
    schema_objects = current_objects.get_schema_objects()
    objects_alters = current_objects.get_objects_alters()

    current_alters = SchemaAlters(schema_objects)
    definitions_alters = current_alters.get_definitions_alters()

    diff_alters = open(opt_main["diff_alters"], 'w')
    diff_alters.write('-- set default character\nset names utf8;\n\n')
    diff_alters.write(objects_alters)
    diff_alters.write(definitions_alters)

    diff_alters.close()


if __name__ == "__main__":
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661

# 6.2 对查询内容格式进行处理

一些逻辑处理可以放在 reader 模块的 sql 语句上。查询语句及映射字段也可以用 sql 去生成。

[1] 将一列数据拆分成多列

// 可以通过多种方式(比如:根据指定字符截取字符串),将一列数据拆分成多列。

// 截取第2个“.”之前的所有字符  www.demo
SELECT SUBSTRING_INDEX('www.demo.com', '.', 2) from dual;
// 截取倒数第2个“.”之后的所有字符  demo.com
SELECT SUBSTRING_INDEX('www.demo.com', '.', -2) from dual;
1
2
3
4
5
6

[2] 将逗号分隔的字段从一行拆分成多行

// 创建测试数据
CREATE TABLE `company` (
  `id` int(20) DEFAULT NULL,
  `name` varchar(100) DEFAULT NULL,
  `shareholder` varchar(100) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `company` VALUES ('1', '阿里巴巴', '马云');
INSERT INTO `company` VALUES ('2', '淘宝', '马云,孙正义');

// 查询语句
SELECT
	a.name,
	substring_index(
		substring_index(
			a.shareholder,
			',',
			b.help_topic_id + 1
		),
		',' ,- 1
	) AS shareholder
FROM
	company a
JOIN mysql.help_topic b ON b.help_topic_id < (
	length(a.shareholder) - length(
		REPLACE (a.shareholder, ',', '')
	) + 1
)

// 返回结果
name	    shareholder
阿里巴巴	 马云
淘宝	    马云
淘宝	    孙正义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

[3] 将多行数据合并成一行

// 创建测试数据
CREATE TABLE GRADE (
  ID int NOT NULL AUTO_INCREMENT,
  USER_NAME varchar(20) NOT NULL,
  COURSE varchar(20) NOT NULL,
	COURSE_CODE varchar(20) NOT NULL,
  SCORE float DEFAULT '0',
  PRIMARY KEY (ID)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

INSERT INTO GRADE(USER_NAME, COURSE, COURSE_CODE, SCORE) VALUES
("张三", "语文", "Chinese", 98),
("张三", "数学", "math", 86),
("张三", "英语", "English", 92),
("李四", "语文", "Chinese", 76),
("李四", "数学", "math", 89),
("李四", "英语", "English", 32),
("王五", "语文", "Chinese", 78),
("王五", "数学", "math", 91),
("王五", "英语", "English", 56);

// 查询语句
SELECT USER_NAME,GROUP_CONCAT(COURSE SEPARATOR '、') COURSE FROM GRADE GROUP BY USER_NAME;

// 返回结果
USER_NAME   COURSE
张三         语文、数学、英语
李四         语文、数学、英语
王五         语文、数学、英语
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 6.3 与时间相关的筛选与转换

[1] 查询指定时间范围内的数据

// 今天
select * from 表名 where to_days(时间字段名) = to_days(now());

// 昨天
SELECT * FROM 表名 WHERE TO_DAYS(NOW()) - TO_DAYS(时间字段名) <= 1

// 近7天
SELECT * FROM 表名 where DATE_SUB(CURDATE(), INTERVAL 7 DAY) <= date(时间字段名)

// 近30天
SELECT * FROM 表名 where DATE_SUB(CURDATE(), INTERVAL 30 DAY) <= date(时间字段名)

// 本月
SELECT * FROM 表名 WHERE DATE_FORMAT(时间字段名, '%Y%m') = DATE_FORMAT CURDATE() , '%Y%m')

// 上一月
SELECT * FROM 表名 WHERE PERIOD_DIFF(date_format(now( ) , '%Y%m') , date_format(时间字段名, '%Y%m' )) =1

// 查询本季度数据
select * from 表名 where QUARTER(时间字段名)=QUARTER(now());

// 查询上季度数据
select * from 表名 where QUARTER(时间字段名)=QUARTER(DATE_SUB(now(),interval 1 QUARTER));

// 查询本年数据
select * from 表名 where YEAR(时间字段名)=YEAR(NOW());

// 查询上年数据
select * from 表名 where year(时间字段名)=year(date_sub(now(),interval 1 year));

// 查询当前这周的数据
SELECT * FROM 表名 WHERE YEARWEEK(date_format(时间字段名,'%Y-%m-%d')) = YEARWEEK(now());

// 查询上周的数据
SELECT * FROM 表名 WHERE YEARWEEK(date_format(时间字段名,'%Y-%m-%d')) = YEARWEEK(now())-1;

// 查询上个月的数据
select * from 表名 where DATE_FORMAT(时间字段名,'%Y%m') = DATE_FORMAT(CURDATE(),'%Y%m') ; 

//查询当前月份的数据 
select * from 表名 where date_format(时间字段名,'%Y-%m')=date_format(now(),'%Y-%m')

// 查询距离当前现在6个月的数据
select * from 表名 where 时间字段名 between date_sub(now(),interval 6 month) and now();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

[2] 日期与时间戳相互转换

// 日期转时间戳
select UNIX_TIMESTAMP('2022-03-04 04:37:44') from dual;

// 时间戳转日期
select FROM_UNIXTIME(1646339864) from dual;
1
2
3
4
5

[3] 对表中的某个时间字段加去或减少一段时间

为日期增加一个时间间隔:date_add();为日期减少一个时间间隔:date_sub(),时间间隔选项如下:

1 microsecond 1毫秒、1 second 1秒、1 minute 1分钟、1 hour 1小时
1 day 1天、1 week 1周、1 month 1月、1 quarter 1季、1 year 1年
1
2

完整示例:对table_name表的time字段加3天

update table_name a set a.time = date_add(a.time, interval 3 day)
1

[4] 将日期转成指定格式的字符串

DATE_FORMAT(insert_time,'%Y-%m-%d %H:%i:%s')
1

# 6.4 局部配置生成及便捷操作

[1] MySQL查询语句及映射字段也可以用 sql 去生成,根据实际处理过程修改一下即可。

// 生成逗号分隔的查询字段
SELECT GROUP_CONCAT(COLUMN_NAME SEPARATOR ",") FROM information_schema.COLUMNS 
WHERE TABLE_SCHEMA = '数据库名' AND TABLE_NAME = '表名'

// 生成映射column的字段
SELECT concat(concat("\"`",GROUP_CONCAT(COLUMN_NAME SEPARATOR "`\",\n\"`")),"`\"") FROM information_schema.COLUMNS 
WHERE TABLE_SCHEMA = '数据库名' AND TABLE_NAME = '表名'
1
2
3
4
5
6
7

[2] ElasticSearch 的 mapping 与 DataX 所需的 column 格式不同,可通过如下代码转换处理。

# -*- coding: utf-8 -*-

import json
from elasticsearch import Elasticsearch


# 获取单个ES索引的详细信息
def get_index_info(es_obj, index_name):
    return es_obj.indices.get_mapping(index=index_name)


if __name__ == '__main__':

    # 配置ES连接信息
    es_obj = Elasticsearch(
        hosts=["ip:port"],
        http_auth=('user', 'password'),
        timeout=60
    )
    index_name = 'your_index'

    # 获取单个索引的信息
    index_info = get_index_info(es_obj, index_name)
    index_mappings_info = index_info[index_name]['mappings']['properties']

    # 遍历字典,过滤key中包含@的(@timestamp、@version),按照datax的column重新生成格式
    result = []
    default_dict_item = {"name": "pk", "type": "id"}
    result.append(default_dict_item)
    for key, value in index_mappings_info.items():
        dict_item = {}
        if "@" not in key:
            dict_item['name'] = key
            dict_item['type'] = value['type']
            result.append(dict_item)

    # 格式化输出json
    print(json.dumps(result, sort_keys=True, indent=2))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

[3] ElasticSearch 索引数据清空但保留索引结构,可以使用 curl 命令发送请求去实现。

curl -u username:password -XPOST 'http://ip:port/index_name/_delete_by_query?refresh&slices=5&pretty' -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}}'
1

注:需要改动的地方有username、password、ip、port、index_name

# 6.5 字符集编码问题

[1] MySQL执行包含union的SQL语句报错:Illegal mix of collations for operation UNION

原因:union连接的表排序规则不一致

解决:保证表和字段的编码规则、排序规则一致即可

[2] MySQL批量修改表和表内字段的字符集和排序规则

批量修改表

SELECT
    CONCAT( 'ALTER TABLE ', TABLE_NAME, ' CONVERT TO CHARACTER SET utf8 COLLATE utf8_unicode_ci;' ) 
FROM
    information_schema.TABLES 
WHERE
    TABLE_SCHEMA = '数据库名';
1
2
3
4
5
6

批量修改字段

SELECT
    CONCAT(
        'ALTER TABLE `',
        TABLE_NAME,
        '` MODIFY `',
        COLUMN_NAME,
        '` ',
        DATA_TYPE,
        '(',
        CHARACTER_MAXIMUM_LENGTH,
        ') CHARACTER SET utf8 COLLATE utf8_unicode_ci',
        ( CASE WHEN IS_NULLABLE = 'NO' THEN ' NOT NULL' ELSE '' END ),
        ';' 
) 
FROM
    information_schema.COLUMNS 
WHERE
    TABLE_SCHEMA = '数据库名' 
    AND (
    DATA_TYPE = 'varchar' 
    OR DATA_TYPE = 'char')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

说明:将上述代码的数据库名、字符集、字符排序规则换成自己需要的,执行上述sql生成用于具体修改的sql语句。

注意:该批量修改方法会导致已添加的注释丢失,如果介意,请勿使用。

[3] 未使用utf8mb4字符集编码导致表情包数据写入出错问题

问题描述:同步数据时,如果字段里存在表情包,字符集编码不正确的话会导致同步出错。

前提条件:需要 >= MySQL 5.5.3版本

select version();
1

解决办法:可通过以下方式解决

--修改表默认的字符集和所有字符列的字符集
ALTER TABLE table_name CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
1
2

# 6.6 使用curl操作ES

// 查询所有索引
$ curl -u 用户名:密码 http://ip:port/_cat/indices

// 删除索引(包含结构)
$ curl -u 用户名:密码 -XDELETE http://ip:port/索引名

// 清空索引(不包含结构)
$ curl -u 用户名:密码 -XPOST 'http://ip:port/索引名/_delete_by_query?refresh&slices=5&pretty' -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}}'

// 创建索引
$ curl -u 用户名:密码 -XPUT 'http://ip:port/索引名' -H 'Content-Type: application/json' -d'
{
    "settings" : {
      "index" : {
        "number_of_shards" : "5",
        "number_of_replicas" : "1"
      }
    },
    "mappings" : {
        "properties" : {
          "post_date": {
               "type": "date"
          },
          "tags": {
               "type": "keyword"
          },
          "title" : {
               "type" : "text"
          }
        }
    }
}'

// 修改索引
$ curl -u 用户名:密码 -XPUT 'http://ip:port/索引名/_mapping' -H 'Content-Type: application/json' -d'
{
  "properties" : {
    "post_date": {
         "type": "date"
    },
    "tags_modify": {
         "type": "keyword"
    },
    "title" : {
         "type" : "text"
    },
    "content": {
         "type": "text"
    }
  }
}'

// 查看单个索引信息(可以查看到单个索引的数据量)
curl -u 用户名:密码 -XGET 'http://ip:port/_cat/indices/index_1?v'

health status index      uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   index_1    aado9-iGRFGN9twQb040ds   5   1   28800345            0        3gb          1.5gb

// 按照文档数量排序索引(可以查看到所有索引的数据量)
curl -u 用户名:密码 -XGET 'http://ip:port/_cat/indices?v&s=docs.count:desc'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

注意事项:创建索引时,有的教程在“mappings”里嵌套了“_doc”,会报如下错误,这是因为版本 7.x 不再支持映射类型,将其删除即可。

{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"The mapping definition cannot be nested under a type [_doc] unless include_type_name is set to true."}],"type":"illegal_argument_exception","reason":"The mapping definition cannot be nested under a type [_doc] unless include_type_name is set to true."},"status":400}%
1

# 7. 使用模板生成配置文件

以下使用模板生成配置文件的脚本,我已在Github上开源了,项目地址:https://github.com/Logistic98/datax-script (opens new window)

# 7.1 需求背景

在大数据领域,尤其是数据仓库领域有许多配置文件,例如ETL作业的配置。大多数的平台都可以通过Web页面或者Restful接口的方式进行ETL作业的配置。

  • 痛点:当需要配置大量的ETL作业时,通过Web页面配置效率低,且容易出错。

# 7.2 实现方式

虽然ETL作业的参数很多,但是一般情况下,需要关心往往只有几个,其它使用默认即可。因此可以使用模板,只需要修改模板中我们关心的参数即可。

  • 制作模板文件:准备一个ETL作业的配置文件,把需要修改的参数改为 $变量名,如$database
  • 通过string.Template() 读取模板文件的内容。
  • 通过safe_substitute() 给模板文件中的变量赋值。

# 7.3 MySQL2MySQL模板

# 7.3.1 制作模板

msyql2mysql_template.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 10,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "$reader_username",
            "password": "$reader_password",
            "connection": [
              {
                "jdbcUrl": [
                  "$reader_jdbcUrl"
                ],
                "querySql": [
                  "$reader_querySql"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "username": "$writer_username",
            "password": "$writer_password",
            "writeMode": "update",
            "column": $writer_column,
            "connection": [
              {
                "table": [
                  "$writer_table"
                ],
                "jdbcUrl": "$writer_jdbcUrl"
              }
            ]
          }
        }
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# 7.3.2 使用模板生成job配置

可以使用如下Python脚本生成MySQL2MySQL的初始Job配置(只需要修改“1. 填写原始配置信息”里的job配置路径和基本连接信息),生成之后根据实际需求进行调整即可。

$ pip3 install pymysql
1

generate_msyql2mysql_job.py

# -*- coding: utf-8 -*-

import json
import os
import string
import pymysql


# 查询mysql的所有字段列
def query_mysql_column(mysql_connect, mysql_db, mysql_table):
    cursor = mysql_connect.cursor()
    sql = "SELECT GROUP_CONCAT(COLUMN_NAME SEPARATOR ',') FROM information_schema.COLUMNS WHERE " \
          "TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}'".format(mysql_db, mysql_table)
    cursor.execute(sql)
    result = cursor.fetchall()[0][0]
    cursor.close()
    return result


# 按行追加写入文件(没有文件会新创建文件)
def write_content_to_file(file_path, content):
    a = open(file_path, 'a')
    a.write(content + '\n')
    a.close()
    

if __name__ == '__main__':
    
    ## 1. 填写原始配置信息
    # 1.1 job配置信息
    job_base_path = "/root/datax/job"
    job_name = "generate_msyql2mysql_job_test.json"
    # 1.2 mysql reader配置信息
    reader_mysql_host = '127.0.0.1'
    reader_mysql_user = 'root'
    reader_mysql_password = 'reader_mysql_password'
    reader_mysql_port = 3306
    reader_mysql_db = 'reader_mysql_db'
    reader_mysql_table = 'reader_mysql_table'
    # 1.3 mysql writer配置信息
    writer_mysql_host = '127.0.0.1'
    writer_mysql_user = 'root'
    writer_mysql_password = 'writer_mysql_password'
    writer_mysql_port = 3306
    writer_mysql_db = 'writer_mysql_db'
    writer_mysql_table = 'writer_mysql_table'
    
    ## 2. 将原始配置信息构造二级配置信息
    job_file_path = job_base_path + "/" + job_name
    reader_jdbcUrl = "jdbc:mysql://{}:{}/{}?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"\
        .format(reader_mysql_host, reader_mysql_port, reader_mysql_db)
    writer_jdbcUrl = "jdbc:mysql://{}:{}/{}?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"\
        .format(writer_mysql_host, writer_mysql_port, writer_mysql_db)

    ## 3. 查询字段信息并处理
    # 3.1 查询reader中的MySQL字段信息并拼接
    # 获取逗号分割的字段字符串
    reader_mysql_connect = pymysql.connect(host=reader_mysql_host, user=reader_mysql_user,
                                           password=reader_mysql_password, port=reader_mysql_port,
                                           db=reader_mysql_db, charset='utf8')
    reader_mysql_column = query_mysql_column(reader_mysql_connect, reader_mysql_db, reader_mysql_table)
    reader_mysql_connect.close()
    # 拼接querySql
    reader_querySql = "select {} from {} where 1=1".format(reader_mysql_column, reader_mysql_table)
    # 3.2 查询writer中的MySQL字段列表
    # 获取逗号分割的字段字符串
    writer_mysql_connect = pymysql.connect(host=writer_mysql_host, user=writer_mysql_user,
                                           password=writer_mysql_password, port=writer_mysql_port,
                                           db=writer_mysql_db, charset='utf8')
    writer_mysql_column = query_mysql_column(writer_mysql_connect, writer_mysql_db, writer_mysql_table)
    writer_mysql_connect.close()
    writer_mysql_column_list = writer_mysql_column.split(",")
    result = []
    for writer_mysql_column_item in writer_mysql_column_list:
        result.append("`{}`".format(writer_mysql_column_item))
    # 格式化输出json
    writer_column = json.dumps(result, sort_keys=True, indent=2)

    ## 4. 使用模板生成job配置
    # 替换模板中的变量
    file = open('msyql2mysql_template.json', encoding='utf-8')
    content = file.read()
    template_setting = string.Template(content)
    job_config = template_setting.safe_substitute(reader_username=reader_mysql_user, reader_password=reader_mysql_password,
                                                  reader_jdbcUrl=reader_jdbcUrl, reader_querySql=reader_querySql,
                                                  writer_username=writer_mysql_user, writer_password=writer_mysql_password,
                                                  writer_jdbcUrl=writer_jdbcUrl, writer_table=writer_mysql_table,
                                                  writer_column=writer_column)
    # 输出结果到文件
    print(job_config)
    if os.path.exists(job_file_path):
        os.remove(job_file_path)
    write_content_to_file(job_file_path, job_config)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

# 7.4 MySQL2ElasticSearch模板

# 7.4.1 制作模板

msyql2es_template.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "$reader_username",
            "password": "$reader_password",
            "connection": [
              {
                "jdbcUrl": [
                  "$reader_jdbcUrl"
                ],
                "querySql": [
                  "$reader_querySql"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "$writer_endpoint",
            "accessId": "$writer_accessId",
            "accessKey": "$writer_accessKey",
            "index": "$writer_index",
            "type": "default",
            "cleanup": false,
            "discovery": false,
            "dynamic": true,
            "batchSize": 1000,
            "splitter": ",",
            "column": $writer_column
          }
        }
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 7.4.2 使用模板生成job配置

可以使用如下Python脚本生成MySQL2ElasticSearch的初始Job配置(只需要修改“1. 填写原始配置信息”里的job配置路径和基本连接信息),生成之后根据实际需求进行调整即可。

$ pip3 install pymysql
$ pip3 install elasticsearch==7.16.2
1
2

generate_msyql2es_job.py

# -*- coding: utf-8 -*-

import json
import os
import string
import pymysql
from elasticsearch import Elasticsearch


# 获取单个ES索引的详细信息
def get_index_info(es_connect, writer_es_index):
    return es_connect.indices.get_mapping(index=writer_es_index)


# 查询mysql的所有字段列
def query_mysql_column(mysql_connect, reader_mysql_db, reader_mysql_table):
    cursor = mysql_connect.cursor()
    sql = "SELECT GROUP_CONCAT(COLUMN_NAME SEPARATOR ',') FROM information_schema.COLUMNS WHERE " \
          "TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}'".format(reader_mysql_db, reader_mysql_table)
    cursor.execute(sql)
    result = cursor.fetchall()[0][0]
    cursor.close()
    return result


# 按行追加写入文件(没有文件会新创建文件)
def write_content_to_file(file_path, content):
    a = open(file_path, 'a')
    a.write(content + '\n')
    a.close()


if __name__ == '__main__':

    ## 1. 填写原始配置信息
    # 1.1 job配置信息
    job_base_path = "/root/datax/job"
    job_name = "generate_msyql2es_job_test.json"
    # 1.2 mysql reader配置信息
    reader_mysql_host = '127.0.0.1'
    reader_mysql_user = 'root'
    reader_mysql_password = 'mysql_password'
    reader_mysql_port = 3306
    reader_mysql_db = 'mysql_db'
    reader_mysql_table = 'mysql_table'
    # 1.3 es writer配置信息
    writer_es_endpoint = "http://ip:port"
    writer_es_accessId = "elastic"
    writer_es_accessKey = "es_password"
    writer_es_index = "es_index"

    ## 2. 将原始配置信息构造二级配置信息
    job_file_path = job_base_path + "/" + job_name
    reader_jdbcUrl = "jdbc:mysql://{}:{}/{}?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8"\
        .format(reader_mysql_host, reader_mysql_port, reader_mysql_db)

    ## 3. 查询字段信息并处理
    # 3.1 查询MySQL字段信息并拼接
    # 获取逗号分割的字段字符串
    mysql_connect = pymysql.connect(host=reader_mysql_host, user=reader_mysql_user,
                                    password=reader_mysql_password, port=reader_mysql_port,
                                    db=reader_mysql_db, charset='utf8')
    reader_mysql_column = query_mysql_column(mysql_connect, reader_mysql_db, reader_mysql_table)
    mysql_connect.close()

    # 拼接querySql
    reader_querySql = "select {} from {} where 1=1".format(reader_mysql_column, reader_mysql_table)
    # 3.2 查询ES字段信息并处理
    # 获取单个索引的信息
    es_connect = Elasticsearch(
        hosts=[writer_es_endpoint],
        http_auth=(writer_es_accessId, writer_es_accessKey),
        timeout=60
    )
    index_info = get_index_info(es_connect, writer_es_index)
    index_mappings_info = index_info[writer_es_index]['mappings']['properties']
    # 遍历字典,过滤key中包含@的(@timestamp、@version),按照datax的column重新生成格式
    result = []
    default_dict_item = {"name": "pk", "type": "id"}
    result.append(default_dict_item)
    for key, value in index_mappings_info.items():
        dict_item = {}
        if "@" not in key:
            dict_item['name'] = key
            dict_item['type'] = value['type']
            result.append(dict_item)
    # 格式化输出json
    writer_column = json.dumps(result, sort_keys=True, indent=2)

    ## 4. 使用模板生成job配置
    # 替换模板中的变量
    file = open('msyql2es_template.json', encoding='utf-8')
    content = file.read()
    template_setting = string.Template(content)
    job_config = template_setting.safe_substitute(reader_username=reader_mysql_user, reader_password=reader_mysql_password,
                                                  reader_jdbcUrl=reader_jdbcUrl, reader_querySql=reader_querySql,
                                                  writer_endpoint=writer_es_endpoint, writer_accessId=writer_es_accessId,
                                                  writer_accessKey=writer_es_accessKey, writer_index=writer_es_index,
                                                  writer_column=writer_column)
    # 输出结果到文件
    print(job_config)
    if os.path.exists(job_file_path):
        os.remove(job_file_path)
    write_content_to_file(job_file_path, job_config)

    ## 5. 附加:生成清空该ES索引的脚本
    clear_es_index_script_path = './clear_index_data.sh'
    content_template = string.Template("curl -u ${writer_es_accessId}:${writer_es_accessKey} -XPOST '${writer_es_endpoint}/${writer_es_index}/_delete_by_query?refresh&slices=5&pretty' -H 'Content-Type: application/json' -d'{\"query\": {\"match_all\": {}}}'")
    script_content = content_template.safe_substitute(writer_es_accessId=writer_es_accessId, writer_es_accessKey=writer_es_accessKey, writer_es_endpoint=writer_es_endpoint, writer_es_index=writer_es_index)
    write_content_to_file(clear_es_index_script_path, script_content)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110

另注:该脚本还会在当前路径自动生成一个clear_index_data.sh脚本,它可以利用curl命令清空当前索引的数据(保留索引结构),需要重新同步数据时会用得到。

# 8. 数据同步质量保障措施

# 8.1 数据同步常见问题及补救

以下梳理DataX数据同步的常见问题,会导致数据同步中断(允许出现多少错误数据可以在DataX任务里配置,默认是0),开发时应注意这些方面。

  • MySQL-->MySQL:业务库字符串长度限制不够,导致数据插入不进去。
  • MySQL-->MySQL:日期格式问题,尤其是多个数据源日期格式不统一时需要进行统一转换,不然可能导致产生脏数据。
  • MySQL-->ES:DataX目前不支持dense_vector向量类型的数据同步,同步是会报错。
  • MySQL-->ES:注意一些字段的转换与判空,字段转换失败会插入不进去。
  • MySQL-->ES:涉及多个数据源的数据同步,可能存在id重复问题,ES相同的_id会走更新机制,因此不要直接用MySQL的id作为ES的_id,可以拼接一下数据源的名称。
  • MySQL-->MySQL-->ES:涉及执行顺序问题,可能会导致ES数据缺失,详见8.2节。
  • MySQL-->MySQL/ES:采集数据里有格式型脏数据,被DataX自动检测到。
  • MySQL-->MySQL/ES:数据库连接信息填写不正确或者数据库有白名单。
  • MySQL-->MySQL/ES:服务器的系统时间不正确,会使上次同步时间记录出错,最终会导致下次数据同步的条件筛选出错。

结构性数据同步出错,报错日志里很容易看出;个别内容出错,在报错日志里不那么容易找,尤其是ES的,可以把报错部分进行JSON格式化再查看。

修复数据同步问题后,建议把相应数据清了、把相应模块记录最后更新时间的文件删了再重新同步。

如果只是数据少了,把记录最后更新时间的文件删了,重新跑一遍全量就行,ES里的数据不用清,会走更新机制。

# 8.2 按照顺序执行job

需求情景:数据同步需要将多个MySQL从采集表汇聚到1个MySQL业务表,然后再将这个业务表同步到ES。

风险问题:MySQL->MySQL的数据同步还没跑完,就开始了MySQL->ES的数据同步,使用上述Shell脚本,会导致最终存到ES的数据出现缺失。

解决方案:添加标志位判定或按顺序执行

方案一:添加标志位判定前置任务是否执行完毕或者按顺序执行,需要改脚本。

方案二:在一个脚本中按顺序添加job。

jsonArray[${#jsonArray[*]}]=test_table_mysql2mysql.json    
jsonArray[${#jsonArray[*]}]=test_table_mysql2es.json          
1
2

方案三:在一个总shell里统一调度,使用同一个时间参数。

统一调度shell:

#!/bin/bash

# 按照顺序执行(先mysql同步,再es同步,以防止mysql未同步完成就同步es)
currentTime=`date +"%Y-%m-%d %H:%M:%S"`
echo "本次任务执行时间:${currentTime}"
echo "开始执行 /root/datax/script/auto_mysql2mysql_sync_data.sh 任务"
/root/datax/script/auto_mysql2mysql_sync_data.sh $currentTime > /root/datax/log/auto_mysql2mysql_sync_data.log 2>&1 &
echo "开始执行 /root/datax/script/auto_mysql2es_sync_data.sh 任务"
/root/datax/script/auto_mysql2es_sync_data.sh $currentTime > /root/datax/log/auto_mysql2es_sync_data.log 2>&1 &
1
2
3
4
5
6
7
8
9

每个具体执行的shell作如下改动,currentTime从统一调度shell里接参,"$1 $2"作为一个整体,接时间参数,这个的引号必不可少。

# 获取当前时间,这个是同步后要写到txt文件的变量
#currentTime=`date +"%Y-%m-%d %H:%M:%S"`
currentTime="$1 $2"
echo ${currentTime}
1
2
3
4

注:这里面的具体脚本是在后台运行的,想要提前终止任务,可以用下面的命令:

$ ps aux | grep /root/datax/script/auto_mysql2mysql_sync_data.sh | grep -v grep | awk '{print $2}' | xargs kill -9
$ ps aux | grep /root/datax/script/auto_mysql2es_sync_data.sh | grep -v grep | awk '{print $2}' | xargs kill -9
1
2

注:grep -v grep 排除掉grep自身的进程,awk '{print $2}'打印出第二行的内容,xargs kill -9杀死进程

# 8.3 统计job的最后更新时间

由于之前每个增量数据同步任务的job都是由单独的txt来记录的,查看不够方便。这里写了个Python脚本,定时统计每个任务的最后更新时间,写到一个csv文件里。如果存在任务长时间未更新,就有可能是出现问题了,请及时排查。

$ pip3 install schedule
1

statistical_job_result.py

# -*- coding: utf-8 -*-

import os
import csv
import time
import schedule


# 读取指定目录下的所有文件夹保存成列表
def read_dir_to_list(file_dir_path):
    file_dir_list = os.listdir(file_dir_path)
    return file_dir_list


# 按行读取txt文件的内容,保存成列表
def read_txt_to_list(txt_path):
    result = []
    with open(txt_path, 'r') as f:
        for line in f:
            result.append(line.strip('\n'))
    return result


# 创建csv并写入字典
def create_csv(fieldnames, path, dict):
    with open(path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerow(dict)


# 将字典追加写入csv
def append_csv(fieldnames, path, dict):
    with open(path, 'a', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writerow(dict)


# 统计任务最后执行时间写入CSV文件
def statistical_job_result(job_txt_path, job_result_path):

    statistical_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    job_txt_file_list = read_dir_to_list(job_txt_path)
    fieldnames = []
    fieldnames.append('statistical_time')
    csv_data = {}
    csv_data['statistical_time'] = statistical_time
    for job_txt_file in job_txt_file_list:
        file_name, file_ext = os.path.splitext(job_txt_file)      # 截取文件名作为csv的列名
        job_last_update = read_txt_to_list(job_txt_path + "/" + job_txt_file)[0]       # 读取job的最后更新时间
        csv_data[file_name] = job_last_update
        fieldnames.append(file_name)

    if not os.path.exists(job_result_path):
        create_csv(fieldnames, job_result_path, csv_data)
    else:
        append_csv(fieldnames, job_result_path, csv_data)


if __name__ == '__main__':

    job_txt_path = '../job/txt'
    job_result_path = './job_result.csv'

    # 配置定时任务,可同时启动多个
    # schedule.every(30).minutes.do(statistical_job_result(job_txt_path, job_result_path))         # 每隔30分钟运行一次job
    schedule.every().hour.do(statistical_job_result(job_txt_path, job_result_path))                # 每隔1小时运行一次job
    # schedule.every().day.at("23:59").do(statistical_job_result(job_txt_path, job_result_path))   # 每天在23:59时间点运行job
    # schedule.every().monday.do(statistical_job_result(job_txt_path, job_result_path))            # 每周一运行一次job

    statistical_job_result(job_txt_path, job_result_path)  # 启动时立即执行一次,随后进入定时任务
    while True:
        schedule.run_pending()
        time.sleep(1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

注:如果不想用Python的schedule库实现定时任务,而是用系统的Crontab来实现,注释掉相关代码即可。需要注意的是,配置crontab的时候,python3命令要写上完整路径,不然会找不到python3命令。

0 */1 * * * cd /datax/script/ && /usr/local/bin/python3 statistical_job_result.py
1

# 9. 参考资料

[1] What is ETL (extract transform load) from informatica (opens new window)

[2] 大数据ETL简介 from 知乎 (opens new window)

[3] 关于数据分析,你需要知道的ETL基础知识 from 知乎 (opens new window)

[4] 从ETL到ELT:让分析更快、更稳、更智能 from 巨杉数据库 (opens new window)

[5] ETL 是什么?from 盖若 (opens new window)

[6] 弃用ETL,为什么说ELT才更适合AI应用场景 from 51CTO博客 (opens new window)

[7] ETL常用的三种工具介绍及对比Datastage,Informatica和Kettle from 知乎 (opens new window)

[8] 什么是 ETL?from Google Cloud (opens new window)

[9] 基于两种架构的ETL实现及ETL工具选型策略 from 博客园 (opens new window)

[10] Kettle简介 from 树懒学堂 (opens new window)

[11] Kettle简介 from 知乎 (opens new window)

[12] 关于M1 Mac 安装部署PDI(kettle)的方法步骤及问题解决 from CSDN (opens new window)

[13] kettle spoon 连接mysql数据库 from CSDN (opens new window)

[14] Kettle执行python脚本 from CSDN (opens new window)

[15] Logstash-介绍 from 知乎 (opens new window)

[16] 数据采集-常用工具介绍 from 智能后端和架构 (opens new window)

[17] Datax和Kettle使用场景的对比 from CSDN (opens new window)

[18] Datax与Sqoop的对比 from CSDN (opens new window)

[19] 如何远程访问Docker容器中的图形界面,如:kettle from CSDN (opens new window)

[20] 可视化ETL工具Kettle概念、安装及实战案例 from CSDN (opens new window)

[21] 基于kettle的可视化数据集成平台 from CSDN (opens new window)

[22] 基于kettle实现的web版数据集成平台,致力于提供web可拖拽的数据集成平台 from Github (opens new window)

[23] DataX 实践(一)构建 DataX 的 Docker 容器镜像并测试运行 from Rabbir (opens new window)

[24] DataX集成可视化页面,选择数据源即可一键生成数据同步任务 from Github (opens new window)

[25] docker 安装 datax和datax-web from 博客园 (opens new window)

[26] DataX介绍以及优缺点分析 from 知乎 (opens new window)

[27] 字节跳动单点恢复功能及 Regional CheckPoint 优化实践 from segmentfault (opens new window)

[28] Flink在米哈游的落地实践 from 知乎 (opens new window)

[29] 使用Datax将数据从Mysql导到Elasticsearch7.x的填坑过程和使用记录 from 稀土掘金 (opens new window)

[30] Datax部署与使用 from 知乎 (opens new window)

[31] DataX Web数据增量同步配置说明 from 稀土掘金 (opens new window)

[32] DataX报错解决办法 - 在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数 from CSDN (opens new window)

[33] 数据同步工具—DataX—Web部署使用 from 51CTO博客 (opens new window)

[34] datax从mysql同步数据到elasticsearch(使用es的动态模板)from CSDN (opens new window)

[35] DataX 增量同步 - Mysql 同步数据到 Elasticsearch from CSDN (opens new window)

[36] Canal——原理架构及应用场景 from 博客园 (opens new window)

[37] Apache NiFi vs 其他技术 from 知乎 (opens new window)

[38] datax 3.0连接不上数据库 from CSDN (opens new window)

[39] DataX学习指南(三)-- 程序打包 from JackpotHan (opens new window)

[40] 使用Datax将数据从Mysql导到Elasticsearch7.x的填坑过程和使用记录 from 稀土掘金 (opens new window)

[41] mysql、es的同步生产实践 from 知乎 (opens new window)

[42] 得物技术分享MySQL多表关联同步到ES的实践 from 知乎 (opens new window)

[43] DataX 源码解析与插件开发 from 开源博客 (opens new window)

[44] DataX(4): DataX.py解读 from CSDN (opens new window)

[45] DataX(5):改造升级-自动识别py环境,执行DataX任务 from CSDN (opens new window)

[46] DataX(6):启动步骤解析 from CSDN (opens new window)

[47] DataX(22):任务分配规则 from CSDN (opens new window)

[48] DataX(23):DataX调优 from CSDN (opens new window)

[49] DataX(24):远程调试DataX from CSDN (opens new window)

[50] DataX(25):插件加载原理 from CSDN (opens new window)

[51] DataX(26):各个数据库与DataX字段映射 from CSDN (opens new window)

[52] DataX(27):不太常见配置项querySql、preSql、postSql、splitPk from CSDN (opens new window)

[53] DataX学习笔记-Reader插件开发 from CSDN (opens new window)

[54] DataX 数据全量,增量同步方案 from 简书 (opens new window)

[55] 使用 DataX 增量同步数据 from 张志敏的技术专栏 (opens new window)

[56] DataX教程(06)- DataX调优 from 51CTO博客 (opens new window)

[57] datax优化设置 from 博客园 (opens new window)

[58] ETL异构数据源Datax限速设置06 from CSDN (opens new window)

[59] Datax限速bug from CSDN (opens new window)

[60] 记一次DataX-MysqlReader性能优化 from CSDN (opens new window)

[61] datax能部署集群吗 from Github issues (opens new window)

[62] 踩坑实录-datax数据推送字符集错误 from CSDN (opens new window)

[63] Shell脚本:获取python:未找到命令错误 from Py社区 (opens new window)

[64] Python编程实践 - 使用模板生成配置文件 from CSDN (opens new window)

[65] 大数据开发治理平台 DataWorks from 阿里云 (opens new window)

[66] MySQL 数据库表结构对比工具 from Github (opens new window)

Last Updated: 2/14/2024, 1:47:08 PM