本文最后编辑于 前,其中的内容可能需要更新。
1. Flask接口封装 1.1 Flask简介 Flask是一个使用Python编写的轻量级Web应用框架。Flask最显著的特点是它是一个“微”框架,轻便灵活,但同时又易于扩展。默认情况下,Flask 只相当于一个内核,不包含数据库抽象层、用户认证、表单验证、发送邮件等其它Web框架经常包含的功能。Flask依赖用各种灵活的扩展来给Web应用添加额外功能。
与Django的对比:Django是一个开源的Python Web应用框架,采用了MVT的框架模式,即模型M,视图V和模版T。Django是一个”大而全”的重量级Web框架,其自带大量的常用工具和组件,甚至还自带了管理后台Admin,适合快速开发功能完善的企业级网站。
Flask项目地址:https://github.com/pallets/flask
1.2 Flask通用模板 为了方便日常功能开发,这里放一个自己平时用的 Flask 通用模板,专注于业务逻辑的编写即可。
完整示例代码已在Github上开源:https://github.com/Logistic98/flask-demo
1.2.1 常规POST请求 server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from flask import Flask, jsonifyfrom flask_cors import CORSfrom pre_request import pre, Rulefrom log import loggerfrom code import ResponseCode, ResponseMessageapp = Flask(__name__) CORS(app, supports_credentials=True ) """ # 方法功能说明 """ @app.route(rule='/api/moduleName/methodName' , methods=['POST' ] ) def methodName (): rule = { "text" : Rule(type =str , required=True , gte=3 , lte=255 ), "type" : Rule(type =int , required=True , gte=1 , lte=1 ) } try : params = pre.parse(rule=rule) except Exception as e: logger.error(e) fail_response = dict (code=ResponseCode.RARAM_FAIL, msg=ResponseMessage.RARAM_FAIL, data=None ) logger.error(fail_response) return jsonify(fail_response) text = params.get("text" ) result = text + ",hello world!" logger.info("测试日志记录" ) success_response = dict (code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS, data=result) logger.info(success_response) return jsonify(success_response) if __name__ == '__main__' : app.config['JSON_AS_ASCII' ] = False app.run(host='0.0.0.0' , port=5000 , debug=False , threaded=True )
log.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import logginglogger = logging.getLogger(__name__) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console = logging.StreamHandler() console.setLevel(logging.INFO) console.setFormatter(formatter) logger.addHandler(console) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("./server.log" ) handler.setLevel(logging.INFO) handler.setFormatter(formatter) logger.addHandler(handler)
code.py
1 2 3 4 5 6 7 8 9 10 11 12 13 class ResponseCode (object ): SUCCESS = 200 RARAM_FAIL = 400 BUSINESS_FAIL = 500 class ResponseMessage (object ): SUCCESS = "请求成功" RARAM_FAIL = "参数校验失败" BUSINESS_FAIL = "业务处理失败"
response.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from code import ResponseMessage, ResponseCodeclass ResMsg (object ): """ 封装响应文本 """ def __init__ (self, data=None , code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS ): self._data = data self._msg = msg self._code = code def update (self, code=None , data=None , msg=None ): """ 更新默认响应文本 :param code:响应状态码 :param data: 响应数据 :param msg: 响应消息 :return: """ if code is not None : self._code = code if data is not None : self._data = data if msg is not None : self._msg = msg def add_field (self, name=None , value=None ): """ 在响应文本中加入新的字段,方便使用 :param name: 变量名 :param value: 变量值 :return: """ if name is not None and value is not None : self.__dict__[name] = value @property def data (self ): """ 输出响应文本内容 :return: """ body = self.__dict__ body["data" ] = body.pop("_data" ) body["msg" ] = body.pop("_msg" ) body["code" ] = body.pop("_code" ) return body
1.2.2 常规GET请求 如果是GET请求,修改两处即可
1 2 3 4 @app.route(rule='/moduleName/methodName' , methods=['GET' ] ) id = request.args.get("id" )
1.2.3 以base64格式传输图片 client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import base64import requestsimport jsonif __name__ == '__main__' : url = 'http://127.0.0.1:5000/moduleName/methodName' f = open ('./data/test.jpg' , 'rb' ) base64_data = base64.b64encode(f.read()) f.close() base64_data = base64_data.decode() data = {'img' : base64_data} r = requests.post(url, data=json.dumps(data)) print (r.text.encode().decode('unicode_escape' ))
注:使用.encode().decode('unicode_escape')
是为了解决中文乱码问题
log.py、code.py与response.py同上,server.py如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import osfrom uuid import uuid1from flask import Flask, jsonifyfrom flask_cors import CORSfrom pre_request import pre, Rulefrom code import ResponseCode, ResponseMessagefrom log import loggerfrom utils import base64_to_imgapp = Flask(__name__) CORS(app, supports_credentials=True ) """ # 方法功能说明 """ @app.route(rule='/api/moduleName/methodName' , methods=['POST' ] ) def methodName (): rule = { "img" : Rule(type =str , required=True ), "ext" : Rule(type =str , required=False ) } try : params = pre.parse(rule=rule) except Exception as e: logger.error(e) fail_response = dict (code=ResponseCode.RARAM_FAIL, msg=ResponseMessage.RARAM_FAIL, data=None ) logger.error(fail_response) return jsonify(fail_response) image_b64 = params.get("img" ) ext = params.get("ext" ) if not os.path.exists('./img' ): os.makedirs('./img' ) uuid = uuid1() if ext is not None : img_path = './img/{}.{}' .format (uuid, ext) else : img_path = './img/{}.jpg' .format (uuid) try : base64_to_img(image_b64, img_path) except Exception as e: logger.error(e) fail_response = dict (code=ResponseCode.BUSINESS_FAIL, msg=ResponseMessage.BUSINESS_FAIL, data=None ) logger.error(fail_response) return jsonify(fail_response) result = image_b64 logger.info("测试日志记录" ) os.remove(img_path) success_response = dict (code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS, data=result) logger.info(success_response) return jsonify(success_response) if __name__ == '__main__' : app.config['JSON_AS_ASCII' ] = False app.run(host='0.0.0.0' , port=5000 , debug=False , threaded=True )
1.2.4 以文件的形式传输 1 2 3 4 5 6 7 8 9 10 11 12 import timeimport requestsif __name__ == '__main__' : url = 'http://127.0.0.1:5000/moduleName/methodName' img_path = './data/test.jpg' files = {'image' : open (img_path, "rb" )} r = requests.post(url, files=files) end_time = time.time() print (r.text.encode().decode('unicode_escape' ))
1.3 Flask常见问题 1.3.1 Flask跨域问题 Step1:引入flask-cors库
1 $ pip install flask-cors
Step2:配置CORS
flask-cors 有两种用法,一种为全局使用,一种对指定的路由使用。
其中CORS提供了一些参数,常用的我们可以配置 origins
、methods
、allow_headers
、supports_credentials
。
[1] 全局使用
1 2 3 4 5 from flask import Flask, requestfrom flask_cors import CORSapp = Flask(__name__) CORS(app, supports_credentials=True )
[2] 局部使用
1 2 3 4 5 6 7 8 9 10 from flask import Flask, requestfrom flask_cors import cross_originapp = Flask(__name__) @app.route('/' ) @cross_origin(supports_credentials=True ) def hello (): name = request.args.get("name" , "World" ) return f'Hello, {name} !'
1.3.2 Flask中文乱码问题 [1] 发送请求乱码
不管是dump还是dumps,中文乱码加入ensure_ascii=False
即可。
1 json.dump(content, f, ensure_ascii=False)
[2] 接收返回值乱码
接收返回值乱码问题,给app配置app.config[‘JSON_AS_ASCII’] = False
即可。
1 2 3 if __name__ == "__main__" : app.config['JSON_AS_ASCII' ] = False app.run(host='0.0.0.0' , port='5000' )
1.3.3 JSON解析问题 1 request_body = request.get_json()
这种方法获取请求体中的JSON,有时会因为空格出现问题,导致请求400。为了避免这种情况,接参之后,可以对其去除全部空格。
1 2 3 request_data = request.get_data(as_text=True ) request_data = '' .join(request_data.split()) request_body = json.loads(request_data)
注:如果入参里要保留空格,则不能通过此方式来处理。
1.3.4 Flask并发调用问题 服务端:通过设置app.run()的参数,来达到多线程的效果。多进程或多线程只能选择一个,不能同时开启。
1 2 3 4 app.run(threaded=True ) app.run(processes=True )
客户端:通过grequests进行并发请求。
requests是Python发送接口请求非常好用的一个三方库,由K神编写,简单,方便上手快。但是requests发送请求是串行的,即阻塞的。发送完一条请求才能发送另一条请求。为了提升测试效率,一般我们需要并行发送请求。这里可以使用多线程,或者协程,gevent或者aiohttp,然而使用起来,都相对麻烦。
grequests是K神基于gevent+requests编写的一个并发发送请求的库,使用起来非常简单。
项目地址:https://github.com/spyoungtech/grequests
依赖安装:
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import timeimport grequestsstart = time.time() req_list = [grequests.post('http://httpbin.org/post' , data={'a' :1 , 'b' :2 }) for i in range (10 )] res_list = grequests.map (req_list) result_list = [] for res in res_list: result_list.append(res.text) print (result_list)print (len (result_list))print (time.time()-start)
1.3.5 base64编码出现b的问题 去除的方法:[1] decode为utf-8编码、[2] str转化为utf-8编码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import base64before_base64 = 'abc' .encode() after_base64 = base64.b64encode(before_base64) print (after_base64)method_one_base64 = after_base64.decode('utf-8' ) print (method_one_base64)method_two_base64 = str (after_base64, 'utf-8' ) print (method_two_base64)>>> b'YWJj' >>> YWJj>>> YWJj
1.3.6 字典中文转码问题 尝试过.encode().decode('unicode_escape')
、# -*- coding: utf-8 -*-
、str()
等方式仍然不行,最后将字典改成json格式,使用.get("key")
方式取值解决了问题。
1 2 3 4 5 6 7 8 9 10 11 12 import jsond = {'s' : '测试' , 'd' : u'\u4ea4\u6362\u673a' } d1 = json.dumps(d1) print (d1)d2 = json.loads(j) print (d2)>>> {"s" : "\u6d4b\u8bd5" , "d" : "\u4ea4\u6362\u673a" }>>> {'s' : '测试' , 'd' : '交换机' }
1.3.7 请求时出现ProxyError问题 使用request请求时有时会遇到requests.exceptions.ProxyError
报错,请求时禁用系统代理即可解决此问题。
1 2 proxies = { "http": None, "https": None} requests.get("url", proxies=proxies)
1.4 Flask全局配置 1.4.1 打印日志到控制台并写入文件 方式一:只将日志写入文件,控制台不打印的话,在文件开头加上这个即可
1 2 3 4 5 6 import logginglogging.basicConfig(filename='server.log' , level=logging.INFO, format ='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__)
方式二:打印日志到控制台并写入文件,可以写一个日志输出配置类 log.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import logginglogger = logging.getLogger(__name__) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console = logging.StreamHandler() console.setLevel(logging.INFO) console.setFormatter(formatter) logger.addHandler(console) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("./server.log" ) handler.setLevel(logging.INFO) handler.setFormatter(formatter) logger.addHandler(handler)
使用时直接调用即可。
1 logger.info("logger.info")
1.4.2 Flask全局统一封装返回值格式 当前主流的 Web 应用开发通常采用前后端分离模式,前端和后端各自独立开发,然后通过数据接口沟通前后端,完成项目。定义一个统一的数据下发格式,有利于提高项目开发效率,减少各端开发沟通成本。对Flask全局统一封装返回值格式可以减少大量重复代码。
code.py
1 2 3 4 5 6 7 8 9 class ResponseCode (object ): SUCCESS = 200 FAIL = 500 class ResponseMessage (object ): SUCCESS = "请求成功" FAIL = "请求失败"
response.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from code import ResponseMessage, ResponseCodeclass ResMsg (object ): """ 封装响应文本 """ def __init__ (self, data=None , code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS ): self._data = data self._msg = msg self._code = code def update (self, code=None , data=None , msg=None ): """ 更新默认响应文本 :param code:响应状态码 :param data: 响应数据 :param msg: 响应消息 :return: """ if code is not None : self._code = code if data is not None : self._data = data if msg is not None : self._msg = msg def add_field (self, name=None , value=None ): """ 在响应文本中加入新的字段,方便使用 :param name: 变量名 :param value: 变量值 :return: """ if name is not None and value is not None : self.__dict__[name] = value @property def data (self ): """ 输出响应文本内容 :return: """ body = self.__dict__ body["data" ] = body.pop("_data" ) body["msg" ] = body.pop("_msg" ) body["code" ] = body.pop("_code" ) return body
test_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from flask import Flask, jsonifyfrom flask_cors import CORSfrom code import ResponseCode, ResponseMessageapp = Flask(__name__) CORS(app, supports_credentials=True ) @app.route("/test" ,methods=["GET" ] ) def test (): test_dict = dict (name="zhang" ,age=18 ) data = dict (code = ResponseCode.SUCCESS, msg = ResponseMessage.SUCCESS, data = test_dict) return jsonify(data) if __name__ == '__main__' : app.config['JSON_AS_ASCII' ] = False app.run(host='0.0.0.0' , port=5000 , debug=False , threaded=True )
1.4.3 使用pre-request校验Flask入参 项目介绍:用于验证请求参数的 python 框架,专为 Flask 设计
项目地址:https://github.com/Eastwu5788/pre-request
官方文档:https://pre-request.readthedocs.io/en/master/index.html
依赖安装:pip install pre-request
使用示例:先定义一个 rule 字典,然后使用 params = pre.parse(rule=rule) 校验参数,之后取值使用 params.get(“xxx”) 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from flask import Flaskfrom flask_cors import CORSfrom pre_request import pre, Ruleapp = Flask(__name__) CORS(app, supports_credentials=True ) rule = { "userName" : Rule(type =str , required=True , gte=3 , lte=20 , dest="user_name" ), "gender" : Rule(type =int , required=True , enum=[1 , 2 ]), "age" : Rule(type =int , required=True , gte=18 , lte=60 ), "country" : Rule(type =str , required=False , gte=2 , default="中国" ) } @app.route("/user/info" , methods=["POST" ] ) def user_info_handler (): params = pre.parse(rule=rule) userName = params.get("userName" ) gender = params.get("gender" ) age = params.get("age" ) country = params.get("country" ) return "success" if __name__ == '__main__' : app.config['JSON_AS_ASCII' ] = False app.run(host='0.0.0.0' , port=5000 , debug=False , threaded=True )
1.4.4 Flask-Doc生成接口文档 基本介绍:Flask-Doc 可以根据代码注释生成文档页面,支持Markdown、离线文档下载、在线调试。
项目地址:https://github.com/kwkwc/flask-docs
官方文档:https://github.com/kwkwc/flask-docs/blob/master/README.zh-CN.md
依赖安装:pip install Flask-Docs
使用示例:examples目录里有官方示例,参照里面的sample_app.py编写即可,以下是一些配置项。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
文档效果:
1.4.5 跨文件全局变量的定义与使用 global关键字可以定义一个变量为全局变量,但是这个仅限于在一个文件中调用全局变量,跨文件就会报错。 既然在一个文件里面可以生效的话,那么我们就专门为全局变量定义一个“全局变量管理模块”就好了。
gol.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def _init (): global _global_dict _global_dict = {} def set_value (key, value ): """ 定义一个全局变量 """ _global_dict[key] = value def get_value (key, defValue=None ): """ 获得一个全局变量,不存在则返回默认值 """ try : return _global_dict[key] except KeyError: return defValue
定义处
1 2 3 4 5 import golgol._init() gol.set_value('name' , 'zhangsan' ) gol.set_value('age' , 23 )
调用处
1 2 3 4 import golname = gol.get_value('name' ) age = gol.get_value('age' )
2. 深度学习模型及算法 我对一些通用的开源算法使用Flask进行了封装集成,项目地址:yoyo-algorithm
2.1 百度飞桨Paddle 飞桨(PaddlePaddle)以百度多年的深度学习技术研究和业务应用为基础,集深度学习核心训练和推理框架、基础模型库、端到端开发套件、丰富的工具组件于一体,是中国首个自主研发、功能完备、开源开放的产业级深度学习平台。
使用Paddle系列的算法,需要统一安装 paddlepaddle 库,具体模块再安装对应模块的库即可。
统一说明:Paddle系列的库包和算法模型都需要关闭翻墙代理工具,算法模型会在代码初次执行时自动下载(存放在C:\Users\xxx\.paddlenlp
目录下),所以初次执行耗时会长一些。
1 $ pip install paddlepaddle==2.2.0 -i https://mirror.baidu.com/pypi/simple
注:PaddleNLP 要求 paddlepaddle >= 2.2,如果根据 PaddleOCR 要求的 paddlepaddle >=2.0.1 而安装的是2.0.1版本,前者会报错:cannot import name '_convert_attention_mask' from 'paddle.nn.layer.transformer'
2.1.1 PaddleOCR PaddleOCR:是一个开源的图片OCR识别算法,模型会在初次执行时自动下载。这是它的官方使用教程:PaddleOCR使用教程
依赖库安装:
1 $ pip install "paddleocr>=2.0.1"
基本使用示例:
1 2 3 4 5 6 7 8 9 from paddleocr import PaddleOCRocr = PaddleOCR(use_angle_cls=True , lang="ch" ) img_path = './imgs/test.jpg' result = ocr.ocr(img_path, cls=True ) for line in result: print (line)
注:如果需要结果可视化、版面分析,需要另外安装相应的库,具体见官方文档。
2.1.2 PaddleNLP PaddleNLP:是一个开源的自然语言处理开发库,模型会在初次执行时自动下载。这是它的官方使用教程:PaddleNLP官方教程
依赖库安装:
1 $ pip install paddlenlp==2.2.4
基本使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 from paddlenlp import Taskflowpaddle_nlp = Taskflow("word_segmentation" ) result = paddle_nlp("第十四届全运会在西安举办" ) print (result)paddle_nlp = Taskflow("pos_tagging" ) result = paddle_nlp("第十四届全运会在西安举办" ) print (result)paddle_nlp = Taskflow("knowledge_mining" , model="nptag" ) result = paddle_nlp("红曲霉菌" ) print (result)paddle_nlp = Taskflow("sentiment_analysis" ) result = paddle_nlp("这个产品用起来真的很流畅,我非常喜欢" ) print (result)paddle_nlp = Taskflow("text_similarity" ) result = paddle_nlp([["世界上什么东西最小" , "世界上什么东西最小?" ]]) print (result)
注:返回结果说明见官方文档。除此之外,PaddleNLP还支持很多其他的自然语言处理,如生成式问答、智能问答等,具体见官方文档。
2.2 目标识别检测 yolov5 :是一种单阶段目标检测算法,该算法在Yolov4的基础上添加了一些新的改进思路,使其速度与精度都得到了极大的性能提升。
这是一篇使用教程:教你利用yolov5训练自己的目标检测模型 ,详细介绍了如何使用yolov5训练自己的目标检测模型,数据集和预训练权重的准备部分也留了该作者相应的博客链接。
2.3 文本合成语音 谷歌开源的文本转语音 API 交互的 Python 库,虽然免费但生成的语音机器音较重,使用时需要联网(被墙,国内需要设置代理)
项目地址:https://github.com/pndurette/gTTS
示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 import osfrom gtts import gTTSos.environ["https_proxy" ] = "http://127.0.0.1:1080" text = "测试gtts文本转语音" audio = gTTS(text=text, lang="zh-cn" ) audio.save("demo.mp3" )
注:如果未设置代理或者代理有问题,会报“Python GTTS / Failed to connect. Probable cause: Unknown”错误。
语音文件播放:
playsound 声明它已经在WAV和MP3文件上进行了测试,但是它可能也适用于其他文件格式。
示例代码如下:
1 2 from playsound import playsoundplaysound('demo.mp3' )
注意事项:调用时可能出现“指定的设备未打开,或不被 MCI 所识别”报错。原因是windows不支持utf-16编码,需修改playsound源码。
修改\Lib\site-packages\playsound.py
文件的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def winCommand (*command ): bufLen = 600 buf = c_buffer(bufLen) command = ' ' .join(command) errorCode = int (windll.winmm.mciSendStringW(command, buf, bufLen - 1 , 0 )) if errorCode: errorBuffer = c_buffer(bufLen) windll.winmm.mciGetErrorStringW(errorCode, errorBuffer, bufLen - 1 ) exceptionMessage = ('\n Error ' + str (errorCode) + ' for command:' '\n ' + command + '\n ' + errorBuffer.raw.decode('utf-16' ).rstrip('\0' )) logger.error(exceptionMessage) raise PlaysoundException(exceptionMessage) return buf.value
2.4 破解Google翻译 破解Google翻译的 py-googletrans 库,使用时需要联网(被墙,国内需要设置代理)
1 $ pip install googletrans
这个库的工作原理(摘自官方说明):
1 2 - 您可能想知道为什么这个库可以正常工作,而其他方法(例如 goslate)却不起作用,因为 Google 最近使用票证机制更新了其翻译服务,以防止大量爬虫程序。 - 我最终找到了一种方法,通过对 Google 用来生成此类令牌的混淆和缩小代码进行逆向工程来生成票证,并在 Python 之上实现。 但是,这可能随时被阻止。
示例代码如下:
1 2 3 4 5 6 7 8 9 10 from googletrans import Translatorimport osos.environ["https_proxy" ] = "http://127.0.0.1:1080" translator = Translator() result = translator.translate('hello world' , dest='zh-cn' ).text print (result)
注:单次请求的最大字符数为5000,超出的话可以拆分成多份,分开请求再对结果进行拼接。另外该破解方式随时可能会被阻止,如果想使用稳定的 API,建议使用 谷歌官方的翻译 API 。
2.5 对旧图片旧视频进行着色 2.5.1 DeOldify简介 DeOldify 是由 Jason Antic 开发和更新的。这是目前最先进的黑白图像、视频的着色方法,所有的东西都是开源的。
基本原理:它使用了一种名为NoGAN的新型GAN训练方法,该方法是作者自己开发的,用来解决在使用由一个鉴别器和一个生成器组成的正常对抗性网络架构进行训练时出现的主要问题。典型地,GAN训练同时训练鉴别器和生成器,生成器一开始是完全随机的,随着时间的推移,它会欺骗鉴别器,鉴别器试图辨别出图像是生成的还是真实的。
项目地址:https://github.com/jantic/DeOldify
效果演示:
2.5.2 Google Colab简介 Colaboratory 是一个 Google 研究项目,旨在帮助传播机器学习培训和研究成果。它是一个 Jupyter 笔记本环境,不需要进行任何设置就可以使用,并且完全在云端运行。
Colaboratory 笔记本存储在 Google 云端硬盘中,并且可以共享,就如同您使用 Google 文档或表格一样,Colaboratory 可免费使用。
利用Colaboratory ,可以方便的使用Keras,TensorFlow,PyTorch,OpenCV等框架进行深度学习应用的开发。
与其它云服务相比,最重要的特点是Colab提供GPU并完全免费,详细介绍及使用方法见 faq page 。
2.5.3 官方提供的在线服务及API 如果不想折腾的话,可以使用官方提供的 DeOldify Image Colorization on DeepAI ,可以直接在这里上传图片对旧照片进行着色,同时该网站还提供了API,供程序中调用,下文可以不用看了。
2.5.4 DeOldify的预训练模型 预训练模型:DeOldify 是基于深度学习开发的,需要用到预训练权重,这里项目开发者已经把训练好的权重上传了,我们可以直接拿来使用,不需要我们再训练。
Artistic 权重,会使图片上色效果更大胆 一些,下载地址:
1 https://data.deepai.org/deoldify/ColorizeArtistic_gen.pth
Stable 权重,相对于 Artistic 上色效果更保守一些,下载地址:
1 https://www.dropbox.com/s/usf7uifrctqw9rl/ColorizeStable_gen.pth
Video 权重,此权重文件用来给视频上色,下载地址:
1 https://data.deepai.org/deoldify/ColorizeVideo_gen.pth
权重文件下载完毕后,在项目根目录下创建一个 models 文件夹,把下载好的权重文件放入 models
文件夹内即可。
2.5.5 使用Google Colab进行部署 由于运行深度学习的项目对机器性能要求较高,因此下文使用了官方提供的预训练模型,并白嫖 Google Colab 进行部署。DeOldify对旧照片、旧视频的着色的使用流程基本一致,只不过用到的预训练模型不同而已,以旧照片着色为例。
官方也提供了Google Colab,不过那个是英文版的,我没有尝试了,下面我用的是网上找的一份中文版的,将其保存到自己的Google Drive里,地址:https://drive.google.com/drive/folders/1G6nTfabx10P3nSzL5lN-SEnoM2Y0jeRh?usp=sharing
注:使用Google Colab需要翻墙,这个要保存到自己的云端硬盘里,我的你们是无法执行的。
打开之后先去执行该代码块(悬浮即可显示执行按钮)
1 2 3 4 5 6 7 8 9 10 11 #点击左侧按钮一键配置环境 !git clone https://github.com/jantic/DeOldify.git DeOldify %cd /content/DeOldify !pip install -r /content/DeOldify/requirements.txt import fastai from deoldify.visualize import * torch.backends.cudnn.benchmark = True !mkdir 'models' !wget https://data.deepai.org/deoldify/ColorizeArtistic_gen.pth -O ./models/ColorizeArtistic_gen.pth colorizer = get_image_colorizer(artistic=True)
说明:预训练模型的地址如果失效了就自己找个吧,替换掉即可。如果要使用 Stable 权重,需要把下面改成False
1 2 Artistic 权重 -- colorizer = get_image_colorizer(artistic=True) Stable 权重 -- colorizer = get_image_colorizer(artistic=False)
踩过的坑:第一次执行的时候可能会出现依赖安装失败的问题,不要慌。点击 RESTART RUNTIME 按钮,等一会儿再重新执行代码块,第二次应该就可以安装成功了,成功的话左侧有个绿色箭头。
安装成功环境以后,再在下面的 source_url 里填入旧照片链接(本地图片的话可以先上传到图床),然后点击左侧的执行按钮,等待一会儿即可生成着色后的照片。
注:如果你的旧照片里本身就有颜色的话,生成效果可能会不太好。因为它会先把原有颜色替换成黑白的,再根据算法生成新的颜色,会导致与原图的颜色不一致。如果你想要保持一致的话,就需要借助PS的蒙版进行二次处理了。
3. 传统类型算法及处理工具 3.1 文本关键词及概要提取 FastTextRank :从中文文本中提取摘要及关键词,并对算法时间复杂度进行了修改,计算图最大权节点的时间复杂度由o(n^2)降低到了o(n)。在有限的测试文本上,其运行速度相比于textrank4zh这个包快了8倍。算法原理见作者的知乎文章
依赖库安装:Numpy>=1.14.5 gensim>=3.5.0 FastTextRank==1.1
基本使用示例:KeyWord.py(提取关键字示例)、Sentence.py(提取摘要示例)
3.2 文本内容审查 Sensitive-word :收集的一些敏感词汇,细分了暴恐词库、反动词库、民生词库、色情词库、贪腐词库、其他词库等。
将词库放到./dict/
目录下,一个分类一个txt文件,词库内容为一行一个敏感词,对输入文本使用jieba分词,撞词库判断是否敏感。
3.3 视频关键帧抽取 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 import cv2import operatorimport numpy as npimport osfrom scipy.signal import argrelextremadef smooth (x, window_len=13 , window='hanning' ): """使用具有所需大小的窗口使数据平滑。 This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal. 该方法是基于一个标度窗口与信号的卷积。 通过在两端引入信号的反射副本(具有窗口大小)来准备信号, 使得在输出信号的开始和结束部分中将瞬态部分最小化。 input: x: the input signal输入信号 window_len: the dimension of the smoothing window平滑窗口的尺寸 window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. 平坦的窗口将产生移动平均平滑 output: the smoothed signal平滑信号 example: import numpy as np t = np.linspace(-2,2,0.1) x = np.sin(t)+np.random.randn(len(t))*0.1 y = smooth(x) see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve scipy.signal.lfilter TODO: 如果使用数组而不是字符串,则window参数可能是窗口本身 """ print (len (x), window_len) s = np.r_[2 * x[0 ] - x[window_len:1 :-1 ], x, 2 * x[-1 ] - x[-1 :-window_len:-1 ]] if window == 'flat' : w = np.ones(window_len, 'd' ) else : w = getattr (np, window)(window_len) y = np.convolve(w / w.sum (), s, mode='same' ) return y[window_len - 1 :-window_len + 1 ] class Frame : """class to hold information about each frame 用于保存有关每个帧的信息 """ def __init__ (self, id , diff ): self.id = id self.diff = diff def __lt__ (self, other ): if self.id == other.id : return self.id < other.id return self.id < other.id def __gt__ (self, other ): return other.__lt__(self) def __eq__ (self, other ): return self.id == other.id and self.id == other.id def __ne__ (self, other ): return not self.__eq__(other) def rel_change (a, b ): x = (b - a) / max (a, b) print (x) return x def getEffectiveFrame (videopath, dir ): if not os.path.exists(dir ): os.makedirs(dir ) (filepath, tempfilename) = os.path.split(videopath) (filename, extension) = os.path.splitext(tempfilename) USE_THRESH = False THRESH = 0.8 USE_TOP_ORDER = False USE_LOCAL_MAXIMA = True NUM_TOP_FRAMES = 50 len_window = int (50 ) print ("target video :" + videopath) print ("frame save directory: " + dir ) cap = cv2.VideoCapture(str (videopath)) prev_frame = None frame_diffs = [] frames = [] success, frame = cap.read() i = 0 while (success): luv = cv2.cvtColor(frame, cv2.COLOR_BGR2LUV) curr_frame = luv if curr_frame is not None and prev_frame is not None : diff = cv2.absdiff(curr_frame, prev_frame) diff_sum = np.sum (diff) diff_sum_mean = diff_sum / (diff.shape[0 ] * diff.shape[1 ]) frame_diffs.append(diff_sum_mean) frame = Frame(i, diff_sum_mean) frames.append(frame) prev_frame = curr_frame i = i + 1 success, frame = cap.read() cap.release() keyframe_id_set = set () if USE_TOP_ORDER: frames.sort(key=operator.attrgetter("diff" ), reverse=True ) for keyframe in frames[:NUM_TOP_FRAMES]: keyframe_id_set.add(keyframe.id ) if USE_THRESH: print ("Using Threshold" ) for i in range (1 , len (frames)): if (rel_change(np.float (frames[i - 1 ].diff), np.float (frames[i].diff)) >= THRESH): keyframe_id_set.add(frames[i].id ) if USE_LOCAL_MAXIMA: print ("Using Local Maxima" ) diff_array = np.array(frame_diffs) sm_diff_array = smooth(diff_array, len_window) frame_indexes = np.asarray(argrelextrema(sm_diff_array, np.greater))[0 ] for i in frame_indexes: keyframe_id_set.add(frames[i - 1 ].id ) cap = cv2.VideoCapture(str (videopath)) success, frame = cap.read() idx = 0 num = 0 while (success): if idx in keyframe_id_set: num = num + 1 name = filename + '_' + str (num) + ".jpg" cv2.imwrite(dir + name, frame) keyframe_id_set.remove(idx) idx = idx + 1 success, frame = cap.read() cap.release() if __name__ == "__main__" : videopath = './data/demo.mp4' dir = './data/keyframe/' getEffectiveFrame(videopath, dir )
3.4 图片文件处理 3.4.1 压缩图片大小 以下是python+opncv实现图片压缩的示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import osimport cv2""" # Features:使用 opencv 实现图片压缩,compress_config为图片压缩配置,说明如下: # [cv2.IMWRITE_PNG_COMPRESSION, 9] 无损压缩(取值范围:0~9,数值越小,压缩比越低) # [cv2.IMWRITE_JPEG_QUALITY, 80] 有损压缩(取值范围:0~100,数值越小,压缩比越高,图片质量损失越严重) """ class Compress_img : def __init__ (self, img_path, compress_config ): self.img_path = img_path self.img_name = img_path.split('/' )[-1 ] self.compress_config = compress_config def compress_img_CV (self, show=False ): old_fsize = os.path.getsize(self.img_path) img_resize = cv2.imread(self.img_path) cv2.imwrite(self.img_path, img_resize, self.compress_config) new_fsize = os.path.getsize(self.img_path) compress_rate = str (round (new_fsize / old_fsize * 100 , 2 )) + "%" print ("%s 图片已压缩," % (self.img_path), "压缩率为:" , compress_rate) if show: cv2.imshow(self.img_name, img_resize) cv2.waitKey(0 ) if __name__ == '__main__' : img_path = './test.jpg' compress_para = [cv2.IMWRITE_PNG_COMPRESSION, 9 ] compress = Compress_img(img_path, compress_para) compress.compress_img_CV()
注:OpenCV无法读取中文路径文件,请使用全英文路径。
3.4.2 图片添加盲水印 如果你想保护自己的原创图片,那最好的方式就是为图片添加盲水印,盲水印就是图片有水印但人眼看不出来,需要通过程序才能提取水印,相当于隐形“盖章”,可以用在数据泄露溯源、版权保护等场景。下面使用阿里巴巴安全团队出品的 blind_watermark 库对图片添加盲水印。
[1] 添加文本水印
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from blind_watermark import WaterMarkbwm1 = WaterMark(password_img=1 , password_wm=1 ) bwm1.read_img('input/001.jpg' ) wm = '@eula.club' bwm1.read_wm(wm, mode='str' ) bwm1.embed('output/001.jpg' ) len_wm = len (bwm1.wm_bit) print ('Put down the length of wm_bit {len_wm}' .format (len_wm=len_wm))
[2] 提取文本水印
1 2 3 bwm1 = WaterMark(password_img=1 , password_wm=1 ) wm_extract = bwm1.extract('output/001.jpg' , wm_shape=len_wm, mode='str' ) print (wm_extract)
注:该库还支持添加和提取图片形式的盲水印,而能添加多大的盲水印图片取决于原始图片,不可超过其大小,不便于批量处理,在此就不放示例了。
3.4.3 获取图片缩略图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from PIL import Imagedef get_thumbnail_pic (input_img_path, output_img_path ): im = Image.open (input_img_path) im.thumbnail((80 , 80 )) print (im.format , im.size, im.mode) im.save(output_img_path, 'JPEG' ) if __name__=='__main__' : input_img_path = './input/001.jpg' output_img_path = './output/001.jpeg' get_thumbnail_pic(input_img_path, output_img_path)
3.5 将网页保存成pdf文件 3.5.1 基本概念简介 [1] pyppeteer简介
Headless chrome/chromium 自动化库(是 puppeteer 无头 Chrome Node.js API 的Python版非官方库),可用于网页截图导出pdf。
项目地址:https://github.com/pyppeteer/pyppeteer
官方文档:https://pyppeteer.github.io/pyppeteer/
puppeteer 和 pyppeteer 的区别:pyppeteer 努力尽可能地复制 puppeteer API,但是,Javascript 和 Python 之间的根本差异使得这很难精确地做到,具体细节对比官方文档。
[2] 无头浏览器简介
无头浏览器指的是没有图形用户界面的浏览器,它可以通过命令行界面或使用网络通信来提供对网页的自动控制。对于测试网页特别有用,因为它们能够像浏览器一样呈现和理解超文本标记语言,包括页面布局、颜色、字体选择以及JavaScript和AJAX的执行等样式元素,这些元素在使用其他测试方法时通常是不可用的。
无头浏览器通常用来:Web应用程序中的测试自动化、拍摄网页截图、对JavaScript库运行自动化测试、收集网站数据、自动化网页交互。
3.5.2 使用Flask进行封装 本文示例代码已在GitHub上开源,地址:https://github.com/Logistic98/pyppeteer-url2pdf
[1] 封装代码
server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 import jsonimport timefrom uuid import uuid1from flask import Flask, jsonify, requestfrom flask_cors import CORSimport osimport asynciofrom log import loggerfrom responseCode import ResponseCode, ResponseMessagefrom utils import url_save_pdf, download_fileapp = Flask(__name__) CORS(app, supports_credentials=True ) """ # 将任意公开访问的url转化为pdf提供下载 """ @app.route(rule='/api/pyppeteer/urlSavePdf' , methods=['POST' ] ) def urlToPdf (): request_data = request.get_data(as_text=True ) request_body = json.loads(request_data) url = request_body.get("url" ) if not url: fail_response = dict (code=ResponseCode.RARAM_FAIL, msg=ResponseMessage.RARAM_FAIL, data=None ) logger.error(fail_response) return jsonify(fail_response) pdf_name = request_body.get("pdf_name" ) if not pdf_name: pdf_name = '{}.pdf' .format (uuid1()) ''' resolution: 设置网页显示尺寸 width: 网页显示宽度 height: 网页显示高度 ''' resolution = request_body.get("resolution" ) if not resolution: resolution = {"width" : 1920 , "height" : 1680 } ''' clip: 位置与图片尺寸信息 x: 网页截图的起始x坐标 y: 网页截图的起始y坐标 width: 图片宽度 height: 图片高度 ''' clip = request_body.get("clip" ) if not clip: clip = {"width" : 1920 , "height" : 1680 } now_str = time.strftime("%Y%m%d" , time.localtime()) pdf_root_path = './tmp/' pdf_base_path = pdf_root_path + now_str if not os.path.exists(pdf_base_path): os.makedirs(pdf_base_path) pdf_path = pdf_base_path + '/' + pdf_name try : loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(url_save_pdf(url, pdf_path, resolution, clip)) logger.info("成功将【{}】网址保存成pdf文件【{}】!" .format (url, pdf_path)) except Exception as e: logger.error(e) fail_response = dict (code=ResponseCode.BUSINESS_FAIL, msg=ResponseMessage.BUSINESS_FAIL, data=None ) logger.error(fail_response) return jsonify(fail_response) return download_file(pdf_path) if __name__ == '__main__' : app.config['JSON_AS_ASCII' ] = False app.run(host='0.0.0.0' , port=5006 , debug=False , threaded=True )
utils.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import osimport urllib.parsefrom PIL import Imagefrom pyppeteer import launchfrom reportlab.pdfgen.canvas import Canvasfrom reportlab.lib.utils import ImageReaderimport imageio.v2 as imageiofrom flask import Responseasync def url_save_pdf (url, pdf_path, resolution, clip ): start_parm = { "handleSIGINT" : False , "handleSIGTERM" : False , "handleSIGHUP" : False , "headless" : True , "args" : [ '--no-sandbox' , ], } browser = await launch(**start_parm) page = await browser.newPage() await page.goto(url) await page.setViewport(resolution) if 'x' not in clip or 'y' not in clip: await page.pdf({'path' : pdf_path, 'width' : clip['width' ], 'height' : clip['height' ]}) await browser.close() else : img_data = await page.screenshot({'clip' : clip}) img_data_array = imageio.imread(img_data, format ="png" ) im = Image.fromarray(img_data_array) page_width, page_height = im.size c = Canvas(pdf_path, pagesize=(page_width, page_height)) c.drawImage(ImageReader(im), 0 , 0 ) c.save() def is_contains_chinese (strs ): for _char in strs: if '\u4e00' <= _char <= '\u9fa5' : return True return False def download_file (file_path ): file_dir, file_full_name = os.path.split(file_path) file_name, file_ext = os.path.splitext(file_full_name) if is_contains_chinese(file_name): file_name = urllib.parse.quote(file_name) new_file_name = file_name + file_ext def send_file (): with open (file_path, 'rb' ) as targetfile: while 1 : data = targetfile.read(20 * 1024 * 1024 ) if not data: break yield data response = Response(send_file(), content_type='application/octet-stream' ) response.headers["Content-disposition" ] = 'attachment; filename=%s' % new_file_name return response
response.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from responseCode import ResponseMessage, ResponseCodeclass ResMsg (object ): """ 封装响应文本 """ def __init__ (self, data=None , code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS ): self._data = data self._msg = msg self._code = code def update (self, code=None , data=None , msg=None ): """ 更新默认响应文本 :param code:响应状态码 :param data: 响应数据 :param msg: 响应消息 :return: """ if code is not None : self._code = code if data is not None : self._data = data if msg is not None : self._msg = msg def add_field (self, name=None , value=None ): """ 在响应文本中加入新的字段,方便使用 :param name: 变量名 :param value: 变量值 :return: """ if name is not None and value is not None : self.__dict__[name] = value @property def data (self ): """ 输出响应文本内容 :return: """ body = self.__dict__ body["data" ] = body.pop("_data" ) body["msg" ] = body.pop("_msg" ) body["code" ] = body.pop("_code" ) return body
responseCode.py
1 2 3 4 5 6 7 8 9 10 11 12 13 class ResponseCode (object ): SUCCESS = 200 RARAM_FAIL = 400 BUSINESS_FAIL = 500 class ResponseMessage (object ): SUCCESS = "请求成功" RARAM_FAIL = "参数校验失败" BUSINESS_FAIL = "业务处理失败"
log.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import logginglogger = logging.getLogger(__name__) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console = logging.StreamHandler() console.setLevel(logging.INFO) console.setFormatter(formatter) logger.addHandler(console) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("./server.log" ) handler.setLevel(logging.INFO) handler.setFormatter(formatter) logger.addHandler(handler)
[2] 注意事项
1)初次执行时会自动下载Chromium。
2)Flask 运行 Pyppeteer 报错 “signal only works in main thread”
解决办法:将handleSIGINT、handleSIGTERM、handleSIGHUP设置为False。
1 2 3 4 5 6 7 8 9 10 start_parm = { "handleSIGINT" : False , "handleSIGTERM" : False , "handleSIGHUP" : False , "headless" : True , "args" : [ '--no-sandbox' , ], }
3.5.3 使用Docker进行部署 [1] 安装Docker环境
Debian11系统:
1 2 3 4 $ apt-get update -y && apt-get install curl -y # 安装curl $ curl https://get.docker.com | sh - # 安装docker $ sudo systemctl start docker # 启动docker服务 $ docker version # 查看docker版本(客户端要与服务端一致)
[2] 导出项目依赖
使用pipreqs导出依赖,使用pipreqs库导出本项目的依赖,生成requirements.txt文件。
1 2 3 $ pip install pipreqs $ cd /root/test-project // 切换到项目根目录 $ pipreqs ./ --encoding=utf8 // 需要带上编码的指定,否则会报GBK编码错误
注意这里还有个坑如下,这是因为本机开了翻墙代理导致的,把代理软件关了就好了。
1 requests.exceptions.SSLError: HTTPSConnectionPool(host='pypi.python.org', port=443): Max retries exceeded with url: /pypi/cv2/json (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1125)')))`
导出的依赖 requirements.txt:
1 2 3 4 5 6 Flask==2.1.2 Flask_Cors==3.0.10 imageio==2.19.3 Pillow==9.1.1 pyppeteer==1.0.2 reportlab==3.6.10
3.5.4 编写Dockerfile 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 FROM python:3.8.8-slim # python:3.8-slim 是基于 Debian GNU/Linux 10 (buster) 制作的 # 设置 Debian 清华源 https://mirrors.tuna.tsinghua.edu.cn/help/debian/(可选) # RUN mv /etc/apt/sources.list /etc/apt/sources.list_bak && \ # 下载无头 Chrome 依赖,参考:https://github.com/puppeteer/puppeteer/blob/main/docs/troubleshooting.md RUN apt-get update && apt-get -y install apt-transport-https ca-certificates libnss3 xvfb gconf-service libasound2 \ libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgbm1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 \ libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 \ libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 \ ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget && rm -rf /var/lib/apt/lists/* # 安装常用调试命令(可选) RUN apt-get install iputils-ping -y # 安装ping RUN apt-get install -y wget # 安装wget RUN apt-get install curl -y # 安装curl RUN apt-get install vim -y # 安装vim RUN apt-get install lsof # 安装lsof # 安装msyh.ttc字体解决中文乱码问题 # 来源:https://github.com/owent-utils/font/raw/master/%E5%BE%AE%E8%BD%AF%E9%9B%85%E9%BB%91/MSYH.TTC RUN cp msyh.ttc /usr/share/fonts/ # 使用淘宝镜像加速下载 chromium(可选) # ENV PYPPETEER_DOWNLOAD_HOST=https://npm.taobao.org/mirrors # 设置 chromium 版本,发布日期为: 2021-02-26T08:47:06.448Z ENV PYPPETEER_CHROMIUM_REVISION=856583 # 拷贝代码到容器内 RUN mkdir /code ADD src /code/ WORKDIR /code # 安装项目所需的 Python 依赖 RUN pip install -r requirements.txt # 放行端口 EXPOSE 5006 # 启动项目 ENTRYPOINT ["nohup","python","server.py","&"]
注意事项
[1] 无头浏览器依赖问题
原始镜像里缺失很多无头浏览器的依赖环境,详见:puppeteer Troubleshooting 官方文档
1 2 3 4 5 apt-get update && apt-get -y install apt-transport-https ca-certificates libnss3 xvfb gconf-service libasound2 \ libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgbm1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0 \ libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1 \ libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6 \ ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget && rm -rf /var/lib/apt/lists/*
[2] 中文字体无法解析问题
原始镜像内无中文字体,会出现中文字体无法解析的问题,下载 msyh.ttc 字体放到 /usr/share/fonts/ 目录里即可。
3.5.5 在服务器上部署项目 [1] 编写部署脚本
build.sh
1 2 3 docker build -t pyppeteer-image . docker run -d -p 5006:5006 --name pyppeteer pyppeteer-image:latest docker update pyppeteer --restart=always
[2] 项目部署结构
项目部署目录结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 . ├── Dockerfile ├── README.md ├── build.sh └── src ├── code.py ├── log.py ├── requirements.txt ├── msyh.ttc ├── response.py ├── server.py └── utils.py
[3] 部署项目服务
将部署包整个上传到服务器上,切换到部署包的根目录。
1 2 $ chmod u+x build.sh $ ./build.sh
3.5.6 验证部署 [1] 接口文档
请求路径:/api/pyppeteer/urlSavePdf
请求方法:POST请求
请求参数:
1 2 3 4 5 6 7 8 9 10 11 url:可公开访问的目标网址(必填,不能是那种需要登录权限的) pdf_name:下载的pdf文件名称(非必填,默认值是uuid命名的pdf文件) clip: 位置与图片尺寸信息(非必填,默认值{ "width" : 1920 , "height" : 1680 } ) x: 网页截图的起始x坐标 y: 网页截图的起始y坐标 width: 图片宽度 height: 图片高度 注释:只传width、height的时候为整页导出,传x,y,width、height的时候为区域导出 resolution: 设置网页显示尺寸 (非必填,默认值{ "width" : 1920 , "height" : 1680 } ) width: 网页显示宽度 height: 网页显示高度
请求示例:
1 2 3 4 5 6 { "url" : "https://www.google.com" , "pdf_name" : "test.pdf" , "resolution" : { "width" : 1920 , "height" : 1680 } , "clip" : { "x" : 0 , "y" : 0 , "width" : 1920 , "height" : 1680 } }
接口返回:以文件流的形式提供pdf文件下载
[2] 测试请求接口
1 $ curl -v -X POST http://127.0.0.1:5006/api/pyppeteer/urlSavePdf -H "Content-type: application/json" -d'{"url":"https://www.google.com","pdf_name":"test.pdf","resolution": {"width": 1920, "height": 1680},"clip": {"x": 0, "y": 0, "width": 1920, "height": 1680}}' >> test.pdf
4. Python工具函数 4.1 解析Excel和CSV文件 需要安装的依赖库
1 2 3 $ pip install xlrd==1.2.0 $ pip install xlwt $ pip install pandas
注意事项:
[1] 新版 xlrd 报 Excel xlsx file; not supported错误(原因:xlrd更新到了2.0.1版本,只支持.xls文件,不支持.xlsx)
[2] Python3.9使用xlrd时报错:AttributeError: ‘ElementTree’ object has no attribute ‘getiterator’
找到xlrd依赖源码里的 xlsx.py 文件,将两个地方的 getiterator() 修改成 iter()。
4.1.1 Excel与CSV转字典列表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def excel_to_dict (path ): if os.path.exists(path): workbook = xlrd.open_workbook(path) else : workbook = xlrd.open_workbook(filename=path.name, file_contents=path.read()) data_sheet = workbook.sheets()[0 ] sheet_nrows = data_sheet.nrows sheet_ncols = data_sheet.ncols get_data = [] for i in range (1 , sheet_nrows): sheet_data = {} for j in range (sheet_ncols): c_cell = data_sheet.cell_value(i, j) sheet_data[data_sheet.row_values(0 )[j]] = c_cell get_data.append(sheet_data) return get_data def csv_to_dict (path ): get_data = [] with open (path, 'r' ,encoding="GBK" ) as f: reader = csv.reader(f) fieldnames = next (reader) csv_reader = csv.DictReader(f, fieldnames=fieldnames) for row in csv_reader: d = {} for k, v in row.items(): d[k] = v get_data.append(d) return get_data
4.1.2 字典列表转Excel与CSV 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 def export_xls (path, dic_data ): data_list = [] for data in dic_data: for value in data.values(): data_list.append(value) new_list = [data_list[i:i + 3 ] for i in range (0 , len (data_list), 3 )] xls = xlwt.Workbook() sheet = xls.add_sheet('Sheet1' , cell_overwrite_ok=True ) heads = ['id' , 'message' , 'result' ] ls = 0 for head in heads: sheet.write(0 , ls, head) ls += 1 i = 1 for list in new_list: j = 0 for data in list : sheet.write(i, j, data) j += 1 i += 1 xls.save(path) def export_xlsx (path, dic_data ): pf = pd.DataFrame(list (dic_data)) order = ['id' , 'message' , 'result' ] pf = pf[order] file_path=pd.ExcelWriter(path) pf.fillna(' ' , inplace=True ) pf.to_excel(file_path, encoding='utf-8' , index=False ) file_path.save() def export_csv (path, dic_data ): with open (path, 'w' , newline='' ) as f: fieldnames = ['id' , 'message' , 'result' ] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for item in dic_data: writer.writerow(item)
4.1.3 其他操作Excel和CSV的示例 [1] 读写操作xlsx示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import openpyxlimport xlrddef write_xlsx (path, sheetname, value ): index = len (value) workbook = openpyxl.Workbook() sheet = workbook.active sheet.title = sheetname for i in range (0 , index): for j in range (0 , len (value[i])): sheet.cell(row=i+1 , column=j+1 , value=str (value[i][j])) workbook.save(path) def append_write_xlsx (path, sheetname, value ): workbook = openpyxl.load_workbook(path) sheet = workbook[sheetname] sheet.append(value) workbook.save(path) def read_xlsx (path, sheetname ): wb = xlrd.open_workbook(path) sh = wb.sheet_by_name(sheetname) result = {} for i in range (1 , sh.nrows): result[sh.row_values(i)[0 ]] = sh.row_values(i)[1 ] return result if __name__ == '__main__' : path = './test.xlsx' sheetname = '测试' head_value = [['id' , 'name' ]] body_value = ['001' , 'zhangsan' ] write_xlsx(path, sheetname, head_value) append_write_xlsx(path, sheetname, body_value) result = read_xlsx(path, sheetname) print (result)
[2] 新建csv文件并写入数据
1 2 3 4 5 6 7 import csvdef create_csv (): csv_path = "./test.csv" with open (csv_path,'w' , newline='' , encoding='GBK' ) as f: csv_write = csv.writer(f) csv_head = ["good" ,"bad" ] csv_write.writerow(csv_head)
注:newline=''
是为了解决csv的隔行空行问题。选择GBK编码,否则使用Excel打开会出现乱码问题。
[3] 操作csv文件实现对特定列排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def sort_csv (csv_path ): datas = [] with open (csv_path, 'r' , encoding='GBK' ) as f: table = [] index = 0 for line in f: index = index + 1 if index == 1 : continue col = line.split(',' ) col[1 ] = int (col[1 ].strip("\n" )) table.append(col) table_sorted = sorted (table, key=itemgetter(1 ), reverse=True ) for row in table_sorted: datas.append(row) f.close() with open (csv_path, "w" , newline='' , encoding='GBK' ) as csvfile: writer = csv.writer(csvfile) csv_head = ["关键词" , "词频" ] writer.writerow(csv_head) for data in datas: writer.writerow(data) csvfile.close()
4.2 读写配置文件 4.2.1 读取JSON文件里的配置信息 配置文件config.json:
1 2 3 4 5 { "DB_URL" : "127.0.0.1:1521/orcl" , "DB_USER" : "test" , "DB_PASSWORD" : "123456" }
工具函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 import jsondef dict_to_json_file (dict , path ): with open (path, "w" , encoding="utf-8" ) as f: json.dump(dict , f) def read_json_to_dict (path ): with open (path, "r" , encoding="utf-8" ) as f: confstr = f.read() conf = json.loads(confstr) return conf
调用示例:
1 2 3 conf_path = './config/config.json' conf = read_json_to_dict(conf_path) conn = cx_Oracle.connect(conf['DB_USER' ], conf['DB_PASSWORD' ], conf['DB_URL' ])
注意事项:JSON文件不要同时进行读写,写入时可能会出现无法解析导致读取失败的情况。
4.2.2 读写INI文件里的配置信息 配置文件config.ini:
1 2 3 4 5 6 [SOURCE_ES] host = 111.111 .111.111 port = 9200 user = elasticpassword = elastictimeout = 60
读取Section内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from configparser import ConfigParserdef read_config (): cfg = ConfigParser() cfg.read('./config.ini' , encoding='utf-8' ) host = cfg.get('TARGET_ES' , 'host' ) port = cfg.get('TARGET_ES' , 'port' ) user = cfg.get('TARGET_ES' , 'user' ) password = cfg.get('TARGET_ES' , 'password' ) timeout = cfg.get('TARGET_ES' , 'timeout' ) es_dict = {} es_dict['host' ] = host es_dict['port' ] = port es_dict['user' ] = user es_dict['password' ] = password es_dict['timeout' ] = timeout return es_dict
修改Section内容:
1 2 3 4 cfg = ConfigParser() cfg.read('./config.ini' , encoding='utf-8' ) cfg.set ("SOURCE_ES" , "timeout" , "3600" ) cfg.write(open ('./config.ini' , "r+" , encoding='utf-8' ))
新增Section内容:
1 2 3 4 5 6 cfg = ConfigParser() cfg.add_section("TARGET_ES" ) cfg.set ("TARGET_ES" , "host" , "222.222.222.222" ) cfg.set ("TARGET_ES" , "port" , "9201" ) cfg.write(open ('./config.ini' , "a" ))
删除Section内容:
1 2 3 4 5 cfg = ConfigParser() cfg.read('./config.ini' , encoding='utf-8' ) cfg.remove_section('TARGET_ES' ) cfg.write(open ('./config.ini' , "w" ))
4.2.3 读写txt文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def write_content_to_txt (txt_path, content ): a = open (txt_path, 'a' ) a.write(content + '\n' ) a.close() def read_txt_to_list (txt_path ): result = [] with open (txt_path, 'r' ) as f: for line in f: result.append(line.strip('\n' )) return result if __name__ == '__main__' : txt_path = './test.txt' write_content_to_txt(txt_path, 'zhangsan' ) write_content_to_txt(txt_path, 'lisi' ) result = read_txt_to_list(txt_path) print (result)
4.2.4 生成xml文件 generate_xml.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 from xml.etree.ElementTree import Elementfrom xml.etree.ElementTree import SubElementfrom xml.etree.ElementTree import ElementTreedef pretty_xml (element, indent, newline, level=0 ): if element: if element.text == None or element.text.isspace(): element.text = newline + indent * (level + 1 ) else : element.text = newline + indent * (level + 1 ) + element.text.strip() + newline + indent * (level + 1 ) temp = list (element) for subelement in temp: if temp.index(subelement) < (len (temp) - 1 ): subelement.tail = newline + indent * (level + 1 ) else : subelement.tail = newline + indent * level pretty_xml(subelement, indent, newline, level=level + 1 ) if __name__ == '__main__' : root = Element('root' ) head = SubElement(root, 'head' ) title = SubElement(head, 'title' ) title.text = "Title" body = SubElement(root, 'body' ) body.text = "Content" tree = ElementTree(root) root = tree.getroot() pretty_xml(root, '\t' , '\n' ) tree.write('result.xml' , encoding = 'utf-8' )
生成效果:
1 2 3 4 5 6 <root > <head > <title > Title</title > </head > <body > Content</body > </root >
4.2.5 解析yaml格式文件 将yaml文件转字典
1 2 3 4 5 import yamlf = open ('./config.yaml' , 'r' ) yaml_str = f.read() config_dict = yaml.load(yaml_str, Loader=yaml.FullLoader)
将字典转成对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class Dict (dict ): __setattr__ = dict .__setitem__ __getattr__ = dict .__getitem__ def dict2obj (dictObj ): if not isinstance (dictObj, dict ): return dictObj d = Dict () for k, v in dictObj.items(): d[k] = dict2obj(v) return d params = { "name" : "login" , "params" : { "transactionId" : "cc258bdb3dd4d6bba2" , "platformType" : "第三方平台" , "uid" : 9 } } res = dict2obj(params) print (res.name)print (res.params.uid)
4.3 列表数组字符串的处理 4.3.1 列表元素去重及统计出现次数 两个列表求差集并去重
1 2 3 4 def list_diff (listA, listB ): result = list (set (listB).difference(set (listA))) return result
列表元素直接去重
1 2 3 4 5 6 old_list = [2 , 1 , 3 , 4 , 1 ] new_list = list (set (old_list)) print (new_list)>>> [1 ,2 ,3 ,4 ]
统计列表中各个元素出现的次数
1 2 3 4 5 6 7 8 from collections import Countertest_list = [1 , 2 , 3 , 1 , 1 , 2 ] result = Counter(test_list) print (result)>>>{1 : 3 , 2 : 2 , 3 : 1 }
4.3.2 逗号分隔的字符串与列表互转 逗号分隔字符串转列表
1 2 3 >>> mStr = '192.168.1.1,192.168.1.2,192.168.1.3' >>> mStr.split("," )['192.168.1.1' , '192.168.1.2' , '192.168.1.3' ]
列表转逗号分隔字符串
1 result = "," .join(str (i) for i in result_list)
4.3.3 比较数组是否完全相等 1 2 3 4 5 6 7 import numpy as npa = np.array([1 ,2 ,3 ]) b = np.array([1 ,2 ,3 ]) print ((a==b).all ())>>> True
4.3.4 实现replaceAll功能 1 2 3 4 5 def replaceAll (input , toReplace, replaceWith ): while (input .find(toReplace) > -1 ): input = input .replace(toReplace, replaceWith) return input
处理空白字符:
1 2 text = replaceAll(replaceAll(replaceAll(replaceAll(text, '\r' , ' ' ), '\n' , ' ' ), '\u3000' , ' ' ), '\x01' , ' ' )
4.3.5 将List拆分成若干个指定长度的小List 1 2 3 4 5 6 7 8 9 10 def list_of_groups (list , length ): return [list [i:i + length] for i in range (0 , len (list ), length)] list = [i for i in range (15 )]length = 2 result = list_of_groups(list , length) print (result)>>> [[0 , 1 ], [2 , 3 ], [4 , 5 ], [6 , 7 ], [8 , 9 ], [10 , 11 ], [12 , 13 ], [14 ]]
4.3.6 按指定长度分段切割字符串或列表 1 2 3 def cut (obj, sec ): return [obj[i:i+sec] for i in range (0 ,len (obj),sec)]
4.3.7 字符串四舍五入保留两位小数 1 2 3 4 5 from decimal import Decimaldef str_get_two_decimal (str ): return Decimal(str ).quantize(Decimal('0.00' ))
4.3.8 将两个相同长度的List转字典 1 2 3 4 5 6 7 8 keys = ['a' , 'b' , 'c' ] values = [1 , 2 , 3 ] dictionary = dict (zip (keys, values)) print (dictionary)>>> {'a' : 1 , 'c' : 3 , 'b' : 2 }https://www.eula.club/%E4%BD%BF%E7%94 %A8pyppeteer%E5%B0%86 %E7%BD%91 %E9%A1%B5%E4%BF%9D%E5%AD%98 %E6%88 %90pdf%E6%96 %87 %E4%BB%B6.html
4.3.9 检查字符串里的中文字符 1 2 3 4 5 6 7 8 9 10 11 12 13 def is_all_chinese (strs ): for _char in strs: if not '\u4e00' <= _char <= '\u9fa5' : return False return True def is_contains_chinese (strs ): for _char in strs: if '\u4e00' <= _char <= '\u9fa5' : return True return False
4.3.10 浏览器URL编码以及反编码 1 2 3 4 5 6 7 8 9 import urllib.parsetest_str = '测试 文本' str_encode = urllib.parse.quote(test_str) print (str_encode)str_decode = urllib.parse.unquote(str_encode) print (str_decode)
4.3.11 去除列表的最后一个元素 pop方法和del方法如果对空列表进行操作,会报错中断执行,切片方法不会因此报错,继续保持空列表向下运行
1)pop方法
1 2 3 4 5 list = [1 ,2 ,3 ,4 ]list .pop()print (list )>>> [1 , 2 , 3 ]
2)del方法
1 2 3 4 5 list = [1 ,2 ,3 ,4 ]del (list [-1 ])print (list )>>> [1 , 2 , 3 ]
3)切片
1 2 3 4 5 list = [1 ,2 ,3 ,4 ]list = list [0 :-1 ]print (list )>>> [1 , 2 , 3 ]
4.3.12 查找字符串里所有子串位置 1 2 3 4 5 6 7 8 9 def find_all (sub_str, str ): index_list = [] if str is not None and str != "" and sub_str is not None and sub_str != "" : index = str .find(sub_str) while index != -1 : index_list.append(index) index = str .find(sub_str, index + 1 ) return index_list
4.4 系统与文件目录的基本操作 4.4.1 基本文件和目录操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import osimport shutilos.getcwd() os.listdir() os.path.exists(dir_path) os.makedirs(dir_path) os.chdir(dir_path) os.remove(file_path) os.removedirs(dir_path) shutil.rmtree(dir_path) os.path.abspath(dir_path) os.chdir(dir_path) os.path.isdir(path) os.path.isfile(path)
4.4.2 复制某个文件并重命名 1 2 3 4 5 6 7 def copy_rename_file (sample,new_path,file_name ): if not os.path.exists(new_path): os.makedirs(new_path) new_file = os.path.join(new_path, file_name) shutil.copy(sample, new_file) return new_file
4.4.3 获取文件大小及创建、修改时间 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import timeimport osdef TimeStampToTime (timestamp ): timeStruct = time.localtime(timestamp) return time.strftime('%Y-%m-%d %H:%M:%S' , timeStruct) def get_FileSize (filePath ): fsize = os.path.getsize(filePath) fsizeFormat = round (fsize / float (1024 ), 2 ) return fsizeFormat def get_FileCreateTime (filePath ): t = os.path.getctime(filePath) return TimeStampToTime(t) def get_FileModifyTime (filePath ): t = os.path.getmtime(filePath) return TimeStampToTime(t)
4.4.4 检查路径是否有中文 1 2 3 4 5 zhmodel = re.compile (u'[\u4e00-\u9fa5]' ) match = zhmodel.search(path) if match: print ("The path cannot contain Chinese!" )
4.4.5 读取指定目录的所有文件夹保存成列表 1 2 3 4 def read_dir_to_list (file_dir_path ): file_dir_list = os.listdir(file_dir_path) return file_dir_list
4.4.6 遍历目录,获取目录下的所有文件路径 1 2 3 4 5 6 7 8 9 10 11 import osdef find_filepaths (dir ): result = [] for root, dirs, files in os.walk(dir ): for name in files: filepath = os.path.join(root, name) if os.path.exists(filepath): result.append(filepath) return result
4.4.7 从文件路径列表筛选出指定后缀的文件 1 2 3 4 5 6 import fnmatchdef getSufFilePath (fileList, suffix ): result = fnmatch.filter (fileList, suffix) return result
注:也可使用glob库来实现
1 2 3 import globimg_path_list = glob.glob('./input/*.jpg' )
另注:从文件路径中筛选出多种指定后缀的文件名
1 2 3 4 5 def getSufListFilePath (dirPath, suffixList ): result = [fn for fn in os.listdir(dirPath) if any (fn.endswith(ext) for ext in suffixList)] return result
4.4.8 递归获取某目录下某后缀的文件路径 程序分为两步,第一步,采用递归的方式获得文件夹下所有文件的路径列表;第二步,从文件路径列表中根据后缀利用.endswith(后缀)
的方法筛选指定文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import osdef getAllFile (path, fileList ): dirList = [] for ff in os.listdir(path): wholepath = os.path.join(path, ff) if os.path.isdir(wholepath): dirList.append(wholepath) if os.path.isfile(wholepath): fileList.append(wholepath) for dir in dirList: getAllFile(dir , fileList) def getSufFilePath (fileList, suffix ): for ff in fileList[:]: if not ff.endswith(suffix): fileList.remove(ff) if __name__ == '__main__' : flist = [] findpath = r'./testdir' getAllFile(findpath, flist) print ('allfile:' , len (flist)) getSufFilePath(flist, '.txt' ) print ('Docfile:' , len (flist)) for ff in flist: print (ff)
4.4.9 根据md5进行文件去重 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import osimport hashlibdef get_md5 (file ): file = open (file,'rb' ) md5 = hashlib.md5(file.read()) file.close() md5_values = md5.hexdigest() return md5_values if __name__ == '__main__' : file_path = "./data" os.chdir(file_path) file_list = os.listdir(file_path) md5_list =[] for file in file_list: md5 = get_md5(file) if md5 not in md5_list: md5_list.append(md5) else : os.remove(file)
4.4.10 将文件转成文件流提供下载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def is_contains_chinese (strs ): for _char in strs: if '\u4e00' <= _char <= '\u9fa5' : return True return False def download_file (file_path ): file_dir, file_full_name = os.path.split(file_path) file_name, file_ext = os.path.splitext(file_full_name) if is_contains_chinese(file_name): file_name = urllib.parse.quote(file_name) new_file_name = file_name + file_ext def send_file (): with open (file_path, 'rb' ) as targetfile: while 1 : data = targetfile.read(20 * 1024 * 1024 ) if not data: break yield data response = Response(send_file(), content_type='application/octet-stream' ) response.headers["Content-disposition" ] = 'attachment; filename=%s' % new_file_name return response
4.4.11 将网络图片转存成base64 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import base64import requests as reqfrom io import BytesIOdef urltobase64 (url ): try : response = req.get(url) http_code = response.status_code if http_code == 200 : ls_f = base64.b64encode(BytesIO(response.content).read()) imgdata = str (ls_f, 'utf-8' ) else : imgdata = "" except Exception as e: print (e) imgdata = "" return imgdata
4.4.12 起始时间及执行时间统计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import timefrom decimal import Decimalstart_time = time.time() start_time_str = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) time.sleep(2.22222 ) end_time = time.time() end_time_str = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) time_consuming_str = str (Decimal(str ((end_time - start_time) * 1000 )).quantize(Decimal('0.00' ))) + 'ms' print (start_time_str)print (end_time_str)print (time_consuming_str)
4.4.13 设置超时操作 1 2 3 4 5 6 7 8 9 10 import timeimport eventleteventlet.monkey_patch() with eventlet.Timeout(2 , False ): time.sleep(3 ) print ('没有跳过这条输出' ) print ('End' )
4.4.14 检查文件编码 1 2 3 4 5 6 7 8 import chardetdef check_charset (file_path ): with open (file_path, "rb" ) as f: data = f.read(4 ) charset = chardet.detect(data)['encoding' ] return charset
4.4.15 文件路径、文件名、后缀分割 1 2 3 4 5 6 7 8 9 import osfile_path = "/root/tmp/test.pdf" file_dir, file_full_name = os.path.split(file_path) print (file_dir) print (file_full_name) file_name, file_ext = os.path.splitext(file_full_name) print (file_name) print (file_ext)
4.4.16 筛选出扩展名符合条件的文件路径列表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import osdef find_filepaths (dir ): result = [] for root, dirs, files in os.walk(dir ): for name in files: filepath = os.path.join(root, name) if os.path.exists(filepath): result.append(filepath) return result def checkDirOrFilePath (path, extList ): file_path_list = [] if os.path.isdir(path): file_path_list = find_filepaths(path) elif os.path.isfile(path): file_path_list.append(path) elif path.find("," ) != -1 : file_path_list = path.split("," ) elif path.find(";" ) != -1 : file_path_list = path.split(";" ) result_list = [] for file_path in file_path_list: file_dir, file_full_name = os.path.split(file_path) file_name, file_ext = os.path.splitext(file_full_name) if file_ext in extList: result_list.append(file_path) return result_list
4.5 加密解密算法 依赖安装:
1 $ pip install pycryptodome
存在的坑:在使用的时候导入模块是有问题的,这时只要修改一下文件夹名称就可以解决这个问题,找到这个路径安装位置\Lib\site-packages
,下面有一个文件夹叫做crypto,将小写c改成大写C就可以了。
4.5.1 RSA加密解密 RSA加密算法是一种非对称加密算法,所谓非对称,就是指该算法加密和解密使用不同的密钥,即使用加密密钥进行加密、解密密钥进行解密。在RAS算法中,加密密钥PK是公开信息,而解密密钥SK是需要保密的。加密算法E和解密算法D也都是公开的。虽然解密密钥SK是由公开密钥PK决定的,由于无法计算出大数n的欧拉函数phi(N),所以不能根据PK计算出SK。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 from Crypto.Cipher import PKCS1_OAEP, AESfrom Crypto.PublicKey import RSAfrom Crypto.Random import get_random_bytesdef create_rsa_keys (code ): """ 生成RSA私钥和公钥 :param code: 密码 :return: """ key = RSA.generate(2048 ) encrypted_key = key.exportKey(passphrase=code, pkcs=8 , protection="scryptAndAES128-CBC" ) with open ('private_rsa_key.bin' , 'wb' ) as f: f.write(encrypted_key) with open ('rsa_public.pem' , 'wb' ) as f: f.write(key.publickey().exportKey()) def file_encryption (file_name, public_key ): """ 文件加密 :param file_name: 文件路径名 :param public_key: 公钥 :return: """ with open (file_name, 'rb' ) as f: data = f.read() file_name_new = file_name + '.rsa' with open (file_name_new, 'wb' ) as out_file: recipient_key = RSA.import_key(open (public_key).read()) session_key = get_random_bytes(16 ) cipher_rsa = PKCS1_OAEP.new(recipient_key) out_file.write(cipher_rsa.encrypt(session_key)) cipher_aes = AES.new(session_key, AES.MODE_EAX) cipher_text, tag = cipher_aes.encrypt_and_digest(data) out_file.write(cipher_aes.nonce) out_file.write(tag) out_file.write(cipher_text) return file_name_new def file_decryption (file_name, code, private_key ): """ 文件解密 :param file_name: 文件路径名 :param code: 密码 :param private_key: 私钥 :return: """ with open (file_name, 'rb' ) as f_in: private_key = RSA.import_key(open (private_key).read(), passphrase=code) enc_session_key, nonce, tag, cipher_text = [f_in.read(x) for x in (private_key.size_in_bytes(), 16 , 16 , -1 )] cipher_rsa = PKCS1_OAEP.new(private_key) session_key = cipher_rsa.decrypt(enc_session_key) cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce) data = cipher_aes.decrypt_and_verify(cipher_text, tag) out_file_name = file_name.replace('.rsa' , '' ) with open (out_file_name, 'wb' ) as f_out: f_out.write(data) return out_file_name if __name__ == '__main__' : create_rsa_keys("test_rsa_key" ) file_encryption("test.txt" , "rsa_public.pem" ) file_decryption("test.txt.rsa" , "test_rsa_key" , "private_rsa_key.bin" )
4.5.2 AES加密解密 AES加密为最常见的对称加密算法(微信小程序的加密传输就是用的这个加密算法)。对称加密算法也就是加密和解密用相同的密钥,具体的加密流程如下图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import base64from Crypto.Cipher import AES''' 采用AES对称加密算法 ''' def add_to_16 (value ): while len (value) % 16 != 0 : value += '\0' return str .encode(value) def encrypt_file (key, input_file_path, encoding, output_file_path ): with open (input_file_path, 'r' , encoding=encoding) as f: mystr = f.read() text = base64.b64encode(mystr.encode('utf-8' )).decode('ascii' ) aes = AES.new(add_to_16(key), AES.MODE_ECB) encrypt_aes = aes.encrypt(add_to_16(text)) encrypted_text = str (base64.encodebytes(encrypt_aes), encoding='utf-8' ) with open (output_file_path, "w" ) as bankdata: bankdata.write(encrypted_text) def decrypt_file (key, file_path, encoding ): with open (file_path, 'r' , encoding=encoding) as f: text = f.read() aes = AES.new(add_to_16(key), AES.MODE_ECB) base64_decrypted = base64.decodebytes(text.encode(encoding='utf-8' )) decrypted_text = str (aes.decrypt(base64_decrypted),encoding='utf-8' ) decrypted_text = base64.b64decode(decrypted_text.encode('utf-8' )).decode('utf-8' ) print (decrypted_text)
4.6 根据IP或域名获取地理位置信息 4.6.1 获取本机IP地址 1 2 3 4 5 6 7 8 9 10 11 12 def get_host_ip (): """ 查询本机ip地址 :return: ip """ try : s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8' , 80 )) ip = s.getsockname()[0 ] finally : s.close() return ip
4.6.2 获取地理位置信息 可以借助GeoIP2-python和GeoLite.mmdb两个开源项目来获取。
GeoIP2-python:https://github.com/maxmind/GeoIP2-python (GeoIP2 web 服务客户端和数据库阅读器的 Python 代码)
GeoLite.mmdb:https://github.com/P3TERX/GeoLite.mmdb (MaxMind 的 GeoIP2 GeoLite2 国家、城市和 ASN 数据库)
依赖安装:
1 2 $ pip install geoip2 $ 把GeoLite2-City.mmdb下载下来,放到项目目录里
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import socketimport geoip2.databasereader = geoip2.database.Reader('GeoLite2-City.mmdb' ) def get_ip_by_domain (domain ): address = socket.getaddrinfo(domain, None ) return address[0 ][4 ][0 ] def ip_get_location (ip ): response = reader.city(ip) country_iso_code = str (response.country.iso_code) country_name = str (response.country.name) country_name_cn = str (response.country.names['zh-CN' ]) country_specific_name = str (response.subdivisions.most_specific.name) country_specific_iso_code = str (response.subdivisions.most_specific.iso_code) city_name = str (response.city.name) location_latitude = str (response.location.latitude) location_longitude = str (response.location.longitude) result_dic = {} result_dic['ip' ] = ip result_dic['country_iso_code' ] = country_iso_code result_dic['country_name' ] = country_name result_dic['country_name_cn' ] = country_name_cn result_dic['country_specific_name' ] = country_specific_name result_dic['country_specific_iso_code' ] = country_specific_iso_code result_dic['city_name' ] = city_name result_dic['location_latitude' ] = location_latitude result_dic['location_longitude' ] = location_longitude return result_dic
4.7 使用cv2库画图 引入cv2库
1 $ pip install opencv-python
绘制矩形和直线示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import numpy as npimport cv2 as cvimg = np.zeros((320 , 320 , 3 ), np.uint8) ptLeftTop = (60 , 60 ) ptRightBottom = (260 , 260 ) point_color = (0 , 255 , 0 ) thickness = 1 lineType = 4 cv.rectangle(img, ptLeftTop, ptRightBottom, point_color, thickness, lineType) ptStart = (60 , 60 ) ptEnd = (260 , 260 ) point_color = (0 , 0 , 255 ) thickness = 1 lineType = 4 cv.line(img, ptStart, ptEnd, point_color, thickness, lineType) cv.namedWindow("CV Test" ) cv.imshow('CV Test' , img) cv.waitKey(5000 ) cv.destroyAllWindows()
5. 数据库及中间件的集成与使用 5.1 使用Redis缓存数据 Step1:引入redis库
Step2:使用Redis
往redis存值
1 2 3 4 5 import redispool = redis.ConnectionPool(host='127.0.0.1' , port='6379' , password='123456' ) r = redis.Redis(connection_pool=pool) r.set ('id' , '666666' )
从redis取值
1 2 3 4 5 import redispool = redis.ConnectionPool(host='127.0.0.1' , port='6379' , password='123456' ) r = redis.Redis(connection_pool=pool) get_value = r.get('id' )
5.2 将数据保存到MySQL 引入pymysql库
将数据保存到MySQL示例:
config.ini
1 2 3 4 5 6 7 [Mysql] host = 127.0 .0.1 user = rootpassword = 123456 port = 3306 db = testdbtable = test_table
log.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import logginglogger = logging.getLogger(__name__) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console = logging.StreamHandler() console.setLevel(logging.INFO) console.setFormatter(formatter) logger.addHandler(console) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("./save_mysql.log" ) handler.setLevel(logging.INFO) handler.setFormatter(formatter) logger.addHandler(handler)
save_mysql.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import timefrom configparser import ConfigParserimport pymysqlfrom log import loggerdef read_config (config_path ): cfg = ConfigParser() cfg.read(config_path, encoding='utf-8' ) host = cfg.get('Mysql' , 'host' ) user = cfg.get('Mysql' , 'user' ) password = cfg.get('Mysql' , 'password' ) port = cfg.get('Mysql' , 'port' ) db = cfg.get('Mysql' , 'db' ) table = cfg.get('Mysql' , 'table' ) mysql_dict = {} mysql_dict['host' ] = host mysql_dict['user' ] = user mysql_dict['password' ] = password mysql_dict['port' ] = port mysql_dict['db' ] = db mysql_dict['table' ] = table return mysql_dict def save_data (mysql_dict, data_list ): if data_list == []: return None mysql = pymysql.connect(host=str (mysql_dict['host' ]), user=str (mysql_dict['user' ]), password=str (mysql_dict['password' ]), port=int (mysql_dict['port' ]), db=str (mysql_dict['db' ]), charset='utf-8' ) for i in data_list: cursor = mysql.cursor() qmarks = ', ' .join(['%s' ] * len (i)) columns = ', ' .join(i.keys()) try : qry = "Insert Into " + str (mysql_dict['table' ]) + " (%s) Values (%s);" % (columns, qmarks) values_list = [] for j in i.values(): values_list.append(j) cursor.execute(qry, values_list) mysql.commit() except Exception as e: logger.error(e) cursor.close() mysql.close() now_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) insert_mysql_info = str (mysql_dict['table' ]) + "表插入了" + str (len (data_list)) + "条数据,时间是" + str (now_time) logger(insert_mysql_info) if __name__ == '__main__' : config_path = './config.ini' mysql_dict = read_config(config_path) data_list = [{'USERNAME' : 'zhangsan' , 'MESSAGE' : 'test1' },{'USERNAME' : 'lisi' , 'MESSAGE' : 'test2' }] save_data(mysql_dict, data_list)
注意事项:
1)插入时报错 Incorrect string value: ‘\xF0\x9F\x98\xAD“,…‘ for column ‘commentContent‘ at row 1
原因:数据库编码问题导致的,原因在于插入数据中存在emoji表情,而这些表情是按照四个字节一个单位进行编码的,而我们通常使用的utf-8编码在mysql数据库中默认是按照3个字节一个单位进行编码的,导致将数据存入mysql的时候出现错误。
解决:修改数据库与数据表编码,然后再改一下连接数据库的字符集编码。
1 2 3 1.修改mysql数据库的编码为uft8mb4 2.修改数据表的编码为utf8mb4 3.将代码连接mysql处改为charset='utf8mb4'
2)将longblob字段的数据写入到文件
1 2 3 4 5 6 if not os.path.exists('./img' ): os.makedirs('./img' ) uuid = uuid1() img_path = './img/{}.jpg' .format (uuid) f = open (img_path, 'wb' ) f.write(result['image_file' ])
5.3 查询Oracle的数据 引入cx_Oracle库
安装Oracle Instant Client
从Oracle里查询数据示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import cx_Oraclecx_Oracle.init_oracle_client(lib_dir="D:\\Development\\instantclient-basic-windows.x64-11.2.0.4.0" ) conn = cx_Oracle.connect("testuser" , "123456" , "127.0.0.1:1521/orcl" ) curs = conn.cursor() sql = "select a.id, a.title, a.text from test_table" rr = curs.execute(sql) result_dir = {} while (1 ): rs = rr.fetchone() if rs == None : break id = rs[0 ] title = rs[1 ] text = rs[2 ] curs.close() conn.close()
注意事项:
1、cx_Oracle.init_oracle_client()要写在Flask接口的外面,否则第二次接口请求时会报cx_Oracle已经初始化的错误。
2、Linux端部署的时候,会出现找不到libclntsh.so动态连接库的问题,报错如下:
1 cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loaded: "Error loading shared library libclntsh.so: No such file or directory". See https://oracle.github.io/odpi/doc/installation.html#linux for help
报错原因:instantclient-basic-linux.x64-11.2.0.4.0.zip
包里根本没有libclntsh.so
,有的是libclntsh.so.11.1
,而单纯的给这个文件改个名是不行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 ./instantclient_11_2: |---BASIC_README |---adrci |---genezi |---libclntsh.so.11.1 |---libnnz11.so |---libocci.so.11.1 |---libociei.so |---libocijdbc11.so |---ojdbc5.jar |---ojdbc6.jar |---uidrvci |---xstreeams.jar
解决办法:需要在Dockerfile里设置软链接解决(注意要用绝对路径)
1 2 ENV LD_LIBRARY_PATH=/home/instantclient_11_2 RUN ln -s /home/instantclient_11_2/libclntsh.so.11.1 /home/instantclient_11_2/libclntsh.so
5.4 ElasticSearch的导入导出 代码已在Github上开源,项目地址为:https://github.com/Logistic98/es-data-transfer
Step1:安装依赖并编写配置文件
1 $ pip install elasticsearch==7.16.2 // 注意要和ES的版本保持一致
config.ini(把ES连接信息换成自己的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [TARGET_ES] host = 192.168 .1.1 port = 9200 user = elasticpassword = elastictimeout = 60 [SOURCE_ES] host = 192.168 .1.2 port = 9200 user = elasticpassword = elastictimeout = 60 index_list = test_index1, test_index2
注:多个索引之间用英文逗号分隔(逗号后面有没有空格都无所谓,读取配置时会进行处理)
Step2:编写ES导入导出脚本
export_es_data.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 from elasticsearch import Elasticsearchfrom datetime import timedeltaimport datetimeimport osimport jsonimport loggingfrom configparser import ConfigParserlogging.basicConfig(filename='logging_es.log' , level=logging.INFO, format ='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def read_config (): cfg = ConfigParser() cfg.read('./config.ini' , encoding='utf-8' ) host = cfg.get('SOURCE_ES' , 'host' ) port = cfg.get('SOURCE_ES' , 'port' ) user = cfg.get('SOURCE_ES' , 'user' ) password = cfg.get('SOURCE_ES' , 'password' ) timeout = cfg.get('SOURCE_ES' , 'timeout' ) index_list = cfg.get('SOURCE_ES' , 'index_list' ) es_dict = {} es_dict['host' ] = host es_dict['port' ] = port es_dict['user' ] = user es_dict['password' ] = password es_dict['timeout' ] = timeout es_dict['index_list' ] = index_list return es_dict def write_list_to_json (list , json_file_name, json_file_save_path ): """ 将list写入到json文件 :param list: :param json_file_name: 写入的json文件名字 :param json_file_save_path: json文件存储路径 :return: """ if not os.path.exists(json_file_save_path): os.makedirs(json_file_save_path) os.chdir(json_file_save_path) with open (json_file_name, 'w' , encoding='utf-8' ) as f: json.dump(list , f, ensure_ascii=False ) def es_json (es_dict, start_time, end_time ): str_separate = "===============================================================" try : BASE_DIR = os.getcwd() Es = Elasticsearch( hosts=[str (es_dict['host' ]) + ":" + str (es_dict['port' ])], http_auth=(str (es_dict['user' ]), str (es_dict['password' ])), timeout=int (es_dict['timeout' ]) ) except Exception as e: logging.error(e) index_list = '' .join(es_dict['index_list' ].split()).split("," ) for i in index_list: print (f"保存索引{i} 的数据\r" ) print_info1 = "保存索引" + i + "的数据" logging.info(print_info1) query = { "query" : { "range" : { "@timestamp" : { "gt" : start_time, "lte" : end_time } } }, "size" : 10000 } try : data = Es.search(index=i, body=query) source_list = [] for hit in data['hits' ]['hits' ]: source_data = hit['_source' ] source_data['_id' ] = hit['_id' ] source_list.append(source_data) print (f"保存的时间为{start_time} 到{end_time} \r" ) print_info2 = "保存的时间为" + start_time + "到" + end_time + "" logging.info(print_info2) file_path = BASE_DIR + "/json_file" file_name = str (i) + ".json" if len (source_list) != 0 : write_list_to_json(source_list, file_name, file_path) else : print ('无更新' ) logging.info(str (i) + '无更新' ) print (str_separate) logging.info(str_separate) except Exception as e: print (e) logging.info("es数据库到json文件的读写error" % e) logging.info(str_separate) if __name__ == '__main__' : start_date_time = datetime.datetime.now() + timedelta(days=-1 ) end_date_time = datetime.datetime.now() start_time = start_date_time.strftime("%Y-%m-%dT%H:00:00.000Z" ) end_time = end_date_time.strftime("%Y-%m-%dT%H:00:00.000Z" ) es_dict = read_config() BASE_DIR = os.getcwd() es_json(es_dict, start_time, end_time)
import_es_data.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 import osimport loggingimport timefrom elasticsearch import Elasticsearch, helpersfrom configparser import ConfigParserlogging.basicConfig(filename='logging_es.log' , level=logging.INFO, format ='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def json_es (BASE_DIR ): json_path = BASE_DIR + '/json_file/' filelist = [] for file in os.listdir(json_path): if '.json' == file[-5 :]: filelist.append(json_path + file) for i in filelist: head, sep, tail = i.partition('json_file/' ) indexname = tail head, sep, tail = indexname.partition('.json' ) index_name = head read_json(i, index_name) os.remove(i) def read_json (file_path, index_name ): with open (file_path, 'r' , encoding='utf-8' ) as file: json_str = file.read() null = None json_list = eval (json_str) batch_data(json_list, index_name) def batch_data (json_list, index_name ): """ 批量写入数据 """ length = len (json_list) step = 1000 for i in range (0 , length, step): if i + step < length: actions = [] for j in range (i, i + step): new_id = json_list[j]['_id' ] del json_list[j]["_id" ] action = { "_index" : str (index_name), "_id" : str (new_id), "_source" : json_list[j] } actions.append(action) helpers.bulk(Es, actions, request_timeout=120 ) else : actions = [] for j in range (i, length): new_id = json_list[j]['_id' ] del json_list[j]["_id" ] action = { "_index" : str (index_name), "_id" : str (new_id), "_source" : json_list[j] } actions.append(action) helpers.bulk(Es, actions, request_timeout=120 ) now_time = time.strftime("%Y-%m-%d %H:%M:%S" , time.localtime()) insert_es_info = str (index_name) + "索引插入了" + str (length) + "条数据,时间是" + str (now_time) logging.info(insert_es_info) def read_config (): cfg = ConfigParser() cfg.read('./config.ini' , encoding='utf-8' ) host = cfg.get('TARGET_ES' , 'host' ) port = cfg.get('TARGET_ES' , 'port' ) user = cfg.get('TARGET_ES' , 'user' ) password = cfg.get('TARGET_ES' , 'password' ) timeout = cfg.get('TARGET_ES' , 'timeout' ) es_dict = {} es_dict['host' ] = host es_dict['port' ] = port es_dict['user' ] = user es_dict['password' ] = password es_dict['timeout' ] = timeout return es_dict if __name__ == '__main__' : BASE_DIR = os.getcwd() es_dict = read_config() Es = Elasticsearch( hosts=[str (es_dict['host' ]) + ":" + str (es_dict['port' ])], http_auth=(str (es_dict['user' ]), str (es_dict['password' ])), timeout=int (es_dict['timeout' ]) ) json_es(BASE_DIR)
Step3:执行脚本导入导出
执行 export_es_data.py 会读取 [SOURCE_ES] 里的 ES 配置,对指定索引进行导出,注意单次仅能导出10000条数据
执行 import_es_data.py 会读取 [TARGET_ES] 里的 ES 配置,json_file文件夹内的json文件进行导入,导入成功后会删除这些json文件。
5.5 minio的文件上传 Step1:安装依赖并编写配置文件
config.ini
1 2 3 4 [minio] minio_url = xxx.xxx.xxx.xxx:9000 access_key = minioadmin secret_key = minioadmin
注:minio_url不要带上http://
的前缀,否则会报如下错误
1 ValueError: path in endpoint is not allowed. Exception ignored in: <function Minio.__del__ at 0x0C0B9A98>
Step2:minio上传文件的代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 import loggingimport osfrom minio import Miniofrom minio.error import S3Errorfrom configparser import ConfigParserlogging.basicConfig(filename='logging_mysql.log' , level=logging.INFO, format ='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def read_config (): cfg = ConfigParser() cfg.read('./config.ini' , encoding='utf-8' ) minio_url = cfg.get('minio' , 'minio_url' ) access_key = cfg.get('minio' , 'access_key' ) secret_key = cfg.get('minio' , 'secret_key' ) config_dict = {} config_dict['minio_url' ] = minio_url config_dict['access_key' ] = access_key config_dict['secret_key' ] = secret_key return config_dict def get_minio_client (config_dict ): minio_client = Minio(config_dict['minio_url' ], access_key=config_dict['access_key' ], secret_key=config_dict['secret_key' ], secure=False ) return minio_client def minio_make_bucket_ifnotexist (minio_client, bucket_name ): bucket_name = bucket_name.replace('_' , "-" ) try : if not minio_client.bucket_exists(bucket_name): logging.info("该存储桶不存在:" + bucket_name) minio_client.make_bucket(bucket_name) logging.info("存储桶创建:" + bucket_name) except S3Error as e: if "InvalidAccessKeyId" in str (e): logging.error("minio 的 access_key 可能有误" ) elif "SignatureDoesNotMatch" in str (e): logging.error("minio 的 secret_key 可能有误" ) else : logging.error("minio 的 endpoint、access_key、secret_key 可能有误" ) raise e def remove_bucket (minio_client, bucket_name ): try : minio_client.remove_bucket(bucket_name) logging.info("删除存储桶成功:" + bucket_name) except S3Error as e: logging.error(e) def minio_upload_file (minio_client, bucket_name, object_name, file_path ): logging.info(file_path) result = minio_client.fput_object(bucket_name, object_name, file_path) return result def find_filepaths (dir ): result = [] for root, dirs, files in os.walk(dir ): for name in files: filepath = os.path.join(root, name) if os.path.exists(filepath): result.append(filepath) return result def get_object_name (file_path ): file_dir, file_full_name = os.path.split(file_path) return file_full_name if __name__ == '__main__' : config_dict = read_config() minio_client = get_minio_client(config_dict) minio_make_bucket_ifnotexist(minio_client, 'test' ) remove_bucket(minio_client, 'test' ) minio_make_bucket_ifnotexist(minio_client, 'test' ) img_list = find_filepaths('./img' ) for img_path in img_list: object_name = get_object_name(img_path) minio_upload_file(minio_client, 'test' , object_name, img_path)
6. Python常用的进阶知识及示例 6.1 使用vthread实现多线程 6.1.1 vthread简介 项目描述:python 多线程库,在不改变源代码的情况下,一行代码即可实现线程池操作。
项目地址:https://github.com/cilame/vthread
依赖安装:pip install vthread
6.1.2 vthread基本使用 [1] 基本线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import timeimport vthread@vthread.pool(3 ) def foolfunc (num ): time.sleep(1 ) print (str (num)) if __name__ == '__main__' : for i in range (5 ): foolfunc(i)
输出结果:
1 2 3 4 5 [ Thread-2_v ] 1 [ Thread-1_v ] 0 [ Thread-3_v ] 2 [ Thread-2_v ] 3 [ Thread-1_v ] 4
[2] 生产消费过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import time, random, queuefrom vthread import poolls1 = queue.Queue() ls2 = queue.Queue() producer = 'pr' consumer1 = 'co1' consumer2 = 'co2' @pool(3 , gqueue=producer ) def creater (num ): time.sleep(random.random()) num1, num2 = num, num * num + 100 print ("数据进入队列: num:{}" .format (num)) ls1.put(num1) ls2.put(num2) @pool(1 , gqueue=consumer1 ) def coster1 (): while not pool.check_stop(gqueue=producer): time.sleep(random.random()) pp = [ls1.get() for _ in range (ls1.qsize())] print ('当前消费的列表 list: {}' .format (pp)) @pool(1 , gqueue=consumer2 ) def coster2 (): while not pool.check_stop(gqueue=producer): time.sleep(random.random()) pp = [ls2.get() for _ in range (ls2.qsize())] print ('当前消费的列表 list: {}' .format (pp)) if __name__ == '__main__' : for i in range (10 ): creater(i) coster1() coster2() pool.waitall() print ('当生产和消费的任务池数据都结束后,这里才会打印' ) print ('current queue 1 size:{}' .format (ls1.qsize())) print ('current queue 2 size:{}' .format (ls2.qsize())) print ('end' )
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 [ Thread-5_co2] 当前消费的列表 list: [] [ Thread-5_co2] 当前消费的列表 list: [] [ Thread-4_co1] 当前消费的列表 list: [] [ Thread-3_pr ] 数据进入队列: num:2 [ Thread-4_co1] 当前消费的列表 list: [2] [ Thread-2_pr ] 数据进入队列: num:1 [ Thread-4_co1] 当前消费的列表 list: [1] [ Thread-1_pr ] 数据进入队列: num:0 [ Thread-5_co2] 当前消费的列表 list: [104, 101, 100] [ Thread-5_co2] 当前消费的列表 list: [] [ Thread-3_pr ] 数据进入队列: num:3 [ Thread-2_pr ] 数据进入队列: num:4 [ Thread-2_pr ] 数据进入队列: num:7 [ Thread-4_co1] 当前消费的列表 list: [0, 3, 4, 7] [ Thread-1_pr ] 数据进入队列: num:5 [ Thread-5_co2] 当前消费的列表 list: [109, 116, 149, 125] [ Thread-5_co2] 当前消费的列表 list: [] [ Thread-5_co2] 当前消费的列表 list: [] [ Thread-2_pr ] 数据进入队列: num:8 [ Thread-3_pr ] 数据进入队列: num:6 [ Thread-4_co1] 当前消费的列表 list: [5, 8, 6] [ Thread-1_pr ] 数据进入队列: num:9 [ Thread-5_co2] 当前消费的列表 list: [164, 136, 181] [ Thread-4_co1] 当前消费的列表 list: [9] [ MainThread ] 当生产和消费的任务池数据都结束后,这里才会打印 [ MainThread ] current queue 1 size:0 [ MainThread ] current queue 2 size:0 [ MainThread ] end
6.2 使用Python协程 6.2.1 Python协程简介 协程,又称微线程,是运行在单线程中的“并发”,协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。
在Python3.4之前,官方没有对协程的支持,但存在一些第三方库的实现,比如gevent和tornado,3.4之后有了asyncio,官方才真正实现了协程这一特性。
6.2.2 进程、线程、协程对比 进程是资源分配的单位,线程是操作系统调度的单位。进程切换需要的资源很最大,效率很低;线程切换需要的资源一般,效率一般;协程切换任务资源很小,效率高。
[1] 进程:一个程序运行起来后,代码及用到的资源称之为进程,它是操作系统分配资源的基本单元。
[2] 线程:一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
[3] 协程:协程是Python中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元。
6.2.2 使用asyncio实现协程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioasync def func (): print (1 ) await asyncio.sleep(2 ) print (2 ) return "返回值" async def main (): print ("main开始" ) task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) print ("main结束" ) ret1 = await task1 ret2 = await task2 print (ret1, ret2) loop = asyncio.get_event_loop() loop.run_until_complete(main())
输出结果:
1 2 3 4 5 6 7 main开始 main结束 1 1 2 2 返回值 返回值
6.3 使用Python装饰器 6.3.1 Python装饰器简介 Python 的装饰器是一种非常便捷的修改函数的方式,不影响原函数的定义而对函数进行一些额外的封装,有点类似 AOP,增加一些小功能却不侵入原有代码,非常简洁强大。
6.3.2 与Java注解异同点对比 [1] 对代码块的影响
java注解:不会对所修饰的代码产生直接的影响。
python装饰器:可以对所修饰的代码产生直接的影响。
[2] 共通处
java中注解+反射 可以实现 python装饰器同样的功能,包括面向切面编程、参数校验等。
[3] 从用途看
从用途看注解像是注释文档一样,用于生成javadoc文档(以参数形式标注)、检查等。
装饰器像是为函数提供更多的功能,并装在不同的函数身上。
[4] 从原理看
java注解:所有注解本质是继承自接口的接口。
python装饰器:被装饰函数的返回值作为参数传给闭包函数执行(这个闭包函数名前面加个@,就是装饰器)。
6.3.3 使用装饰器实现权限校验 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class Person (): def __init__ (self, name, permission ): self.name = name self.permission = permission def checkPermission (num ): def setPermission (func ): def inner (person ): if num & person.permission == num: func(person) else : print (person.name, "无权限" ) return inner return setPermission @checkPermission(1 ) def read (person ): print (person.name, "读取代码" ) @checkPermission(2 ) def write (person ): print (person.name, "写入代码" ) @checkPermission(4 ) def run (person ): print (person.name, "执行代码" ) if __name__ == '__main__' : p1 = Person("张三" , 1 ) p2 = Person("李四" , 3 ) p3 = Person("王五" , 6 ) read(p1), write(p1), run(p1) print ("===================" ) read(p2), write(p2), run(p2) print ("===================" ) read(p3), write(p3), run(p3)
输出结果:
1 2 3 4 5 6 7 8 9 10 11 张三 读取代码 张三 无权限 张三 无权限 =================== 李四 读取代码 李四 写入代码 李四 无权限 =================== 王五 无权限 王五 写入代码 王五 执行代码
6.4 程序内存占用分析 6.4.1 Memray简介 需求情景:深度学习算法编写或者调用不当时可能会出现内存叠加、内存溢出等问题,可以使用Memray工具对程序内存占用进行分析。
项目描述:Memray 是 Python 的内存分析器。它可以跟踪 Python 代码、本地扩展模块和 Python 解释器本身中的内存分配。仅可用于Linux平台。
项目地址:https://github.com/bloomberg/memray
6.4.2 Memray基本使用 具体使用:安装依赖——用memray运行程序——转换二进制文件
1 2 3 4 $ pip install memray // 安装memray依赖(仅支持Linux) $ python -m memray run my_script.py // 运行单个文件 $ python -m memray run -m my_module // 运行整个模块 $ memray flamegraph my_script.2369.bin // 将二进制文件转换成火焰图html文件
注意必须是Linux平台,其他平台不支持使用,它生成的是一个二进制文件(如my_script.2369.bin),可通过命令将其转换成直观的火焰图html文件。
7. 项目的打包部署 一般使用Docker来部署Flask项目,它的基本概念及使用就不再赘述了,不会的话见我的另一篇博客:VPS基本部署环境的搭建与配置 。
7.1 Docker环境搭建 1 2 3 4 $ apt-get update -y && apt-get install curl -y # 安装curl $ curl https://get.docker.com | sh - # 安装docker $ sudo systemctl start docker # 启动docker服务 $ docker version # 查看docker版本(客户端要与服务端一致)
7.2 导出项目依赖 方法一:使用pip freeze
命令导出所有依赖,再进行筛选。
1 $ pip freeze > requirements.txt
注:建议对项目单独建一个conda虚拟环境,再导出依赖,这样导出的依赖就这一个项目的,就不用手动删除无用的了。
方法二:使用pipreqs库导出本项目的依赖,生成的也是requirements.txt文件。
1 2 3 $ pip install pipreqs $ cd /root/test-project // 切换到项目根目录 $ pipreqs ./ --encoding=utf8 // 需要带上编码的指定,否则会报GBK编码错误
注意这里还有个坑如下,这是因为本机开了翻墙代理导致的,把代理软件关了就好了。
1 requests.exceptions.SSLError: HTTPSConnectionPool(host='pypi.python.org', port=443): Max retries exceeded with url: /pypi/cv2/json (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1125)')))`
7.3 使用Docker部署Flask项目 编写Dockerfile,示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 FROM python:3.7 RUN mkdir /code ADD test-project /code/ WORKDIR /code RUN pip install -r requirements.txt RUN apt-get update && apt-get install vim -y EXPOSE 5000 ENTRYPOINT ["nohup" ,"python" ,"server.py" ,"&" ]
Step2:将项目和Dockerfile上传到服务器并制作镜像运行容器,示例如下:
1 2 3 $ cd /root/deploy // 切换到存放项目和Dockerfile的目录 $ docker build -t test-flask-image . // 使用Dockerfile构建镜像 $ docker run -d -p 5000:5000 --name test-flask test-flask-image:latest // 通过镜像运行容器
我们可以打包导出镜像,方便迁移到其他服务器上部署。
1 $ docker save test-image > test-image.v1.dockerimage
7.4 依赖类库的安装部署说明 [1] pyhanlp:中文分词 词性标注 命名实体识别 依存句法分析 新词发现 关键词短语提取 自动摘要 文本分类聚类 拼音简繁 自然语言处理
pyhanlp依赖的运行需要JVM环境,因此部署时应在 Dockerfile 里添加 jdk 并完成配置。安装包去官网下载Linux版的,放到项目里即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 WORKDIR /usr RUN mkdir /usr/local/java ADD jdk-8u202-linux-x64.tar.gz /usr/local/java RUN ln -s /usr/local/java/jdk1.8.0_202 /usr/local/java/jdk ENV JAVA_HOME /usr/local/java/jdkENV JRE_HOME ${JAVA_HOME}/jreENV CLASSPATH .:${JAVA_HOME}/lib:${JRE_HOME}/libENV PATH ${JAVA_HOME}/bin:$PATH
附:jdk-8u202-linux-x64.tar.gz 的官网下载地址
[2] PyTorch:一种开源机器学习框架。
如果使用GPU(NVIDIA显卡)部署的话,需要先安装CUDA驱动,然后通过如下命令查看CUDA版本:
去 PyTorch官网 勾选上自己服务器的环境,下面会生成对应的安装命令。
1 2 3 4 5 6 7 8 CPU版 $ pip install torch==1.10.0+cpu torchvision==0.11.1+cpu torchaudio==0.10.0+cpu -f https://download.pytorch.org/whl/cpu/torch_stable.html GPU版(CUDA 10.2) $ pip install torch torchvision torchaudio GPU版(CUDA 11.3) $ pip install torch==1.10.2+cu113 torchvision==0.11.3+cu113 torchaudio==0.10.2+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html
验证PyTorch是否连上GPU
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import torchimport torch.nn as nnprint (torch.cuda.is_available())model = nn.LSTM(input_size=10 , hidden_size=4 , num_layers=1 , batch_first=True ) model = model.cuda() print (next (model.parameters()).device) data = torch.ones([2 , 3 ]) data = data.cuda() print (data.device)
[3] tensorflow:一个端到端开源机器学习平台。
直接使用 pip 安装有时会出问题,可以去 tensorflow官网 找对应版本的 whl 包进行安装。
1 2 $ pip install https://storage.googleapis.com/tensorflow/linux/gpu/tensorflow_gpu-2.6.0-cp37-cp37m-manylinux2010_x86_64.whl (支持GPU) $ pip install https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow_cpu-2.6.0-cp37-cp37m-manylinux2010_x86_64.whl (仅支持CPU)
验证 Tensorflow是否连上GPU
1 2 3 4 5 6 7 8 import tensorflow as tfprint (tf.test.is_gpu_available())gpu_device_name = tf.test.gpu_device_name() print (gpu_device_name)
7.5 常见报错问题的解决 [1] ImportError: libGL.so.1: cannot open shared object file: No such file or directory
1 2 $ apt update $ apt install libgl1-mesa-glx -y
[2] ImportError: numpy.core.multiarray failed to import
1 $ pip install --upgrade numpy
8. 参考资料 [1] Python 使用 flask 库传递 JSON 数据 from CSDN
[2] Python | Flask 解决跨域问题 from segmentfault
[3] flask小坑–request.json 无法获取请求body的json数据 from CSDN
[4] python清除字符串中间空格的方法 from CSDN
[5] Python 3 实现定义跨模块的全局变量和使用 from 博客园
[6] python 将爬取的数据存入mysql from 代码先锋网
[7] python操作mysql数据库(增,删,改,查)from CSDN
[8] 新版xlrd报 Excel xlsx file;not supported from CSDN
[9] Python md5去重图片文件 from 代码交流
[10] Python递归获取指定文件夹下所有指定后缀的文件路径 from CSDN
[11] Python3生成XML文件 from 知乎
[12] python 统计list中各个元素出现的次数 from CSDN
[13] python 读写csv文件(创建,追加,覆盖)from CSDN
[14] 用Python编写的CSV文件每行之间都有空行 from QAStack
[15] python操作csv文件实现对特定列排序 from 简书
[16] docker镜像alpine中安装oracle客户端 from 简书
[17] 解决flask接口返回的内容中文乱码的问题 from CSDN
[18] json dump 中文乱码 from CSDN
[19] 文本摘要/关键词TextRank算法的优化与思考 from 知乎
[20] PaddleOCR快速开始 form Github
[21] 教你利用yolov5训练自己的目标检测模型 from CSDN
[22] python导出项目依赖包与导入项目依赖包 from CSDN
[23] requests.exceptions.SSLError: HTTPSConnectionPool 解决方案 from CSDN
[24] python 使用代理的几种方式 from CSDN
[25] python3 | pip install dlib报错 from CSDN
[26] python字典转为对象,用”.”方式访问对象属性 from 博客园
[27] 视频抽帧那点事 from cnblogs
[28] 保护版权,用 Python 为图片添加盲水印 from 程序员客栈
[29] python多种方法压缩图片,opencv、PIL、tinypng、pngquant压缩图片 from CSDN
[30] Flask 使用流下载文件 from CSDN
[31] 解决:ImportError: numpy.core.multiarray failed to import from CSDN
[32] [Docker] 错误之ImportError: libGL.so.1: cannot open shared object file: No such file or directory from CSDN
[33] paddleocr多平台使用 from jieli
[34] paddlepaddle/PaddleHub报错cannot import name ‘_convert_attention_mask’ from ‘paddle.nn.layer.transformer from Gitee Issues
[35] googletrans出现httpcore._exceptions.ConnectError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。from CSDN
[36] InvalideEndpointException .net core 3 from Github
[37] MinIO 基于 python 把目录下的文件上传到 minio from 夏来风
[38] Python连接MINIO Api, 实现上传下载等功能 from 一只技术小白
[39] Python使用grequests并发发送请求 from 北京临渊
[40] Python 文本加密解密 中文TXT数据 from 简书
[41] Python调用playsound时报错:指定的设备未打开,或不被 MCI 所识别 from 程序员的秘密
[42] python 网络图片转base64 from CSDN
[43] python之base64编码出现b的问题 from 知乎
[44] Flask后端实践 连载三 接口标准化及全球化 from CSDN
[45] 使用 Flask-Docs 自动生成 Api 文档 from segmentfault
[46] 令人不悦的Error–requests.exceptions.ProxyError from CSDN
[47] python制作图片缩略图 from 51CTO
[48] Python 读写 ini 配置文件 from IS-RPA
[49] Python3.9使用pandas读取Excel报错 AttributeError: ‘ElementTree‘ object has no attribute ‘getiterator‘ from CSDN
[50] 使用 pre-request 优化 Flask 入参校验 from 稀土掘金
[51] python高级使用,包括多线程、协同编程、知识协同、网络、装饰器等 from Github
[52] 浅谈java中注解和python中装饰器的区别 from CSDN
[53] python 装饰器学习 校验权限 from CSDN
[54] DeOldify黑白旧照片着色神器:基于NoGAN的深度学习来实现旧照着色还原 from 佰阅部落
[55] 人工智能DeOldify修复黑白图片和视频 from Bilibili
[56] 黑白老照片上色,手把手教你用Python怎么玩儿!from 简书
[57] Google Colab免费GPU使用教程 from Rogn
[58] 一个基于深度学习的项目,用于着色和恢复旧图像和视频 from Github
[59] Colaboratory常见问题解答 from Google官方文档
[60] Python-在线网页导出为图片或pdf from 代码先锋网
[61] Pyppeteer Browser closed unexpectedly in heroku from GitHub Issues
[62] puppeteer Troubleshooting 官方文档 from Github
[63] 使用docker部署chrome无头浏览器并解决中文乱码,为pyppeteer提供运行环境 from CSDN
[64] python asynio错误 There is no current event loop in thread ‘Thread-1’ from CSDN
[65] Pyppeteer和Flask问题,服务器部署Requests_html问题,多线程调用pyppeteer或requests_html问题 from CSDN
[66] Puppeteer wait until page is completely loaded from stackoverflow
[67] Puppeteer–等待加载 from 博客园
[68] Generate the pdf with empty content occasionally from GitHub Issues