Flask接口封装及Python工具函数

  1. 1. Flask接口封装
    1. 1.1 Flask简介
    2. 1.2 Flask基本示例
    3. 1.3 Flask常见问题
      1. 1.3.1 Flask跨域问题
      2. 1.3.2 JSON解析问题
    4. 1.4 Flask常用操作
      1. 1.4.1 打印日志到控制台并写入文件
      2. 1.4.2 跨文件全局变量的定义与使用
      3. 1.4.3 使用Redis缓存数据
      4. 1.4.4 将数据保存到MySQL
      5. 1.4.5 查询Oracle的数据
    5. 1.5 Flask通用模板
  2. 2. Python工具函数
    1. 2.1 解析xlsx文件
    2. 2.2 解析txt文件
    3. 2.3 实现replaceAll功能
    4. 2.4 使用cv2库画图
    5. 2.5 根据md5进行文件去重
    6. 2.6 递归获取某目录下某后缀的文件路径
    7. 2.7 生成xml文件
    8. 2.8 复制某个文件并重命名
    9. 2.9 两个列表求差集并去重
    10. 2.10 统计列表中各个元素出现的次数
    11. 2.11 列表元素去重
    12. 2.12 新建csv文件并写入数据
    13. 2.13 操作csv文件实现对特定列排序
    14. 2.14 读取JSON文件里的配置信息
  3. 3. 参考资料

1. Flask接口封装

1.1 Flask简介

Flask是一个使用Python编写的轻量级Web应用框架。Flask最显著的特点是它是一个“微”框架,轻便灵活,但同时又易于扩展。默认情况下,Flask 只相当于一个内核,不包含数据库抽象层、用户认证、表单验证、发送邮件等其它Web框架经常包含的功能。Flask依赖用各种灵活的扩展来给Web应用添加额外功能。

与Django的对比:Django是一个开源的Python Web应用框架,采用了MVT的框架模式,即模型M,视图V和模版T。Django是一个”大而全”的重量级Web框架,其自带大量的常用工具和组件,甚至还自带了管理后台Admin,适合快速开发功能完善的企业级网站。

Flask项目地址:https://github.com/pallets/flask

1.2 Flask基本示例

下例简单示范了基于Flask,搭建 web 服务,并通过 POST 请求传递 JSON 格式数据的过程。

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import Flask, request, jsonify

# 创建一个服务
app = Flask(__name__)

# 创建一个接口 指定路由和请求方法 定义处理请求的函数
@app.route(rule='/', methods=['POST'])
def everything():
# 获取 JSON 格式的请求体 并解析
request_body = request.get_json()
print('Request info: ', request_body)
# 生成响应信息
response_info = {'msg': '收到'}
print('Response info:', response_info)
# 将响应信息转换为 JSON 格式
response_body = jsonify(response_info)
# 最终对请求进行相应
return response_body

if __name__ == '__main__':
# 启动服务 指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False)

test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import requests

# 定义一个函数 测试一个服务接口
def test_everything():
# 构造服务接口地址
url = 'http://localhost:{0}/'.format(5000)
# 构造请求体 请求体将被转换为 JSON 格式
request_body = {'msg': '请求'}
# 向指定服务接口发送 POST 请求
r0 = requests.post(url=url, json=request_body)
# 解析 JSON 格式的响应体 并打印
print('Response info:', r0.json())

if __name__ == '__main__':
test_everything()

1.3 Flask常见问题

1.3.1 Flask跨域问题

Step1:引入flask-cors库

1
$ pip install flask-cors

Step2:配置CORS

flask-cors 有两种用法,一种为全局使用,一种对指定的路由使用。

其中CORS提供了一些参数,常用的我们可以配置 originsmethodsallow_headerssupports_credentials

[1] 全局使用

1
2
3
4
5
from flask import Flask, request
from flask_cors import CORS

app = Flask(__name__)
CORS(app, supports_credentials=True)

[2] 局部使用

1
2
3
4
5
6
7
8
9
10
from flask import Flask, request
from flask_cors import cross_origin

app = Flask(__name__)

@app.route('/')
@cross_origin(supports_credentials=True)
def hello():
name = request.args.get("name", "World")
return f'Hello, {name}!'

1.3.2 JSON解析问题

1
request_body = request.get_json()

这种方法获取请求体中的JSON,有时会因为空格出现问题,导致请求400。为了避免这种情况,接参之后,可以对其去除全部空格。

1
2
3
request_data = request.get_data(as_text=True)
request_data = ''.join(request_data.split())
request_body = json.loads(request_data)

1.4 Flask常用操作

1.4.1 打印日志到控制台并写入文件

可以写一个日志输出配置类,代码如下:

log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./logs.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

使用时直接调用即可。

1
logger.info("logger.info")

1.4.2 跨文件全局变量的定义与使用

global关键字可以定义一个变量为全局变量,但是这个仅限于在一个文件中调用全局变量,跨文件就会报错。 既然在一个文件里面可以生效的话,那么我们就专门为全局变量定义一个“全局变量管理模块”就好了。

gol.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# -*- coding: utf-8 -*-

def _init():
global _global_dict
_global_dict = {}

def set_value(key, value):
""" 定义一个全局变量 """
_global_dict[key] = value

def get_value(key, defValue=None):
""" 获得一个全局变量,不存在则返回默认值 """
try:
return _global_dict[key]
except KeyError:
return defValue

定义处

1
2
3
4
5
import gol

gol._init() # 先必须在主模块初始化(只需要一次即可)
gol.set_value('name', 'zhangsan')
gol.set_value('age', 23)

调用处

1
2
3
4
import gol

name = gol.get_value('name')
age = gol.get_value('age')

1.4.3 使用Redis缓存数据

Step1:引入redis库

1
$ pip install redis

Step2:使用Redis

往redis存值

1
2
3
4
5
import redis

pool = redis.ConnectionPool(host='127.0.0.1', port='6379', password='123456')
r = redis.Redis(connection_pool=pool)
r.set('id', '666666')

从redis取值

1
2
3
4
5
import redis

pool = redis.ConnectionPool(host='127.0.0.1', port='6379', password='123456')
r = redis.Redis(connection_pool=pool)
get_value = r.get('id')

1.4.4 将数据保存到MySQL

引入pymysql库

1
$ pip install pymysql

将数据保存到MySQL示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- coding: utf-8 -*-

import pymysql

# MySQL数据库连接信息
HOST = "127.0.0.1"
USER = "test"
PASSWD = "123456"
PORT = 3306
DB = "testdb"
CHARSET = "utf8"

# 打开数据库连接
db = pymysql.connect(host=HOST, user=USER, passwd=PASSWD, port=PORT, db=DB, charset=CHARSET)
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# sql语句
sql = "INSERT INTO test_table\
(id, name, city)\
VALUES ('%s', '%s', '%s')" \
% ('001', 'zhangsan', 'beijing')

try:
# 打印实际执行的sql,调试使用
print(sql)
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except Exception:
# 出现错误时回滚
print(Exception)
db.rollback()
# 关闭数据库连接
db.close()

1.4.5 查询Oracle的数据

引入cx_Oracle库

1
$ pip install cx_Oracle

安装Oracle Instant Client

从Oracle里查询数据示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import cx_Oracle

# 指定Oracle Instant Client的目录
cx_Oracle.init_oracle_client(lib_dir="D:\\Development\\instantclient-basic-windows.x64-11.2.0.4.0")

# 执行查询sql
conn = cx_Oracle.connect("testuser", "123456", "127.0.0.1:1521/orcl")
curs = conn.cursor()
sql = "select a.id, a.title, a.text from test_table "
rr = curs.execute(sql)

# 将查询结果保存成字典
result_dir = {}
while(1):
rs = rr.fetchone()
if rs == None:
break
id = rs[0]
title = rs[1]
text = rs[2]

curs.close()
conn.close()

注意事项:

1、cx_Oracle.init_oracle_client()要写在Flask接口的外面,否则第二次接口请求时会报cx_Oracle已经初始化的错误。

2、Linux端部署的时候,会出现找不到libclntsh.so动态连接库的问题,报错如下:

1
cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loaded: "Error loading shared library libclntsh.so: No such file or directory". See https://oracle.github.io/odpi/doc/installation.html#linux for help

报错原因:instantclient-basic-linux.x64-11.2.0.4.0.zip包里根本没有libclntsh.so,有的是libclntsh.so.11.1,而单纯的给这个文件改个名是不行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
./instantclient_11_2:
|---BASIC_README
|---adrci
|---genezi
|---libclntsh.so.11.1
|---libnnz11.so
|---libocci.so.11.1
|---libociei.so
|---libocijdbc11.so
|---ojdbc5.jar
|---ojdbc6.jar
|---uidrvci
|---xstreeams.jar

解决办法:需要在Dockerfile里设置软链接解决(注意要用绝对路径)

1
2
ENV LD_LIBRARY_PATH=/home/instantclient_11_2
RUN ln -s /home/instantclient_11_2/libclntsh.so.11.1 /home/instantclient_11_2/libclntsh.so

1.5 Flask通用模板

为了方便日常功能开发,这里放一个自己平时用的通用模板,专注于业务逻辑的编写即可。

log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./logs.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# -*- coding: utf-8 -*-

import json
from log import logger
from flask import Flask, request, jsonify
from flask_cors import CORS

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 方法功能说明
"""
@app.route(rule='/moduleName/methodName', methods=['POST'])
def methodName():

# 获取JSON格式的请求体,并解析
request_data = request.get_data(as_text=True)
request_data = ''.join(request_data.split())
request_body = json.loads(request_data)

# 返回结果初始化
response_info = {}

# 若干参数校验模块
id = request_body.get("id")
if not id:
response_info["code"] = 500
response_info["msg"] = "请求参数里不存在id,请检查!"
response_info["data"] = None
return jsonify(response_info)

# 业务处理模块
result = "hello world!"

# 打印日志信息
logger.info("logger.info")
logger.warn("logger.warn")
logger.error("logger.error")

# 成功的结果返回
response_info["code"] = 200
response_info["msg"] = "请求成功"
data = {}
data["result"] = result
response_info["data"] = data
response_body = jsonify(response_info)

# 最终对请求进行响应
return response_body

if __name__ == '__main__':
# 启动服务 指定主机和端口
app.run(host='0.0.0.0', port=5000, debug=False)

注:如果是GET请求,修改两处即可

1
2
3
4
# 声明处
@app.route(rule='/moduleName/methodName', methods=['GET'])
# 接参处
id = request.args.get("id")

2. Python工具函数

2.1 解析xlsx文件

引入xlrd库

1
$ pip install xlrd==1.2.0

注:新版 xlrd 报 Excel xlsx file; not supported错误(原因:xlrd更新到了2.0.1版本,只支持.xls文件,不支持.xlsx)

读写操作xlsx示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding: utf-8 -*-

import openpyxl
import xlrd

# 将数据写入到xlsx文件里(新创建文件)
def write_xlsx(path, sheetname, value):
index = len(value)
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.title = sheetname
for i in range(0, index):
for j in range(0, len(value[i])):
sheet.cell(row=i+1, column=j+1, value=str(value[i][j]))
workbook.save(path)

# 将数据追加写入到xlsx文件里(已有文件追加写入)
def append_write_xlsx(path, sheetname, value):
workbook = openpyxl.load_workbook(path)
sheet = workbook[sheetname]
sheet.append(value)
workbook.save(path)

# 读取xlsx文件信息
def read_xlsx(path, sheetname):
wb = xlrd.open_workbook(path)
sh = wb.sheet_by_name(sheetname)
result = {}
for i in range(1, sh.nrows):
result[sh.row_values(i)[0]] = sh.row_values(i)[1]
return result

if __name__ == '__main__':

path = './test.xlsx'
sheetname = '测试'
head_value = [['id', 'name']]
body_value = ['001', 'zhangsan']

write_xlsx(path, sheetname, head_value)
append_write_xlsx(path, sheetname, body_value)
result = read_xlsx(path, sheetname)
print(result)

2.2 解析txt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# -*- coding: utf-8 -*-

# 按行追加写入txt文件(没有文件会新创建文件)
def write_content_to_txt(txt_path, content):
a = open(txt_path, 'a')
a.write(content + '\n')
a.close()

# 按行读取txt文件的内容,保存成列表
def read_txt_to_list(txt_path):
result = []
with open(txt_path, 'r') as f:
for line in f:
result.append(line.strip('\n'))
return result

if __name__ == '__main__':

txt_path = './test.txt'
write_content_to_txt(txt_path, 'zhangsan')
write_content_to_txt(txt_path, 'lisi')
result = read_txt_to_list(txt_path)
print(result)

2.3 实现replaceAll功能

1
2
3
4
5
# 实现replaceAll的
def replaceAll(input, toReplace, replaceWith):
while (input.find(toReplace) > -1):
input = input.replace(toReplace, replaceWith)
return input

2.4 使用cv2库画图

引入cv2库

1
$ pip install opencv-python

绘制矩形和直线示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding: utf-8 -*-

import numpy as np
import cv2 as cv

img = np.zeros((320, 320, 3), np.uint8) # 生成一个空灰度图像

# 绘制矩形
ptLeftTop = (60, 60)
ptRightBottom = (260, 260)
point_color = (0, 255, 0)
thickness = 1
lineType = 4
cv.rectangle(img, ptLeftTop, ptRightBottom, point_color, thickness, lineType)

# 绘制直线
ptStart = (60, 60)
ptEnd = (260, 260)
point_color = (0, 0, 255)
thickness = 1
lineType = 4
cv.line(img, ptStart, ptEnd, point_color, thickness, lineType)

cv.namedWindow("CV Test")
cv.imshow('CV Test', img)
cv.waitKey(5000) # 显示5000ms后消失,设置为0永不消失
cv.destroyAllWindows()

2.5 根据md5进行文件去重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import os
import hashlib

def get_md5(file):
file = open(file,'rb')
md5 = hashlib.md5(file.read())
file.close()
md5_values = md5.hexdigest()
return md5_values

if __name__ == '__main__':
file_path = "./data"
os.chdir(file_path)
file_list = os.listdir(file_path)
md5_list =[]
for file in file_list:
md5 = get_md5(file)
if md5 not in md5_list:
md5_list.append(md5)
else:
os.remove(file)

2.6 递归获取某目录下某后缀的文件路径

程序分为两步,第一步,采用递归的方式获得文件夹下所有文件的路径列表;第二步,从文件路径列表中根据后缀利用.endswith(后缀)的方法筛选指定文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import os

# 从指定path下递归获取所有文件
def getAllFile(path, fileList):
dirList = [] # 存放文件夹路径的列表
for ff in os.listdir(path):
wholepath = os.path.join(path, ff)
if os.path.isdir(wholepath):
dirList.append(wholepath) # 如果是文件添加到结果文件列表中
if os.path.isfile(wholepath):
fileList.append(wholepath) # 如果是文件夹,存到文件夹列表中
for dir in dirList:
getAllFile(dir, fileList) # 对于dirList列表中的文件夹,递归提取其中的文件,fileList一直在往下传,所有的文件路径都会被保存在这个列表中

# 从文件路径列表中筛选出指定后缀的文件
def getSufFilePath(fileList, suffix):
for ff in fileList[:]:
if not ff.endswith(suffix):
fileList.remove(ff)

if __name__ == '__main__':
flist = []
findpath = r'./testdir'
getAllFile(findpath, flist)
print('allfile:', len(flist)) # filepath下的文件总数
getSufFilePath(flist, '.txt')

print('Docfile:', len(flist)) # filepath下的指定类型文件总数(这里是.txt文件的数量)
for ff in flist:
print(ff)

2.7 生成xml文件

generate_xml.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# -*- coding: utf-8 -*-

from xml.etree.ElementTree import Element
from xml.etree.ElementTree import SubElement
from xml.etree.ElementTree import ElementTree

# 美化xml:elemnt为传进来的Elment类,参数indent用于缩进,newline用于换行
def pretty_xml(element, indent, newline, level=0):
# 判断element是否有子元素
if element:
# 如果element的text没有内容
if element.text == None or element.text.isspace():
element.text = newline + indent * (level + 1)
else:
element.text = newline + indent * (level + 1) + element.text.strip() + newline + indent * (level + 1)
temp = list(element) # 将elemnt转成list
for subelement in temp:
# 如果不是list的最后一个元素,说明下一个行是同级别元素的起始,缩进应一致
if temp.index(subelement) < (len(temp) - 1):
subelement.tail = newline + indent * (level + 1)
else: # 如果是list的最后一个元素, 说明下一行是母元素的结束,缩进应该少一个
subelement.tail = newline + indent * level
# 对子元素进行递归操作
pretty_xml(subelement, indent, newline, level=level + 1)


if __name__ == '__main__':

# generate root node
root = Element('root')
# generate first child-node head
head = SubElement(root, 'head')
# child-node of head node
title = SubElement(head, 'title')
title.text = "Title"
# generate second child-node body
body = SubElement(root, 'body')
body.text = "Content"
tree = ElementTree(root)

root = tree.getroot() # 得到根元素,Element类
pretty_xml(root, '\t', '\n') # 执行美化方法

# write out xml data
tree.write('result.xml', encoding = 'utf-8')

生成效果:

1
2
3
4
5
6
<root>
<head>
<title>Title</title>
</head>
<body>Content</body>
</root>

2.8 复制某个文件并重命名

1
2
3
4
5
6
7
# 复制某个文件并重命名:sample-原始文件路径、new_path-新文件目录、file_name-新文件名称
def copy_rename_file(sample,new_path,file_name):
if not os.path.exists(new_path):
os.makedirs(new_path)
new_file = os.path.join(new_path, file_name)
shutil.copy(sample, new_file)
return new_file

2.9 两个列表求差集并去重

1
2
3
4
# 两个列表求差集,在B中但不在A中(注:重复元素也会被去除)
def list_diff(listA, listB):
result = list(set(listB).difference(set(listA)))
return result

2.10 统计列表中各个元素出现的次数

1
2
3
4
5
6
from collections import Counter
test_list = [1, 2, 3, 1, 1, 2]
result = Counter(test_list)
print(result)

>>>{1: 3, 2: 2, 3: 1}

2.11 列表元素去重

1
2
3
old_list = [2, 1, 3, 4, 1]
new_list = list(set(old_list))
print(new_list)

2.12 新建csv文件并写入数据

1
2
3
4
5
6
7
import csv
def create_csv():
csv_path = "./test.csv"
with open(csv_path,'w', newline='', encoding='GBK') as f:
csv_write = csv.writer(f)
csv_head = ["good","bad"]
csv_write.writerow(csv_head)

注:newline=''是为了解决csv的隔行空行问题。选择GBK编码,否则使用Excel打开会出现乱码问题。

2.13 操作csv文件实现对特定列排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def sort_csv(csv_path):
datas = [] # 用于存放排序过的数据
with open(csv_path, 'r', encoding='GBK') as f:
table = []
index = 0
for line in f:
index = index + 1
if index == 1:
continue
col = line.split(',')
col[1] = int(col[1].strip("\n"))
table.append(col)
table_sorted = sorted(table, key=itemgetter(1), reverse=True) # 精确的按照第2列排序
for row in table_sorted:
datas.append(row)
f.close()
with open(csv_path, "w", newline='', encoding='GBK') as csvfile:
writer = csv.writer(csvfile)
csv_head = ["关键词", "词频"]
writer.writerow(csv_head)
for data in datas:
writer.writerow(data)
csvfile.close()

2.14 读取JSON文件里的配置信息

配置文件config.json:

1
2
3
4
5
{
"DB_URL": "127.0.0.1:1521/orcl",
"DB_USER": "test",
"DB_PASSWORD": "123456"
}

工具函数:

1
2
3
4
5
6
# 读取json里的配置信息
def read_conf(conf_path):
with open(conf_path,"r",encoding="utf-8") as f:
confstr = f.read()
conf = json.loads(confstr)
return conf

调用示例:

1
2
3
conf_path = './config/config.json'
conf = read_conf(conf_path)
conn = cx_Oracle.connect(conf['DB_USER'], conf['DB_PASSWORD'], conf['DB_URL'])

3. 参考资料

[1] Python 使用 flask 库传递 JSON 数据 from CSDN

[2] Python | Flask 解决跨域问题 from segmentfault

[3] flask小坑–request.json 无法获取请求body的json数据 from CSDN

[4] python清除字符串中间空格的方法 from CSDN

[5] Python 3 实现定义跨模块的全局变量和使用 from 博客园

[6] python 将爬取的数据存入mysql from 代码先锋网

[7] python操作mysql数据库(增,删,改,查)from CSDN

[8] 新版xlrd报 Excel xlsx file;not supported from CSDN

[9] Python md5去重图片文件 from 代码交流

[10] Python递归获取指定文件夹下所有指定后缀的文件路径 from CSDN

[11] Python3生成XML文件 from 知乎

[12] python 统计list中各个元素出现的次数 from CSDN

[13] python 读写csv文件(创建,追加,覆盖)from CSDN

[14] 用Python编写的CSV文件每行之间都有空行 from QAStack

[15] python操作csv文件实现对特定列排序 from 简书

[16] docker镜像alpine中安装oracle客户端 from 简书