使用Flask封装集成深度学习算法

  1. 1. Flask接口封装
    1. 1.1 Flask简介
    2. 1.2 Flask通用模板
      1. 1.2.1 常规POST请求
      2. 1.2.2 常规GET请求
      3. 1.2.3 以base64格式传输图片
      4. 1.2.4 以文件的形式传输
    3. 1.3 Flask常见问题
      1. 1.3.1 Flask跨域问题
      2. 1.3.2 Flask中文乱码问题
      3. 1.3.3 JSON解析问题
      4. 1.3.4 Flask并发调用问题
      5. 1.3.5 base64编码出现b的问题
      6. 1.3.6 字典中文转码问题
      7. 1.3.7 请求时出现ProxyError问题
    4. 1.4 Flask全局配置
      1. 1.4.1 打印日志到控制台并写入文件
      2. 1.4.2 Flask全局统一封装返回值格式
      3. 1.4.3 使用pre-request校验Flask入参
      4. 1.4.4 Flask-Doc生成接口文档
      5. 1.4.5 跨文件全局变量的定义与使用
  2. 2. 深度学习模型及算法
    1. 2.1 百度飞桨Paddle
      1. 2.1.1 PaddleOCR
      2. 2.1.2 PaddleNLP
    2. 2.2 目标识别检测
    3. 2.3 文本合成语音
    4. 2.4 破解Google翻译
    5. 2.5 对旧图片旧视频进行着色
      1. 2.5.1 DeOldify简介
      2. 2.5.2 Google Colab简介
      3. 2.5.3 官方提供的在线服务及API
      4. 2.5.4 DeOldify的预训练模型
      5. 2.5.5 使用Google Colab进行部署
  3. 3. 传统类型算法及处理工具
    1. 3.1 文本关键词及概要提取
    2. 3.2 文本内容审查
    3. 3.3 视频关键帧抽取
    4. 3.4 图片文件处理
      1. 3.4.1 压缩图片大小
      2. 3.4.2 图片添加盲水印
      3. 3.4.3 获取图片缩略图
    5. 3.5 将网页保存成pdf文件
      1. 3.5.1 基本概念简介
      2. 3.5.2 使用Flask进行封装
      3. 3.5.3 使用Docker进行部署
      4. 3.5.4 编写Dockerfile
      5. 3.5.5 在服务器上部署项目
      6. 3.5.6 验证部署
  4. 4. Python工具函数
    1. 4.1 解析Excel和CSV文件
      1. 4.1.1 Excel与CSV转字典列表
      2. 4.1.2 字典列表转Excel与CSV
      3. 4.1.3 其他操作Excel和CSV的示例
    2. 4.2 读写配置文件
      1. 4.2.1 读取JSON文件里的配置信息
      2. 4.2.2 读写INI文件里的配置信息
      3. 4.2.3 读写txt文件
      4. 4.2.4 生成xml文件
      5. 4.2.5 解析yaml格式文件
    3. 4.3 列表数组字符串的处理
      1. 4.3.1 列表元素去重及统计出现次数
      2. 4.3.2 逗号分隔的字符串与列表互转
      3. 4.3.3 比较数组是否完全相等
      4. 4.3.4 实现replaceAll功能
      5. 4.3.5 将List拆分成若干个指定长度的小List
      6. 4.3.6 按指定长度分段切割字符串或列表
      7. 4.3.7 字符串四舍五入保留两位小数
      8. 4.3.8 将两个相同长度的List转字典
      9. 4.3.9 检查字符串里的中文字符
      10. 4.3.10 浏览器URL编码以及反编码
      11. 4.3.11 去除列表的最后一个元素
      12. 4.3.12 查找字符串里所有子串位置
    4. 4.4 系统与文件目录的基本操作
      1. 4.4.1 基本文件和目录操作
      2. 4.4.2 复制某个文件并重命名
      3. 4.4.3 获取文件大小及创建、修改时间
      4. 4.4.4 检查路径是否有中文
      5. 4.4.5 读取指定目录的所有文件夹保存成列表
      6. 4.4.6 遍历目录,获取目录下的所有文件路径
      7. 4.4.7 从文件路径列表筛选出指定后缀的文件
      8. 4.4.8 递归获取某目录下某后缀的文件路径
      9. 4.4.9 根据md5进行文件去重
      10. 4.4.10 将文件转成文件流提供下载
      11. 4.4.11 将网络图片转存成base64
      12. 4.4.12 起始时间及执行时间统计
      13. 4.4.13 设置超时操作
      14. 4.4.14 检查文件编码
      15. 4.4.15 文件路径、文件名、后缀分割
      16. 4.4.16 筛选出扩展名符合条件的文件路径列表
    5. 4.5 加密解密算法
      1. 4.5.1 RSA加密解密
      2. 4.5.2 AES加密解密
    6. 4.6 根据IP或域名获取地理位置信息
      1. 4.6.1 获取本机IP地址
      2. 4.6.2 获取地理位置信息
    7. 4.7 使用cv2库画图
  5. 5. 数据库及中间件的集成与使用
    1. 5.1 使用Redis缓存数据
    2. 5.2 将数据保存到MySQL
    3. 5.3 查询Oracle的数据
    4. 5.4 ElasticSearch的导入导出
    5. 5.5 minio的文件上传
  6. 6. Python常用的进阶知识及示例
    1. 6.1 使用vthread实现多线程
      1. 6.1.1 vthread简介
      2. 6.1.2 vthread基本使用
    2. 6.2 使用Python协程
      1. 6.2.1 Python协程简介
      2. 6.2.2 进程、线程、协程对比
      3. 6.2.2 使用asyncio实现协程
    3. 6.3 使用Python装饰器
      1. 6.3.1 Python装饰器简介
      2. 6.3.2 与Java注解异同点对比
      3. 6.3.3 使用装饰器实现权限校验
    4. 6.4 程序内存占用分析
      1. 6.4.1 Memray简介
      2. 6.4.2 Memray基本使用
  7. 7. 项目的打包部署
    1. 7.1 Docker环境搭建
    2. 7.2 导出项目依赖
    3. 7.3 使用Docker部署Flask项目
    4. 7.4 依赖类库的安装部署说明
    5. 7.5 常见报错问题的解决
  8. 8. 参考资料

1. Flask接口封装

1.1 Flask简介

Flask是一个使用Python编写的轻量级Web应用框架。Flask最显著的特点是它是一个“微”框架,轻便灵活,但同时又易于扩展。默认情况下,Flask 只相当于一个内核,不包含数据库抽象层、用户认证、表单验证、发送邮件等其它Web框架经常包含的功能。Flask依赖用各种灵活的扩展来给Web应用添加额外功能。

与Django的对比:Django是一个开源的Python Web应用框架,采用了MVT的框架模式,即模型M,视图V和模版T。Django是一个”大而全”的重量级Web框架,其自带大量的常用工具和组件,甚至还自带了管理后台Admin,适合快速开发功能完善的企业级网站。

Flask项目地址:https://github.com/pallets/flask

1.2 Flask通用模板

为了方便日常功能开发,这里放一个自己平时用的 Flask 通用模板,专注于业务逻辑的编写即可。

完整示例代码已在Github上开源:https://github.com/Logistic98/flask-demo

1.2.1 常规POST请求

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-

from flask import Flask, jsonify
from flask_cors import CORS
from pre_request import pre, Rule

from log import logger
from code import ResponseCode, ResponseMessage

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 方法功能说明
"""
@app.route(rule='/api/moduleName/methodName', methods=['POST'])
def methodName():

# 参数校验
rule = {
"text": Rule(type=str, required=True, gte=3, lte=255),
"type": Rule(type=int, required=True, gte=1, lte=1)
}
try:
params = pre.parse(rule=rule)
except Exception as e:
logger.error(e)
fail_response = dict(code=ResponseCode.RARAM_FAIL, msg=ResponseMessage.RARAM_FAIL, data=None)
logger.error(fail_response)
return jsonify(fail_response)

# 获取参数
text = params.get("text")

# 业务处理模块
result = text + ",hello world!"
logger.info("测试日志记录")

# 成功的结果返回
success_response = dict(code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS, data=result)
logger.info(success_response)
return jsonify(success_response)


if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务,指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./server.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

code.py

1
2
3
4
5
6
7
8
9
10
11
12
13
# -*- coding: utf-8 -*-


class ResponseCode(object):
SUCCESS = 200
RARAM_FAIL = 400
BUSINESS_FAIL = 500


class ResponseMessage(object):
SUCCESS = "请求成功"
RARAM_FAIL = "参数校验失败"
BUSINESS_FAIL = "业务处理失败"

response.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-

from code import ResponseMessage, ResponseCode


class ResMsg(object):
"""
封装响应文本
"""
def __init__(self, data=None, code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS):
self._data = data
self._msg = msg
self._code = code

def update(self, code=None, data=None, msg=None):
"""
更新默认响应文本
:param code:响应状态码
:param data: 响应数据
:param msg: 响应消息
:return:
"""
if code is not None:
self._code = code
if data is not None:
self._data = data
if msg is not None:
self._msg = msg

def add_field(self, name=None, value=None):
"""
在响应文本中加入新的字段,方便使用
:param name: 变量名
:param value: 变量值
:return:
"""
if name is not None and value is not None:
self.__dict__[name] = value

@property
def data(self):
"""
输出响应文本内容
:return:
"""
body = self.__dict__
body["data"] = body.pop("_data")
body["msg"] = body.pop("_msg")
body["code"] = body.pop("_code")
return body

1.2.2 常规GET请求

如果是GET请求,修改两处即可

1
2
3
4
# 声明处
@app.route(rule='/moduleName/methodName', methods=['GET'])
# 接参处
id = request.args.get("id")

1.2.3 以base64格式传输图片

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import base64
import requests
import json

if __name__ == '__main__':
# 测试请求
url = 'http://127.0.0.1:5000/moduleName/methodName'
f = open('./data/test.jpg', 'rb')
# base64编码
base64_data = base64.b64encode(f.read())
f.close()
base64_data = base64_data.decode()
# 传输的数据格式
data = {'img': base64_data}
# post传递数据
r = requests.post(url, data=json.dumps(data))
print(r.text.encode().decode('unicode_escape'))

注:使用.encode().decode('unicode_escape')是为了解决中文乱码问题

log.py、code.py与response.py同上,server.py如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# -*- coding: utf-8 -*-

import os
from uuid import uuid1
from flask import Flask, jsonify
from flask_cors import CORS
from pre_request import pre, Rule

from code import ResponseCode, ResponseMessage
from log import logger
from utils import base64_to_img

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 方法功能说明
"""
@app.route(rule='/api/moduleName/methodName', methods=['POST'])
def methodName():

# 参数校验
rule = {
"img": Rule(type=str, required=True),
"ext": Rule(type=str, required=False)
}
try:
params = pre.parse(rule=rule)
except Exception as e:
logger.error(e)
fail_response = dict(code=ResponseCode.RARAM_FAIL, msg=ResponseMessage.RARAM_FAIL, data=None)
logger.error(fail_response)
return jsonify(fail_response)

# 获取参数
image_b64 = params.get("img")
ext = params.get("ext")

# 将base64字符串解析成图片保存
if not os.path.exists('./img'):
os.makedirs('./img')
uuid = uuid1()
if ext is not None:
img_path = './img/{}.{}'.format(uuid, ext)
else:
img_path = './img/{}.jpg'.format(uuid)
try:
base64_to_img(image_b64, img_path)
except Exception as e:
logger.error(e)
fail_response = dict(code=ResponseCode.BUSINESS_FAIL, msg=ResponseMessage.BUSINESS_FAIL, data=None)
logger.error(fail_response)
return jsonify(fail_response)

# 下面对保存的图片进行若干处理
result = image_b64
logger.info("测试日志记录")

# 处理完成后删除生成的图片文件
os.remove(img_path)

# 成功的结果返回
success_response = dict(code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS, data=result)
logger.info(success_response)
return jsonify(success_response)


if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务 指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

1.2.4 以文件的形式传输

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-

import time
import requests

if __name__ == '__main__':
url = 'http://127.0.0.1:5000/moduleName/methodName'
img_path = './data/test.jpg'
files = {'image': open(img_path, "rb")}
r = requests.post(url, files=files)
end_time = time.time()
print(r.text.encode().decode('unicode_escape'))

1.3 Flask常见问题

1.3.1 Flask跨域问题

Step1:引入flask-cors库

1
$ pip install flask-cors

Step2:配置CORS

flask-cors 有两种用法,一种为全局使用,一种对指定的路由使用。

其中CORS提供了一些参数,常用的我们可以配置 originsmethodsallow_headerssupports_credentials

[1] 全局使用

1
2
3
4
5
from flask import Flask, request
from flask_cors import CORS

app = Flask(__name__)
CORS(app, supports_credentials=True)

[2] 局部使用

1
2
3
4
5
6
7
8
9
10
from flask import Flask, request
from flask_cors import cross_origin

app = Flask(__name__)

@app.route('/')
@cross_origin(supports_credentials=True)
def hello():
name = request.args.get("name", "World")
return f'Hello, {name}!'

1.3.2 Flask中文乱码问题

[1] 发送请求乱码

不管是dump还是dumps,中文乱码加入ensure_ascii=False即可。

1
json.dump(content, f, ensure_ascii=False)

[2] 接收返回值乱码

接收返回值乱码问题,给app配置app.config[‘JSON_AS_ASCII’] = False即可。

1
2
3
if __name__ == "__main__":
app.config['JSON_AS_ASCII'] = False
app.run(host='0.0.0.0', port='5000')

1.3.3 JSON解析问题

1
request_body = request.get_json()

这种方法获取请求体中的JSON,有时会因为空格出现问题,导致请求400。为了避免这种情况,接参之后,可以对其去除全部空格。

1
2
3
request_data = request.get_data(as_text=True)
request_data = ''.join(request_data.split())
request_body = json.loads(request_data)

注:如果入参里要保留空格,则不能通过此方式来处理。

1.3.4 Flask并发调用问题

服务端:通过设置app.run()的参数,来达到多线程的效果。多进程或多线程只能选择一个,不能同时开启。

1
2
3
4
# 1.threaded : 多线程支持,默认为False,即不开启多线程;
app.run(threaded=True)
# 2.processes:进程数量,默认为1.
app.run(processes=True)

客户端:通过grequests进行并发请求。

requests是Python发送接口请求非常好用的一个三方库,由K神编写,简单,方便上手快。但是requests发送请求是串行的,即阻塞的。发送完一条请求才能发送另一条请求。为了提升测试效率,一般我们需要并行发送请求。这里可以使用多线程,或者协程,gevent或者aiohttp,然而使用起来,都相对麻烦。

grequests是K神基于gevent+requests编写的一个并发发送请求的库,使用起来非常简单。

项目地址:https://github.com/spyoungtech/grequests

依赖安装:

1
$ pip install grequests

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# -*- coding: utf-8 -*-

import time
import grequests

start = time.time()
req_list = [grequests.post('http://httpbin.org/post', data={'a':1, 'b':2}) for i in range(10)]
res_list = grequests.map(req_list)
result_list = []
for res in res_list:
result_list.append(res.text)
print(result_list)
print(len(result_list))
print(time.time()-start)

1.3.5 base64编码出现b的问题

去除的方法:[1] decode为utf-8编码、[2] str转化为utf-8编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# -*- coding: utf-8 -*-

import base64

# 生成测试数据
before_base64 = 'abc'.encode()
after_base64 = base64.b64encode(before_base64)
print(after_base64)

# 方式一:decode为utf-8编码
method_one_base64 = after_base64.decode('utf-8')
print(method_one_base64)

# 方式二:str转化为utf-8编码
method_two_base64 = str(after_base64, 'utf-8')
print(method_two_base64)

>>> b'YWJj'
>>> YWJj
>>> YWJj

1.3.6 字典中文转码问题

尝试过.encode().decode('unicode_escape')# -*- coding: utf-8 -*-str()等方式仍然不行,最后将字典改成json格式,使用.get("key")方式取值解决了问题。

1
2
3
4
5
6
7
8
9
10
11
12
# -*- coding: utf-8 -*-

import json

d = {'s': '测试', 'd': u'\u4ea4\u6362\u673a'}
d1 = json.dumps(d1)
print(d1)
d2 = json.loads(j)
print(d2)

>>> {"s": "\u6d4b\u8bd5", "d": "\u4ea4\u6362\u673a"}
>>> {'s': '测试', 'd': '交换机'}

1.3.7 请求时出现ProxyError问题

使用request请求时有时会遇到requests.exceptions.ProxyError报错,请求时禁用系统代理即可解决此问题。

1
2
proxies = { "http": None, "https": None}
requests.get("url", proxies=proxies)

1.4 Flask全局配置

1.4.1 打印日志到控制台并写入文件

方式一:只将日志写入文件,控制台不打印的话,在文件开头加上这个即可

1
2
3
4
5
6
import logging

# 日志记录
logging.basicConfig(filename='server.log', level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

方式二:打印日志到控制台并写入文件,可以写一个日志输出配置类 log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./server.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

使用时直接调用即可。

1
logger.info("logger.info")

1.4.2 Flask全局统一封装返回值格式

当前主流的 Web 应用开发通常采用前后端分离模式,前端和后端各自独立开发,然后通过数据接口沟通前后端,完成项目。定义一个统一的数据下发格式,有利于提高项目开发效率,减少各端开发沟通成本。对Flask全局统一封装返回值格式可以减少大量重复代码。

code.py

1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-

class ResponseCode(object):
SUCCESS = 200
FAIL = 500

class ResponseMessage(object):
SUCCESS = "请求成功"
FAIL = "请求失败"

response.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-

from code import ResponseMessage, ResponseCode


class ResMsg(object):
"""
封装响应文本
"""
def __init__(self, data=None, code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS):
self._data = data
self._msg = msg
self._code = code

def update(self, code=None, data=None, msg=None):
"""
更新默认响应文本
:param code:响应状态码
:param data: 响应数据
:param msg: 响应消息
:return:
"""
if code is not None:
self._code = code
if data is not None:
self._data = data
if msg is not None:
self._msg = msg

def add_field(self, name=None, value=None):
"""
在响应文本中加入新的字段,方便使用
:param name: 变量名
:param value: 变量值
:return:
"""
if name is not None and value is not None:
self.__dict__[name] = value

@property
def data(self):
"""
输出响应文本内容
:return:
"""
body = self.__dict__
body["data"] = body.pop("_data")
body["msg"] = body.pop("_msg")
body["code"] = body.pop("_code")
return body

test_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# -*- coding: utf-8 -*-

from flask import Flask, jsonify
from flask_cors import CORS

from code import ResponseCode, ResponseMessage

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)


@app.route("/test",methods=["GET"])
def test():
test_dict = dict(name="zhang",age=18)
data = dict(code = ResponseCode.SUCCESS, msg = ResponseMessage.SUCCESS, data = test_dict)
return jsonify(data)


if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务,指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

1.4.3 使用pre-request校验Flask入参

项目介绍:用于验证请求参数的 python 框架,专为 Flask 设计

项目地址:https://github.com/Eastwu5788/pre-request

官方文档:https://pre-request.readthedocs.io/en/master/index.html

依赖安装:pip install pre-request

使用示例:先定义一个 rule 字典,然后使用 params = pre.parse(rule=rule) 校验参数,之后取值使用 params.get(“xxx”) 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# -*- coding: utf-8 -*-

from flask import Flask
from flask_cors import CORS
from pre_request import pre, Rule

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

rule = {
"userName": Rule(type=str, required=True, gte=3, lte=20, dest="user_name"),
"gender": Rule(type=int, required=True, enum=[1, 2]),
"age": Rule(type=int, required=True, gte=18, lte=60),
"country": Rule(type=str, required=False, gte=2, default="中国")
}

@app.route("/user/info", methods=["POST"])
def user_info_handler():
params = pre.parse(rule=rule)
userName = params.get("userName")
gender = params.get("gender")
age = params.get("age")
country = params.get("country")
return "success"


if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务 指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)

1.4.4 Flask-Doc生成接口文档

基本介绍:Flask-Doc 可以根据代码注释生成文档页面,支持Markdown、离线文档下载、在线调试。

项目地址:https://github.com/kwkwc/flask-docs

官方文档:https://github.com/kwkwc/flask-docs/blob/master/README.zh-CN.md

依赖安装:pip install Flask-Docs

使用示例:examples目录里有官方示例,参照里面的sample_app.py编写即可,以下是一些配置项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 使用 CDN
# app.config["API_DOC_CDN"] = True

# 禁用文档页面
# app.config["API_DOC_ENABLE"] = False

# SHA256 加密的授权密码,例如这里是 admin
# echo -n admin | shasum -a 256
# app.config["API_DOC_PASSWORD_SHA2"] = "8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918"

# 允许显示的方法
# app.config["API_DOC_METHODS_LIST"] = ["GET", "POST", "PUT", "DELETE", "PATCH"]

# 自定义 url_prefix
# app.config["API_DOC_URL_PREFIX"] = "/docs/api"

# 需要排除的 RESTful Api 类名
# app.config["API_DOC_RESTFUL_EXCLUDE"] = ["Todo"]

# 需要显示的 Api 蓝图名称
# app.config["API_DOC_MEMBER"] = ["api", "platform"]

# 需要排除的子成员 Api 函数名称
# app.config["API_DOC_MEMBER_SUB_EXCLUDE"] = ["delete_data"]

# 自动生成请求参数 markdown
# app.config["API_DOC_AUTO_GENERATING_ARGS_MD"] = True

# 禁止以 markdown 处理所有文档
# app.config["API_DOC_ALL_MD"] = False

文档效果:

Flask-Doc

1.4.5 跨文件全局变量的定义与使用

global关键字可以定义一个变量为全局变量,但是这个仅限于在一个文件中调用全局变量,跨文件就会报错。 既然在一个文件里面可以生效的话,那么我们就专门为全局变量定义一个“全局变量管理模块”就好了。

gol.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# -*- coding: utf-8 -*-

def _init():
global _global_dict
_global_dict = {}

def set_value(key, value):
""" 定义一个全局变量 """
_global_dict[key] = value

def get_value(key, defValue=None):
""" 获得一个全局变量,不存在则返回默认值 """
try:
return _global_dict[key]
except KeyError:
return defValue

定义处

1
2
3
4
5
import gol

gol._init() # 先必须在主模块初始化(只需要一次即可)
gol.set_value('name', 'zhangsan')
gol.set_value('age', 23)

调用处

1
2
3
4
import gol

name = gol.get_value('name')
age = gol.get_value('age')

2. 深度学习模型及算法

我对一些通用的开源算法使用Flask进行了封装集成,项目地址:yoyo-algorithm

2.1 百度飞桨Paddle

飞桨(PaddlePaddle)以百度多年的深度学习技术研究和业务应用为基础,集深度学习核心训练和推理框架、基础模型库、端到端开发套件、丰富的工具组件于一体,是中国首个自主研发、功能完备、开源开放的产业级深度学习平台。

使用Paddle系列的算法,需要统一安装 paddlepaddle 库,具体模块再安装对应模块的库即可。

统一说明:Paddle系列的库包和算法模型都需要关闭翻墙代理工具,算法模型会在代码初次执行时自动下载(存放在C:\Users\xxx\.paddlenlp目录下),所以初次执行耗时会长一些。

1
$ pip install paddlepaddle==2.2.0 -i https://mirror.baidu.com/pypi/simple

注:PaddleNLP 要求 paddlepaddle >= 2.2,如果根据 PaddleOCR 要求的 paddlepaddle >=2.0.1 而安装的是2.0.1版本,前者会报错:cannot import name '_convert_attention_mask' from 'paddle.nn.layer.transformer'

2.1.1 PaddleOCR

PaddleOCR:是一个开源的图片OCR识别算法,模型会在初次执行时自动下载。这是它的官方使用教程:PaddleOCR使用教程

依赖库安装:

1
$ pip install "paddleocr>=2.0.1"

基本使用示例:

1
2
3
4
5
6
7
8
9
from paddleocr import PaddleOCR

# Paddleocr目前支持的多语言语种可以通过修改lang参数进行切换
# 例如`ch`, `en`, `fr`, `german`, `korean`, `japan`
ocr = PaddleOCR(use_angle_cls=True, lang="ch") # need to run only once to download and load model into memory
img_path = './imgs/test.jpg'
result = ocr.ocr(img_path, cls=True)
for line in result:
print(line)

注:如果需要结果可视化、版面分析,需要另外安装相应的库,具体见官方文档。

2.1.2 PaddleNLP

PaddleNLP:是一个开源的自然语言处理开发库,模型会在初次执行时自动下载。这是它的官方使用教程:PaddleNLP官方教程

依赖库安装:

1
$ pip install paddlenlp==2.2.4

基本使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# -*- coding: utf-8 -*-

from paddlenlp import Taskflow

# 中文分词
paddle_nlp = Taskflow("word_segmentation")
result = paddle_nlp("第十四届全运会在西安举办")
print(result)
# >>> ['第十四届', '全运会', '在', '西安', '举办']

# 词性标注
paddle_nlp = Taskflow("pos_tagging")
result = paddle_nlp("第十四届全运会在西安举办")
print(result)
# >>> [('第十四届', 'm'), ('全运会', 'nz'), ('在', 'p'), ('西安', 'LOC'), ('举办', 'v')]

# 名词短语标注
paddle_nlp = Taskflow("knowledge_mining", model="nptag")
result = paddle_nlp("红曲霉菌")
print(result)
# >>> [{'text': '红曲霉菌', 'label': '微生物'}]

# 情感分析
paddle_nlp = Taskflow("sentiment_analysis")
result = paddle_nlp("这个产品用起来真的很流畅,我非常喜欢")
print(result)
# >>> [{'text': '这个产品用起来真的很流畅,我非常喜欢', 'label': 'positive', 'score': 0.9938690066337585}]

# 文本相似度
paddle_nlp = Taskflow("text_similarity")
result = paddle_nlp([["世界上什么东西最小", "世界上什么东西最小?"]])
print(result)
# >>> [{'text1': '世界上什么东西最小', 'text2': '世界上什么东西最小?', 'similarity': 0.992725}]

注:返回结果说明见官方文档。除此之外,PaddleNLP还支持很多其他的自然语言处理,如生成式问答、智能问答等,具体见官方文档。

2.2 目标识别检测

yolov5:是一种单阶段目标检测算法,该算法在Yolov4的基础上添加了一些新的改进思路,使其速度与精度都得到了极大的性能提升。

这是一篇使用教程:教你利用yolov5训练自己的目标检测模型,详细介绍了如何使用yolov5训练自己的目标检测模型,数据集和预训练权重的准备部分也留了该作者相应的博客链接。

2.3 文本合成语音

谷歌开源的文本转语音 API 交互的 Python 库,虽然免费但生成的语音机器音较重,使用时需要联网(被墙,国内需要设置代理)

项目地址:https://github.com/pndurette/gTTS

1
$ pip install gTTS

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
# -*- coding: utf-8 -*-

import os
from gtts import gTTS

os.environ["https_proxy"] = "http://127.0.0.1:1080"

# 谷歌文字转语音API测试
text = "测试gtts文本转语音"
audio = gTTS(text=text, lang="zh-cn")
audio.save("demo.mp3")

注:如果未设置代理或者代理有问题,会报“Python GTTS / Failed to connect. Probable cause: Unknown”错误。

语音文件播放:

playsound 声明它已经在WAV和MP3文件上进行了测试,但是它可能也适用于其他文件格式。

1
$ pip install playsound

示例代码如下:

1
2
from playsound import playsound
playsound('demo.mp3')

注意事项:调用时可能出现“指定的设备未打开,或不被 MCI 所识别”报错。原因是windows不支持utf-16编码,需修改playsound源码。

修改\Lib\site-packages\playsound.py文件的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def winCommand(*command):
bufLen = 600
buf = c_buffer(bufLen)
#command = ' '.join(command).encode('utf-16') # 1.修改前
command = ' '.join(command) # 1.修改后
errorCode = int(windll.winmm.mciSendStringW(command, buf, bufLen - 1, 0)) # use widestring version of the function
if errorCode:
errorBuffer = c_buffer(bufLen)
windll.winmm.mciGetErrorStringW(errorCode, errorBuffer, bufLen - 1) # use widestring version of the function
exceptionMessage = ('\n Error ' + str(errorCode) + ' for command:'
#'\n ' + command.decode('utf-16') + # 2.修改前
'\n ' + command + # 2.修改后
'\n ' + errorBuffer.raw.decode('utf-16').rstrip('\0'))
logger.error(exceptionMessage)
raise PlaysoundException(exceptionMessage)
return buf.value

2.4 破解Google翻译

破解Google翻译的 py-googletrans 库,使用时需要联网(被墙,国内需要设置代理)

1
$ pip install googletrans

这个库的工作原理(摘自官方说明):

1
2
- 您可能想知道为什么这个库可以正常工作,而其他方法(例如 goslate)却不起作用,因为 Google 最近使用票证机制更新了其翻译服务,以防止大量爬虫程序。
- 我最终找到了一种方法,通过对 Google 用来生成此类令牌的混淆和缩小代码进行逆向工程来生成票证,并在 Python 之上实现。 但是,这可能随时被阻止。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
#-*- coding:utf-8 -*-

from googletrans import Translator
import os

os.environ["https_proxy"] = "http://127.0.0.1:1080"

translator = Translator()
result = translator.translate('hello world', dest='zh-cn').text
print(result)

注:单次请求的最大字符数为5000,超出的话可以拆分成多份,分开请求再对结果进行拼接。另外该破解方式随时可能会被阻止,如果想使用稳定的 API,建议使用 谷歌官方的翻译 API

2.5 对旧图片旧视频进行着色

2.5.1 DeOldify简介

DeOldify 是由 Jason Antic 开发和更新的。这是目前最先进的黑白图像、视频的着色方法,所有的东西都是开源的。

基本原理:它使用了一种名为NoGAN的新型GAN训练方法,该方法是作者自己开发的,用来解决在使用由一个鉴别器和一个生成器组成的正常对抗性网络架构进行训练时出现的主要问题。典型地,GAN训练同时训练鉴别器和生成器,生成器一开始是完全随机的,随着时间的推移,它会欺骗鉴别器,鉴别器试图辨别出图像是生成的还是真实的。

项目地址:https://github.com/jantic/DeOldify

效果演示:

DeOldify旧照片着色

2.5.2 Google Colab简介

Colaboratory 是一个 Google 研究项目,旨在帮助传播机器学习培训和研究成果。它是一个 Jupyter 笔记本环境,不需要进行任何设置就可以使用,并且完全在云端运行。

Colaboratory 笔记本存储在 Google 云端硬盘中,并且可以共享,就如同您使用 Google 文档或表格一样,Colaboratory 可免费使用。

利用Colaboratory ,可以方便的使用Keras,TensorFlow,PyTorch,OpenCV等框架进行深度学习应用的开发。

与其它云服务相比,最重要的特点是Colab提供GPU并完全免费,详细介绍及使用方法见 faq page

2.5.3 官方提供的在线服务及API

如果不想折腾的话,可以使用官方提供的 DeOldify Image Colorization on DeepAI,可以直接在这里上传图片对旧照片进行着色,同时该网站还提供了API,供程序中调用,下文可以不用看了。

2.5.4 DeOldify的预训练模型

预训练模型:DeOldify 是基于深度学习开发的,需要用到预训练权重,这里项目开发者已经把训练好的权重上传了,我们可以直接拿来使用,不需要我们再训练。

  • Artistic 权重,会使图片上色效果更大胆 一些,下载地址:

    1
    https://data.deepai.org/deoldify/ColorizeArtistic_gen.pth
  • Stable 权重,相对于 Artistic 上色效果更保守一些,下载地址:

    1
    https://www.dropbox.com/s/usf7uifrctqw9rl/ColorizeStable_gen.pth
  • Video 权重,此权重文件用来给视频上色,下载地址:

    1
    https://data.deepai.org/deoldify/ColorizeVideo_gen.pth

权重文件下载完毕后,在项目根目录下创建一个 models 文件夹,把下载好的权重文件放入 models 文件夹内即可。

2.5.5 使用Google Colab进行部署

由于运行深度学习的项目对机器性能要求较高,因此下文使用了官方提供的预训练模型,并白嫖 Google Colab 进行部署。DeOldify对旧照片、旧视频的着色的使用流程基本一致,只不过用到的预训练模型不同而已,以旧照片着色为例。

官方也提供了Google Colab,不过那个是英文版的,我没有尝试了,下面我用的是网上找的一份中文版的,将其保存到自己的Google Drive里,地址:https://drive.google.com/drive/folders/1G6nTfabx10P3nSzL5lN-SEnoM2Y0jeRh?usp=sharing

注:使用Google Colab需要翻墙,这个要保存到自己的云端硬盘里,我的你们是无法执行的。

DeOldify的Google-Colab

打开之后先去执行该代码块(悬浮即可显示执行按钮)

1
2
3
4
5
6
7
8
9
10
11
#点击左侧按钮一键配置环境
!git clone https://github.com/jantic/DeOldify.git DeOldify
%cd /content/DeOldify
!pip install -r /content/DeOldify/requirements.txt
import fastai
from deoldify.visualize import *

torch.backends.cudnn.benchmark = True
!mkdir 'models'
!wget https://data.deepai.org/deoldify/ColorizeArtistic_gen.pth -O ./models/ColorizeArtistic_gen.pth
colorizer = get_image_colorizer(artistic=True)

说明:预训练模型的地址如果失效了就自己找个吧,替换掉即可。如果要使用 Stable 权重,需要把下面改成False

1
2
Artistic 权重  -- colorizer = get_image_colorizer(artistic=True)
Stable 权重 -- colorizer = get_image_colorizer(artistic=False)

踩过的坑:第一次执行的时候可能会出现依赖安装失败的问题,不要慌。点击 RESTART RUNTIME 按钮,等一会儿再重新执行代码块,第二次应该就可以安装成功了,成功的话左侧有个绿色箭头。

DeOldify

安装成功环境以后,再在下面的 source_url 里填入旧照片链接(本地图片的话可以先上传到图床),然后点击左侧的执行按钮,等待一会儿即可生成着色后的照片。

DeOldify旧照片着色实践

注:如果你的旧照片里本身就有颜色的话,生成效果可能会不太好。因为它会先把原有颜色替换成黑白的,再根据算法生成新的颜色,会导致与原图的颜色不一致。如果你想要保持一致的话,就需要借助PS的蒙版进行二次处理了。

3. 传统类型算法及处理工具

3.1 文本关键词及概要提取

FastTextRank:从中文文本中提取摘要及关键词,并对算法时间复杂度进行了修改,计算图最大权节点的时间复杂度由o(n^2)降低到了o(n)。在有限的测试文本上,其运行速度相比于textrank4zh这个包快了8倍。算法原理见作者的知乎文章

依赖库安装:Numpy>=1.14.5 gensim>=3.5.0 FastTextRank==1.1

基本使用示例:KeyWord.py(提取关键字示例)、Sentence.py(提取摘要示例)

3.2 文本内容审查

Sensitive-word:收集的一些敏感词汇,细分了暴恐词库、反动词库、民生词库、色情词库、贪腐词库、其他词库等。

将词库放到./dict/目录下,一个分类一个txt文件,词库内容为一行一个敏感词,对输入文本使用jieba分词,撞词库判断是否敏感。

3.3 视频关键帧抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-

import cv2
import operator
import numpy as np
import os
from scipy.signal import argrelextrema

def smooth(x, window_len=13, window='hanning'):
"""使用具有所需大小的窗口使数据平滑。
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the begining and end part of the output signal.
该方法是基于一个标度窗口与信号的卷积。
通过在两端引入信号的反射副本(具有窗口大小)来准备信号,
使得在输出信号的开始和结束部分中将瞬态部分最小化。
input:
x: the input signal输入信号
window_len: the dimension of the smoothing window平滑窗口的尺寸
window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
平坦的窗口将产生移动平均平滑
output:
the smoothed signal平滑信号

example:
import numpy as np
t = np.linspace(-2,2,0.1)
x = np.sin(t)+np.random.randn(len(t))*0.1
y = smooth(x)

see also:
numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
scipy.signal.lfilter

TODO: 如果使用数组而不是字符串,则window参数可能是窗口本身
"""
print(len(x), window_len)
s = np.r_[2 * x[0] - x[window_len:1:-1],
x, 2 * x[-1] - x[-1:-window_len:-1]]

if window == 'flat': # moving average平移
w = np.ones(window_len, 'd')
else:
w = getattr(np, window)(window_len)
y = np.convolve(w / w.sum(), s, mode='same')
return y[window_len - 1:-window_len + 1]


class Frame:
"""class to hold information about each frame
用于保存有关每个帧的信息
"""

def __init__(self, id, diff):
self.id = id
self.diff = diff

def __lt__(self, other):
if self.id == other.id:
return self.id < other.id
return self.id < other.id

def __gt__(self, other):
return other.__lt__(self)

def __eq__(self, other):
return self.id == other.id and self.id == other.id

def __ne__(self, other):
return not self.__eq__(other)


def rel_change(a, b):
x = (b - a) / max(a, b)
print(x)
return x


def getEffectiveFrame(videopath, dir):
# 如果文件目录不存在则创建目录
if not os.path.exists(dir):
os.makedirs(dir)
(filepath, tempfilename) = os.path.split(videopath) # 分离路径和文件名
(filename, extension) = os.path.splitext(tempfilename) # 区分文件的名字和后缀
# Setting fixed threshold criteria设置固定阈值标准
USE_THRESH = False
# fixed threshold value固定阈值
THRESH = 0.8
# Setting fixed threshold criteria设置固定阈值标准
USE_TOP_ORDER = False
# Setting local maxima criteria设置局部最大值标准
USE_LOCAL_MAXIMA = True
# Number of top sorted frames排名最高的帧数
NUM_TOP_FRAMES = 50
# smoothing window size平滑窗口大小
len_window = int(50)
print("target video :" + videopath)
print("frame save directory: " + dir)
# load video and compute diff between frames加载视频并计算帧之间的差异
cap = cv2.VideoCapture(str(videopath))
prev_frame = None
frame_diffs = []
frames = []
success, frame = cap.read()
i = 0
while (success):
luv = cv2.cvtColor(frame, cv2.COLOR_BGR2LUV)
curr_frame = luv
if curr_frame is not None and prev_frame is not None:
# logic here
diff = cv2.absdiff(curr_frame, prev_frame) # 获取差分图
diff_sum = np.sum(diff)
diff_sum_mean = diff_sum / (diff.shape[0] * diff.shape[1]) # 平均帧
frame_diffs.append(diff_sum_mean)
frame = Frame(i, diff_sum_mean)
frames.append(frame)
prev_frame = curr_frame
i = i + 1
success, frame = cap.read()
cap.release()

# compute keyframe
keyframe_id_set = set()
if USE_TOP_ORDER:
# sort the list in descending order以降序对列表进行排序
frames.sort(key=operator.attrgetter("diff"), reverse=True) # 排序operator.attrgetter
for keyframe in frames[:NUM_TOP_FRAMES]:
keyframe_id_set.add(keyframe.id)
if USE_THRESH:
print("Using Threshold") # 使用阈值
for i in range(1, len(frames)):
if (rel_change(np.float(frames[i - 1].diff), np.float(frames[i].diff)) >= THRESH):
keyframe_id_set.add(frames[i].id)
if USE_LOCAL_MAXIMA:
print("Using Local Maxima") # 使用局部极大值
diff_array = np.array(frame_diffs)
sm_diff_array = smooth(diff_array, len_window) # 平滑
frame_indexes = np.asarray(argrelextrema(sm_diff_array, np.greater))[0] # 找极值
for i in frame_indexes:
keyframe_id_set.add(frames[i - 1].id) # 记录极值帧数

# save all keyframes as image将所有关键帧另存为图像
cap = cv2.VideoCapture(str(videopath))
success, frame = cap.read()
idx = 0
num = 0
while (success):
if idx in keyframe_id_set:
num = num + 1
name = filename + '_' + str(num) + ".jpg"
cv2.imwrite(dir + name, frame)
keyframe_id_set.remove(idx)
idx = idx + 1
success, frame = cap.read()
cap.release()


if __name__ == "__main__":
videopath = './data/demo.mp4' # Video path of the source file源文件的视频路径
dir = './data/keyframe/' # Directory to store the processed frames存储已处理帧的目录
getEffectiveFrame(videopath, dir)

3.4 图片文件处理

3.4.1 压缩图片大小

以下是python+opncv实现图片压缩的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# -*- coding: utf-8 -*-

import os
import cv2

"""
# Features:使用 opencv 实现图片压缩,compress_config为图片压缩配置,说明如下:
# [cv2.IMWRITE_PNG_COMPRESSION, 9] 无损压缩(取值范围:0~9,数值越小,压缩比越低)
# [cv2.IMWRITE_JPEG_QUALITY, 80] 有损压缩(取值范围:0~100,数值越小,压缩比越高,图片质量损失越严重)
"""
class Compress_img:

def __init__(self, img_path, compress_config):
self.img_path = img_path
self.img_name = img_path.split('/')[-1]
self.compress_config = compress_config

def compress_img_CV(self, show=False):
old_fsize = os.path.getsize(self.img_path)
# 读取并压缩图片
img_resize = cv2.imread(self.img_path)
cv2.imwrite(self.img_path, img_resize, self.compress_config)
new_fsize = os.path.getsize(self.img_path)
# 计算压缩率
compress_rate = str(round(new_fsize / old_fsize * 100, 2)) + "%"
print("%s 图片已压缩," % (self.img_path), "压缩率为:", compress_rate)
# 查看压缩后的图片
if show:
cv2.imshow(self.img_name, img_resize)
cv2.waitKey(0)

if __name__ == '__main__':

img_path = './test.jpg'
compress_para = [cv2.IMWRITE_PNG_COMPRESSION, 9]
compress = Compress_img(img_path, compress_para)
compress.compress_img_CV()

注:OpenCV无法读取中文路径文件,请使用全英文路径。

3.4.2 图片添加盲水印

如果你想保护自己的原创图片,那最好的方式就是为图片添加盲水印,盲水印就是图片有水印但人眼看不出来,需要通过程序才能提取水印,相当于隐形“盖章”,可以用在数据泄露溯源、版权保护等场景。下面使用阿里巴巴安全团队出品的 blind_watermark 库对图片添加盲水印。

[1] 添加文本水印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from blind_watermark import WaterMark

# 设置密码,默认是 1
bwm1 = WaterMark(password_img=1, password_wm=1)
# 读取原始图片
bwm1.read_img('input/001.jpg')
# 定义水印文本
wm = '@eula.club'
# 合并文本并输出新的图片
bwm1.read_wm(wm, mode='str')
bwm1.embed('output/001.jpg')
# 输出结果
len_wm = len(bwm1.wm_bit)
print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm))

[2] 提取文本水印

1
2
3
bwm1 = WaterMark(password_img=1, password_wm=1)
wm_extract = bwm1.extract('output/001.jpg', wm_shape=len_wm, mode='str')
print(wm_extract)

注:该库还支持添加和提取图片形式的盲水印,而能添加多大的盲水印图片取决于原始图片,不可超过其大小,不便于批量处理,在此就不放示例了。

3.4.3 获取图片缩略图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# -*- coding: utf-8 -*-

from PIL import Image


def get_thumbnail_pic(input_img_path, output_img_path):
im = Image.open(input_img_path)
im.thumbnail((80, 80))
print(im.format, im.size, im.mode)
im.save(output_img_path, 'JPEG')


if __name__=='__main__':
input_img_path = './input/001.jpg'
output_img_path = './output/001.jpeg'
get_thumbnail_pic(input_img_path, output_img_path)

3.5 将网页保存成pdf文件

3.5.1 基本概念简介

[1] pyppeteer简介

Headless chrome/chromium 自动化库(是 puppeteer 无头 Chrome Node.js API 的Python版非官方库),可用于网页截图导出pdf。

项目地址:https://github.com/pyppeteer/pyppeteer

官方文档:https://pyppeteer.github.io/pyppeteer/

puppeteer 和 pyppeteer 的区别:pyppeteer 努力尽可能地复制 puppeteer API,但是,Javascript 和 Python 之间的根本差异使得这很难精确地做到,具体细节对比官方文档。

[2] 无头浏览器简介

无头浏览器指的是没有图形用户界面的浏览器,它可以通过命令行界面或使用网络通信来提供对网页的自动控制。对于测试网页特别有用,因为它们能够像浏览器一样呈现和理解超文本标记语言,包括页面布局、颜色、字体选择以及JavaScript和AJAX的执行等样式元素,这些元素在使用其他测试方法时通常是不可用的。

无头浏览器通常用来:Web应用程序中的测试自动化、拍摄网页截图、对JavaScript库运行自动化测试、收集网站数据、自动化网页交互。

3.5.2 使用Flask进行封装

本文示例代码已在GitHub上开源,地址:https://github.com/Logistic98/pyppeteer-url2pdf

[1] 封装代码

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-

import json
import time
from uuid import uuid1
from flask import Flask, jsonify, request
from flask_cors import CORS
import os
import asyncio

from log import logger
from responseCode import ResponseCode, ResponseMessage
from utils import url_save_pdf, download_file

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 将任意公开访问的url转化为pdf提供下载
"""
@app.route(rule='/api/pyppeteer/urlSavePdf', methods=['POST'])
def urlToPdf():
# 获取JSON格式的请求体,并解析
request_data = request.get_data(as_text=True)
request_body = json.loads(request_data)

# 参数校验模块
url = request_body.get("url")
if not url:
fail_response = dict(code=ResponseCode.RARAM_FAIL, msg=ResponseMessage.RARAM_FAIL, data=None)
logger.error(fail_response)
return jsonify(fail_response)
pdf_name = request_body.get("pdf_name")
if not pdf_name:
pdf_name = '{}.pdf'.format(uuid1())
'''
resolution: 设置网页显示尺寸
width: 网页显示宽度
height: 网页显示高度
'''
resolution = request_body.get("resolution")
if not resolution:
resolution = {"width": 1920, "height": 1680}
'''
clip: 位置与图片尺寸信息
x: 网页截图的起始x坐标
y: 网页截图的起始y坐标
width: 图片宽度
height: 图片高度
'''
clip = request_body.get("clip")
if not clip:
clip = {"width": 1920, "height": 1680}

# 创建pdf的存储目录
now_str = time.strftime("%Y%m%d", time.localtime())
pdf_root_path = './tmp/'
pdf_base_path = pdf_root_path + now_str
if not os.path.exists(pdf_base_path):
os.makedirs(pdf_base_path)
pdf_path = pdf_base_path + '/' + pdf_name

# 将url保存成pdf文件
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(url_save_pdf(url, pdf_path, resolution, clip))
logger.info("成功将【{}】网址保存成pdf文件【{}】!".format(url, pdf_path))
except Exception as e:
logger.error(e)
fail_response = dict(code=ResponseCode.BUSINESS_FAIL, msg=ResponseMessage.BUSINESS_FAIL, data=None)
logger.error(fail_response)
return jsonify(fail_response)

# 将pdf文件转成文件流提供下载
return download_file(pdf_path)


if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务,指定主机和端口
app.run(host='0.0.0.0', port=5006, debug=False, threaded=True)

utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# -*- coding: utf-8 -*-

import os
import urllib.parse

from PIL import Image
from pyppeteer import launch
from reportlab.pdfgen.canvas import Canvas
from reportlab.lib.utils import ImageReader
import imageio.v2 as imageio
from flask import Response


# 指定url区域截屏保存成pdf
async def url_save_pdf(url, pdf_path, resolution, clip):
start_parm = {
# 下列三个参数用于解决 Flask 运行 Pyppeteer 报错 "signal only works in main thread"
"handleSIGINT": False,
"handleSIGTERM": False,
"handleSIGHUP": False,
"headless": True, # 关闭无头浏览器
"args": [
'--no-sandbox', # 关闭沙盒模式
],
}
browser = await launch(**start_parm)
page = await browser.newPage()
# 加载指定的网页url
await page.goto(url)
# 设置网页显示尺寸
await page.setViewport(resolution)
# 设置截屏区域
if 'x' not in clip or 'y' not in clip:
await page.pdf({'path': pdf_path, 'width': clip['width'], 'height': clip['height']})
await browser.close()
else:
img_data = await page.screenshot({'clip': clip})
img_data_array = imageio.imread(img_data, format="png")
im = Image.fromarray(img_data_array)
page_width, page_height = im.size
c = Canvas(pdf_path, pagesize=(page_width, page_height))
c.drawImage(ImageReader(im), 0, 0)
c.save()


# 检验是否含有中文字符
def is_contains_chinese(strs):
for _char in strs:
if '\u4e00' <= _char <= '\u9fa5':
return True
return False


# 将文件转成文件流提供下载
def download_file(file_path):

# 文件路径、文件名、后缀分割
file_dir, file_full_name = os.path.split(file_path)
file_name, file_ext = os.path.splitext(file_full_name)

# 文件名如果包含中文则进行编码
if is_contains_chinese(file_name):
file_name = urllib.parse.quote(file_name)
new_file_name = file_name + file_ext

# 流式读取下载
def send_file():
with open(file_path, 'rb') as targetfile:
while 1:
data = targetfile.read(20 * 1024 * 1024) # 每次读取20M
if not data:
break
yield data
response = Response(send_file(), content_type='application/octet-stream')
response.headers["Content-disposition"] = 'attachment; filename=%s' % new_file_name
return response

response.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-

from responseCode import ResponseMessage, ResponseCode


class ResMsg(object):
"""
封装响应文本
"""
def __init__(self, data=None, code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS):
self._data = data
self._msg = msg
self._code = code

def update(self, code=None, data=None, msg=None):
"""
更新默认响应文本
:param code:响应状态码
:param data: 响应数据
:param msg: 响应消息
:return:
"""
if code is not None:
self._code = code
if data is not None:
self._data = data
if msg is not None:
self._msg = msg

def add_field(self, name=None, value=None):
"""
在响应文本中加入新的字段,方便使用
:param name: 变量名
:param value: 变量值
:return:
"""
if name is not None and value is not None:
self.__dict__[name] = value

@property
def data(self):
"""
输出响应文本内容
:return:
"""
body = self.__dict__
body["data"] = body.pop("_data")
body["msg"] = body.pop("_msg")
body["code"] = body.pop("_code")
return body

responseCode.py

1
2
3
4
5
6
7
8
9
10
11
12
13
# -*- coding: utf-8 -*-


class ResponseCode(object):
SUCCESS = 200
RARAM_FAIL = 400
BUSINESS_FAIL = 500


class ResponseMessage(object):
SUCCESS = "请求成功"
RARAM_FAIL = "参数校验失败"
BUSINESS_FAIL = "业务处理失败"

log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./server.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

[2] 注意事项

1)初次执行时会自动下载Chromium。

2)Flask 运行 Pyppeteer 报错 “signal only works in main thread”

解决办法:将handleSIGINT、handleSIGTERM、handleSIGHUP设置为False。

1
2
3
4
5
6
7
8
9
10
start_parm = {
# 下列三个参数用于解决 Flask 运行 Pyppeteer 报错 "signal only works in main thread"
"handleSIGINT": False,
"handleSIGTERM": False,
"handleSIGHUP": False,
"headless": True, # 关闭无头浏览器
"args": [
'--no-sandbox', # 关闭沙盒模式
],
}

3.5.3 使用Docker进行部署

[1] 安装Docker环境

Debian11系统:

1
2
3
4
$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh - # 安装docker
$ sudo systemctl start docker # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)

[2] 导出项目依赖

使用pipreqs导出依赖,使用pipreqs库导出本项目的依赖,生成requirements.txt文件。

1
2
3
$ pip install pipreqs
$ cd /root/test-project // 切换到项目根目录
$ pipreqs ./ --encoding=utf8 // 需要带上编码的指定,否则会报GBK编码错误

注意这里还有个坑如下,这是因为本机开了翻墙代理导致的,把代理软件关了就好了。

1
requests.exceptions.SSLError: HTTPSConnectionPool(host='pypi.python.org', port=443): Max retries exceeded with url: /pypi/cv2/json (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1125)')))`

导出的依赖 requirements.txt:

1
2
3
4
5
6
Flask==2.1.2
Flask_Cors==3.0.10
imageio==2.19.3
Pillow==9.1.1
pyppeteer==1.0.2
reportlab==3.6.10

3.5.4 编写Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
FROM python:3.8.8-slim

# python:3.8-slim 是基于 Debian GNU/Linux 10 (buster) 制作的
# 设置 Debian 清华源 https://mirrors.tuna.tsinghua.edu.cn/help/debian/(可选)
# RUN mv /etc/apt/sources.list /etc/apt/sources.list_bak && \
# echo '# 默认注释了源码镜像以提高 apt update 速度,如有需要可自行取消注释' >> /etc/apt/sources.list && \
# echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian/ buster main contrib non-free' >> /etc/apt/sources.list && \
# echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian/ buster main contrib non-free' >> /etc/apt/sources.list && \
# echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main contrib non-free' >> /etc/apt/sources.list && \
# echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main contrib non-free' >> /etc/apt/sources.list && \
# echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-backports main contrib non-free' >> /etc/apt/sources.list && \
# echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-backports main contrib non-free' >> /etc/apt/sources.list && \
# echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main contrib non-free' >> /etc/apt/sources.list && \
# echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main contrib non-free' >> /etc/apt/sources.list
# 下载无头 Chrome 依赖,参考:https://github.com/puppeteer/puppeteer/blob/main/docs/troubleshooting.md#chrome-headless-doesnt-launch-on-unix=
RUN apt-get update && apt-get -y install apt-transport-https ca-certificates libnss3 xvfb gconf-service libasound2 \
libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgbm1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 \
libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 \
libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 \
ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget && rm -rf /var/lib/apt/lists/*

# 安装常用调试命令(可选)
RUN apt-get install iputils-ping -y # 安装ping
RUN apt-get install -y wget # 安装wget
RUN apt-get install curl -y # 安装curl
RUN apt-get install vim -y # 安装vim
RUN apt-get install lsof # 安装lsof

# 安装msyh.ttc字体解决中文乱码问题
# 来源:https://github.com/owent-utils/font/raw/master/%E5%BE%AE%E8%BD%AF%E9%9B%85%E9%BB%91/MSYH.TTC
RUN cp msyh.ttc /usr/share/fonts/

# 使用淘宝镜像加速下载 chromium(可选)
# ENV PYPPETEER_DOWNLOAD_HOST=https://npm.taobao.org/mirrors
# 设置 chromium 版本,发布日期为: 2021-02-26T08:47:06.448Z
ENV PYPPETEER_CHROMIUM_REVISION=856583

# 拷贝代码到容器内
RUN mkdir /code
ADD src /code/
WORKDIR /code

# 安装项目所需的 Python 依赖
RUN pip install -r requirements.txt

# 放行端口
EXPOSE 5006
# 启动项目
ENTRYPOINT ["nohup","python","server.py","&"]

注意事项

[1] 无头浏览器依赖问题

原始镜像里缺失很多无头浏览器的依赖环境,详见:puppeteer Troubleshooting 官方文档

1
2
3
4
5
apt-get update && apt-get -y install apt-transport-https ca-certificates libnss3 xvfb gconf-service libasound2  \
libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgbm1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 \
libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 \
libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 \
ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget && rm -rf /var/lib/apt/lists/*

[2] 中文字体无法解析问题

原始镜像内无中文字体,会出现中文字体无法解析的问题,下载 msyh.ttc 字体放到 /usr/share/fonts/ 目录里即可。

3.5.5 在服务器上部署项目

[1] 编写部署脚本

build.sh

1
2
3
docker build -t pyppeteer-image .
docker run -d -p 5006:5006 --name pyppeteer pyppeteer-image:latest
docker update pyppeteer --restart=always

[2] 项目部署结构

项目部署目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
.
├── Dockerfile
├── README.md
├── build.sh
└── src
├── code.py
├── log.py
├── requirements.txt
├── msyh.ttc
├── response.py
├── server.py
└── utils.py

[3] 部署项目服务

将部署包整个上传到服务器上,切换到部署包的根目录。

1
2
$ chmod u+x build.sh
$ ./build.sh

3.5.6 验证部署

[1] 接口文档

请求路径:/api/pyppeteer/urlSavePdf

请求方法:POST请求

请求参数:

1
2
3
4
5
6
7
8
9
10
11
url:可公开访问的目标网址(必填,不能是那种需要登录权限的)
pdf_name:下载的pdf文件名称(非必填,默认值是uuid命名的pdf文件)
clip: 位置与图片尺寸信息(非必填,默认值{"width": 1920, "height": 1680}
x: 网页截图的起始x坐标
y: 网页截图的起始y坐标
width: 图片宽度
height: 图片高度
注释:只传width、height的时候为整页导出,传x,y,width、height的时候为区域导出
resolution: 设置网页显示尺寸 (非必填,默认值{"width": 1920, "height": 1680}
width: 网页显示宽度
height: 网页显示高度

请求示例:

1
2
3
4
5
6
{
"url":"https://www.google.com",
"pdf_name":"test.pdf",
"resolution": {"width": 1920, "height": 1680},
"clip": {"x": 0, "y": 0, "width": 1920, "height": 1680}
}

接口返回:以文件流的形式提供pdf文件下载

[2] 测试请求接口

1
$ curl -v -X POST http://127.0.0.1:5006/api/pyppeteer/urlSavePdf -H "Content-type: application/json" -d'{"url":"https://www.google.com","pdf_name":"test.pdf","resolution": {"width": 1920, "height": 1680},"clip": {"x": 0, "y": 0, "width": 1920, "height": 1680}}' >> test.pdf

4. Python工具函数

4.1 解析Excel和CSV文件

需要安装的依赖库

1
2
3
$ pip install xlrd==1.2.0
$ pip install xlwt
$ pip install pandas

注意事项:

[1] 新版 xlrd 报 Excel xlsx file; not supported错误(原因:xlrd更新到了2.0.1版本,只支持.xls文件,不支持.xlsx)

[2] Python3.9使用xlrd时报错:AttributeError: ‘ElementTree’ object has no attribute ‘getiterator’

找到xlrd依赖源码里的 xlsx.py 文件,将两个地方的 getiterator() 修改成 iter()。

4.1.1 Excel与CSV转字典列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Excel转字典列表
def excel_to_dict(path):

# 判断是否为文件路径
if os.path.exists(path):
workbook = xlrd.open_workbook(path)
else:
workbook = xlrd.open_workbook(filename=path.name, file_contents=path.read())

# 根据sheet索引或者名称获取sheet内容
data_sheet = workbook.sheets()[0]
# 获取sheet名称,行数,列数据
sheet_nrows = data_sheet.nrows
sheet_ncols = data_sheet.ncols

# excel转dict
get_data = []
for i in range(1, sheet_nrows):
# 定义一个空字典
sheet_data = {}
for j in range(sheet_ncols):
# 获取单元格数据
c_cell = data_sheet.cell_value(i, j)
# 循环每一个有效的单元格,将字段与值对应存储到字典中
sheet_data[data_sheet.row_values(0)[j]] = c_cell
# 再将字典追加到列表中
get_data.append(sheet_data)
# 返回从excel中获取到的数据:以列表存字典的形式返回
return get_data


# CSV转字典列表
def csv_to_dict(path):
get_data = []
with open(path, 'r',encoding="GBK") as f:
reader = csv.reader(f)
fieldnames = next(reader)
csv_reader = csv.DictReader(f, fieldnames=fieldnames)
for row in csv_reader:
d = {}
for k, v in row.items():
d[k] = v
get_data.append(d)
return get_data

4.1.2 字典列表转Excel与CSV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 将字典列表导出xls
def export_xls(path, dic_data):
data_list = []
# 循环得到每一个data
for data in dic_data:
# 循环得到data字典里的所有键值对的值
for value in data.values():
# 将得到的值放入空列表中
data_list.append(value)
# 创建一个新的列表生成式并赋给一个变量new_list.
# 这个列表生成式主要是将数据每3个为一个新的元素存入新的列表中,即列表套列表
new_list = [data_list[i:i + 3] for i in range(0, len(data_list), 3)]
# 生成一个xlwt.Workbook对象
xls = xlwt.Workbook()
# 调用对象的add_sheet方法
sheet = xls.add_sheet('Sheet1', cell_overwrite_ok=True)
# 创建我们需要的第一行的标头数据
heads = ['id', 'message', 'result']
ls = 0
# 将标头循环写入表中
for head in heads:
sheet.write(0, ls, head)
ls += 1
i = 1
# 将数据分两次循环写入表中 外围循环行
for list in new_list:
j = 0
# 内围循环列
for data in list:
sheet.write(i, j, data)
j += 1
i += 1
# 最后将文件save保存
xls.save(path)


# 将字典列表导出xlsx
def export_xlsx(path, dic_data):
# 将字典列表转换为DataFrame
pf = pd.DataFrame(list(dic_data))
# 指定字段顺序
order = ['id', 'message', 'result']
pf = pf[order]
# 指定生成的Excel表格名称
file_path=pd.ExcelWriter(path)
# 替换空单元格
pf.fillna(' ', inplace=True)
# 输出
pf.to_excel(file_path, encoding='utf-8', index=False)
# 保存表格
file_path.save()


# 将字典列表导出csv
def export_csv(path, dic_data):
with open(path, 'w', newline='') as f:
fieldnames = ['id', 'message', 'result']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for item in dic_data:
writer.writerow(item)

4.1.3 其他操作Excel和CSV的示例

[1] 读写操作xlsx示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding: utf-8 -*-

import openpyxl
import xlrd

# 将数据写入到xlsx文件里(新创建文件)
def write_xlsx(path, sheetname, value):
index = len(value)
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = sheetname
for i in range(0, index):
for j in range(0, len(value[i])):
sheet.cell(row=i+1, column=j+1, value=str(value[i][j]))
workbook.save(path)

# 将数据追加写入到xlsx文件里(已有文件追加写入)
def append_write_xlsx(path, sheetname, value):
workbook = openpyxl.load_workbook(path)
sheet = workbook[sheetname]
sheet.append(value)
workbook.save(path)

# 读取xlsx文件信息
def read_xlsx(path, sheetname):
wb = xlrd.open_workbook(path)
sh = wb.sheet_by_name(sheetname)
result = {}
for i in range(1, sh.nrows):
result[sh.row_values(i)[0]] = sh.row_values(i)[1]
return result

if __name__ == '__main__':

path = './test.xlsx'
sheetname = '测试'
head_value = [['id', 'name']]
body_value = ['001', 'zhangsan']

write_xlsx(path, sheetname, head_value)
append_write_xlsx(path, sheetname, body_value)
result = read_xlsx(path, sheetname)
print(result)

[2] 新建csv文件并写入数据

1
2
3
4
5
6
7
import csv
def create_csv():
csv_path = "./test.csv"
with open(csv_path,'w', newline='', encoding='GBK') as f:
csv_write = csv.writer(f)
csv_head = ["good","bad"]
csv_write.writerow(csv_head)

注:newline=''是为了解决csv的隔行空行问题。选择GBK编码,否则使用Excel打开会出现乱码问题。

[3] 操作csv文件实现对特定列排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def sort_csv(csv_path):
datas = [] # 用于存放排序过的数据
with open(csv_path, 'r', encoding='GBK') as f:
table = []
index = 0
for line in f:
index = index + 1
if index == 1:
continue
col = line.split(',')
col[1] = int(col[1].strip("\n"))
table.append(col)
table_sorted = sorted(table, key=itemgetter(1), reverse=True) # 精确的按照第2列排序
for row in table_sorted:
datas.append(row)
f.close()
with open(csv_path, "w", newline='', encoding='GBK') as csvfile:
writer = csv.writer(csvfile)
csv_head = ["关键词", "词频"]
writer.writerow(csv_head)
for data in datas:
writer.writerow(data)
csvfile.close()

4.2 读写配置文件

4.2.1 读取JSON文件里的配置信息

配置文件config.json:

1
2
3
4
5
{
"DB_URL": "127.0.0.1:1521/orcl",
"DB_USER": "test",
"DB_PASSWORD": "123456"
}

工具函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
import json

# 将字典写入json
def dict_to_json_file(dict, path):
with open(path, "w", encoding="utf-8") as f:
json.dump(dict, f)

# 读取json为字典
def read_json_to_dict(path):
with open(path, "r", encoding="utf-8") as f:
confstr = f.read()
conf = json.loads(confstr)
return conf

调用示例:

1
2
3
conf_path = './config/config.json'
conf = read_json_to_dict(conf_path)
conn = cx_Oracle.connect(conf['DB_USER'], conf['DB_PASSWORD'], conf['DB_URL'])

注意事项:JSON文件不要同时进行读写,写入时可能会出现无法解析导致读取失败的情况。

4.2.2 读写INI文件里的配置信息

配置文件config.ini:

1
2
3
4
5
6
[SOURCE_ES]
host = 111.111.111.111
port = 9200
user = elastic
password = elastic
timeout = 60

读取Section内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from configparser import ConfigParser

def read_config():
cfg = ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
host = cfg.get('TARGET_ES', 'host')
port = cfg.get('TARGET_ES', 'port')
user = cfg.get('TARGET_ES', 'user')
password = cfg.get('TARGET_ES', 'password')
timeout = cfg.get('TARGET_ES', 'timeout')
es_dict = {}
es_dict['host'] = host
es_dict['port'] = port
es_dict['user'] = user
es_dict['password'] = password
es_dict['timeout'] = timeout
return es_dict

修改Section内容:

1
2
3
4
cfg = ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
cfg.set("SOURCE_ES", "timeout", "3600")
cfg.write(open('./config.ini', "r+", encoding='utf-8'))

新增Section内容:

1
2
3
4
5
6
cfg = ConfigParser()
cfg.add_section("TARGET_ES")
cfg.set("TARGET_ES", "host", "222.222.222.222")
cfg.set("TARGET_ES", "port", "9201")
# cfg.write(open('./config.ini', "w")) # 删除原文件重新写入
cfg.write(open('./config.ini', "a")) # 追加模式写入

删除Section内容:

1
2
3
4
5
cfg = ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
# cfg.remove_option('TARGET_ES', "host") # 删除Section下的某项
cfg.remove_section('TARGET_ES') # 删除整个Section模块
cfg.write(open('./config.ini', "w"))

4.2.3 读写txt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# -*- coding: utf-8 -*-

# 按行追加写入txt文件(没有文件会新创建文件)
def write_content_to_txt(txt_path, content):
a = open(txt_path, 'a')
a.write(content + '\n')
a.close()

# 按行读取txt文件的内容,保存成列表
def read_txt_to_list(txt_path):
result = []
with open(txt_path, 'r') as f:
for line in f:
result.append(line.strip('\n'))
return result

if __name__ == '__main__':
txt_path = './test.txt'
write_content_to_txt(txt_path, 'zhangsan')
write_content_to_txt(txt_path, 'lisi')
result = read_txt_to_list(txt_path)
print(result)

4.2.4 生成xml文件

generate_xml.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# -*- coding: utf-8 -*-

from xml.etree.ElementTree import Element
from xml.etree.ElementTree import SubElement
from xml.etree.ElementTree import ElementTree

# 美化xml:elemnt为传进来的Elment类,参数indent用于缩进,newline用于换行
def pretty_xml(element, indent, newline, level=0):
# 判断element是否有子元素
if element:
# 如果element的text没有内容
if element.text == None or element.text.isspace():
element.text = newline + indent * (level + 1)
else:
element.text = newline + indent * (level + 1) + element.text.strip() + newline + indent * (level + 1)
temp = list(element) # 将elemnt转成list
for subelement in temp:
# 如果不是list的最后一个元素,说明下一个行是同级别元素的起始,缩进应一致
if temp.index(subelement) < (len(temp) - 1):
subelement.tail = newline + indent * (level + 1)
else: # 如果是list的最后一个元素, 说明下一行是母元素的结束,缩进应该少一个
subelement.tail = newline + indent * level
# 对子元素进行递归操作
pretty_xml(subelement, indent, newline, level=level + 1)


if __name__ == '__main__':

# generate root node
root = Element('root')
# generate first child-node head
head = SubElement(root, 'head')
# child-node of head node
title = SubElement(head, 'title')
title.text = "Title"
# generate second child-node body
body = SubElement(root, 'body')
body.text = "Content"
tree = ElementTree(root)

root = tree.getroot() # 得到根元素,Element类
pretty_xml(root, '\t', '\n') # 执行美化方法

# write out xml data
tree.write('result.xml', encoding = 'utf-8')

生成效果:

1
2
3
4
5
6
<root>
<head>
<title>Title</title>
</head>
<body>Content</body>
</root>

4.2.5 解析yaml格式文件

将yaml文件转字典

1
2
3
4
5
import yaml

f = open('./config.yaml', 'r')
yaml_str = f.read()
config_dict = yaml.load(yaml_str, Loader=yaml.FullLoader)

将字典转成对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Dict(dict):
__setattr__ = dict.__setitem__
__getattr__ = dict.__getitem__

def dict2obj(dictObj):
if not isinstance(dictObj, dict):
return dictObj
d = Dict()
for k, v in dictObj.items():
d[k] = dict2obj(v)
return d

# 测试数据
params = {
"name": "login",
"params": {
"transactionId": "cc258bdb3dd4d6bba2",
"platformType": "第三方平台",
"uid": 9
}
}

# 转换字典成为对象,可以用"."方式访问对象属性
res = dict2obj(params)
print(res.name)
print(res.params.uid)

4.3 列表数组字符串的处理

4.3.1 列表元素去重及统计出现次数

两个列表求差集并去重

1
2
3
4
# 两个列表求差集,在B中但不在A中(注:重复元素也会被去除)
def list_diff(listA, listB):
result = list(set(listB).difference(set(listA)))
return result

列表元素直接去重

1
2
3
4
5
6
# 列表元素直接去重
old_list = [2, 1, 3, 4, 1]
new_list = list(set(old_list))
print(new_list)

>>> [1,2,3,4]

统计列表中各个元素出现的次数

1
2
3
4
5
6
7
8
from collections import Counter

# 统计列表中各个元素出现的次数
test_list = [1, 2, 3, 1, 1, 2]
result = Counter(test_list)
print(result)

>>>{1: 3, 2: 2, 3: 1}

4.3.2 逗号分隔的字符串与列表互转

逗号分隔字符串转列表

1
2
3
>>> mStr = '192.168.1.1,192.168.1.2,192.168.1.3'
>>> mStr.split(",")
['192.168.1.1', '192.168.1.2', '192.168.1.3']

列表转逗号分隔字符串

1
result = ",".join(str(i) for i in result_list)

4.3.3 比较数组是否完全相等

1
2
3
4
5
6
7
import numpy as np

a = np.array([1,2,3])
b = np.array([1,2,3])
print((a==b).all())

>>> True

4.3.4 实现replaceAll功能

1
2
3
4
5
# 实现replaceAll的功能
def replaceAll(input, toReplace, replaceWith):
while (input.find(toReplace) > -1):
input = input.replace(toReplace, replaceWith)
return input

处理空白字符:

1
2
# 处理空白字符
text = replaceAll(replaceAll(replaceAll(replaceAll(text, '\r', ' '), '\n', ' '), '\u3000', ' '), '\x01', ' ')

4.3.5 将List拆分成若干个指定长度的小List

1
2
3
4
5
6
7
8
9
10
#  将List拆分成若干个指定长度的小List
def list_of_groups(list, length):
return [list[i:i + length] for i in range(0, len(list), length)]

list = [i for i in range(15)]
length = 2
result = list_of_groups(list, length)
print(result)

>>> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14]]

4.3.6 按指定长度分段切割字符串或列表

1
2
3
# 按指定长度分段切割字符串或列表
def cut(obj, sec):
return [obj[i:i+sec] for i in range(0,len(obj),sec)]

4.3.7 字符串四舍五入保留两位小数

1
2
3
4
5
from decimal import Decimal

# 字符串四舍五入保留两位小数
def str_get_two_decimal(str):
return Decimal(str).quantize(Decimal('0.00'))

4.3.8 将两个相同长度的List转字典

1
2
3
4
5
6
7
8
keys = ['a', 'b', 'c']
values = [1, 2, 3]
dictionary = dict(zip(keys, values))
print(dictionary)

>>> {'a': 1, 'c': 3, 'b': 2}

https://www.eula.club/%E4%BD%BF%E7%94%A8pyppeteer%E5%B0%86%E7%BD%91%E9%A1%B5%E4%BF%9D%E5%AD%98%E6%88%90pdf%E6%96%87%E4%BB%B6.html

4.3.9 检查字符串里的中文字符

1
2
3
4
5
6
7
8
9
10
11
12
13
# 检验是否全是中文字符
def is_all_chinese(strs):
for _char in strs:
if not '\u4e00' <= _char <= '\u9fa5':
return False
return True

# 检验是否含有中文字符
def is_contains_chinese(strs):
for _char in strs:
if '\u4e00' <= _char <= '\u9fa5':
return True
return False

4.3.10 浏览器URL编码以及反编码

1
2
3
4
5
6
7
8
9
import urllib.parse

test_str = '测试 文本'
# 首先模仿浏览器生产的编码格式,不管是中文或者空格都会转码
str_encode = urllib.parse.quote(test_str)
print(str_encode)
# 使用 unquote 进行反编码,这个步骤基本都是服务端接受浏览器传递的数据时候处理
str_decode = urllib.parse.unquote(str_encode)
print(str_decode)

4.3.11 去除列表的最后一个元素

pop方法和del方法如果对空列表进行操作,会报错中断执行,切片方法不会因此报错,继续保持空列表向下运行

1)pop方法

1
2
3
4
5
list = [1,2,3,4]
list.pop()
print(list)

>>> [1, 2, 3]

2)del方法

1
2
3
4
5
list = [1,2,3,4]
del(list[-1])
print(list)

>>> [1, 2, 3]

3)切片

1
2
3
4
5
list = [1,2,3,4]
list = list[0:-1]
print(list)

>>> [1, 2, 3]

4.3.12 查找字符串里所有子串位置

1
2
3
4
5
6
7
8
9
# 查找所有子串位置
def find_all(sub_str, str):
index_list = []
if str is not None and str != "" and sub_str is not None and sub_str != "":
index = str.find(sub_str)
while index != -1:
index_list.append(index)
index = str.find(sub_str, index + 1)
return index_list

4.4 系统与文件目录的基本操作

4.4.1 基本文件和目录操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
import shutil

os.getcwd() # 获取当前路径
os.listdir() # 将当前目录的文件及目录保存成一个列表
os.path.exists(dir_path) # 检查文件或目录是否存在
os.makedirs(dir_path) # 创建目录
os.chdir(dir_path) # 切换目录
os.remove(file_path) # 删除指定文件
os.removedirs(dir_path) # 删除空目录
shutil.rmtree(dir_path) # 递归删除目录(可为空,也可不为空)
os.path.abspath(dir_path) # 相对路径对应的绝对路径
os.chdir(dir_path) # 更改路径
os.path.isdir(path) # 判断路径是否是目录
os.path.isfile(path) # 判断路径是否是文件

4.4.2 复制某个文件并重命名

1
2
3
4
5
6
7
# 复制某个文件并重命名:sample-原始文件路径、new_path-新文件目录、file_name-新文件名称
def copy_rename_file(sample,new_path,file_name):
if not os.path.exists(new_path):
os.makedirs(new_path)
new_file = os.path.join(new_path, file_name)
shutil.copy(sample, new_file)
return new_file

4.4.3 获取文件大小及创建、修改时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding: utf-8 -*-

import time
import os

# 把时间戳转化为时间
def TimeStampToTime(timestamp):
timeStruct = time.localtime(timestamp)
return time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)

# 获取文件的大小,结果保留两位小数,单位为KB
def get_FileSize(filePath):
fsize = os.path.getsize(filePath)
fsizeFormat = round(fsize / float(1024), 2)
return fsizeFormat

# 获取文件的创建时间
def get_FileCreateTime(filePath):
t = os.path.getctime(filePath)
return TimeStampToTime(t)

# 获取文件的修改时间
def get_FileModifyTime(filePath):
t = os.path.getmtime(filePath)
return TimeStampToTime(t)

4.4.4 检查路径是否有中文

1
2
3
4
5
# 检查路径是否有中文
zhmodel = re.compile(u'[\u4e00-\u9fa5]')
match = zhmodel.search(path)
if match:
print("The path cannot contain Chinese!")

4.4.5 读取指定目录的所有文件夹保存成列表

1
2
3
4
# 读取指定目录下的所有文件夹保存成列表
def read_dir_to_list(file_dir_path):
file_dir_list = os.listdir(file_dir_path)
return file_dir_list

4.4.6 遍历目录,获取目录下的所有文件路径

1
2
3
4
5
6
7
8
9
10
11
import os

# 级联遍历目录,获取目录下的所有文件路径
def find_filepaths(dir):
result = []
for root, dirs, files in os.walk(dir):
for name in files:
filepath = os.path.join(root, name)
if os.path.exists(filepath):
result.append(filepath)
return result

4.4.7 从文件路径列表筛选出指定后缀的文件

1
2
3
4
5
6
import fnmatch

# 从文件路径列表中筛选出指定后缀的文件(suffix需要带上通配符,如*.py)
def getSufFilePath(fileList, suffix):
result = fnmatch.filter(fileList, suffix)
return result

注:也可使用glob库来实现

1
2
3
import glob

img_path_list = glob.glob('./input/*.jpg')

另注:从文件路径中筛选出多种指定后缀的文件名

1
2
3
4
5
# 从文件路径中筛选出多种指定后缀的文件名(suffixList取值示例:['jpg','jpeg', 'bmp', 'png', 'gif'])
def getSufListFilePath(dirPath, suffixList):
result = [fn for fn in os.listdir(dirPath)
if any(fn.endswith(ext) for ext in suffixList)]
return result

4.4.8 递归获取某目录下某后缀的文件路径

程序分为两步,第一步,采用递归的方式获得文件夹下所有文件的路径列表;第二步,从文件路径列表中根据后缀利用.endswith(后缀)的方法筛选指定文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os

# 从指定path下递归获取所有文件
def getAllFile(path, fileList):
dirList = [] # 存放文件夹路径的列表
for ff in os.listdir(path):
wholepath = os.path.join(path, ff)
if os.path.isdir(wholepath):
dirList.append(wholepath) # 如果是文件添加到结果文件列表中
if os.path.isfile(wholepath):
fileList.append(wholepath) # 如果是文件夹,存到文件夹列表中
for dir in dirList:
getAllFile(dir, fileList) # 对于dirList列表中的文件夹,递归提取其中的文件,fileList一直在往下传,所有的文件路径都会被保存在这个列表中

# 从文件路径列表中筛选出指定后缀的文件
def getSufFilePath(fileList, suffix):
for ff in fileList[:]:
if not ff.endswith(suffix):
fileList.remove(ff)

if __name__ == '__main__':
flist = []
findpath = r'./testdir'
getAllFile(findpath, flist)
print('allfile:', len(flist)) # filepath下的文件总数
getSufFilePath(flist, '.txt')

print('Docfile:', len(flist)) # filepath下的指定类型文件总数(这里是.txt文件的数量)
for ff in flist:
print(ff)

4.4.9 根据md5进行文件去重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import hashlib

# 获取md5值
def get_md5(file):
file = open(file,'rb')
md5 = hashlib.md5(file.read())
file.close()
md5_values = md5.hexdigest()
return md5_values

if __name__ == '__main__':
file_path = "./data"
os.chdir(file_path)
file_list = os.listdir(file_path)
md5_list =[]
for file in file_list:
md5 = get_md5(file)
if md5 not in md5_list:
md5_list.append(md5)
else:
os.remove(file)

4.4.10 将文件转成文件流提供下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 检验是否含有中文字符
def is_contains_chinese(strs):
for _char in strs:
if '\u4e00' <= _char <= '\u9fa5':
return True
return False


# 将文件转成文件流提供下载
def download_file(file_path):

# 文件路径、文件名、后缀分割
file_dir, file_full_name = os.path.split(file_path)
file_name, file_ext = os.path.splitext(file_full_name)

# 文件名如果包含中文则进行编码
if is_contains_chinese(file_name):
file_name = urllib.parse.quote(file_name)
new_file_name = file_name + file_ext

# 流式读取下载
def send_file():
with open(file_path, 'rb') as targetfile:
while 1:
data = targetfile.read(20 * 1024 * 1024) # 每次读取20M
if not data:
break
yield data
response = Response(send_file(), content_type='application/octet-stream')
response.headers["Content-disposition"] = 'attachment; filename=%s' % new_file_name
return response

4.4.11 将网络图片转存成base64

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import base64
import requests as req
from io import BytesIO

# 将网络图片转存成base64
def urltobase64(url):
# 发送请求并将图片保存在内存
try:
response = req.get(url)
http_code = response.status_code
if http_code == 200:
# 得到图片的base64编码
ls_f = base64.b64encode(BytesIO(response.content).read())
imgdata = str(ls_f, 'utf-8')
else:
imgdata = ""
except Exception as e:
print(e)
imgdata = ""
return imgdata

4.4.12 起始时间及执行时间统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
from decimal import Decimal

start_time = time.time()
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

time.sleep(2.22222)

end_time = time.time()
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'

print(start_time_str)
print(end_time_str)
print(time_consuming_str)

4.4.13 设置超时操作

1
2
3
4
5
6
7
8
9
10
import time
import eventlet

eventlet.monkey_patch()
# 设置超时时间为2秒
with eventlet.Timeout(2, False):
time.sleep(3)
print('没有跳过这条输出')

print('End')

4.4.14 检查文件编码

1
2
3
4
5
6
7
8
import chardet

# 检查文件编码
def check_charset(file_path):
with open(file_path, "rb") as f:
data = f.read(4)
charset = chardet.detect(data)['encoding']
return charset

4.4.15 文件路径、文件名、后缀分割

1
2
3
4
5
6
7
8
9
import os

file_path = "/root/tmp/test.pdf"
file_dir, file_full_name = os.path.split(file_path)
print(file_dir) # file_name
print(file_full_name) # test.pdf
file_name, file_ext = os.path.splitext(file_full_name)
print(file_name) # test
print(file_ext) # .pdf

4.4.16 筛选出扩展名符合条件的文件路径列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# -*- coding: utf-8 -*-

import os

# 级联遍历⽬录,获取⽬录下的所有⽂件路径
def find_filepaths(dir):
result = []
for root, dirs, files in os.walk(dir):
for name in files:
filepath = os.path.join(root, name)
if os.path.exists(filepath):
result.append(filepath)
return result


# 筛选出扩展名符合条件的文件路径列表(扩展名不带通配符,extList例如['.jpg', '.png'])
def checkDirOrFilePath(path, extList):

file_path_list = []
if os.path.isdir(path): # 判断路径是否是目录
file_path_list = find_filepaths(path)
elif os.path.isfile(path): # 判断路径是否是文件
file_path_list.append(path)
elif path.find(",") != -1: # 判断路径是否是逗号分隔的多选文件
file_path_list = path.split(",")
elif path.find(";") != -1: # 判断路径是否是分号分隔的多选文件
file_path_list = path.split(";")

result_list = []
for file_path in file_path_list:
file_dir, file_full_name = os.path.split(file_path)
file_name, file_ext = os.path.splitext(file_full_name)
if file_ext in extList:
result_list.append(file_path)

return result_list

4.5 加密解密算法

依赖安装:

1
$ pip install pycryptodome

存在的坑:在使用的时候导入模块是有问题的,这时只要修改一下文件夹名称就可以解决这个问题,找到这个路径安装位置\Lib\site-packages,下面有一个文件夹叫做crypto,将小写c改成大写C就可以了。

4.5.1 RSA加密解密

RSA加密算法是一种非对称加密算法,所谓非对称,就是指该算法加密和解密使用不同的密钥,即使用加密密钥进行加密、解密密钥进行解密。在RAS算法中,加密密钥PK是公开信息,而解密密钥SK是需要保密的。加密算法E和解密算法D也都是公开的。虽然解密密钥SK是由公开密钥PK决定的,由于无法计算出大数n的欧拉函数phi(N),所以不能根据PK计算出SK。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# -*- coding: utf-8 -*-

from Crypto.Cipher import PKCS1_OAEP, AES
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes


def create_rsa_keys(code):
"""
生成RSA私钥和公钥
:param code: 密码
:return:
"""
# 生成 2048 位的 RSA 密钥
key = RSA.generate(2048)
encrypted_key = key.exportKey(passphrase=code, pkcs=8, protection="scryptAndAES128-CBC")
# 生成私钥
with open('private_rsa_key.bin', 'wb') as f:
f.write(encrypted_key)
# 生成公钥
with open('rsa_public.pem', 'wb') as f:
f.write(key.publickey().exportKey())


def file_encryption(file_name, public_key):
"""
文件加密
:param file_name: 文件路径名
:param public_key: 公钥
:return:
"""
# 二进制只读打开文件,读取文件数据
with open(file_name, 'rb') as f:
data = f.read()
file_name_new = file_name + '.rsa'
with open(file_name_new, 'wb') as out_file:
# 收件人秘钥 - 公钥
recipient_key = RSA.import_key(open(public_key).read())
# 一个 16 字节的会话密钥
session_key = get_random_bytes(16)
# Encrypt the session key with the public RSA key
cipher_rsa = PKCS1_OAEP.new(recipient_key)
out_file.write(cipher_rsa.encrypt(session_key))
# Encrypt the data with the AES session key
cipher_aes = AES.new(session_key, AES.MODE_EAX)
cipher_text, tag = cipher_aes.encrypt_and_digest(data)
out_file.write(cipher_aes.nonce)
out_file.write(tag)
out_file.write(cipher_text)
return file_name_new


def file_decryption(file_name, code, private_key):
"""
文件解密
:param file_name: 文件路径名
:param code: 密码
:param private_key: 私钥
:return:
"""
with open(file_name, 'rb') as f_in:
# 导入私钥
private_key = RSA.import_key(open(private_key).read(), passphrase=code)
# 会话密钥, 随机数, 消息认证码, 机密的数据
enc_session_key, nonce, tag, cipher_text = [f_in.read(x) for x in (private_key.size_in_bytes(), 16, 16, -1)]
cipher_rsa = PKCS1_OAEP.new(private_key)
session_key = cipher_rsa.decrypt(enc_session_key)
cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce)
# 解密
data = cipher_aes.decrypt_and_verify(cipher_text, tag)
# 文件重命名
out_file_name = file_name.replace('.rsa', '')
with open(out_file_name, 'wb') as f_out:
f_out.write(data)
return out_file_name


if __name__ == '__main__':
create_rsa_keys("test_rsa_key")
file_encryption("test.txt", "rsa_public.pem")
file_decryption("test.txt.rsa", "test_rsa_key", "private_rsa_key.bin")

4.5.2 AES加密解密

AES加密为最常见的对称加密算法(微信小程序的加密传输就是用的这个加密算法)。对称加密算法也就是加密和解密用相同的密钥,具体的加密流程如下图:

AES加密传输流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# -*- coding: utf-8 -*-

import base64
from Crypto.Cipher import AES

'''
采用AES对称加密算法
'''

# str不是16的倍数那就补足为16的倍数
def add_to_16(value):
while len(value) % 16 != 0:
value += '\0'
return str.encode(value)

# 加密方法
def encrypt_file(key, input_file_path, encoding, output_file_path):
# 一次性读取文本内容
with open(input_file_path, 'r', encoding=encoding) as f:
# print(text) 测试打印读取的数据
# 待加密文本
mystr = f.read()
text = base64.b64encode(mystr.encode('utf-8')).decode('ascii')
# 初始化加密器
aes = AES.new(add_to_16(key), AES.MODE_ECB)
# 先进行aes加密
encrypt_aes = aes.encrypt(add_to_16(text))
# 用base64转成字符串形式
encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 执行加密并转码返回bytes
# print(encrypted_text) 测试打印加密数据
# 写入加密数据到文件
with open(output_file_path, "w") as bankdata:
bankdata.write(encrypted_text)

# 解密方法
def decrypt_file(key, file_path, encoding):
# 密文
with open(file_path, 'r', encoding=encoding) as f:
# print(text) 测试打印读取的加密数据
# 待解密文本
text = f.read()
# 初始化加密器
aes = AES.new(add_to_16(key), AES.MODE_ECB)
# 优先逆向解密base64成bytes
base64_decrypted = base64.decodebytes(text.encode(encoding='utf-8'))
# bytes解密
decrypted_text = str(aes.decrypt(base64_decrypted),encoding='utf-8')
decrypted_text = base64.b64decode(decrypted_text.encode('utf-8')).decode('utf-8')
print(decrypted_text)

4.6 根据IP或域名获取地理位置信息

4.6.1 获取本机IP地址

1
2
3
4
5
6
7
8
9
10
11
12
def get_host_ip():
"""
查询本机ip地址
:return: ip
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip

4.6.2 获取地理位置信息

可以借助GeoIP2-python和GeoLite.mmdb两个开源项目来获取。

GeoIP2-python:https://github.com/maxmind/GeoIP2-python(GeoIP2 web 服务客户端和数据库阅读器的 Python 代码)

GeoLite.mmdb:https://github.com/P3TERX/GeoLite.mmdb(MaxMind 的 GeoIP2 GeoLite2 国家、城市和 ASN 数据库)

依赖安装:

1
2
$ pip install geoip2
$ 把GeoLite2-City.mmdb下载下来,放到项目目录里

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# -*- coding: utf-8 -*-

import socket
import geoip2.database

reader = geoip2.database.Reader('GeoLite2-City.mmdb')


# 通过域名获取IP(输入为IP的话保持不变)
def get_ip_by_domain(domain):
address = socket.getaddrinfo(domain, None)
return address[0][4][0]


# 查询IP地址对应的地理信息
def ip_get_location(ip):
# 载入指定IP相关数据
response = reader.city(ip)
# 读取国家代码
country_iso_code = str(response.country.iso_code)
# 读取国家名称
country_name = str(response.country.name)
# 读取国家名称(中文显示)
country_name_cn = str(response.country.names['zh-CN'])
# 读取州(国外)/省(国内)名称
country_specific_name = str(response.subdivisions.most_specific.name)
# 读取州(国外)/省(国内)代码
country_specific_iso_code = str(response.subdivisions.most_specific.iso_code)
# 读取城市名称
city_name = str(response.city.name)
# 获取纬度
location_latitude = str(response.location.latitude)
# 获取经度
location_longitude = str(response.location.longitude)
# 返回结果
result_dic = {}
result_dic['ip'] = ip
result_dic['country_iso_code'] = country_iso_code
result_dic['country_name'] = country_name
result_dic['country_name_cn'] = country_name_cn
result_dic['country_specific_name'] = country_specific_name
result_dic['country_specific_iso_code'] = country_specific_iso_code
result_dic['city_name'] = city_name
result_dic['location_latitude'] = location_latitude
result_dic['location_longitude'] = location_longitude
return result_dic

4.7 使用cv2库画图

引入cv2库

1
$ pip install opencv-python

绘制矩形和直线示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-

import numpy as np
import cv2 as cv

img = np.zeros((320, 320, 3), np.uint8) # 生成一个空灰度图像

# 绘制矩形
ptLeftTop = (60, 60)
ptRightBottom = (260, 260)
point_color = (0, 255, 0)
thickness = 1
lineType = 4
cv.rectangle(img, ptLeftTop, ptRightBottom, point_color, thickness, lineType)

# 绘制直线
ptStart = (60, 60)
ptEnd = (260, 260)
point_color = (0, 0, 255)
thickness = 1
lineType = 4
cv.line(img, ptStart, ptEnd, point_color, thickness, lineType)

cv.namedWindow("CV Test")
cv.imshow('CV Test', img) # 显示绘图
cv.waitKey(5000) # 显示5000ms后消失,设置为0永不消失
cv.destroyAllWindows()

5. 数据库及中间件的集成与使用

5.1 使用Redis缓存数据

Step1:引入redis库

1
$ pip install redis

Step2:使用Redis

往redis存值

1
2
3
4
5
import redis

pool = redis.ConnectionPool(host='127.0.0.1', port='6379', password='123456')
r = redis.Redis(connection_pool=pool)
r.set('id', '666666')

从redis取值

1
2
3
4
5
import redis

pool = redis.ConnectionPool(host='127.0.0.1', port='6379', password='123456')
r = redis.Redis(connection_pool=pool)
get_value = r.get('id')

5.2 将数据保存到MySQL

引入pymysql库

1
$ pip install pymysql

将数据保存到MySQL示例:

config.ini

1
2
3
4
5
6
7
[Mysql]
host = 127.0.0.1
user = root
password = 123456
port = 3306
db = testdb
table = test_table

log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./save_mysql.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

save_mysql.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# -*- coding: utf-8 -*-

import time
from configparser import ConfigParser
import pymysql
from log import logger


# 读取配置
def read_config(config_path):
cfg = ConfigParser()
cfg.read(config_path, encoding='utf-8')
host = cfg.get('Mysql', 'host')
user = cfg.get('Mysql', 'user')
password = cfg.get('Mysql', 'password')
port = cfg.get('Mysql', 'port')
db = cfg.get('Mysql', 'db')
table = cfg.get('Mysql', 'table')
mysql_dict = {}
mysql_dict['host'] = host
mysql_dict['user'] = user
mysql_dict['password'] = password
mysql_dict['port'] = port
mysql_dict['db'] = db
mysql_dict['table'] = table
return mysql_dict


# 保存到mysql数据库
def save_data(mysql_dict, data_list):
if data_list == []:
return None
mysql = pymysql.connect(host=str(mysql_dict['host']), user=str(mysql_dict['user']),
password=str(mysql_dict['password']), port=int(mysql_dict['port']),
db=str(mysql_dict['db']), charset='utf-8')
for i in data_list:
cursor = mysql.cursor()
qmarks = ', '.join(['%s'] * len(i))
columns = ', '.join(i.keys())
try:
qry = "Insert Into " + str(mysql_dict['table']) + " (%s) Values (%s);" % (columns, qmarks)
values_list = []
for j in i.values():
values_list.append(j)
cursor.execute(qry, values_list)
mysql.commit()
except Exception as e:
logger.error(e)
cursor.close()
mysql.close()
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
insert_mysql_info = str(mysql_dict['table']) + "表插入了" + str(len(data_list)) + "条数据,时间是" + str(now_time)
logger(insert_mysql_info)


if __name__ == '__main__':
config_path = './config.ini'
mysql_dict = read_config(config_path)
data_list = [{'USERNAME': 'zhangsan', 'MESSAGE': 'test1'},{'USERNAME': 'lisi', 'MESSAGE': 'test2'}]
save_data(mysql_dict, data_list)

注意事项:

1)插入时报错 Incorrect string value: ‘\xF0\x9F\x98\xAD“,…‘ for column ‘commentContent‘ at row 1

原因:数据库编码问题导致的,原因在于插入数据中存在emoji表情,而这些表情是按照四个字节一个单位进行编码的,而我们通常使用的utf-8编码在mysql数据库中默认是按照3个字节一个单位进行编码的,导致将数据存入mysql的时候出现错误。

解决:修改数据库与数据表编码,然后再改一下连接数据库的字符集编码。

1
2
3
1.修改mysql数据库的编码为uft8mb4  
2.修改数据表的编码为utf8mb4
3.将代码连接mysql处改为charset='utf8mb4'

2)将longblob字段的数据写入到文件

1
2
3
4
5
6
if not os.path.exists('./img'):
os.makedirs('./img')
uuid = uuid1()
img_path = './img/{}.jpg'.format(uuid)
f = open(img_path, 'wb')
f.write(result['image_file'])

5.3 查询Oracle的数据

引入cx_Oracle库

1
$ pip install cx_Oracle

安装Oracle Instant Client

从Oracle里查询数据示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import cx_Oracle

# 指定Oracle Instant Client的目录
cx_Oracle.init_oracle_client(lib_dir="D:\\Development\\instantclient-basic-windows.x64-11.2.0.4.0")

# 执行查询sql
conn = cx_Oracle.connect("testuser", "123456", "127.0.0.1:1521/orcl")
curs = conn.cursor()
sql = "select a.id, a.title, a.text from test_table"
rr = curs.execute(sql)

# 将查询结果保存成字典
result_dir = {}
while(1):
rs = rr.fetchone()
if rs == None:
break
id = rs[0]
title = rs[1]
text = rs[2]

curs.close()
conn.close()

注意事项:

1、cx_Oracle.init_oracle_client()要写在Flask接口的外面,否则第二次接口请求时会报cx_Oracle已经初始化的错误。

2、Linux端部署的时候,会出现找不到libclntsh.so动态连接库的问题,报错如下:

1
cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loaded: "Error loading shared library libclntsh.so: No such file or directory". See https://oracle.github.io/odpi/doc/installation.html#linux for help

报错原因:instantclient-basic-linux.x64-11.2.0.4.0.zip包里根本没有libclntsh.so,有的是libclntsh.so.11.1,而单纯的给这个文件改个名是不行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
./instantclient_11_2:
|---BASIC_README
|---adrci
|---genezi
|---libclntsh.so.11.1
|---libnnz11.so
|---libocci.so.11.1
|---libociei.so
|---libocijdbc11.so
|---ojdbc5.jar
|---ojdbc6.jar
|---uidrvci
|---xstreeams.jar

解决办法:需要在Dockerfile里设置软链接解决(注意要用绝对路径)

1
2
ENV LD_LIBRARY_PATH=/home/instantclient_11_2
RUN ln -s /home/instantclient_11_2/libclntsh.so.11.1 /home/instantclient_11_2/libclntsh.so

5.4 ElasticSearch的导入导出

代码已在Github上开源,项目地址为:https://github.com/Logistic98/es-data-transfer

Step1:安装依赖并编写配置文件

1
$ pip install elasticsearch==7.16.2   // 注意要和ES的版本保持一致

config.ini(把ES连接信息换成自己的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[TARGET_ES]
host = 192.168.1.1
port = 9200
user = elastic
password = elastic
timeout = 60

[SOURCE_ES]
host = 192.168.1.2
port = 9200
user = elastic
password = elastic
timeout = 60
index_list = test_index1, test_index2

注:多个索引之间用英文逗号分隔(逗号后面有没有空格都无所谓,读取配置时会进行处理)

Step2:编写ES导入导出脚本

export_es_data.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch
from datetime import timedelta
import datetime
import os
import json
import logging
from configparser import ConfigParser

# 生成日志文件
logging.basicConfig(filename='logging_es.log', level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def read_config():
cfg = ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
host = cfg.get('SOURCE_ES', 'host')
port = cfg.get('SOURCE_ES', 'port')
user = cfg.get('SOURCE_ES', 'user')
password = cfg.get('SOURCE_ES', 'password')
timeout = cfg.get('SOURCE_ES', 'timeout')
index_list = cfg.get('SOURCE_ES', 'index_list')
es_dict = {}
es_dict['host'] = host
es_dict['port'] = port
es_dict['user'] = user
es_dict['password'] = password
es_dict['timeout'] = timeout
es_dict['index_list'] = index_list
return es_dict


def write_list_to_json(list, json_file_name, json_file_save_path):
"""
将list写入到json文件
:param list:
:param json_file_name: 写入的json文件名字
:param json_file_save_path: json文件存储路径
:return:
"""
if not os.path.exists(json_file_save_path):
os.makedirs(json_file_save_path)
os.chdir(json_file_save_path)
with open(json_file_name, 'w', encoding='utf-8') as f:
json.dump(list, f, ensure_ascii=False)


def es_json(es_dict, start_time, end_time):
str_separate = "==============================================================="
try:
BASE_DIR = os.getcwd()
Es = Elasticsearch(
hosts=[str(es_dict['host']) + ":" + str(es_dict['port'])],
http_auth=(str(es_dict['user']), str(es_dict['password'])),
timeout=int(es_dict['timeout'])
)

except Exception as e:
logging.error(e)

index_list = ''.join(es_dict['index_list'].split()).split(",")
for i in index_list:
print(f"保存索引{i}的数据\r")
print_info1 = "保存索引" + i + "的数据"
logging.info(print_info1)
query = {
"query": {
"range": {
"@timestamp": {
# 大于上一次读取结束时间,小于等于本次读取开始时间
"gt": start_time,
"lte": end_time
}
}
},
"size": 10000
}
try:
data = Es.search(index=i, body=query)
source_list = []
for hit in data['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
print(f"保存的时间为{start_time}{end_time}\r")
print_info2 = "保存的时间为" + start_time + "到" + end_time + ""
logging.info(print_info2)
file_path = BASE_DIR + "/json_file"
file_name = str(i) + ".json"
if len(source_list) != 0:
write_list_to_json(source_list, file_name, file_path)
else:
print('无更新')
logging.info(str(i) + '无更新')
print(str_separate)
logging.info(str_separate)
except Exception as e:
print(e)
logging.info("es数据库到json文件的读写error" % e)
logging.info(str_separate)


if __name__ == '__main__':
start_date_time = datetime.datetime.now() + timedelta(days=-1)
end_date_time = datetime.datetime.now()
start_time = start_date_time.strftime("%Y-%m-%dT%H:00:00.000Z")
end_time = end_date_time.strftime("%Y-%m-%dT%H:00:00.000Z")
# 读取配置信息
es_dict = read_config()
# 获取当前的目录地址
BASE_DIR = os.getcwd()
# 读取es数据库中的数据,写成json文件
es_json(es_dict, start_time, end_time)

import_es_data.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# -*- coding: utf-8 -*-

import os
import logging
import time

from elasticsearch import Elasticsearch, helpers
from configparser import ConfigParser


# 生成日志文件
logging.basicConfig(filename='logging_es.log', level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 查找解密后的json文件
def json_es(BASE_DIR):
json_path = BASE_DIR + '/json_file/'
filelist = []
for file in os.listdir(json_path):
if '.json' == file[-5:]:
filelist.append(json_path + file)
for i in filelist:
head, sep, tail = i.partition('json_file/')
indexname = tail
head, sep, tail = indexname.partition('.json')
index_name = head
read_json(i, index_name)
os.remove(i)

# 读json文件
def read_json(file_path, index_name):
with open(file_path, 'r', encoding='utf-8') as file:
json_str = file.read()
# json_str中会存在一个null字符串表示空值,但是python里面没有null这个关键字,需要将null定义为变量名,赋值python里面的None
null = None
# 将字符串形式的列表数据转成列表数据
json_list = eval(json_str)
batch_data(json_list, index_name)

# 将构造好的列表写入ES数据库
def batch_data(json_list, index_name):
""" 批量写入数据 """
# 按照步长分批插入数据库,缓解插入数据库时的压力
length = len(json_list)
# 步长为1000,缓解批量写入的压力
step = 1000
for i in range(0, length, step):
# 要写入的数据长度大于步长,那么久分批写入
if i + step < length:
actions = []
for j in range(i, i + step):
# 先把导入时添加的"_id"的值取出来
new_id = json_list[j]['_id']
del json_list[j]["_id"] # 要删除导入时添加的"_id"
action = {
"_index": str(index_name),
"_id": str(new_id),
"_source": json_list[j]
}
actions.append(action)
helpers.bulk(Es, actions, request_timeout=120)
# 要写入的数据小于步长,那么久一次性写入
else:
actions = []
for j in range(i, length):
# 先把导入时添加的"_id"的值取出来
new_id = json_list[j]['_id']
del json_list[j]["_id"] # 要删除导入时添加的"_id"
action = {
"_index": str(index_name),
"_id": str(new_id),
"_source": json_list[j]
}
actions.append(action)
helpers.bulk(Es, actions, request_timeout=120)
now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
insert_es_info = str(index_name) + "索引插入了" + str(length) + "条数据,时间是" + str(now_time)
logging.info(insert_es_info)

def read_config():
cfg = ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
host = cfg.get('TARGET_ES', 'host')
port = cfg.get('TARGET_ES', 'port')
user = cfg.get('TARGET_ES', 'user')
password = cfg.get('TARGET_ES', 'password')
timeout = cfg.get('TARGET_ES', 'timeout')
es_dict = {}
es_dict['host'] = host
es_dict['port'] = port
es_dict['user'] = user
es_dict['password'] = password
es_dict['timeout'] = timeout
return es_dict


if __name__ == '__main__':
# 获取当前的目录地址
BASE_DIR = os.getcwd()
# 读取配置文件
es_dict = read_config()
# 构造连接
Es = Elasticsearch(
hosts=[str(es_dict['host']) + ":" + str(es_dict['port'])],
http_auth=(str(es_dict['user']), str(es_dict['password'])),
timeout=int(es_dict['timeout'])
)
# 将解密的json文件写入ES数据库
json_es(BASE_DIR)

Step3:执行脚本导入导出

执行 export_es_data.py 会读取 [SOURCE_ES] 里的 ES 配置,对指定索引进行导出,注意单次仅能导出10000条数据

执行 import_es_data.py 会读取 [TARGET_ES] 里的 ES 配置,json_file文件夹内的json文件进行导入,导入成功后会删除这些json文件。

5.5 minio的文件上传

Step1:安装依赖并编写配置文件

1
$ pip install minio

config.ini

1
2
3
4
[minio]
minio_url = xxx.xxx.xxx.xxx:9000
access_key = minioadmin
secret_key = minioadmin

注:minio_url不要带上http://的前缀,否则会报如下错误

1
ValueError: path in endpoint is not allowed. Exception ignored in: <function Minio.__del__ at 0x0C0B9A98>

Step2:minio上传文件的代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -*-

import logging
import os
from minio import Minio
from minio.error import S3Error
from configparser import ConfigParser

logging.basicConfig(filename='logging_mysql.log', level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# 读取配置文件
def read_config():
cfg = ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
minio_url = cfg.get('minio', 'minio_url')
access_key = cfg.get('minio', 'access_key')
secret_key = cfg.get('minio', 'secret_key')
config_dict = {}
config_dict['minio_url'] = minio_url
config_dict['access_key'] = access_key
config_dict['secret_key'] = secret_key
return config_dict


# 初始化minio客户端
def get_minio_client(config_dict):
# 使用endpoint、access key和secret key来初始化minioClient对象。
minio_client = Minio(config_dict['minio_url'],
access_key=config_dict['access_key'],
secret_key=config_dict['secret_key'],
secure=False)
return minio_client


# 创建一个存储桶(判断桶是否已经存在,不存在则创建,存在忽略)
def minio_make_bucket_ifnotexist(minio_client, bucket_name):
bucket_name = bucket_name.replace('_', "-")
try:
if not minio_client.bucket_exists(bucket_name):
logging.info("该存储桶不存在:" + bucket_name)
minio_client.make_bucket(bucket_name)
logging.info("存储桶创建:" + bucket_name)
except S3Error as e:
if "InvalidAccessKeyId" in str(e):
logging.error("minio 的 access_key 可能有误")
elif "SignatureDoesNotMatch" in str(e):
logging.error("minio 的 secret_key 可能有误")
else:
logging.error("minio 的 endpoint、access_key、secret_key 可能有误")
raise e


# 删除存储桶
def remove_bucket(minio_client, bucket_name):
try:
minio_client.remove_bucket(bucket_name)
logging.info("删除存储桶成功:" + bucket_name)
except S3Error as e:
logging.error(e)


# 文件上传
def minio_upload_file(minio_client, bucket_name, object_name, file_path):
logging.info(file_path)
result = minio_client.fput_object(bucket_name, object_name, file_path)
return result


# 级联遍历目录,获取目录下的所有文件路径
def find_filepaths(dir):
result = []
for root, dirs, files in os.walk(dir):
for name in files:
filepath = os.path.join(root, name)
if os.path.exists(filepath):
result.append(filepath)
return result


# 获取object_name(上传到minio的路径)
def get_object_name(file_path):
file_dir, file_full_name = os.path.split(file_path)
return file_full_name


if __name__ == '__main__':
# 读取配置文件
config_dict = read_config()
# 初始化minio客户端
minio_client = get_minio_client(config_dict)
# 创建一个存储桶(判断桶是否已经存在,不存在则创建,存在忽略)
minio_make_bucket_ifnotexist(minio_client, 'test')
# 删除存储桶
remove_bucket(minio_client, 'test')
# 文件上传
minio_make_bucket_ifnotexist(minio_client, 'test')
img_list = find_filepaths('./img')
for img_path in img_list:
object_name = get_object_name(img_path)
minio_upload_file(minio_client, 'test', object_name, img_path)

6. Python常用的进阶知识及示例

6.1 使用vthread实现多线程

6.1.1 vthread简介

项目描述:python 多线程库,在不改变源代码的情况下,一行代码即可实现线程池操作。

项目地址:https://github.com/cilame/vthread

依赖安装:pip install vthread

6.1.2 vthread基本使用

[1] 基本线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# -*- coding: utf-8 -*-

import time
import vthread

@vthread.pool(3) # 只用加这一行就能实现3条线程池的包装
def foolfunc(num):
time.sleep(1)
print(str(num))


if __name__ == '__main__':
for i in range(5):
foolfunc(i)

输出结果:

1
2
3
4
5
[  Thread-2_v ] 1
[ Thread-1_v ] 0
[ Thread-3_v ] 2
[ Thread-2_v ] 3
[ Thread-1_v ] 4

[2] 生产消费过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-

import time, random, queue
from vthread import pool

ls1 = queue.Queue()
ls2 = queue.Queue()
producer = 'pr'
consumer1 = 'co1'
consumer2 = 'co2'

# 一个生产者
@pool(3, gqueue=producer)
def creater(num):
time.sleep(random.random())
num1, num2 = num, num * num + 100
print("数据进入队列: num:{}".format(num))
ls1.put(num1)
ls2.put(num2)

# 两个消费者
@pool(1, gqueue=consumer1)
def coster1():
while not pool.check_stop(gqueue=producer):
time.sleep(random.random())
pp = [ls1.get() for _ in range(ls1.qsize())]
print('当前消费的列表 list: {}'.format(pp))

@pool(1, gqueue=consumer2)
def coster2():
while not pool.check_stop(gqueue=producer):
time.sleep(random.random())
pp = [ls2.get() for _ in range(ls2.qsize())]
print('当前消费的列表 list: {}'.format(pp))


if __name__ == '__main__':

for i in range(10):
creater(i)
coster1()
coster2()

# 当需要简单等待全部任务结束再执行某些任务时,这样处理即可,这个等于下面注释中的内容。
pool.waitall()
# pool.wait(gqueue=producer)
# pool.wait(gqueue=consumer1)
# pool.wait(gqueue=consumer2)
print('当生产和消费的任务池数据都结束后,这里才会打印')
print('current queue 1 size:{}'.format(ls1.qsize()))
print('current queue 2 size:{}'.format(ls2.qsize()))
print('end')

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[ Thread-5_co2] 当前消费的列表 list: []
[ Thread-5_co2] 当前消费的列表 list: []
[ Thread-4_co1] 当前消费的列表 list: []
[ Thread-3_pr ] 数据进入队列: num:2
[ Thread-4_co1] 当前消费的列表 list: [2]
[ Thread-2_pr ] 数据进入队列: num:1
[ Thread-4_co1] 当前消费的列表 list: [1]
[ Thread-1_pr ] 数据进入队列: num:0
[ Thread-5_co2] 当前消费的列表 list: [104, 101, 100]
[ Thread-5_co2] 当前消费的列表 list: []
[ Thread-3_pr ] 数据进入队列: num:3
[ Thread-2_pr ] 数据进入队列: num:4
[ Thread-2_pr ] 数据进入队列: num:7
[ Thread-4_co1] 当前消费的列表 list: [0, 3, 4, 7]
[ Thread-1_pr ] 数据进入队列: num:5
[ Thread-5_co2] 当前消费的列表 list: [109, 116, 149, 125]
[ Thread-5_co2] 当前消费的列表 list: []
[ Thread-5_co2] 当前消费的列表 list: []
[ Thread-2_pr ] 数据进入队列: num:8
[ Thread-3_pr ] 数据进入队列: num:6
[ Thread-4_co1] 当前消费的列表 list: [5, 8, 6]
[ Thread-1_pr ] 数据进入队列: num:9
[ Thread-5_co2] 当前消费的列表 list: [164, 136, 181]
[ Thread-4_co1] 当前消费的列表 list: [9]
[ MainThread ] 当生产和消费的任务池数据都结束后,这里才会打印
[ MainThread ] current queue 1 size:0
[ MainThread ] current queue 2 size:0
[ MainThread ] end

6.2 使用Python协程

6.2.1 Python协程简介

协程,又称微线程,是运行在单线程中的“并发”,协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。

在Python3.4之前,官方没有对协程的支持,但存在一些第三方库的实现,比如gevent和tornado,3.4之后有了asyncio,官方才真正实现了协程这一特性。

6.2.2 进程、线程、协程对比

进程是资源分配的单位,线程是操作系统调度的单位。进程切换需要的资源很最大,效率很低;线程切换需要的资源一般,效率一般;协程切换任务资源很小,效率高。

[1] 进程:一个程序运行起来后,代码及用到的资源称之为进程,它是操作系统分配资源的基本单元。

[2] 线程:一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

[3] 协程:协程是Python中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元。

6.2.2 使用asyncio实现协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -*- coding: utf-8 -*-

import asyncio


async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "返回值"


async def main():
print("main开始")
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task1 = asyncio.create_task(func())
# 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
task2 = asyncio.create_task(func())
print("main结束")
# 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
# 此处的await是等待相对应的协程全都执行完毕并获取结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)


loop = asyncio.get_event_loop() # 创建一个事件循环
loop.run_until_complete(main()) # 将协程对象加入到事件循环

输出结果:

1
2
3
4
5
6
7
main开始
main结束
1
1
2
2
返回值 返回值

6.3 使用Python装饰器

6.3.1 Python装饰器简介

Python 的装饰器是一种非常便捷的修改函数的方式,不影响原函数的定义而对函数进行一些额外的封装,有点类似 AOP,增加一些小功能却不侵入原有代码,非常简洁强大。

6.3.2 与Java注解异同点对比

[1] 对代码块的影响

  • java注解:不会对所修饰的代码产生直接的影响。
  • python装饰器:可以对所修饰的代码产生直接的影响。

[2] 共通处

  • java中注解+反射 可以实现 python装饰器同样的功能,包括面向切面编程、参数校验等。

[3] 从用途看

  • 从用途看注解像是注释文档一样,用于生成javadoc文档(以参数形式标注)、检查等。
  • 装饰器像是为函数提供更多的功能,并装在不同的函数身上。

[4] 从原理看

  • java注解:所有注解本质是继承自接口的接口。
  • python装饰器:被装饰函数的返回值作为参数传给闭包函数执行(这个闭包函数名前面加个@,就是装饰器)。

6.3.3 使用装饰器实现权限校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# -*- coding: utf-8 -*-


# 设置一个类表示个人,有姓名和权限两个属性
class Person():
def __init__(self, name, permission):
self.name = name
self.permission = permission


# 权限校验装饰器
def checkPermission(num):
def setPermission(func):
def inner(person):
# 使用 & 进行按位与运算(只有参与&运算的两个位都为1时,结果才为1,否则为0),当校验通过时,可以执行该操作
if num & person.permission == num:
func(person)
else:
print(person.name, "无权限")
return inner
return setPermission


@checkPermission(1) # 001
def read(person):
print(person.name, "读取代码")


@checkPermission(2) # 010
def write(person):
print(person.name, "写入代码")


@checkPermission(4) # 100
def run(person):
print(person.name, "执行代码")


if __name__ == '__main__':
p1 = Person("张三", 1) # 只有读权限(1=2^0,001)
p2 = Person("李四", 3) # 读写权限(3=2^1+2^0,011)
p3 = Person("王五", 6) # 写与执行权限(6=2^2+2^1,110)
read(p1), write(p1), run(p1)
print("===================")
read(p2), write(p2), run(p2)
print("===================")
read(p3), write(p3), run(p3)

输出结果:

1
2
3
4
5
6
7
8
9
10
11
张三 读取代码
张三 无权限
张三 无权限
===================
李四 读取代码
李四 写入代码
李四 无权限
===================
王五 无权限
王五 写入代码
王五 执行代码

6.4 程序内存占用分析

6.4.1 Memray简介

需求情景:深度学习算法编写或者调用不当时可能会出现内存叠加、内存溢出等问题,可以使用Memray工具对程序内存占用进行分析。

项目描述:Memray 是 Python 的内存分析器。它可以跟踪 Python 代码、本地扩展模块和 Python 解释器本身中的内存分配。仅可用于Linux平台。

项目地址:https://github.com/bloomberg/memray

6.4.2 Memray基本使用

具体使用:安装依赖——用memray运行程序——转换二进制文件

1
2
3
4
$ pip install memray                     // 安装memray依赖(仅支持Linux)
$ python -m memray run my_script.py // 运行单个文件
$ python -m memray run -m my_module // 运行整个模块
$ memray flamegraph my_script.2369.bin // 将二进制文件转换成火焰图html文件

注意必须是Linux平台,其他平台不支持使用,它生成的是一个二进制文件(如my_script.2369.bin),可通过命令将其转换成直观的火焰图html文件。

Memray火焰图

7. 项目的打包部署

一般使用Docker来部署Flask项目,它的基本概念及使用就不再赘述了,不会的话见我的另一篇博客:VPS基本部署环境的搭建与配置

7.1 Docker环境搭建

1
2
3
4
$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh - # 安装docker
$ sudo systemctl start docker # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)

7.2 导出项目依赖

方法一:使用pip freeze命令导出所有依赖,再进行筛选。

1
$ pip freeze > requirements.txt

注:建议对项目单独建一个conda虚拟环境,再导出依赖,这样导出的依赖就这一个项目的,就不用手动删除无用的了。

方法二:使用pipreqs库导出本项目的依赖,生成的也是requirements.txt文件。

1
2
3
$ pip install pipreqs
$ cd /root/test-project // 切换到项目根目录
$ pipreqs ./ --encoding=utf8 // 需要带上编码的指定,否则会报GBK编码错误

注意这里还有个坑如下,这是因为本机开了翻墙代理导致的,把代理软件关了就好了。

1
requests.exceptions.SSLError: HTTPSConnectionPool(host='pypi.python.org', port=443): Max retries exceeded with url: /pypi/cv2/json (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1125)')))`

7.3 使用Docker部署Flask项目

编写Dockerfile,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 基于python3.7镜像创建新镜像
FROM python:3.7
# 创建容器内部目录
RUN mkdir /code
# 将项目复制到内部目录
ADD test-project /code/
# 切换到工作目录
WORKDIR /code
# 安装项目依赖
RUN pip install -r requirements.txt
# 安装vim命令
RUN apt-get update && apt-get install vim -y
# 放行端口
EXPOSE 5000
# 启动项目
ENTRYPOINT ["nohup","python","server.py","&"]

Step2:将项目和Dockerfile上传到服务器并制作镜像运行容器,示例如下:

1
2
3
$ cd /root/deploy                                                       // 切换到存放项目和Dockerfile的目录
$ docker build -t test-flask-image . // 使用Dockerfile构建镜像
$ docker run -d -p 5000:5000 --name test-flask test-flask-image:latest // 通过镜像运行容器

我们可以打包导出镜像,方便迁移到其他服务器上部署。

1
$ docker save test-image > test-image.v1.dockerimage  

7.4 依赖类库的安装部署说明

[1] pyhanlp:中文分词 词性标注 命名实体识别 依存句法分析 新词发现 关键词短语提取 自动摘要 文本分类聚类 拼音简繁 自然语言处理

pyhanlp依赖的运行需要JVM环境,因此部署时应在 Dockerfile 里添加 jdk 并完成配置。安装包去官网下载Linux版的,放到项目里即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 切换到工作目录
WORKDIR /usr
# 创建目录
RUN mkdir /usr/local/java
# 将jdk复制到内部目录并自动解压
ADD jdk-8u202-linux-x64.tar.gz /usr/local/java
# 设置软链接
RUN ln -s /usr/local/java/jdk1.8.0_202 /usr/local/java/jdk
# 设置环境变量
ENV JAVA_HOME /usr/local/java/jdk
ENV JRE_HOME ${JAVA_HOME}/jre
ENV CLASSPATH .:${JAVA_HOME}/lib:${JRE_HOME}/lib
ENV PATH ${JAVA_HOME}/bin:$PATH

附:jdk-8u202-linux-x64.tar.gz 的官网下载地址

[2] PyTorch:一种开源机器学习框架。

如果使用GPU(NVIDIA显卡)部署的话,需要先安装CUDA驱动,然后通过如下命令查看CUDA版本:

1
$ nvcc --version

PyTorch官网 勾选上自己服务器的环境,下面会生成对应的安装命令。

1
2
3
4
5
6
7
8
CPU版
$ pip install torch==1.10.0+cpu torchvision==0.11.1+cpu torchaudio==0.10.0+cpu -f https://download.pytorch.org/whl/cpu/torch_stable.html

GPU版(CUDA 10.2)
$ pip install torch torchvision torchaudio

GPU版(CUDA 11.3)
$ pip install torch==1.10.2+cu113 torchvision==0.11.3+cu113 torchaudio==0.10.2+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html

验证PyTorch是否连上GPU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# -*- coding: utf-8 -*-

import torch
import torch.nn as nn

print(torch.cuda.is_available())

model = nn.LSTM(input_size=10, hidden_size=4, num_layers=1, batch_first=True)
model = model.cuda()
print(next(model.parameters()).device)

data = torch.ones([2, 3])
data = data.cuda()
print(data.device)

[3] tensorflow:一个端到端开源机器学习平台。

直接使用 pip 安装有时会出问题,可以去 tensorflow官网 找对应版本的 whl 包进行安装。

1
2
$ pip install https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow_gpu-2.6.0-cp37-cp37m-manylinux2010_x86_64.whl (支持GPU)
$ pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow_cpu-2.6.0-cp37-cp37m-manylinux2010_x86_64.whl (仅支持CPU)

验证 Tensorflow是否连上GPU

1
2
3
4
5
6
7
8
# -*- coding: utf-8 -*-

import tensorflow as tf

print(tf.test.is_gpu_available())

gpu_device_name = tf.test.gpu_device_name()
print(gpu_device_name)

7.5 常见报错问题的解决

[1] ImportError: libGL.so.1: cannot open shared object file: No such file or directory

1
2
$ apt update
$ apt install libgl1-mesa-glx -y

[2] ImportError: numpy.core.multiarray failed to import

1
$ pip install --upgrade numpy

8. 参考资料

[1] Python 使用 flask 库传递 JSON 数据 from CSDN

[2] Python | Flask 解决跨域问题 from segmentfault

[3] flask小坑–request.json 无法获取请求body的json数据 from CSDN

[4] python清除字符串中间空格的方法 from CSDN

[5] Python 3 实现定义跨模块的全局变量和使用 from 博客园

[6] python 将爬取的数据存入mysql from 代码先锋网

[7] python操作mysql数据库(增,删,改,查)from CSDN

[8] 新版xlrd报 Excel xlsx file;not supported from CSDN

[9] Python md5去重图片文件 from 代码交流

[10] Python递归获取指定文件夹下所有指定后缀的文件路径 from CSDN

[11] Python3生成XML文件 from 知乎

[12] python 统计list中各个元素出现的次数 from CSDN

[13] python 读写csv文件(创建,追加,覆盖)from CSDN

[14] 用Python编写的CSV文件每行之间都有空行 from QAStack

[15] python操作csv文件实现对特定列排序 from 简书

[16] docker镜像alpine中安装oracle客户端 from 简书

[17] 解决flask接口返回的内容中文乱码的问题 from CSDN

[18] json dump 中文乱码 from CSDN

[19] 文本摘要/关键词TextRank算法的优化与思考 from 知乎

[20] PaddleOCR快速开始 form Github

[21] 教你利用yolov5训练自己的目标检测模型 from CSDN

[22] python导出项目依赖包与导入项目依赖包 from CSDN

[23] requests.exceptions.SSLError: HTTPSConnectionPool 解决方案 from CSDN

[24] python 使用代理的几种方式 from CSDN

[25] python3 | pip install dlib报错 from CSDN

[26] python字典转为对象,用”.”方式访问对象属性 from 博客园

[27] 视频抽帧那点事 from cnblogs

[28] 保护版权,用 Python 为图片添加盲水印 from 程序员客栈

[29] python多种方法压缩图片,opencv、PIL、tinypng、pngquant压缩图片 from CSDN

[30] Flask 使用流下载文件 from CSDN

[31] 解决:ImportError: numpy.core.multiarray failed to import from CSDN

[32] [Docker] 错误之ImportError: libGL.so.1: cannot open shared object file: No such file or directory from CSDN

[33] paddleocr多平台使用 from jieli

[34] paddlepaddle/PaddleHub报错cannot import name ‘_convert_attention_mask’ from ‘paddle.nn.layer.transformer from Gitee Issues

[35] googletrans出现httpcore._exceptions.ConnectError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。from CSDN

[36] InvalideEndpointException .net core 3 from Github

[37] MinIO 基于 python 把目录下的文件上传到 minio from 夏来风

[38] Python连接MINIO Api, 实现上传下载等功能 from 一只技术小白

[39] Python使用grequests并发发送请求 from 北京临渊

[40] Python 文本加密解密 中文TXT数据 from 简书

[41] Python调用playsound时报错:指定的设备未打开,或不被 MCI 所识别 from 程序员的秘密

[42] python 网络图片转base64 from CSDN

[43] python之base64编码出现b的问题 from 知乎

[44] Flask后端实践 连载三 接口标准化及全球化 from CSDN

[45] 使用 Flask-Docs 自动生成 Api 文档 from segmentfault

[46] 令人不悦的Error–requests.exceptions.ProxyError from CSDN

[47] python制作图片缩略图 from 51CTO

[48] Python 读写 ini 配置文件 from IS-RPA

[49] Python3.9使用pandas读取Excel报错 AttributeError: ‘ElementTree‘ object has no attribute ‘getiterator‘ from CSDN

[50] 使用 pre-request 优化 Flask 入参校验 from 稀土掘金

[51] python高级使用,包括多线程、协同编程、知识协同、网络、装饰器等 from Github

[52] 浅谈java中注解和python中装饰器的区别 from CSDN

[53] python 装饰器学习 校验权限 from CSDN

[54] DeOldify黑白旧照片着色神器:基于NoGAN的深度学习来实现旧照着色还原 from 佰阅部落

[55] 人工智能DeOldify修复黑白图片和视频 from Bilibili

[56] 黑白老照片上色,手把手教你用Python怎么玩儿!from 简书

[57] Google Colab免费GPU使用教程 from Rogn

[58] 一个基于深度学习的项目,用于着色和恢复旧图像和视频 from Github

[59] Colaboratory常见问题解答 from Google官方文档

[60] Python-在线网页导出为图片或pdf from 代码先锋网

[61] Pyppeteer Browser closed unexpectedly in heroku from GitHub Issues

[62] puppeteer Troubleshooting 官方文档 from Github

[63] 使用docker部署chrome无头浏览器并解决中文乱码,为pyppeteer提供运行环境 from CSDN

[64] python asynio错误 There is no current event loop in thread ‘Thread-1’ from CSDN

[65] Pyppeteer和Flask问题,服务器部署Requests_html问题,多线程调用pyppeteer或requests_html问题 from CSDN

[66] Puppeteer wait until page is completely loaded from stackoverflow

[67] Puppeteer–等待加载 from 博客园

[68] Generate the pdf with empty content occasionally from GitHub Issues