常用服务的数据备份及同步专题

3/8/2022 数据备份数据同步文件传输LogstashRclone

# 1. 前言

# 1.1 基本概念

# 1.1.1 数据备份

避免数据丢失,一般通过快照、备份等技术构建数据的数据备份副本,故障时可以通过数据的历史副本恢复用户数据。

# 1.1.2 数据同步

数据同步功能旨在帮助用户实现两个数据源之间的数据实时同步。数据同步功能可应用于异地多活、数据异地灾备、本地数据灾备、数据异地多活、跨境数据同步、查询与报表分流、云BI及实时数据仓库等多种业务场景。

# 1.1.3 数据容灾

避免业务中断,一般是通过复制技术(应用层复制、主机I/O层复制、存储层复制)在异地构建业务的备用主机和数据,主站点故障时备用站点可以接管业务。

# 1.2 备份策略

为了防止数据丢失,对于最重要的数据,应该采取 3-2-1 备份策略。首先,除了原始数据,你应该有3份数据拷贝。其次,3份数据拷贝应该保存在两种媒介,比如不能放在同一个硬盘或者同一台计算机。最后,有一份数据拷贝必须放在异地,比如在云端或其他城市。

3-2-1备份策略

# 2. 数据备份

# 2.1 MySQL数据备份

# 2.1.1 MySQL数据库的备份与恢复

[1] 数据备份

可以使用mysqldump实现,docker-mysql的数据库备份语句如下:

// 备份单个数据库的结构与数据
$ docker exec mysql sh -c "exec mysqldump --databases 数据库名 -u root -p'root密码'" > /mydocker/mysql/backup/db.sql

// 备份单个数据库的结构
$ docker exec mysql sh -c "exec mysqldump --databases 数据库名 -d -u root -p'root密码'" > /mydocker/mysql/backup/db.sql

// 备份所有数据库的结构与数据
$ docker exec mysql sh -c "exec mysqldump --databases --all-databases -u root -p'root密码'" > /mydocker/mysql/backup/db.sql

// 备份所有数据库的结构
$ docker exec mysql sh -c "exec mysqldump --databases --all-databases -d -u root -p'root密码'" > /mydocker/mysql/backup/db.sql
1
2
3
4
5
6
7
8
9
10
11

说明:1)如果安的是原生mysql,直接执行mysqldump语句备份即可。2)如果需要同时导出多个数据库,“数据库名”处用空格分割多个数据库即可。3)不要数据只保留数据库结构的话加个-d参数即可。

[2] 数据恢复

docker-mysql的数据库恢复语句如下:

docker exec -i mysql sh -c "exec mysql -uroot -p'root密码'" < /mydocker/mysql/backup/db.sql
1

注:建议使用命令去恢复数据,直接用Navicat工具运行导出的sql来恢复数据有时会出问题(比如我就遇到过导出的库在Navicat执行报错,但用命令可以导入成功的情况)

# 2.1.2 MySQL数据库的定时备份

实际在生产环境上,我们不会每天都去手动备份。可以写一个脚本出来,完成这项操作,然后用 crontab 定时执行。

Step1:新建一个mysqlbackup.sh脚本,并使用chmod u+x命令赋予可执行权限,脚本内容如下:

#!/bin/bash
#数据库用户名
dbuser='root'
#数据库密码
dbpasswd='root密码'
#数据库名,如有多个库用空格分开
dbname='dbname1 dbname2 dbname3'
#备份时间
backuptime=`date +"%Y-%m-%d"`
#备份输出目录
path='/mydocker/mysql/backup'
#备份输出日志
log='mysqlbackup.log'

echo "################## ${backuptime} #############################" 
echo "开始备份" 
#日志记录头部
echo "" >> ${path}/${log}
echo "-------------------------------------------------" >> ${path}/${log}
echo "备份时间为${backuptime},备份数据库 ${dbname} 开始" >> ${path}/${log}
#正式备份数据库
for table in ${dbname}; do
docker exec mysql sh -c "exec mysqldump --databases ${table} -u ${dbuser} -p'${dbpasswd}'" > ${path}/${table}-${backuptime}.sql 2>> ${path}/${log};
#备份成功以下操作
if [ "$?" == 0 ];then
cd ${path}
#为节约硬盘空间,将数据库压缩
tar Jcvf ${table}-${backuptime}.tar.gz ${table}-${backuptime}.sql > /dev/null
#删除原始文件,只留压缩后文件
rm -f ${path}/${table}-${backuptime}.sql
#删除30天前备份
find ${path} -name "*.tar.gz" -type f -mtime +30 -exec rm -rf {} \; > /dev/null 2>&1
echo "数据库表 ${table} 备份成功!!" >> ${path}/${log}
else
#备份失败则进行以下操作
echo "数据库表 ${table} 备份失败!!" >> ${path}/${log}
fi
done
echo "完成备份"
echo "备份日志见 ${path}/${log} "
echo "################## ${backuptime} #############################"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

说明:

1)上面的信息部分换成自己的,备份输出目录的末尾不要加‘/’,多个数据库用空格分隔不要加逗号。

2)正式备份数据库的语句也换成自己的,注意"exec mysqldump --databases ${table} -u ${dbuser} -p'${dbpasswd}'"处外面是双引号,里面套单引号,反过来会因为${}导致转义问题。

3)为了节约硬盘空间,我这里对sql进行了压缩(如果不想压缩就不用安了,注释掉相关代码即可),请先安装xz命令。

$ apt-get install xz-utils
1

4)为了节约硬盘空间,我这里删除了30天前的备份(根据文件的最后改动时间来判定),具体保留几天自己定就行。

Step2:配置定时任务执行脚本

$ crontab -e
1

然后添加如下代码(含义是每天23:59分自动执行脚本)保存即可:

59 23 * * * 脚本路径/mysqlbackup.sh
1

# 2.1.3 生产环境使用命令行操作MySQL

需求情景:生产环境出于安全考虑,在安全组处没有放行数据库的端口,线上业务系统用服务器的内网地址直接连接数据库,无法使用Navicat等工具在外部进行连接,给排查线上问题造成了一些不便。

解决方案:出于安全考虑,我通常不会直接在生产环境进行排查修复,而是先去服务器上用命令把数据表导出来,然后在Navicat进行导入,排查问题,记录下处理问题的sql语句,然后再在生产环境执行处理方案的sql语句。这样既方便排查问题,又有数据备份,可以有效防止生产事故发生。

具体步骤:用mysqldump导出问题数据表sql——在自己服务器或本地创建数据库,用Navicat导入数据——在Navicat上排查问题,记录下处理问题(如脏数据)的sql语句——在生产环境以命令行模式进mysql使用处理方案的sql语句。

// 导出指定数据库的所有数据表(结构+数据),会要求输入数据库密码
$ mysqldump -u 用户名 数据库名 > /root/db-数据表名.sql    

// 导出具体的问题数据表(结构+数据),会要求输入数据库密码
$ mysqldump -u 用户名 数据库名 数据表名 > /root/db-数据表名.sql       

...Navicat导入数据,排查与解决问题过程,准备好处理方案的sql

// 命令行模式进入mysql,会要求输入数据库密码
$ mysql -P 端口号 -h ip地址 -u 用户名 -p            

// 选择数据库,并执行处理方案的sql语句(注意别忘了用use语句切换数据表、SQL语句的末尾要加“;”)
$ use 数据表名;
$ 执行具体的处理方案的sql语句;
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 2.2 Oracle数据备份

# 2.2.1 使用命令以dmp的形式导入导出

[1] 导入导出一个用户下的所有表

exp 用户名/密码@localhost:1521/orcl file=E:文件名.dmp log=用户名.log owner=(用户名)  --导出表(带数据)
exp 用户名/密码@localhost:1521/orcl file=E:文件名.dmp log=用户名.log owner=(用户名)  rows=n --导出表结构(不带数据) 
imp 用户名/密码@localhost:1521/orcl file=E:文件名.dmp log=用户名.log fromuser=(用户名) touser=(用户名)  --导入
1
2
3

[2] 导入导出单个表

exp 用户名/密码@localhost:1521/orcl file=E:文件名.dmp log=用户名.log tables=(用户名.表名) --导出表(带数据)
exp 用户名/密码@localhost:1521/orcl file=E:文件名.dmp log=用户名.log tables=(用户名.表名) rows=n--导出表结构(不带数据)
imp 用户名/密码@localhost:1521/orcl file=E:文件名.dmp log=用户名.log fromuser=(用户名) tables=(表名)
1
2
3

注:导入导出多个表的时候,用逗号分隔即可

[3] 导入导出可能会遇到的问题

[1] 账户被锁问题
sqlplus /nolog      # 不以任何用户登录(只是打开登录窗口)
connect as sysdba   # 然后需要输入待解锁的用户名和密码
alter user 用户名 account unlock;  # 账户解锁
alter user 用户名 identified by 新密码;  # 账户修改密码

[2] 对表进行授权
grant create session to 用户名;
grant create table to 用户名;

[3]对表空间无权限
alter user 用户名 quota unlimited on 表空间名

[4] “只有DBA才能导入由其他DBA导出的文件”的问题
grant dba to 用户名

[5] “XXX表空间不存在”的问题
需要创建表空间
create tablespace 表空间名
datafile 'Oracle安装路径/oradata/orcl/表空间名'    --app目录的子目录
size 1M autoextend on next 50M maxsize unlimited;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 2.2.2 使用临时表备份旧表数据

需求情景:[1] 备份数据,用于恢复 [2] 备份数据,修改表结构 [3] 转移数据

[1] 创建新表保存旧表数据

create table hr.runtime_summary_backup as select * from hr.runtime_summary
1

[2] 使用已存在的表保存旧表数据

insert into hr.runtime_summary_backup select * from hr.runtime_summary
1

[3] 两个数据库进行数据同步

Step1:创建dblink

create  public  database link 创建的dblink名
  connect to 另一个库的用户名 identified by 另一个库的密码 
  using '(DESCRIPTION =(ADDRESS_LIST =(ADDRESS =(PROTOCOL = TCP)(HOST = 另一个库的地址)(PORT = 1521)))(CONNECT_DATA =(SERVICE_NAME = orcl)))';
1
2
3

Step2:检查两个库的表是否一致(不一致可借助文本比较工具找出差异的表)

select table_name from all_all_tables where owner = '大写用户名' order by table_name
select table_name from all_all_tables@创建的dblink名 where owner = '大写用户名' order by table_name
1
2

Step3:删除原库数据(也就是删除一个用户下的所有表)

select 'truncate table 用户名.' || t.table_name
  from (select table_name from all_all_tables where owner = '大写用户名') t
执行跑出来的sql语句
1
2
3

Step4:从另一个库里迁移数据

select 'insert into 用户名.'|| t.table_name || ' select * from 用户名.' || t.table_name || '@创建的dblink名'
  from (select table_name from all_all_tables where owner = '大写用户名') t
执行跑出来的sql语句
1
2
3

# 2.2.3 在当前用户下备份所有表的数据

select 'create table bsms.' || table_name ||
       '_backup as select * from 用户名.' || table_name || ';' backup
  from (select table_name from all_all_tables where owner = '大写用户名')
执行跑出来的sql语句
1
2
3
4

# 2.3 Elasticsearch数据备份

# 2.3.1 ES的导入与导出

方案一:使用elasticdump插件进行导入导出

elasticdump (opens new window):Elasticsearch的导入导出工具

Step1:安装npm

$ sudo apt update
$ sudo apt install nodejs npm
1
2

Step2:安装elasticdump插件

$ npm install elasticdump -g
1

ES的导入导出(以导出为例):

$ elasticdump \
  --input=http://username:password@ip:port/index \
  --output=/data/index.json \
  --type=data
1
2
3
4

说明:开启了xpack安全认证的话就要像上面那样加上username:password@,没有的话就不需要加。导入跟导出命令基本一致,互换一下input和output的内容即可。

方案二:使用esm工具进行导入导出

esm (opens new window):一个 Elasticsearch 迁移工具

$ mkdir -p /root/esm
$ cd /root/esm
$ wget https://github.com/medcl/esm/releases/download/v0.6.1/migrator-linux-amd64
$ chmod u+x migrator-linux-amd64
$ ./migrator-linux-amd64  -s http://source_ip:9200 -m source_username:source_password -x "source_index" -d http://target_ip:9200 -n target_username:target_password -y "target_index"
1
2
3
4
5

注:可使用./migrator-linux-amd64 --help查看使用帮助

  -s, --source=源elasticsearch实例,即:http://localhost:9200
  -q, --query= 查询源elasticsearch实例,迁移前过滤数据,即:name:medcl
  -d, --dest= 目标elasticsearch实例,即:http://localhost:9201
  -m, --source_auth= 源elasticsearch实例的基本认证,即:user:pass
  -n, --dest_auth=目标elasticsearch实例的基本认证,即:user:pass
  -c, --count= 一次文档数:即滚动请求中的“大小”(默认值:5000)
      --buffer_count= 内存中缓冲文档的数量(默认值:10000)
  -w, --workers= 批量工作线程的并发数(默认值:1)
  -b, --bulk_size= 以 MB 为单位的批量大小(默认值:5)
  -t, --time= 滚动时间(默认:10m)
      --sliced_scroll_size= 切片滚动的大小,要使其正常工作,大小应> 1(默认值:1)
  -f, --force 在复制前删除目标索引
  -a, --all 复制以 . 开头的索引。和 _
      --copy_settings 从源复制索引设置
      --copy_mappings 从源复制索引映射
      --shards= 在新创建的索引上设置多个分片
  -x, --src_indexes= 要复制的索引名称,支持正则表达式和逗号分隔列表(默认值:_all)
  -y, --dest_index= 要保存的索引名,只允许一个索引名,如果不指定将使用原始索引名
  -u, --type_override=覆盖类型名称
      --green 在转储前等待两台主机集群状态变为绿色。否则黄色是可以的
  -v, --log= 设置日志级别,选项:trace,debug,info,warn,error (默认: INFO)
  -o, --output_file= 将源索引的文件输出到本地文件中
  -i, --input_file= 从本地转储文件索引
      --input_file_type= 输入文件的数据类型,选项:dump、json_line、json_array、log_line(默认:dump)
      --source_proxy= 设置代理为源http连接,即:http://127.0.0.1:8080
      --dest_proxy= 设置代理为目标http连接,即:http://127.0.0.1:8080
      --refresh 迁移完成后刷新
      --fields= 过滤源字段,逗号分隔,即:col1,col2,col3,...
      --rename= 重命名源字段,逗号分隔,即:_type:type, name:myname
  -l, --logstash_endpoint=目标logstash tcp端点,即:127.0.0.1:5055
      --secured_logstash_endpoint 目标 logstash tcp 端点由 TLS 保护
      --repeat_times= 将数据从源重复N次到目标输出,使用对齐参数regenerate_id来放大数据大小
  -r, --regenerate_id 为文档重新生成 id,这将覆盖数据源中存在的文档 id
      --compress 使用gzip压缩流量
  -p, --sleep= 在每个批量请求后睡眠 N 秒(默认值:-1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 2.3.2 ES的定期自动备份

实际在生产环境上,我们不会每天都去手动备份。可以写一个脚本出来,完成这项操作,然后用 crontab 定时执行。

Step1:新建一个esbackup.sh脚本,并使用chmod u+x命令赋予可执行权限,该脚本是基于elasticdump的,脚本内容如下:

#!/bin/bash
#ES路径
address='ip:port'
#ES用户名
user='username'
#ES密码
passwd='password'
#ES索引名
index='index'
#备份时间
backuptime=`date +"%Y-%m-%d"`
#备份输出目录
path='/mydocker/es/backup'
#备份输出日志
log='esbackup.log'

echo "################## ${backuptime} #############################" 
echo "开始备份" 
#日志记录头部
echo "" >> ${path}/${log}
echo "-------------------------------------------------" >> ${path}/${log}
echo "备份时间为${backuptime},备份索引 ${index} 开始" >> ${path}/${log}
#正式备份数据库
elasticdump \
  --input=http://${user}:${passwd}@${address}/${index} \
  --output=${path}/${index}-${backuptime}.json \
  --type=data 2>> ${path}/${log};
#备份成功以下操作
if [ "$?" == 0 ];then
cd ${path}
#为节约硬盘空间,将数据库压缩
tar Jcvf ${index}-${backuptime}.tar.gz ${index}-${backuptime}.json > /dev/null
#删除原始文件,只留压缩后文件
rm -f ${path}/${index}-${backuptime}.json
#删除30天前备份
find ${path} -name "*.tar.gz" -type f -mtime +30 -exec rm -rf {} \; > /dev/null 2>&1
echo "索引 ${index} 备份成功!!" >> ${path}/${log}
else
#备份失败则进行以下操作
echo "索引 ${index} 备份失败!!" >> ${path}/${log}
fi

echo "完成备份"
echo "备份日志见 ${path}/${log} "
echo "################## ${backuptime} #############################"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

说明:

1)上面的信息部分换成自己的,备份输出目录的末尾不要加‘/’,多个数据库用空格分隔不要加逗号。

2)为了节约硬盘空间,我这里对sql进行了压缩(如果不想压缩就不用安了,注释掉相关代码即可),请先安装xz命令。

$ apt-get install xz-utils
1

3)为了节约硬盘空间,我这里删除了30天前的备份(根据文件的最后改动时间来判定),具体保留几天自己定就行。

Step2:配置定时任务执行脚本

$ crontab -e
1

然后添加如下代码保存即可:

59 23 * * * 脚本路径/esbackup.sh
1

说明:含义是每天23:59分自动执行脚本。

# 3. 文件传输

# 3.1 使用Shell实现文件传输

sshpass是为了解决scp命令需要输入密码的问题,只需要在发送方安装,可在线安装也可离线安装。

$ apt-get install sshpass
1

sshpass.sh

#!/bin/sh

# 定义scp参数
source_dir=/root/source/
target_dir=/root/target/
host=目标服务器IP
port=目标服务器端口
username=目标服务器用户名
password=目标服务器密码

# 将本机某目录下的所有文件拷贝到另一台服务器的指定目录
time=$(date "+%Y-%m-%d %H:%M:%S")
echo "===$time task start===" >> scp.log
for filename in $(ls $source_dir)
do
    # 传输文件
    echo "sshpass -p $password scp -P $port $source_dir$filename $username@$host:$target_dir" >> scp.log
    sshpass -p $password scp -r -P $port $source_dir$filename $username@$host:$target_dir
    # 删除文件
    # echo "rm -f $source_dir$filename" >> scp.log
    # rm -f $source_dir$filename
done
echo "===$time task end===" >> scp.log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 3.2 使用Python实现文件传输

paramiko是用python写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。利用该模块,可以方便的进行ssh连接和sftp协议进行sftp文件传输以及远程命令执行。

.
├── config.ini
├── pull.py
└── push.py
1
2
3
4

config.ini

[pull]
# 远程服务器IP
host_ip = 111.111.111.111
# 远程服务器端口
host_port = 22
#远程服务器用户名
host_username = root
#远程服务器密码
host_password = your_password
#这个是需要拉取文件所在的远程目录,目录地址最后要加/
remote_path = /root/remote_path/
#这个是把拉取文件存放的本地目录,目录地址最后要加/
local_path = /Users/local_path/
# 拉取文件名包含关键词
pull_file_key = data
# 下拉的文件格式,即文件后缀名,扩展名需要写.
pull_file_suffix = .zip

[push]
# 远程服务器IP
host_ip = 111.111.111.111
# 远程服务器端口
host_port = 22
#远程服务器用户名
host_username = root
#远程服务器密码
host_password = your_password
#这个是需要拉取文件所在的远程目录,目录地址最后要加/
remote_path = /root/remote_path/
#这个是把拉取文件存放的本地目录,目录地址最后要加/
local_path = /Users/local_path/
# 拉取文件名包含关键词
pull_file_key = data
# 下拉的文件格式,即文件后缀名,扩展名需要写.
pull_file_suffix = .zip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

pull.py

# -*- coding: utf-8 -*-

from configparser import ConfigParser
import paramiko
import logging

'''
对于远程服务器的指定路径,可以根据筛选规则下拉文件
'''

# 生成日志文件
logging.basicConfig(filename='pull_file.log', level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


def read_config(config_path):
    cfg = ConfigParser()
    cfg.read(config_path, encoding='utf-8')
    section_list = cfg.sections()
    config_dict = {}
    for section in section_list:
        section_item = cfg.items(section)
        for item in section_item:
            config_dict[item[0]] = item[1]
    return config_dict


def pull(host_ip, host_port, host_username, host_password, remote_path, local_path, pull_file_key, pull_file_suffix):
    scp = paramiko.Transport((host_ip, host_port))
    scp.connect(username=host_username, password=host_password)
    sftp = paramiko.SFTPClient.from_transport(scp)
    key = pull_file_key
    suffix_name = pull_file_suffix
    suffix_len = len(suffix_name)
    try:
        remote_files = sftp.listdir(remote_path)
        # 遍历读取远程目录里的所有文件
        for file in remote_files:
            if file.find(key) != -1:
                # 判断文件后缀
                if file[-int(suffix_len):] == suffix_name:
                    local_file = local_path + file
                    remote_file = remote_path + file
                    try:
                        sftp.get(remote_file, local_file)
                        logger.info(file + 'file pulled successfully.')
                    except Exception as e:
                        logger.error(e)
    except IOError:
        logger.error("remote_path or local_path is not exist")
    scp.close()


if __name__ == '__main__':
    # 读取配置文件
    scp_config = read_config('./config.ini')
    host_ip = str(scp_config['host_ip'])
    host_port = int(scp_config['host_port'])
    host_username = str(scp_config['host_username'])
    host_password = str(scp_config['host_password'])
    remote_path = str(scp_config['remote_path'])
    local_path = str(scp_config['local_path'])
    pull_file_key = str(scp_config['pull_file_key'])
    pull_file_suffix = str(scp_config['pull_file_suffix'])
    # 调用方法拉取文件
    pull(host_ip, host_port, host_username, host_password, remote_path, local_path, pull_file_key, pull_file_suffix)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

push.py

# -*- coding: utf-8 -*-

import os
from configparser import ConfigParser
import paramiko
import logging

'''
对于远程服务器的指定路径,可以根据筛选规则上传文件
'''

# 生成日志文件
logging.basicConfig(filename='push_file.log', level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


def read_config(config_path):
    cfg = ConfigParser()
    cfg.read(config_path, encoding='utf-8')
    section_list = cfg.sections()
    config_dict = {}
    for section in section_list:
        section_item = cfg.items(section)
        for item in section_item:
            config_dict[item[0]] = item[1]
    return config_dict


def push(host_ip, host_port, host_username, host_password, remote_path, local_path, push_file_key, push_file_suffix):
    scp = paramiko.Transport((host_ip, host_port))
    scp.connect(username=host_username, password=host_password)
    sftp = paramiko.SFTPClient.from_transport(scp)
    files = os.listdir(local_path)
    key = push_file_key
    suffix_name = push_file_suffix
    suffix_len = len(suffix_name)
    try:
        for file in files:
            if file.find(key) != -1:
                # 判断文件后缀
                if file[-int(suffix_len):] == suffix_name:
                    local_file = local_path + file
                    remote_file = remote_path + file
                    try:
                        sftp.put(local_file, remote_file)
                        logger.info(file + 'file push successfully')
                    except Exception as e:
                        logger.error(e)
    except IOError:
        logger.error("remote_path or local_path is not exist")
    scp.close()


if __name__ == '__main__':
    # 读取配置文件
    scp_config = read_config('./config.ini')
    host_ip = str(scp_config['host_ip'])
    host_port = int(scp_config['host_port'])
    host_username = str(scp_config['host_username'])
    host_password = str(scp_config['host_password'])
    remote_path = str(scp_config['remote_path'])
    local_path = str(scp_config['local_path'])
    push_file_key = str(scp_config['push_file_key'])
    push_file_suffix = str(scp_config['push_file_suffix'])
    # 调用方法上传文件
    push(host_ip, host_port, host_username, host_password, remote_path, local_path, push_file_key, push_file_suffix)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

# 4. 数据同步

# 4.1 MySQL to Elasticsearch 数据同步

Logstash是具有实时流水线能力的开源的数据收集引擎。 Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。 它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。

我们可以使用Logstash将MySQL数据同步到ES。

# 4.1.1 安装Logstash

项目地址:https://github.com/elastic/logstash (opens new window)

下载地址:https://www.elastic.co/guide/en/logstash/7.16/installing-logstash.html#_apt (opens new window)

处理流程:

logstash处理流程

注意要有 java8 的环境,然后要安装跟ES一样的版本,以下是 Logstash 的安装命令:

$ mkdir -p /root/mysql_sync_es && cd /root/mysql_sync_es
$ wget https://artifacts.elastic.co/downloads/logstash/logstash-7.16.2-linux-x86_64.tar.gz
$ tar -zxvf logstash-7.16.2-linux-x86_64.tar.gz && rm -f logstash-7.16.2-linux-x86_64.tar.gz
1
2
3

# 4.1.2 配置Logstash

Step1: 下载一个 mysql-connector-java-5.1.34.jar (opens new window)

$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.34/mysql-connector-java-5.1.34.jar
1

Step2:创建同步配置文件

$ cd /root/mysql_sync_es
$ mkdir last_update_data data conf
$ cd conf
$ vim db_table_sync.conf
1
2
3
4

单表同步的配置示例如下:

input {
    jdbc {
       # 类型(建议用表名即可)
       type => "db_table"
       # 数据库连接地址
       jdbc_connection_string => "jdbc:mysql://ip:port/db?useUnicode=true&characterEncoding=utf8&useSSL=false"
       # 数据库用户名
       jdbc_user => "root"
       # 数据库密码
       jdbc_password => "password"
       # 判断数据库连接是否可用,默认false不开启
       jdbc_validate_connection => true
       # MySQL依赖包路径
       jdbc_driver_library => "/root/mysql_sync_es/mysql-connector-java-5.1.34.jar"
       # MySQL驱动
       jdbc_driver_class => "com.mysql.jdbc.Driver"
       # 开启分页查询(默认false不开启)
       jdbc_paging_enabled => true
       # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
       jdbc_page_size => "1000"
       # 编码转换
       codec => plain { charset => "UTF-8"}
       # 执行的sql语句
       statement => "SELECT id, uid, user_name, description, DATE_FORMAT( create_time, '%Y-%m-%d %H:%i:%S' ) create_time, DATE_FORMAT( insert_time, '%Y-%m-%d %H:%i:%S' ) insert_time, DATE_FORMAT( ifnull( update_time, insert_time ), '%Y-%m-%d %H:%i:%S' ) update_time FROM table WHERE update_time > :sql_last_value"     
       # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
       record_last_run => true
       # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
       use_column_value => true
       # 需要记录的字段,用于增量同步,需是数据库字段
       tracking_column => "update_time"
       # 需要记录的字段类型,值可以是:numeric,timestamp,默认值为“numeric”
       tracking_column_type => "timestamp"
       # record_last_run上次数据存放位置
       last_run_metadata_path => "/root/mysql_sync_es/last_update_data/db_table_last_update_time"
       # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
       lowercase_column_names => false
       # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
       clean_run => false
       # crontab定时任务,目前设置的是每小时同步一次,默认设置为每分钟同步一次
       schedule => "* */1 * * *"    
    }    
}

filter {
   if [type] == "db_table" { 
   mutate {
        # 重命名字段
        rename => {
            "user_name" => "userName"
            "create_time" => "createTime"
            "insert_time" => "insertTime"
            "update_time" => "updateTime"
        }
        # 添加额外字段
        add_field => {
            "field" => "Field"
       }
      }
    }
}

output {
  if [type] == "db_table" {  
    stdout {
        # JSON格式输出
        codec => json_lines
    } 
    elasticsearch {
        # ES索引名
        index => "db_table"
        # ES连接地址
        hosts => "ip:port"
        # 文档id
        document_id => "%{id}"
        # ES用户名
        user => "elastic"
        # ES密码
        password => "password"
    }
  } 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

注意事项:

1)多表配置和单表配置的区别在于input模块的jdbc模块有几个type,output模块就需对应有几个type。

2)document_id => "%{id}"是根据id去更新(如果不会更新旧数据,这个去掉即可),如果有重命名,%{}里应该是重命名之后的。

3)检查MySQL数据表的字段是否有ES保留词(比如type),如果有的话需要给它重命名字段,否则无法成功创建索引。

# 4.1.3 启动Logstash开始数据同步

方式一:指定具体的配置文件
$ nohup /root/mysql_sync_es/logstash-7.16.2/bin/logstash -f /root/mysql_sync_es/db_table_sync.conf --path.data=/root/mysql_sync_es/data/db_table_sync_data &

方式二:指定配置文件目录(目录里所有的配置文件都会执行)
$ nohup /root/mysql_sync_es/logstash-7.16.2/bin/logstash -f /root/mysql_sync_es/conf --path.data=/root/mysql_sync_es/data &
1
2
3
4
5

注意事项:

1)需要指定 path.data,否则会报如下错误

Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the "path.data" setting
1

2)可以提前写好用于启动的 shell 脚本,这样就更方便启动了。

3)关闭 Logstash 可通过 kill 进程来实现

$ ps -ef | grep logstash
$ kill -9 [PID]
1
2

# 4.2 MinIO to MinIO 数据同步

# 4.2.1 Rclone工具简介

Rclone是一个命令行程序,用于管理云存储上的文件,它旨在成为"云存储的rsync"。 它是云供应商web存储界面的一个功能丰富的替代品。超过40种云存储产品支持Rclone,包括S3对象存储、业务和消费者文件存储服务以及标准传输协议。除了MinIO之外,也支持其他云存储的同步。

项目地址:https://github.com/rclone/rclone (opens new window)

官方文档:http://docs.minio.org.cn/docs/master/rclone-with-minio-server (opens new window)

# 4.2.2 使用Rclone实现MinIO数据同步

Step1:安装Rclone

$ curl https://rclone.org/install.sh | sudo bash
1

Step2:新建配置文件

/root/.config/rclone/rclone.conf 路径新建配置文件,内容如下:

[oldminio]
type = s3
provider = Minio
env_auth = false
access_key_id = user1
secret_access_key = password1
region = cn-east-1
endpoint = http://ip1:9000
location_constraint =
server_side_encryption =

[newminio]
type = s3
provider = Minio
env_auth = false
access_key_id = user2
secret_access_key = password2
region = cn-east-1
endpoint = http://ip2:9000
location_constraint =
server_side_encryption =
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

注:只需修改 access_key_id、secret_access_key、endpoint(填写ip:port的地址,而不是域名访问的地址)的值即可,可添加多个数据源。

Step3:进行数据同步

执行命令:

$ rclone --size-only --dry-run copy -P oldminio:mybucket newminio:mybucket    // 测试增量复制(只是预览效果,不会实际操作数据)
$ rclone --size-only --dry-run sync -P oldminio:mybucket newminio:mybucket    // 测试变化同步(只是预览效果,不会实际操作数据)
$ rclone --size-only copy -P oldminio:mybucket newminio:mybucket              // 增量复制
$ rclone --size-only sync -P oldminio:mybucket newminio:mybucket              // 变化同步
1
2
3
4

解释说明:

  • MinIO目前并不支持所有的S3特性。特别是它不支持MD5校验(ETag)或者是元数据。这就表示Rclone不能通过MD5SUMs进行校验或者保存最后修改时间。不过你可以用Rclone的--size-only flag。
  • 使用 sync 参数,同步有差异的数据(会新增也会删除);使用 copy参数,增量复制数据(只新增不删除)。
  • 使用--dry-run参数,是用来测试预览效果的,不会实际操作数据,在正式操作之前,可以加上这个测试一下是否符合预期。
  • -P 显示详细过程。是mybucket 桶名称,如果新minio没有则自动创建。

其他常用命令:

$ rclone lsd newminio:                                           // 列举存储桶
$ rclone mkdir newminio:mybucket2                                // 创建一个新的存储桶
$ rclone --size-only copy /path/to/files newminio:mybucket       // 拷贝文件到存储桶
$ rclone --size-only copy newminio:mybucket /tmp/bucket-copy     // 从存储桶中拷贝文件
$ rclone ls newminio:mybucket                                    // 列举存储桶中的所有文件
$ rclone --size-only sync /path/to/files myminio:mybucket        // 同步文件到存储桶
1
2
3
4
5
6

注意事项:

  • 注意存储桶的访问策略,如果为Public可以直接访问,如果为Private不可访问,如果为Custom请在有权限访问的服务器上进行操作。
  • Rclone是一个数据同步工具,支持增量更新,可用于MinIO数据同步。

# 4.3 以离线文件形式进行数据同步

针对不可联网的重要数据,我写了一套离线数据同步脚本,代码已在Github上开源,项目地址为:https://github.com/Logistic98/data-sync (opens new window)

# 4.3.1 项目简介

该项目是一套以离线文件形式进行数据同步的Python脚本(目前只支持MySQL、ES),支持基于时间的全量数据导出与增量数据导出。密钥采用RSA非对称加密,数据采用AES对称加密。数据文件加密后,再将其压缩成一个ZIP压缩包进行传输。适用于不可联网的重要数据进行离线数据同步。

# 4.3.2 项目结构

项目主要包含file_scp(远程文件传输)、rsa_key_tool(生成RSA公私钥)、source_export_data(导出数据)、target_import_data(导入数据)

.
├── file_scp                         
│   ├── config.ini.example
│   ├── pull.py
│   └── push.py
├── rsa_key_tool                     
│   ├── config.ini.example
│   └── rsa_encryption.py
├── source_export_data               
│   ├── clear_data_tool.py
│   ├── config.ini.example
│   ├── last_job_time.json
│   ├── build.sh
│   ├── Dockerfile
│   ├── export_es_data.py
│   ├── export_mysql_data.py
│   ├── gol.py
│   ├── log.py
│   ├── source_export_data.py
│   └── utils.py
└── target_import_data               
    ├── clear_data_tool.py
    ├── config.ini.example
    ├── last_job_time.json
    ├── build.sh
    ├── Dockerfile
    ├── gol.py
    ├── import_es_data.py
    ├── import_mysql_data.py
    ├── log.py
    ├── target_import_data.py
    └── utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 4.3.3 项目使用

Step1:项目里的4个config.ini.example是配置示例,将其重命名为 config.ini,将配置信息换成自己的实际配置(具体含义见配置文件里的注释)

Step2:安装项目依赖,见 requirements.txt 文件(执行pip install -r requirements.txt命令即可)

schedule==1.1.0
paramiko==2.11.0
elasticsearch==7.16.2
PyMySQL==1.0.2
pycryptodome==3.17
1
2
3
4
5

Step3:执行 rsa_key_tool/rsa_encryption.py 生成RSA公私钥

默认在 source_export_data 目录下生成 public_rsa_key.bin 公钥文件,用于加密;在 target_import_data 目录下生成 private_rsa_key.bin 私钥文件,用于解密。

Step4:导出及导入数据

执行 source_export_data/source_export_data.py导出数据,通过 file_scp/push.py将导出的 data_package 同步到 target_import_data 目录下,执行target_import_data/target_import_data.py导入数据。

Step5:Docker打包部署

source_export_data部署在导出端服务器,给build.sh脚本添加权限后即可一键构建,配置文件、日志、数据文件会挂载到容器外。target_import_data部署在导入端服务器,使用同上。构建镜像时配置了自启动,容器启动后即可自动启动主程序。

# 4.3.4 注意事项

[1] 注意Python环境的版本,支持Python3.8环境,不支持Python3.9环境(加密模块会出错),其余的版本没有测试。

[2] ES的依赖版本尽量与服务端相近,我这里使用的是7.16.2版本的elasticsearch依赖,在7.16.2和8.4.1版本的服务端ES上测试无问题。

[3] 支持基于时间的全量导出与增量导出,每次执行时会读取上次的同步时间,若这个值为空字符串,则跑全量数据(这里的全量是指在当前时间之前的所有数据,若时间标志位有在当前时间之后的,则不会导出)。使用的基于时间的增量字段,在配置文件的 time_field 项进行配置。需要注意的是,日期字段格式只支持%Y-%m-%d %H:%M:%S,若不是此格式,需要在代码里转换一下。

[4] 本项目采用 schedule 库配置定时任务,在 source_export_data/source_export_data.pytarget_import_data/target_import_data.py文件里配置即可,支持配置多个定时任务规则。代码里加了运行状态限制,若上一次没执行完,本次会跳过。

# 配置定时任务,可同时启动多个
logger.info("定时任务规则:{}".format("每隔30分钟运行一次job"))
schedule.every(30).minutes.do(source_export_data_main_job)
logger.info("定时任务规则:{}".format("每隔1小时运行一次job"))
schedule.every().hour.do(source_export_data_main_job)
logger.info("定时任务规则:{}".format("每天在23:59时间点运行job"))
schedule.every().day.at("23:59").do(source_export_data_main_job)
logger.info("定时任务规则:{}".format("每周一运行一次job"))
schedule.every().monday.do(source_export_data_main_job
1
2
3
4
5
6
7
8
9

# 5. 参考资料

[1] mysql 数据库 定时自动备份 from 简书 (opens new window)

[2] MySQL定时备份数据库 from 知乎 (opens new window)

[3] 如何在生产环境下实现每天自动备份mysql数据库 from 51CTO (opens new window)

[4] docker 中 MySQL 备份及恢复 from learnku (opens new window)

[5] 使用 Logstash 同步海量 MySQL 数据到 ES from 腾讯云 (opens new window)

[6] 使用logstash 运行配置文件,出现you must change the "path.data" setting from CSDN (opens new window)

[7] rclone 迁移minio数据 from CSDN (opens new window)

[8] Rclone结合MinlO Server from Minio官方文档 (opens new window)

[9] Rclone 进阶使用教程 - 常用命令参数详解 from p3terx zone (opens new window)

Last Updated: 10/6/2024, 5:46:17 PM