使用Flask封装集成深度学习算法

  1. 1. Flask接口封装
    1. 1.1 Flask简介
    2. 1.2 Flask基本示例
    3. 1.3 Flask常见问题
      1. 1.3.1 Flask跨域问题
      2. 1.3.2 Flask中文乱码问题
      3. 1.3.3 JSON解析问题
    4. 1.4 Flask全局配置
      1. 1.4.1 打印日志到控制台并写入文件
      2. 1.4.2 跨文件全局变量的定义与使用
    5. 1.5 数据库的集成与使用
      1. 1.5.1 使用Redis缓存数据
      2. 1.5.2 将数据保存到MySQL
      3. 1.5.3 查询Oracle的数据
    6. 1.6 Flask通用模板
      1. 1.6.1 常规POST请求
      2. 1.6.2 常规GET请求
      3. 1.6.3 以base64格式传输图片
  2. 2. Python工具函数
    1. 2.1 解析xlsx文件
    2. 2.2 解析txt文件
    3. 2.3 解析csv文件
    4. 2.4 读取JSON文件里的配置信息
    5. 2.5 生成xml文件
    6. 2.6 解析yaml格式文件
    7. 2.7 列表元素去重及统计出现次数
    8. 2.8 比较数组是否完全相等
    9. 2.9 实现replaceAll功能
    10. 2.10 根据md5进行文件去重
    11. 2.11 递归获取某目录下某后缀的文件路径
    12. 2.12 文件及目录的基本操作
    13. 2.13 使用cv2库画图
  3. 3. 深度学习模型及算法
    1. 3.1 文本关键词及概要提取
    2. 3.2 图片OCR文本识别
    3. 3.3 文本内容审查
    4. 3.4 目标识别检测
    5. 3.5 文本合成语音
    6. 3.6 视频关键帧抽取
  4. 4. 项目的打包部署
    1. 4.1 Docker环境搭建
    2. 4.2 导出项目依赖
    3. 4.3 使用Docker部署Flask项目
    4. 4.4 依赖类库的安装部署说明
  5. 5. 参考资料

1. Flask接口封装

1.1 Flask简介

Flask是一个使用Python编写的轻量级Web应用框架。Flask最显著的特点是它是一个“微”框架,轻便灵活,但同时又易于扩展。默认情况下,Flask 只相当于一个内核,不包含数据库抽象层、用户认证、表单验证、发送邮件等其它Web框架经常包含的功能。Flask依赖用各种灵活的扩展来给Web应用添加额外功能。

与Django的对比:Django是一个开源的Python Web应用框架,采用了MVT的框架模式,即模型M,视图V和模版T。Django是一个”大而全”的重量级Web框架,其自带大量的常用工具和组件,甚至还自带了管理后台Admin,适合快速开发功能完善的企业级网站。

Flask项目地址:https://github.com/pallets/flask

1.2 Flask基本示例

下例简单示范了基于Flask,搭建 web 服务,并通过 POST 请求传递 JSON 格式数据的过程。(注:这个示例存在一些问题,下面会逐步提到,直接使用的话请使用“Falsk通用模版”里的代码)

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import Flask, request, jsonify

# 创建一个服务
app = Flask(__name__)

# 创建一个接口 指定路由和请求方法 定义处理请求的函数
@app.route(rule='/', methods=['POST'])
def everything():
# 获取 JSON 格式的请求体 并解析
request_body = request.get_json()
print('Request info: ', request_body)
# 生成响应信息
response_info = {'msg': '收到'}
print('Response info:', response_info)
# 将响应信息转换为 JSON 格式
response_body = jsonify(response_info)
# 最终对请求进行相应
return response_body

if __name__ == '__main__':
# 启动服务 指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False)

test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import requests

# 定义一个函数 测试一个服务接口
def test_everything():
# 构造服务接口地址
url = 'http://localhost:{0}/'.format(5000)
# 构造请求体 请求体将被转换为 JSON 格式
request_body = {'msg': '请求'}
# 向指定服务接口发送 POST 请求
r0 = requests.post(url=url, json=request_body)
# 解析 JSON 格式的响应体 并打印
print('Response info:', r0.json())

if __name__ == '__main__':
test_everything()

1.3 Flask常见问题

1.3.1 Flask跨域问题

Step1:引入flask-cors库

1
$ pip install flask-cors

Step2:配置CORS

flask-cors 有两种用法,一种为全局使用,一种对指定的路由使用。

其中CORS提供了一些参数,常用的我们可以配置 originsmethodsallow_headerssupports_credentials

[1] 全局使用

1
2
3
4
5
from flask import Flask, request
from flask_cors import CORS

app = Flask(__name__)
CORS(app, supports_credentials=True)

[2] 局部使用

1
2
3
4
5
6
7
8
9
10
from flask import Flask, request
from flask_cors import cross_origin

app = Flask(__name__)

@app.route('/')
@cross_origin(supports_credentials=True)
def hello():
name = request.args.get("name", "World")
return f'Hello, {name}!'

1.3.2 Flask中文乱码问题

[1] 发送请求乱码

不管是dump还是dumps,中文乱码加入ensure_ascii=False即可。

1
json.dump(content, f, ensure_ascii=False)

[2] 接收返回值乱码

接收返回值乱码问题,给app配置app.config[‘JSON_AS_ASCII’] = False即可。

1
2
3
if __name__ == "__main__":
app.config['JSON_AS_ASCII'] = False
app.run(host='0.0.0.0', port='5000')

1.3.3 JSON解析问题

1
request_body = request.get_json()

这种方法获取请求体中的JSON,有时会因为空格出现问题,导致请求400。为了避免这种情况,接参之后,可以对其去除全部空格。

1
2
3
request_data = request.get_data(as_text=True)
request_data = ''.join(request_data.split())
request_body = json.loads(request_data)

1.4 Flask全局配置

1.4.1 打印日志到控制台并写入文件

可以写一个日志输出配置类,代码如下:

log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./logs.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

使用时直接调用即可。

1
logger.info("logger.info")

1.4.2 跨文件全局变量的定义与使用

global关键字可以定义一个变量为全局变量,但是这个仅限于在一个文件中调用全局变量,跨文件就会报错。 既然在一个文件里面可以生效的话,那么我们就专门为全局变量定义一个“全局变量管理模块”就好了。

gol.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# -*- coding: utf-8 -*-

def _init():
global _global_dict
_global_dict = {}

def set_value(key, value):
""" 定义一个全局变量 """
_global_dict[key] = value

def get_value(key, defValue=None):
""" 获得一个全局变量,不存在则返回默认值 """
try:
return _global_dict[key]
except KeyError:
return defValue

定义处

1
2
3
4
5
import gol

gol._init() # 先必须在主模块初始化(只需要一次即可)
gol.set_value('name', 'zhangsan')
gol.set_value('age', 23)

调用处

1
2
3
4
import gol

name = gol.get_value('name')
age = gol.get_value('age')

1.5 数据库的集成与使用

1.5.1 使用Redis缓存数据

Step1:引入redis库

1
$ pip install redis

Step2:使用Redis

往redis存值

1
2
3
4
5
import redis

pool = redis.ConnectionPool(host='127.0.0.1', port='6379', password='123456')
r = redis.Redis(connection_pool=pool)
r.set('id', '666666')

从redis取值

1
2
3
4
5
import redis

pool = redis.ConnectionPool(host='127.0.0.1', port='6379', password='123456')
r = redis.Redis(connection_pool=pool)
get_value = r.get('id')

1.5.2 将数据保存到MySQL

引入pymysql库

1
$ pip install pymysql

将数据保存到MySQL示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- coding: utf-8 -*-

import pymysql

# MySQL数据库连接信息
HOST = "127.0.0.1"
USER = "test"
PASSWD = "123456"
PORT = 3306
DB = "testdb"
CHARSET = "utf8"

# 打开数据库连接
db = pymysql.connect(host=HOST, user=USER, passwd=PASSWD, port=PORT, db=DB, charset=CHARSET)
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# sql语句
sql = "INSERT INTO test_table\
(id, name, city)\
VALUES ('%s', '%s', '%s')" \
% ('001', 'zhangsan', 'beijing')

try:
# 打印实际执行的sql,调试使用
print(sql)
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except Exception:
# 出现错误时回滚
print(Exception)
db.rollback()
# 关闭数据库连接
db.close()

1.5.3 查询Oracle的数据

引入cx_Oracle库

1
$ pip install cx_Oracle

安装Oracle Instant Client

从Oracle里查询数据示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import cx_Oracle

# 指定Oracle Instant Client的目录
cx_Oracle.init_oracle_client(lib_dir="D:\\Development\\instantclient-basic-windows.x64-11.2.0.4.0")

# 执行查询sql
conn = cx_Oracle.connect("testuser", "123456", "127.0.0.1:1521/orcl")
curs = conn.cursor()
sql = "select a.id, a.title, a.text from test_table "
rr = curs.execute(sql)

# 将查询结果保存成字典
result_dir = {}
while(1):
rs = rr.fetchone()
if rs == None:
break
id = rs[0]
title = rs[1]
text = rs[2]

curs.close()
conn.close()

注意事项:

1、cx_Oracle.init_oracle_client()要写在Flask接口的外面,否则第二次接口请求时会报cx_Oracle已经初始化的错误。

2、Linux端部署的时候,会出现找不到libclntsh.so动态连接库的问题,报错如下:

1
cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loaded: "Error loading shared library libclntsh.so: No such file or directory". See https://oracle.github.io/odpi/doc/installation.html#linux for help

报错原因:instantclient-basic-linux.x64-11.2.0.4.0.zip包里根本没有libclntsh.so,有的是libclntsh.so.11.1,而单纯的给这个文件改个名是不行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
./instantclient_11_2:
|---BASIC_README
|---adrci
|---genezi
|---libclntsh.so.11.1
|---libnnz11.so
|---libocci.so.11.1
|---libociei.so
|---libocijdbc11.so
|---ojdbc5.jar
|---ojdbc6.jar
|---uidrvci
|---xstreeams.jar

解决办法:需要在Dockerfile里设置软链接解决(注意要用绝对路径)

1
2
ENV LD_LIBRARY_PATH=/home/instantclient_11_2
RUN ln -s /home/instantclient_11_2/libclntsh.so.11.1 /home/instantclient_11_2/libclntsh.so

1.6 Flask通用模板

为了方便日常功能开发,这里放一个自己平时用的通用模板,专注于业务逻辑的编写即可。

1.6.1 常规POST请求

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# -*- coding: utf-8 -*-

import json
from flask import Flask, request, jsonify
from flask_cors import CORS

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 方法功能说明
"""
@app.route(rule='/moduleName/methodName', methods=['POST'])
def methodName():

# 获取JSON格式的请求体,并解析
request_data = request.get_data(as_text=True)
request_data = ''.join(request_data.split())
request_body = json.loads(request_data)

# 返回结果初始化
response_info = {}

# 若干参数校验模块
id = request_body.get("id")
if not id:
response_info["code"] = 500
response_info["msg"] = "请求参数里不存在id,请检查!"
response_info["data"] = None
return jsonify(response_info)

# 业务处理模块
result = "hello world!"

# 成功的结果返回
response_info["code"] = 200
response_info["msg"] = "请求成功"
data = {}
data["result"] = result
response_info["data"] = data
response_body = jsonify(response_info)

# 最终对请求进行响应
return response_body

if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务,指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False)

1.6.2 常规GET请求

如果是GET请求,修改两处即可

1
2
3
4
# 声明处
@app.route(rule='/moduleName/methodName', methods=['GET'])
# 接参处
id = request.args.get("id")

1.6.3 以base64格式传输图片

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import base64
import requests
import json

if __name__ == '__main__':
# 测试请求
url = 'http://127.0.0.1:5000/moduleName/methodName'
f = open('./data/test.jpg', 'rb')
# base64编码
base64_data = base64.b64encode(f.read())
f.close()
base64_data = base64_data.decode()
# 传输的数据格式
data = {'img': base64_data}
# post传递数据
r = requests.post(url, data=json.dumps(data))
print(r.text)

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# -*- coding: utf-8 -*-

import os
from uuid import uuid1
from flask import Flask, request, jsonify
from flask_cors import CORS
import base64
import json

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

# 解析base64生成图像文件
def base64_to_img(image_b64, img_path):
imgdata = base64.b64decode(image_b64)
file = open(img_path, 'wb')
file.write(imgdata)
file.close()

@app.route(rule='/moduleName/methodName', methods=['POST'])
def checkImgContent():

# 返回结果初始化
response_info = {}

# 从请求中解析出图像的base64字符串
request_data = request.get_data(as_text=True)
request_data = ''.join(request_data.split())
request_body = json.loads(request_data)
image_b64 = request_body.get("img")
if not image_b64:
response_info["code"] = 500
response_info["msg"] = "请求参数里不存在img,请检查!"
response_info["result"] = None
return jsonify(response_info)

# 将base64字符串解析成图片保存
uuid = uuid1()
img_path = './img/{}.jpg'.format(uuid)
base64_to_img(image_b64, img_path)

# 下面对保存的图片进行若干处理
result = 'hello world'

# 处理完成后删除生成的图片文件
os.remove(img_path)

# 成功的结果返回
response_info["code"] = 200
response_info["msg"] = "请求成功"
data = {}
data["result"] = result
response_info["data"] = data
response_body = jsonify(response_info)

# 最终对请求进行响应
return response_body

if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务 指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False)

2. Python工具函数

2.1 解析xlsx文件

引入xlrd库

1
$ pip install xlrd==1.2.0

注:新版 xlrd 报 Excel xlsx file; not supported错误(原因:xlrd更新到了2.0.1版本,只支持.xls文件,不支持.xlsx)

读写操作xlsx示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding: utf-8 -*-

import openpyxl
import xlrd

# 将数据写入到xlsx文件里(新创建文件)
def write_xlsx(path, sheetname, value):
index = len(value)
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = sheetname
for i in range(0, index):
for j in range(0, len(value[i])):
sheet.cell(row=i+1, column=j+1, value=str(value[i][j]))
workbook.save(path)

# 将数据追加写入到xlsx文件里(已有文件追加写入)
def append_write_xlsx(path, sheetname, value):
workbook = openpyxl.load_workbook(path)
sheet = workbook[sheetname]
sheet.append(value)
workbook.save(path)

# 读取xlsx文件信息
def read_xlsx(path, sheetname):
wb = xlrd.open_workbook(path)
sh = wb.sheet_by_name(sheetname)
result = {}
for i in range(1, sh.nrows):
result[sh.row_values(i)[0]] = sh.row_values(i)[1]
return result

if __name__ == '__main__':

path = './test.xlsx'
sheetname = '测试'
head_value = [['id', 'name']]
body_value = ['001', 'zhangsan']

write_xlsx(path, sheetname, head_value)
append_write_xlsx(path, sheetname, body_value)
result = read_xlsx(path, sheetname)
print(result)

2.2 解析txt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# -*- coding: utf-8 -*-

# 按行追加写入txt文件(没有文件会新创建文件)
def write_content_to_txt(txt_path, content):
a = open(txt_path, 'a')
a.write(content + '\n')
a.close()

# 按行读取txt文件的内容,保存成列表
def read_txt_to_list(txt_path):
result = []
with open(txt_path, 'r') as f:
for line in f:
result.append(line.strip('\n'))
return result

if __name__ == '__main__':

txt_path = './test.txt'
write_content_to_txt(txt_path, 'zhangsan')
write_content_to_txt(txt_path, 'lisi')
result = read_txt_to_list(txt_path)
print(result)

2.3 解析csv文件

[1] 新建csv文件并写入数据

1
2
3
4
5
6
7
import csv
def create_csv():
csv_path = "./test.csv"
with open(csv_path,'w', newline='', encoding='GBK') as f:
csv_write = csv.writer(f)
csv_head = ["good","bad"]
csv_write.writerow(csv_head)

注:newline=''是为了解决csv的隔行空行问题。选择GBK编码,否则使用Excel打开会出现乱码问题。

[2] 操作csv文件实现对特定列排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def sort_csv(csv_path):
datas = [] # 用于存放排序过的数据
with open(csv_path, 'r', encoding='GBK') as f:
table = []
index = 0
for line in f:
index = index + 1
if index == 1:
continue
col = line.split(',')
col[1] = int(col[1].strip("\n"))
table.append(col)
table_sorted = sorted(table, key=itemgetter(1), reverse=True) # 精确的按照第2列排序
for row in table_sorted:
datas.append(row)
f.close()
with open(csv_path, "w", newline='', encoding='GBK') as csvfile:
writer = csv.writer(csvfile)
csv_head = ["关键词", "词频"]
writer.writerow(csv_head)
for data in datas:
writer.writerow(data)
csvfile.close()

2.4 读取JSON文件里的配置信息

配置文件config.json:

1
2
3
4
5
{
"DB_URL": "127.0.0.1:1521/orcl",
"DB_USER": "test",
"DB_PASSWORD": "123456"
}

工具函数:

1
2
3
4
5
6
# 读取json里的配置信息
def read_conf(conf_path):
with open(conf_path,"r",encoding="utf-8") as f:
confstr = f.read()
conf = json.loads(confstr) # 字符串转json
return conf

调用示例:

1
2
3
conf_path = './config/config.json'
conf = read_conf(conf_path)
conn = cx_Oracle.connect(conf['DB_USER'], conf['DB_PASSWORD'], conf['DB_URL'])

2.5 生成xml文件

generate_xml.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# -*- coding: utf-8 -*-

from xml.etree.ElementTree import Element
from xml.etree.ElementTree import SubElement
from xml.etree.ElementTree import ElementTree

# 美化xml:elemnt为传进来的Elment类,参数indent用于缩进,newline用于换行
def pretty_xml(element, indent, newline, level=0):
# 判断element是否有子元素
if element:
# 如果element的text没有内容
if element.text == None or element.text.isspace():
element.text = newline + indent * (level + 1)
else:
element.text = newline + indent * (level + 1) + element.text.strip() + newline + indent * (level + 1)
temp = list(element) # 将elemnt转成list
for subelement in temp:
# 如果不是list的最后一个元素,说明下一个行是同级别元素的起始,缩进应一致
if temp.index(subelement) < (len(temp) - 1):
subelement.tail = newline + indent * (level + 1)
else: # 如果是list的最后一个元素, 说明下一行是母元素的结束,缩进应该少一个
subelement.tail = newline + indent * level
# 对子元素进行递归操作
pretty_xml(subelement, indent, newline, level=level + 1)


if __name__ == '__main__':

# generate root node
root = Element('root')
# generate first child-node head
head = SubElement(root, 'head')
# child-node of head node
title = SubElement(head, 'title')
title.text = "Title"
# generate second child-node body
body = SubElement(root, 'body')
body.text = "Content"
tree = ElementTree(root)

root = tree.getroot() # 得到根元素,Element类
pretty_xml(root, '\t', '\n') # 执行美化方法

# write out xml data
tree.write('result.xml', encoding = 'utf-8')

生成效果:

1
2
3
4
5
6
<root>
<head>
<title>Title</title>
</head>
<body>Content</body>
</root>

2.6 解析yaml格式文件

将yaml文件转字典

1
2
3
4
5
import yaml

f = open('./config.yaml', 'r')
yaml_str = f.read()
config_dict = yaml.load(yaml_str, Loader=yaml.FullLoader)

将字典转成对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Dict(dict):
__setattr__ = dict.__setitem__
__getattr__ = dict.__getitem__

def dict2obj(dictObj):
if not isinstance(dictObj, dict):
return dictObj
d = Dict()
for k, v in dictObj.items():
d[k] = dict2obj(v)
return d

# 测试数据
params = {
"name": "login",
"params": {
"transactionId": "cc258bdb3dd4d6bba2",
"platformType": "第三方平台",
"uid": 9
}
}

# 转换字典成为对象,可以用"."方式访问对象属性
res = dict2obj(params)
print(res.name)
print(res.params.uid)

2.7 列表元素去重及统计出现次数

两个列表求差集并去重

1
2
3
4
# 两个列表求差集,在B中但不在A中(注:重复元素也会被去除)
def list_diff(listA, listB):
result = list(set(listB).difference(set(listA)))
return result

列表元素直接去重

1
2
3
4
5
6
# 列表元素直接去重
old_list = [2, 1, 3, 4, 1]
new_list = list(set(old_list))
print(new_list)

>>> [1,2,3,4]

统计列表中各个元素出现的次数

1
2
3
4
5
6
7
8
from collections import Counter

# 统计列表中各个元素出现的次数
test_list = [1, 2, 3, 1, 1, 2]
result = Counter(test_list)
print(result)

>>>{1: 3, 2: 2, 3: 1}

2.8 比较数组是否完全相等

1
2
3
4
5
6
7
import numpy as np

a = np.array([1,2,3])
b = np.array([1,2,3])
print((a==b).all())

>>> True

2.9 实现replaceAll功能

1
2
3
4
5
# 实现replaceAll的
def replaceAll(input, toReplace, replaceWith):
while (input.find(toReplace) > -1):
input = input.replace(toReplace, replaceWith)
return input

2.10 根据md5进行文件去重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import hashlib

# 获取md5值
def get_md5(file):
file = open(file,'rb')
md5 = hashlib.md5(file.read())
file.close()
md5_values = md5.hexdigest()
return md5_values

if __name__ == '__main__':
file_path = "./data"
os.chdir(file_path)
file_list = os.listdir(file_path)
md5_list =[]
for file in file_list:
md5 = get_md5(file)
if md5 not in md5_list:
md5_list.append(md5)
else:
os.remove(file)

2.11 递归获取某目录下某后缀的文件路径

程序分为两步,第一步,采用递归的方式获得文件夹下所有文件的路径列表;第二步,从文件路径列表中根据后缀利用.endswith(后缀)的方法筛选指定文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os

# 从指定path下递归获取所有文件
def getAllFile(path, fileList):
dirList = [] # 存放文件夹路径的列表
for ff in os.listdir(path):
wholepath = os.path.join(path, ff)
if os.path.isdir(wholepath):
dirList.append(wholepath) # 如果是文件添加到结果文件列表中
if os.path.isfile(wholepath):
fileList.append(wholepath) # 如果是文件夹,存到文件夹列表中
for dir in dirList:
getAllFile(dir, fileList) # 对于dirList列表中的文件夹,递归提取其中的文件,fileList一直在往下传,所有的文件路径都会被保存在这个列表中

# 从文件路径列表中筛选出指定后缀的文件
def getSufFilePath(fileList, suffix):
for ff in fileList[:]:
if not ff.endswith(suffix):
fileList.remove(ff)

if __name__ == '__main__':
flist = []
findpath = r'./testdir'
getAllFile(findpath, flist)
print('allfile:', len(flist)) # filepath下的文件总数
getSufFilePath(flist, '.txt')

print('Docfile:', len(flist)) # filepath下的指定类型文件总数(这里是.txt文件的数量)
for ff in flist:
print(ff)

2.12 文件及目录的基本操作

基本文件和目录操作

1
2
3
4
5
6
7
8
9
10
11
import os
import shutil

os.getcwd() # 获取当前路径
os.listdir() # 将当前目录的文件及目录保存成一个列表
os.path.exists(dir_path) # 检查文件目录是否存在
os.makedirs(dir_path) # 创建目录
os.chdir(dir_path) # 切换目录
os.remove(file_path) # 删除指定文件
os.removedirs(dir_path) # 删除空目录
shutil.rmtree(dir_path) # 递归删除目录(可为空,也可不为空)

复制某个文件并重命名

1
2
3
4
5
6
7
# 复制某个文件并重命名:sample-原始文件路径、new_path-新文件目录、file_name-新文件名称
def copy_rename_file(sample,new_path,file_name):
if not os.path.exists(new_path):
os.makedirs(new_path)
new_file = os.path.join(new_path, file_name)
shutil.copy(sample, new_file)
return new_file

2.13 使用cv2库画图

引入cv2库

1
$ pip install opencv-python

绘制矩形和直线示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-

import numpy as np
import cv2 as cv

img = np.zeros((320, 320, 3), np.uint8) # 生成一个空灰度图像

# 绘制矩形
ptLeftTop = (60, 60)
ptRightBottom = (260, 260)
point_color = (0, 255, 0)
thickness = 1
lineType = 4
cv.rectangle(img, ptLeftTop, ptRightBottom, point_color, thickness, lineType)

# 绘制直线
ptStart = (60, 60)
ptEnd = (260, 260)
point_color = (0, 0, 255)
thickness = 1
lineType = 4
cv.line(img, ptStart, ptEnd, point_color, thickness, lineType)

cv.namedWindow("CV Test")
cv.imshow('CV Test', img) # 显示绘图
cv.waitKey(5000) # 显示5000ms后消失,设置为0永不消失
cv.destroyAllWindows()

3. 深度学习模型及算法

3.1 文本关键词及概要提取

FastTextRank:从中文文本中提取摘要及关键词,并对算法时间复杂度进行了修改,计算图最大权节点的时间复杂度由o(n^2)降低到了o(n)。在有限的测试文本上,其运行速度相比于textrank4zh这个包快了8倍。算法原理见作者的知乎文章

依赖库安装:Numpy>=1.14.5 gensim>=3.5.0 FastTextRank==1.1

基本使用示例:KeyWord.py(提取关键字示例)、Sentence.py(提取摘要示例)

3.2 图片OCR文本识别

百度开源的图片OCR识别算法,这是它的官方使用教程:PaddleOCR使用教程,可以很方便的通过几行Python代码实现需求,模型会在初次执行时自动下载。

依赖库安装:paddlepaddle==2.2.0 paddleocr==2.0.1(注:paddleocr库在win环境下安装可能会出问题,需要Microsoft Visual C++ 14.0的环境,建议直接在linux环境下安装使用)

附:paddlepaddle的依赖环境安装

1
$ python -m pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple

基本使用示例:

1
2
3
4
5
6
7
8
9
from paddleocr import PaddleOCR

# Paddleocr目前支持的多语言语种可以通过修改lang参数进行切换
# 例如`ch`, `en`, `fr`, `german`, `korean`, `japan`
ocr = PaddleOCR(use_angle_cls=True, lang="ch") # need to run only once to download and load model into memory
img_path = './imgs/test.jpg'
result = ocr.ocr(img_path, cls=True)
for line in result:
print(line)

注:如果需要结果可视化、版面分析,需要另外安装相应的库,具体见官方教程。

3.3 文本内容审查

Sensitive-word:收集的一些敏感词汇,细分了暴恐词库、反动词库、民生词库、色情词库、贪腐词库、其他词库等。

将词库放到./dict/目录下,一个分类一个txt文件,文件名即为分类,词库内容为一行一个敏感词(无需分隔符)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-

import json
from flask import Flask, request, jsonify
from flask_cors import CORS
import jieba
import pandas as pd
import os
from flashtext import KeywordProcessor
global keyword_processor
keyword_processor = KeywordProcessor()

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 文本违规检测
"""
@app.route(rule='/textReview/checkTextContent', methods=['POST'])
def methodName():

# 获取JSON格式的请求体,并解析
request_data = request.get_data(as_text=True)
request_data = ''.join(request_data.split())
request_body = json.loads(request_data)

# 返回结果初始化
response_info = {}

# 若干参数校验模块
text = request_body.get("text")
if not text:
response_info["code"] = 500
response_info["msg"] = "请求参数里不存在text,请检查!"
response_info["images"] = None
return jsonify(response_info)

# 敏感词库路径
path = './dict/'

# 文件列表
files = []
for file in os.listdir(path):
if file.endswith(".txt"):
files.append(path + file)

# 遍历所有文件
for file in files:
filename = file.split("/")[2].split(".txt")[0]
datai = pd.read_csv(file, encoding='gbk')
for i in datai.values:
jieba.add_word(i.item())
keyword_processor.add_keyword(i.item(), filename)

def checkout(input):
seg = jieba.cut(input, cut_all=True)
input = " ".join(seg)
result = keyword_processor.extract_keywords(input)
result = list(set(result)) # 列表元素去重
return result

result = checkout(text)

if len(result) == 0:
result.append("正常")

# 成功的结果返回
response_info["code"] = 200
response_info["msg"] = "请求成功"
data = {}
data["result"] = result
response_info["data"] = data
response_body = jsonify(response_info)

# 最终对请求进行响应
return response_body


if __name__ == '__main__':
# 解决中文乱码问题
app.config['JSON_AS_ASCII'] = False
# 启动服务 指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False)

3.4 目标识别检测

yolov5:是一种单阶段目标检测算法,该算法在Yolov4的基础上添加了一些新的改进思路,使其速度与精度都得到了极大的性能提升。

这是一篇使用教程:教你利用yolov5训练自己的目标检测模型,详细介绍了如何使用yolov5训练自己的目标检测模型,数据集和预训练权重的准备部分也留了该作者相应的博客链接。

3.5 文本合成语音

谷歌开源的文本转语音 API 交互的 Python 库,虽然免费但生成的语音机器音较重,使用时需要联网(被墙,国内需要设置代理)

项目地址:https://github.com/pndurette/gTTS

1
2
3
4
5
6
7
8
9
10
# -*- coding: utf-8 -*-

from gtts import gTTS
import os
os.environ["https_proxy"] = "http://127.0.0.1:1080"

# 谷歌文字转语音API测试
with open("./demo.txt",'r',encoding="utf8") as f:
audio = gTTS(text=f.read(),lang="zh-cn")
audio.save("./demo.mp3")

注:如果未设置代理或者代理有问题,会报“Python GTTS / Failed to connect. Probable cause: Unknown”错误。

3.6 视频关键帧抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# -*- coding: utf-8 -*-

import cv2
import operator
import numpy as np
import os
from scipy.signal import argrelextrema


def smooth(x, window_len=13, window='hanning'):
"""使用具有所需大小的窗口使数据平滑。

This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the begining and end part of the output signal.
该方法是基于一个标度窗口与信号的卷积。
通过在两端引入信号的反射副本(具有窗口大小)来准备信号,
使得在输出信号的开始和结束部分中将瞬态部分最小化。
input:
x: the input signal输入信号
window_len: the dimension of the smoothing window平滑窗口的尺寸
window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
平坦的窗口将产生移动平均平滑
output:
the smoothed signal平滑信号

example:
import numpy as np
t = np.linspace(-2,2,0.1)
x = np.sin(t)+np.random.randn(len(t))*0.1
y = smooth(x)

see also:

numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
scipy.signal.lfilter

TODO: 如果使用数组而不是字符串,则window参数可能是窗口本身
"""
print(len(x), window_len)
s = np.r_[2 * x[0] - x[window_len:1:-1],
x, 2 * x[-1] - x[-1:-window_len:-1]]

if window == 'flat': # moving average平移
w = np.ones(window_len, 'd')
else:
w = getattr(np, window)(window_len)
y = np.convolve(w / w.sum(), s, mode='same')
return y[window_len - 1:-window_len + 1]


class Frame:
"""class to hold information about each frame
用于保存有关每个帧的信息
"""

def __init__(self, id, diff):
self.id = id
self.diff = diff

def __lt__(self, other):
if self.id == other.id:
return self.id < other.id
return self.id < other.id

def __gt__(self, other):
return other.__lt__(self)

def __eq__(self, other):
return self.id == other.id and self.id == other.id

def __ne__(self, other):
return not self.__eq__(other)


def rel_change(a, b):
x = (b - a) / max(a, b)
print(x)
return x


def getEffectiveFrame(videopath, dir):
# 如果文件目录不存在则创建目录
if not os.path.exists(dir):
os.makedirs(dir)
(filepath, tempfilename) = os.path.split(videopath) # 分离路径和文件名
(filename, extension) = os.path.splitext(tempfilename) # 区分文件的名字和后缀
# Setting fixed threshold criteria设置固定阈值标准
USE_THRESH = False
# fixed threshold value固定阈值
THRESH = 0.8
# Setting fixed threshold criteria设置固定阈值标准
USE_TOP_ORDER = False
# Setting local maxima criteria设置局部最大值标准
USE_LOCAL_MAXIMA = True
# Number of top sorted frames排名最高的帧数
NUM_TOP_FRAMES = 50
# smoothing window size平滑窗口大小
len_window = int(50)

print("target video :" + videopath)
print("frame save directory: " + dir)
# load video and compute diff between frames加载视频并计算帧之间的差异
cap = cv2.VideoCapture(str(videopath))
prev_frame = None
frame_diffs = []
frames = []
success, frame = cap.read()
i = 0
while (success):
luv = cv2.cvtColor(frame, cv2.COLOR_BGR2LUV)
curr_frame = luv
if curr_frame is not None and prev_frame is not None:
# logic here
diff = cv2.absdiff(curr_frame, prev_frame) # 获取差分图
diff_sum = np.sum(diff)
diff_sum_mean = diff_sum / (diff.shape[0] * diff.shape[1]) # 平均帧
frame_diffs.append(diff_sum_mean)
frame = Frame(i, diff_sum_mean)
frames.append(frame)
prev_frame = curr_frame
i = i + 1
success, frame = cap.read()
cap.release()

# compute keyframe
keyframe_id_set = set()
if USE_TOP_ORDER:
# sort the list in descending order以降序对列表进行排序
frames.sort(key=operator.attrgetter("diff"), reverse=True) # 排序operator.attrgetter
for keyframe in frames[:NUM_TOP_FRAMES]:
keyframe_id_set.add(keyframe.id)
if USE_THRESH:
print("Using Threshold") # 使用阈值
for i in range(1, len(frames)):
if (rel_change(np.float(frames[i - 1].diff), np.float(frames[i].diff)) >= THRESH):
keyframe_id_set.add(frames[i].id)
if USE_LOCAL_MAXIMA:
print("Using Local Maxima") # 使用局部极大值
diff_array = np.array(frame_diffs)
sm_diff_array = smooth(diff_array, len_window) # 平滑
frame_indexes = np.asarray(argrelextrema(sm_diff_array, np.greater))[0] # 找极值
for i in frame_indexes:
keyframe_id_set.add(frames[i - 1].id) # 记录极值帧数

# save all keyframes as image将所有关键帧另存为图像
cap = cv2.VideoCapture(str(videopath))
success, frame = cap.read()
idx = 0
num = 0
while (success):
if idx in keyframe_id_set:
num = num + 1
name = filename + '_' + str(num) + ".jpg"
cv2.imwrite(dir + name, frame)
keyframe_id_set.remove(idx)
idx = idx + 1
success, frame = cap.read()
cap.release()


if __name__ == "__main__":
videopath = './data/demo.mp4' # Video path of the source file源文件的视频路径
dir = './data/keyframe/' # Directory to store the processed frames存储已处理帧的目录
getEffectiveFrame(videopath, dir)

4. 项目的打包部署

一般使用Docker来部署Flask项目,它的基本概念及使用就不再赘述了,不会的话见我的另一篇博客:VPS基本部署环境的搭建与配置

4.1 Docker环境搭建

1
2
3
4
$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh - # 安装docker
$ sudo systemctl start docker # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)

4.2 导出项目依赖

方法一:使用pip freeze命令导出所有依赖,再进行筛选。

1
$ pip freeze > requirements.txt

注:建议对项目单独建一个conda虚拟环境,再导出依赖,这样导出的依赖就这一个项目的,就不用手动删除无用的了。

方法二:使用pipreqs库导出本项目的依赖,生成的也是requirements.txt文件。

1
2
3
$ pip install pipreqs
$ cd /root/test-project // 切换到项目根目录
$ pipreqs ./ --encoding=utf8 // 需要带上编码的指定,否则会报GBK编码错误

注意这里还有个坑:requests.exceptions.SSLError: HTTPSConnectionPool(host='pypi.python.org', port=443): Max retries exceeded with url: /pypi/cv2/json (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1125)')))

这是因为本机开了翻墙代理导致的,把代理软件关了就好了。

4.3 使用Docker部署Flask项目

编写Dockerfile,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 基于python3.7镜像创建新镜像
FROM python:3.7
# 创建容器内部目录
RUN mkdir /code
# 将项目复制到内部目录
ADD test-project /code/
# 切换到工作目录
WORKDIR /code
# 安装项目依赖
RUN pip install -r requirements.txt
# 放行端口
EXPOSE 5000
# 启动项目
ENTRYPOINT ["nohup","python","server.py","&"]

Step2:将项目和Dockerfile上传到服务器并制作镜像运行容器,示例如下:

1
2
3
$ cd /root/deploy                                                       // 切换到存放项目和Dockerfile的目录
$ docker build -t test-flask-image . // 使用Dockerfile构建镜像
$ docker run -d -p 5000:5000 --name test-flask test-flask-image:latest // 通过镜像运行容器

我们可以打包导出镜像,方便迁移到其他服务器上部署。

1
$ docker save test-image > test-image.v1.dockerimage  

4.4 依赖类库的安装部署说明

[1] pyhanlp:中文分词 词性标注 命名实体识别 依存句法分析 新词发现 关键词短语提取 自动摘要 文本分类聚类 拼音简繁 自然语言处理

pyhanlp依赖的运行需要JVM环境,因此部署时应在 Dockerfile 里添加 jdk 并完成配置。安装包去官网下载Linux版的,放到项目里即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
# 切换到工作目录
WORKDIR /usr
# 创建目录
RUN mkdir /usr/local/java
# 将jdk复制到内部目录并自动解压
ADD jdk-8u202-linux-x64.tar.gz /usr/local/java
# 设置软链接
RUN ln -s /usr/local/java/jdk1.8.0_202 /usr/local/java/jdk
# 设置环境变量
ENV JAVA_HOME /usr/local/java/jdk
ENV JRE_HOME ${JAVA_HOME}/jre
ENV CLASSPATH .:${JAVA_HOME}/lib:${JRE_HOME}/lib
ENV PATH ${JAVA_HOME}/bin:$PATH

[2] PyTorch:一种开源机器学习框架。

直接使用 pip 安装有时会出问题,建议去找对应版本的 whl 包进行安装。

1
$ pip install torch==1.10.0+cpu torchvision==0.11.1+cpu torchaudio==0.10.0+cpu -f https://download.pytorch.org/whl/cpu/torch_stable.html

另注:PyTorch官网,这个库的国内pip源不好使,如果要直接安的话需要挂代理下载。

[3] tensorflow:一个端到端开源机器学习平台。

直接使用 pip 安装有时会出问题,可以去 tensorflow官网 找对应版本的 whl 包进行安装。

1
2
$ wget https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow_cpu-2.6.0-cp37-cp37m-manylinux2010_x86_64.whl
$ pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow_cpu-2.6.0-cp37-cp37m-manylinux2010_x86_64.whl

5. 参考资料

[1] Python 使用 flask 库传递 JSON 数据 from CSDN

[2] Python | Flask 解决跨域问题 from segmentfault

[3] flask小坑–request.json 无法获取请求body的json数据 from CSDN

[4] python清除字符串中间空格的方法 from CSDN

[5] Python 3 实现定义跨模块的全局变量和使用 from 博客园

[6] python 将爬取的数据存入mysql from 代码先锋网

[7] python操作mysql数据库(增,删,改,查)from CSDN

[8] 新版xlrd报 Excel xlsx file;not supported from CSDN

[9] Python md5去重图片文件 from 代码交流

[10] Python递归获取指定文件夹下所有指定后缀的文件路径 from CSDN

[11] Python3生成XML文件 from 知乎

[12] python 统计list中各个元素出现的次数 from CSDN

[13] python 读写csv文件(创建,追加,覆盖)from CSDN

[14] 用Python编写的CSV文件每行之间都有空行 from QAStack

[15] python操作csv文件实现对特定列排序 from 简书

[16] docker镜像alpine中安装oracle客户端 from 简书

[17] 解决flask接口返回的内容中文乱码的问题 from CSDN

[18] json dump 中文乱码 from CSDN

[19] 文本摘要/关键词TextRank算法的优化与思考 from 知乎

[20] PaddleOCR快速开始 form Github

[21] 教你利用yolov5训练自己的目标检测模型 from CSDN

[22] python导出项目依赖包与导入项目依赖包 from CSDN

[23] requests.exceptions.SSLError: HTTPSConnectionPool 解决方案 from CSDN

[24] python 使用代理的几种方式 from CSDN

[25] python3 | pip install dlib报错 from CSDN

[26] python字典转为对象,用”.”方式访问对象属性 from 博客园

[27] 视频抽帧那点事 from cnblogs