图像与文本音频的AI智能处理

6/6/2021 DeOldifyDragGANimage-mattingMVSS-NetHivisionIDPhotosFastTextRankPyppeteer

# 1. 图片文件处理

# 1.1 DeOldify旧图片进行着色

# 1.1.1 DeOldify简介

DeOldify 是由 Jason Antic 开发和更新的。这是目前最先进的黑白图像、视频的着色方法,所有的东西都是开源的。

基本原理:它使用了一种名为NoGAN的新型GAN训练方法,该方法是作者自己开发的,用来解决在使用由一个鉴别器和一个生成器组成的正常对抗性网络架构进行训练时出现的主要问题。典型地,GAN训练同时训练鉴别器和生成器,生成器一开始是完全随机的,随着时间的推移,它会欺骗鉴别器,鉴别器试图辨别出图像是生成的还是真实的。

DeOldify旧照片着色

# 1.1.2 官方提供的在线服务及API

如果不想折腾的话,可以使用官方提供的 DeOldify Image Colorization on DeepAI (opens new window),可以直接在这里上传图片对旧照片进行着色,同时该网站还提供了API,供程序中调用,下文可以不用看了。

# 1.1.3 DeOldify的预训练模型

预训练模型:DeOldify 是基于深度学习开发的,需要用到预训练权重,这里项目开发者已经把训练好的权重上传了,我们可以直接拿来使用,不需要我们再训练。

  • Artistic 权重,会使图片上色效果更大胆 一些,下载地址:

    https://data.deepai.org/deoldify/ColorizeArtistic_gen.pth
    
    1
  • Stable 权重,相对于 Artistic 上色效果更保守一些,下载地址:

    https://www.dropbox.com/s/usf7uifrctqw9rl/ColorizeStable_gen.pth
    
    1
  • Video 权重,此权重文件用来给视频上色,下载地址:

    https://data.deepai.org/deoldify/ColorizeVideo_gen.pth
    
    1

权重文件下载完毕后,在项目根目录下创建一个 models 文件夹,把下载好的权重文件放入 models 文件夹内即可。

# 1.1.4 使用Google Colab进行部署

由于运行深度学习的项目对机器性能要求较高,因此下文使用了官方提供的预训练模型,并白嫖 Google Colab 进行部署。DeOldify对旧照片、旧视频的着色的使用流程基本一致,只不过用到的预训练模型不同而已,以旧照片着色为例。

官方也提供了Google Colab,不过那个是英文版的,我没有尝试了,下面我用的是网上找的一份中文版的,将其保存到自己的Google Drive里

注:使用Google Colab需要翻墙,这个要保存到自己的云端硬盘里,我的你们是无法执行的。

DeOldify的Google-Colab

打开之后先去执行该代码块(悬浮即可显示执行按钮)

#点击左侧按钮一键配置环境
!git clone https://github.com/jantic/DeOldify.git DeOldify 
%cd /content/DeOldify
!pip install -r /content/DeOldify/requirements.txt
import fastai
from deoldify.visualize import *

torch.backends.cudnn.benchmark = True
!mkdir 'models'
!wget https://data.deepai.org/deoldify/ColorizeArtistic_gen.pth -O ./models/ColorizeArtistic_gen.pth
colorizer = get_image_colorizer(artistic=True)
1
2
3
4
5
6
7
8
9
10
11

说明:预训练模型的地址如果失效了就自己找个吧,替换掉即可。如果要使用 Stable 权重,需要把下面改成False

Artistic 权重  -- colorizer = get_image_colorizer(artistic=True)
Stable 权重    -- colorizer = get_image_colorizer(artistic=False)
1
2

踩过的坑:第一次执行的时候可能会出现依赖安装失败的问题,不要慌。点击 RESTART RUNTIME 按钮,等一会儿再重新执行代码块,第二次应该就可以安装成功了,成功的话左侧有个绿色箭头。

DeOldify

安装成功环境以后,再在下面的 source_url 里填入旧照片链接(本地图片的话可以先上传到图床),然后点击左侧的执行按钮,等待一会儿即可生成着色后的照片。

DeOldify旧照片着色实践

注:如果你的旧照片里本身就有颜色的话,生成效果可能会不太好。因为它会先把原有颜色替换成黑白的,再根据算法生成新的颜色,会导致与原图的颜色不一致。如果你想要保持一致的话,就需要借助PS的蒙版进行二次处理了。

# 1.2 image-matting一键抠图

# 1.2.1 image-matting简介

一个基于cv_unet_image-matting和cv_unet_universal-matting模型的一键AI抠图在线工具。

image-matting

# 1.2.2 image-matting部署

实验环境:Debian 11 x86_64 系统,8GB内存,160GB存储,2x Intel Xeon CPU,无GPU,带宽1 Gigabit

可以使用Docker进行一键搭建,无GPU也可以运行。

// 方式一:直接使用官方镜像
$ docker run --name image-matting -itd -p 8000:8000 ihmily/image-matting:0.0.3

// 方式二:从源码自行构建镜像
$ git clone https://github.com/ihmily/image-matting.git 
$ cd image-matting
$ docker build -t image-matting:0.0.3 .
$ docker run --name image-matting -itd -p 8000:8000 image-matting:0.0.3
1
2
3
4
5
6
7
8

注意:服务启动过程有一点儿慢,需要等一会儿才能用,可以使用 docker logs -f image-matting --tail 100 命令查看日志。

# 1.2.3 image-matting使用

[1] 网页调用方式

使用Chrome浏览器打开 http://ip:8000 地址,上传图片即可。

image-matting抠图效果

[2] API调用方式

本地图片路径:

import requests

server = "http://127.0.0.1:8000"
image_path = "image.png"
model_name = "universal"  # people,universal
files = {"image": (image_path, open(image_path, "rb"))}
data = {"model": model_name}
response = requests.post(server+'/matting', files=files, data=data)
print(response.text)
json_data = response.json()
image_url = json_data['result_image_url']
mask_url = json_data['mask_image_url']
print("image_url:", server + image_url)
print("mask_url:", server + mask_url)
1
2
3
4
5
6
7
8
9
10
11
12
13
14

网络图片链接:

import requests

server = "http://127.0.0.1:8000"
image_url = "http://your-image-url/demo.png"
data = {"image_url": image_url, "model": "universal"}  # people,universal
response = requests.post(server+'/matting/url', json=data)
print(response.text)
json_data = response.json()
image_url = json_data['result_image_url']
mask_url = json_data['mask_image_url']
print("image_url:",server+image_url)
print("mask_url:",server+mask_url)
1
2
3
4
5
6
7
8
9
10
11
12

# 1.3 HivisionIDPhotos制作证件照

# 1.3.1 HivisionIDPhotos简介

HivisionIDPhoto 项目旨在通过智能算法简化证件照生成流程。该项目利用完善的模型工作流程,能够识别多种拍照场景,进行精准抠图并自动生成标准证件照,仅使用 CPU 即可快速完成抠图任务。

# 1.3.2 HivisionIDPhotos部署

实验环境:Debian 11 x86_64 系统,8GB内存,160GB存储,2x Intel Xeon CPU,无GPU,带宽1 Gigabit

$ docker pull linzeyi/hivision_idphotos:v1
$ docker run -itd --name hivision_idphotos -p 7860:7860  linzeyi/hivision_idphotos:v1
1
2

注:该工具支持部署API服务,以API的形式嵌入其它应用进行使用,详见官方文档,这里就不赘述了。

# 1.3.3 HivisionIDPhotos使用

使用Chrome浏览器打开 http://ip:7860 地址,上传照片设置参数即可一键生成证件照。

HivisionIDPhotos效果

使用体验:除了在页面使用之外,该工具还支持以API的方式来供其他应用去调用,它使用CPU即可快速推理,几秒钟即可出结果,生成的证件照效果还不错。

# 1.4 DragGAN拖拽修图

# 1.4.1 DragGAN简介

DragGAN支持通过鼠标拖拽的方式对图像进行编辑,任何人都可以通过精确控制像素去向,轻松修改图像中物体的姿态、表情、形状、布局等。

DragGAN

# 1.4.2 准备代码与模型

实验环境:Macbook Pro 2021,M1 pro芯片,16G内存,512G存储,macOS Ventura13.2.1系统,Python3.9环境

Step1:拉取代码并安装依赖

$ git clone [email protected]:XingangPan/DragGAN.git
$ pip3 install -r requirements
1
2

Step2:下载算法模型

$ cd scripts && chmod u+x download_model.sh && sh download_model.sh
1

该脚本的内容如下,也可自己手动下载模型放到 checkpoints 目录下:

mkdir checkpoints
cd checkpoints

wget https://storage.googleapis.com/self-distilled-stylegan/lions_512_pytorch.pkl
mv lions_512_pytorch.pkl stylegan2_lions_512_pytorch.pkl

wget https://storage.googleapis.com/self-distilled-stylegan/dogs_1024_pytorch.pkl
mv dogs_1024_pytorch.pkl stylegan2_dogs_1024_pytorch.pkl

wget https://storage.googleapis.com/self-distilled-stylegan/horses_256_pytorch.pkl
mv horses_256_pytorch.pkl stylegan2_horses_256_pytorch.pkl

wget https://storage.googleapis.com/self-distilled-stylegan/elephants_512_pytorch.pkl
mv elephants_512_pytorch.pkl stylegan2_elephants_512_pytorch.pkl

wget https://api.ngc.nvidia.com/v2/models/nvidia/research/stylegan2/versions/1/files/stylegan2-ffhq-512x512.pkl
wget https://api.ngc.nvidia.com/v2/models/nvidia/research/stylegan2/versions/1/files/stylegan2-afhqcat-512x512.pkl
wget http://d36zk2xti64re0.cloudfront.net/stylegan2/networks/stylegan2-car-config-f.pkl
wget http://d36zk2xti64re0.cloudfront.net/stylegan2/networks/stylegan2-cat-config-f.pkl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 1.4.3 运行DragGAN服务

执行以下命令运行DragGAN服务,拖拽即可调整物体姿态,具体使用详见官方演示。

$ export PYTORCH_ENABLE_MPS_FALLBACK=1
$ python3 visualizer_drag_gradio.py
1
2

DragGAN效果

# 1.5 MVSS-Net图像篡改检测

# 1.5.1 MVSS-Net简介

图像处理检测网络,用于检测图像的剪切、移动、拼接和修补等操作,在媒体取证中非常重要。作者提出了一种多视角特征学习方法,为了有效地从真实图像中学习,使用多尺度(像素/边缘/图像)监督进行训练。该网络及增强版本MVSS-Net++在同一数据集和跨数据集场景中进行了实验,显示出MVSS-Net++表现最佳,并且对JPEG压缩、高斯模糊和基于截图的图像重采样具有更好的鲁棒性。

MVSS-Net

# 1.5.2 准备MVSS-Net代码及模型

实验环境:Macbook Pro 2021,M1 pro芯片,16G内存,512G存储,macOS Sonoma14.4.1系统,Python3.9环境

$ git clone https://github.com/dong03/MVSS-Net.git
$ cd MVSS-Net
$ pip3 install -r requirements.txt   // 这个项目比较老,有些依赖版本可能不适配Python3.9环境,自行调整即可
1
2
3

下载模型:Google Drive (opens new window)百度网盘 (opens new window) (提取码:mvss),这里使用百度网盘的 mvssnet+_casia.pt 模型文件,放到 ckpt 目录。

MVSS-Net模型文件

# 1.5.3 使用MVSS-Net检测篡改的图像

这个算法直接使用CPU推理就挺快的,原先是要用GPU来跑,将 MVSS-Net/common/tools.py 文件的 img = img.cuda() 改成 img = img.to("cpu") 即可。

def inference_single(img, model, th=0):
    model.eval()
    with torch.no_grad():
        img = img.reshape((-1, img.shape[-3], img.shape[-2], img.shape[-1]))
        img = direct_val(img)
        # img = img.cuda()
        img = img.to("cpu")
        _, seg = run_model(model, img)
        seg = torch.sigmoid(seg).detach().cpu()
        if torch.isnan(seg).any() or torch.isinf(seg).any():
            max_score = 0.0
        else:
            max_score = torch.max(seg).numpy()
        seg = [np.array(transform_pil(seg[i])) for i in range(len(seg))]

        if len(seg) != 1:
            pdb.set_trace()
        else:
            fake_seg = seg[0]
        if th == 0:
            return fake_seg, max_score
        fake_seg = 255.0 * (fake_seg > 255 * th)
        fake_seg = fake_seg.astype(np.uint8)

    return fake_seg, max_score
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

还有就是该算法的原始代码不能适配任意大小的图片,修改一下 MVSS-Net/models/mvssnet.py 代码 ERB 和 MVSSNet 类的 forward 函数即可。

class ERB(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ERB, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU()
        self.bn = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)

    def forward(self, x, upsample_scale=None, relu=True):
        x = self.conv1(x)
        res = self.conv2(x)
        res = self.bn(res)
        res = self.relu(res)
        res = self.conv3(res)
        if upsample_scale is not None:
            res = F.interpolate(res, scale_factor=upsample_scale, mode='bilinear', align_corners=False)
        if relu:
            return self.relu(x + res)
        else:
            return x + res


class MVSSNet(ResNet50):
    def __init__(self, nclass, aux=False, sobel=False, constrain=False, n_input=3, **kwargs):
        super(MVSSNet, self).__init__(pretrained=True, n_input=n_input)
        self.num_class = nclass
        self.aux = aux

        self.__setattr__('exclusive', ['head'])

        self.upsample = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)
        self.upsample_4 = nn.Upsample(scale_factor=4, mode="bilinear", align_corners=True)
        self.sobel = sobel
        self.constrain = constrain

        self.erb_db_1 = ERB(256, self.num_class)
        self.erb_db_2 = ERB(512, self.num_class)
        self.erb_db_3 = ERB(1024, self.num_class)
        self.erb_db_4 = ERB(2048, self.num_class)

        self.erb_trans_1 = ERB(self.num_class, self.num_class)
        self.erb_trans_2 = ERB(self.num_class, self.num_class)
        self.erb_trans_3 = ERB(self.num_class, self.num_class)

        if self.sobel:
            print("----------use sobel-------------")
            self.sobel_x1, self.sobel_y1 = get_sobel(256, 1)
            self.sobel_x2, self.sobel_y2 = get_sobel(512, 1)
            self.sobel_x3, self.sobel_y3 = get_sobel(1024, 1)
            self.sobel_x4, self.sobel_y4 = get_sobel(2048, 1)

        if self.constrain:
            print("----------use constrain-------------")
            self.noise_extractor = ResNet50(n_input=3, pretrained=True)
            self.constrain_conv = BayarConv2d(in_channels=1, out_channels=3, padding=2)
            self.head = _DAHead(2048+2048, self.num_class, aux, **kwargs)
        else:
            self.head = _DAHead(2048, self.num_class, aux, **kwargs)

    def forward(self, x):
        size = x.size()[2:]  # 使用输入尺寸进行动态上采样
        input_ = x.clone()
        feature_map, _ = self.base_forward(input_)
        c1, c2, c3, c4 = feature_map

        if self.sobel:
            res1 = self.erb_db_1(run_sobel(self.sobel_x1, self.sobel_y1, c1))
            res1 = self.erb_trans_1(res1 + F.interpolate(self.erb_db_2(run_sobel(self.sobel_x2, self.sobel_y2, c2)), size=res1.size()[2:], mode='bilinear', align_corners=False))
            res1 = self.erb_trans_2(res1 + F.interpolate(self.erb_db_3(run_sobel(self.sobel_x3, self.sobel_y3, c3)), size=res1.size()[2:], mode='bilinear', align_corners=False))
            res1 = self.erb_trans_3(res1 + F.interpolate(self.erb_db_4(run_sobel(self.sobel_x4, self.sobel_y4, c4)), size=res1.size()[2:], mode='bilinear', align_corners=False), relu=False)
        else:
            res1 = self.erb_db_1(c1)
            res1 = self.erb_trans_1(res1 + F.interpolate(self.erb_db_2(c2), size=res1.size()[2:], mode='bilinear', align_corners=False))
            res1 = self.erb_trans_2(res1 + F.interpolate(self.erb_db_3(c3), size=res1.size()[2:], mode='bilinear', align_corners=False))
            res1 = self.erb_trans_3(res1 + F.interpolate(self.erb_db_4(c4), size=res1.size()[2:], mode='bilinear', align_corners=False), relu=False)

        if self.constrain:
            x = rgb2gray(x)
            x = self.constrain_conv(x)
            constrain_features, _ = self.noise_extractor.base_forward(x)
            constrain_feature = constrain_features[-1]
            c4 = torch.cat([c4, constrain_feature], dim=1)

        outputs = []

        x = self.head(c4)
        x0 = F.interpolate(x[0], size, mode='bilinear', align_corners=True)
        outputs.append(x0)

        if self.aux:
            x1 = F.interpolate(x[1], size, mode='bilinear', align_corners=True)
            x2 = F.interpolate(x[2], size, mode='bilinear', align_corners=True)
            outputs.append(x1)
            outputs.append(x2)

        return res1, x0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

然后再准备测试程序及测试数据:

# -*- coding: utf-8 -*-

import torch
import cv2
from PIL import Image

from common.tools import inference_single
from models.mvssnet import get_mvss


def load_model(path):
    model = get_mvss(backbone='resnet50', pretrained_base=True, nclass=1, sobel=True, constrain=True, n_input=3)
    checkpoint = torch.load(path, map_location='cpu')
    state_dict = checkpoint['model_dict']
    model_keys = set(model.state_dict().keys())
    filtered_state_dict = {k: v for k, v in state_dict.items() if k in model_keys}
    model.load_state_dict(filtered_state_dict, strict=False)
    model.to("cpu")
    model.eval()
    return model


def concatenate_images(img1, img2):
    img1_pil = Image.fromarray(cv2.cvtColor(img1, cv2.COLOR_BGR2RGB))
    img2_pil = Image.fromarray(img2)
    new_height = max(img1_pil.height, img2_pil.height)
    new_img = Image.new('RGB', (img1_pil.width + img2_pil.width, new_height))
    new_img.paste(img1_pil, (0, 0))
    new_img.paste(img2_pil, (img1_pil.width, 0))
    return new_img


if __name__ == '__main__':
    paths = {
        'input': './test/input/input.png',
        'output': './test/output/output.png',
        'model': './ckpt/mvssnet+_casia.pt'
    }
    model = load_model(paths['model'])
    input_img = img = cv2.imread(paths['input'])

    with torch.no_grad():
        seg_result, _ = inference_single(img=input_img, model=model, th=0)
        seg_image = Image.fromarray(seg_result)

    final_image = concatenate_images(input_img, seg_result)
    final_image.save(paths['output'])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

运行后的效果:只能辅助判定图片是否被篡改过,直接拍摄出来的正常图片也有可能会检测出部分地方存在异常。

MVSS-Net检测效果

# 2. 文本音频处理

我对其中的一些通用开源算法使用Flask进行了封装集成,项目地址为:https://github.com/Logistic98/yoyo-algorithm (opens new window)

# 2.1 百度飞桨Paddle

飞桨(PaddlePaddle)以百度多年的深度学习技术研究和业务应用为基础,集深度学习核心训练和推理框架、基础模型库、端到端开发套件、丰富的工具组件于一体,是中国首个自主研发、功能完备、开源开放的产业级深度学习平台。

使用Paddle系列的算法,需要统一安装 paddlepaddle 库,具体模块再安装对应模块的库即可。

统一说明:Paddle系列的库包和算法模型都需要关闭翻墙代理工具,算法模型会在代码初次执行时自动下载(存放在C:\Users\xxx\.paddlenlp目录下),所以初次执行耗时会长一些。

$ pip install paddlepaddle==2.2.0 -i https://mirror.baidu.com/pypi/simple
1

注:PaddleNLP 要求 paddlepaddle >= 2.2,如果根据 PaddleOCR 要求的 paddlepaddle >=2.0.1 而安装的是2.0.1版本,前者会报错:cannot import name '_convert_attention_mask' from 'paddle.nn.layer.transformer'

# 2.1.1 PaddleNLP

PaddleNLP:是一个开源的自然语言处理开发库,模型会在初次执行时自动下载。这是它的官方使用教程:PaddleNLP官方教程 (opens new window)

依赖库安装:

$ pip install paddlenlp==2.2.4
1

基本使用示例:

# -*- coding: utf-8 -*-

from paddlenlp import Taskflow

# 中文分词
paddle_nlp = Taskflow("word_segmentation")
result = paddle_nlp("第十四届全运会在西安举办")
print(result)
# >>> ['第十四届', '全运会', '在', '西安', '举办']

# 词性标注
paddle_nlp = Taskflow("pos_tagging")
result = paddle_nlp("第十四届全运会在西安举办")
print(result)
# >>> [('第十四届', 'm'), ('全运会', 'nz'), ('在', 'p'), ('西安', 'LOC'), ('举办', 'v')]

# 名词短语标注
paddle_nlp = Taskflow("knowledge_mining", model="nptag")
result = paddle_nlp("红曲霉菌")
print(result)
# >>> [{'text': '红曲霉菌', 'label': '微生物'}]

# 情感分析
paddle_nlp = Taskflow("sentiment_analysis")
result = paddle_nlp("这个产品用起来真的很流畅,我非常喜欢")
print(result)
# >>> [{'text': '这个产品用起来真的很流畅,我非常喜欢', 'label': 'positive', 'score': 0.9938690066337585}]

# 文本相似度
paddle_nlp = Taskflow("text_similarity")
result = paddle_nlp([["世界上什么东西最小", "世界上什么东西最小?"]])
print(result)
# >>> [{'text1': '世界上什么东西最小', 'text2': '世界上什么东西最小?', 'similarity': 0.992725}]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

注:返回结果说明见官方文档。除此之外,PaddleNLP还支持很多其他的自然语言处理,如生成式问答、智能问答等,具体见官方文档。

# 2.1.2 PaddleOCR

PaddleOCR:是一个开源的图片OCR识别算法,模型会在初次执行时自动下载。这是它的官方使用教程:PaddleOCR使用教程 (opens new window)

依赖库安装:

$ pip install "paddleocr>=2.0.1"
1

基本使用示例:

from paddleocr import PaddleOCR

# Paddleocr目前支持的多语言语种可以通过修改lang参数进行切换
# 例如`ch`, `en`, `fr`, `german`, `korean`, `japan`
ocr = PaddleOCR(use_angle_cls=True, lang="ch")  # need to run only once to download and load model into memory
img_path = './imgs/test.jpg'
result = ocr.ocr(img_path, cls=True)
for line in result:
    print(line)
1
2
3
4
5
6
7
8
9

注:如果需要结果可视化、版面分析,需要另外安装相应的库,具体见官方文档。

# 2.2 文本提取与审查

# 2.2.1 文本关键词及概要提取

FastTextRank (opens new window):从中文文本中提取摘要及关键词,并对算法时间复杂度进行了修改,计算图最大权节点的时间复杂度由o(n^2)降低到了o(n)。在有限的测试文本上,其运行速度相比于textrank4zh这个包快了8倍。算法原理见作者的知乎文章 (opens new window)

依赖库安装:Numpy>=1.14.5 gensim>=3.5.0 FastTextRank==1.1

基本使用示例:KeyWord.py(提取关键字示例)、Sentence.py(提取摘要示例)

# 2.2.2 文本内容审查

Sensitive-word (opens new window):收集的一些敏感词汇,细分了暴恐词库、反动词库、民生词库、色情词库、贪腐词库、其他词库等。

将词库放到./dict/目录下,一个分类一个txt文件,词库内容为一行一个敏感词,对输入文本使用jieba分词,撞词库判断是否敏感。

# 2.3 文本翻译

# 2.3.1 破解Google翻译

破解Google翻译的 py-googletrans (opens new window) 库,使用时需要联网(被墙,国内需要设置代理)

$ pip3 install googletrans
1

这个库的工作原理:

py-googletrans的工作原理

示例代码如下:

#-*- coding:utf-8 -*-

from googletrans import Translator
import os

os.environ["https_proxy"] = "http://127.0.0.1:7890"

translator = Translator()
result = translator.translate('hello world', dest='zh-cn').text
print(result)
1
2
3
4
5
6
7
8
9
10

注:单次请求的最大字符数为5000,超出的话可以拆分成多份,分开请求再对结果进行拼接。另外该破解方式随时可能会被阻止,如果想使用稳定的 API,建议使用 谷歌官方的翻译 API (opens new window)

# 2.3.2 LibreTranslate机器翻译

LibreTranslate是一个开源的、可以自行搭建的翻译服务,支持多种语言的互相翻译,包括中文。翻译的准确度远不如商业API,效果很一般,但它是永久免费的。服务器上部署LibreTranslate共计需要大约13GB的存储空间。

$ docker run -itd -p 5000:5000 --name libretranslate libretranslate/libretranslate
$ docker logs -f libretranslate --tail 100
1
2

注:创建容器后会自动下载语言包,这个过程会比较慢,等待它安装完毕后,浏览器访问http://ip:5000即可查看以下Web页面。

LibreTranslate

# 2.4 语音合成

# 2.4.1 gTTS语音合成

谷歌开源的文本转语音 API 交互的 Python 库,虽然免费但生成的语音机器音较重,使用时需要联网(被墙,国内需要设置代理)

项目地址:https://github.com/pndurette/gTTS (opens new window)

$ pip install gTTS
1

示例代码如下:

# -*- coding: utf-8 -*-

import os
from gtts import gTTS

os.environ["https_proxy"] = "http://127.0.0.1:1080"

# 谷歌文字转语音API测试
text = "测试gtts文本转语音"
audio = gTTS(text=text, lang="zh-cn")
audio.save("demo.mp3")
1
2
3
4
5
6
7
8
9
10
11

注:如果未设置代理或者代理有问题,会报“Python GTTS / Failed to connect. Probable cause: Unknown”错误。

语音文件播放:

playsound 声明它已经在WAV和MP3文件上进行了测试,但是它可能也适用于其他文件格式。

$ pip install playsound
1

示例代码如下:

from playsound import playsound
playsound('demo.mp3')
1
2

注意事项:调用时可能出现“指定的设备未打开,或不被 MCI 所识别”报错。原因是windows不支持utf-16编码,需修改playsound源码。

修改\Lib\site-packages\playsound.py文件的源码如下:

def winCommand(*command):
        bufLen = 600
        buf = c_buffer(bufLen)
        #command = ' '.join(command).encode('utf-16') # 1.修改前
        command = ' '.join(command) # 1.修改后
        errorCode = int(windll.winmm.mciSendStringW(command, buf, bufLen - 1, 0))  # use widestring version of the function
        if errorCode:
            errorBuffer = c_buffer(bufLen)
            windll.winmm.mciGetErrorStringW(errorCode, errorBuffer, bufLen - 1)  # use widestring version of the function
            exceptionMessage = ('\n    Error ' + str(errorCode) + ' for command:'
                                #'\n        ' + command.decode('utf-16') + # 2.修改前
                                '\n        ' + command + # 2.修改后
                                '\n    ' + errorBuffer.raw.decode('utf-16').rstrip('\0'))
            logger.error(exceptionMessage)
            raise PlaysoundException(exceptionMessage)
        return buf.value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2.4.2 OpenTTS语音合成

OpenTTS 是一个用 Python 编写的免费、开源的文本转语音服务。它支持多种语言,并带有易于使用的界面。

项目地址:https://github.com/synesthesiam/opentts (opens new window)

$ docker run --name opentts -itd -p 5500:5500 synesthesiam/opentts:all --no-espeak --cache /cache
1

opentts

# 3. 词云绘制

词云绘制的示例代码已在Github上开源,项目地址为:https://github.com/Logistic98/word-cloud (opens new window)

# 3.1 词云简介

# 3.1.1 词云是什么

词云,又称文字云,是文本数据的视觉表示,由词汇组成类似云的彩色图形,用于展示大量文本数据。 通常用于描述网站上的关键字元数据,或可视化自由格式文本,每个词的重要性以字体大小或颜色显示。在数据可视化方面,词云一直是一种视觉冲击力很强的方式。

词云示例

# 3.1.2 常见技术方案

对输入的一段文字进行语义分割,得到不同频度的词汇,然后以正比于词频的字体大小无规则的集中显示高频词,简洁直观高效。其中获取词汇我们可以使用 jieba 分词等 NLP 库。渲染部分通常有两种方式,一种是使用 Python 直接渲染成静态图片,另一种是以 RESTful API 的形式返回给前端,使用Echarts进行渲染,这样得到的就是动态词云,鼠标悬浮上去会有特效,会更好看一些。

# 3.2 Python生成静态词云

# 3.2.1 依赖环境安装

wordcloud包依赖于Pillow、numpy、matplotlib。

$ pip intsall Pillow numpy matplotlib wordcloud
$ pip install imageio 
$ pip install jieba 
$ pip install snownlp
1
2
3
4

# 3.2.2 NLP相关库的使用

1)jieba分词库

  • 精确模式(最常用,生成词云一般用这个):每个字只用一遍,不存在冗余词汇。jieba.lcut('动力学和电磁学')
  • 全模式:把每个字可能形成的词汇都提取出来,存在冗余。jieba.lcut('动力学和电磁学',cut_all=True)
  • 搜索引擎模式:将全模式分词的结果从短到长排列好。jieba.lcut_for_search('动力学和电磁学')

jieba-demo.py

# -*- coding: utf-8 -*-

import jieba

text = '动力学和电磁学'

print('{:-^50}'.format('精确模式:每个字只用一遍,不存在冗余词汇'))
textlist = jieba.lcut(text)
print('分词之后生成的列表为', textlist)

print('{:-^50}'.format('全模式:把每个字可能形成的词汇都提取出来,存在冗余'))
textlist = jieba.lcut(text, cut_all=True)
print('分词之后生成的列表为', textlist)

print('{:-^50}'.format('搜索引擎模式:将全模式分词的结果从短到长排列好'))
textlist = jieba.lcut_for_search(text)
print('分词之后生成的列表为', textlist)

>>> ---------------精确模式:每个字只用一遍,不存在冗余词汇---------------
>>> 分词之后生成的列表为 ['动力学', '和', '电磁学']
>>> ------------全模式:把每个字可能形成的词汇都提取出来,存在冗余-------------
>>> 分词之后生成的列表为 ['动力', '动力学', '力学', '和', '电磁', '电磁学', '磁学']
>>> -------------搜索引擎模式:将全模式分词的结果从短到长排列好--------------
>>> 分词之后生成的列表为 ['动力', '力学', '动力学', '和', '电磁', '磁学', '电磁学']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

2)snownlp文本分析库

snownlp的语料库是淘宝等电商网站的评论,所以对购物类的文本情感分析准确度很高。

snownlp-demo.py

# -*- coding: utf-8 -*-

import snownlp

text1 = '中华民族伟大复兴'
print('{:-^50}'.format('测试文本:'+text1))
s = snownlp.SnowNLP(text1)
print('情感分析', s.sentiments)
print('中文分词', s.words)
print('转成拼音', s.pinyin)
print('词频', s.tf)
print('提取三个关键词', s.keywords(3))

text2 = '快递慢到死,客服态度不好,退款!'
print('{:-^50}'.format('测试文本:'+text2))
s = snownlp.SnowNLP(text2)
print('情感分析', s.sentiments)
print('中文分词', s.words)
print('转成拼音', s.pinyin)
print('词频', s.tf)
print('提取三个关键词', s.keywords(3))

>>> ------------------测试文本:中华民族伟大复兴-------------------
>>> 情感分析 0.9935086411278989
>>> 中文分词 ['中华民族', '伟大', '复兴']
>>> 转成拼音 ['zhong', 'hua', 'min', 'zu', 'wei', 'da', 'fu', 'xing']
>>> 词频 [{'中': 1}, {'华': 1}, {'民': 1}, {'族': 1}, {'伟': 1}, {'大': 1}, {'复': 1}, {'兴': 1}]
>>> 提取三个关键词 ['复兴', '中华民族']
>>> --------------测试文本:快递慢到死,客服态度不好,退款!---------------
>>> 情感分析 0.00012171645785852281
>>> 中文分词 ['快递', '慢', '到', '死', ',', '客', '服', '态度', '不好', ',', '退款', '!']
>>> 转成拼音 ['kuai', 'di', 'man', 'dao', 'si', ',', 'ke', 'fu', 'tai', 'du', 'bu', 'hao', ',', 'tui', 'kuan', '!']
>>> 词频 [{'快': 1}, {'递': 1}, {'慢': 1}, {'到': 1}, {'死': 1}, {',': 1}, {'客': 1}, {'服': 1}, {'态': 1}, {'度': 1}, {'不': 1}, {'好': 1}, {',': 1}, {'退': 1}, {'款': 1}, {'!': 1}]
>>> 提取三个关键词 ['服', '不好', '态度']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 3.3 基本词云示例

# 3.3.1 最简版词云

simple-word-cloud.py

# -*- coding: utf-8 -*-

import wordcloud

w = wordcloud.WordCloud()
w.generate('and that government of the people, by the people, for the people, shall not perish from the earth.')
w.to_file('./output/simple-word-cloud.png')
1
2
3
4
5
6
7

讲解说明:

  • wordcloud库为每一个词云生成一个WordCloud对象,wordcloud.WordCloud()代表一个词云对象,我们将它赋值给w
  • 我们可以在WordCloud()括号里填入各种参数,控制词云的字体、字号、字的颜色、背景颜色等。wordcloud库会非常智能地按空格进行分词及词频统计,出现次数多的词就大。

生成效果:

simple-word-cloud

# 3.3.2 美化词云样式

beautify-word-cloud.py

# -*- coding: utf-8 -*-

import wordcloud

# 构建词云对象w,设置词云图片宽、高、字体、背景颜色等参数
w = wordcloud.WordCloud(width=1000,
                        height=700,
                        background_color='white',
                        font_path='msyh.ttc')

w.generate('从明天起,做一个幸福的人。喂马、劈柴,周游世界。从明天起,关心粮食和蔬菜。我有一所房子,面朝大海,春暖花开')
w.to_file('./output/beautify-word-cloud.png')
1
2
3
4
5
6
7
8
9
10
11
12

参数说明:

  • width 词云图片宽度,默认400像素
  • height 词云图片高度,默认200像素
  • background_color 词云图片的背景颜色,默认为黑色background_color='white'
  • font_step 字号增大的步进间隔,默认1号
  • font_path 指定字体路径,默认None,对于中文可用font_path='msyh.ttc'
  • mini_font_size 最小字号,默认4号
  • max_font_size 最大字号,根据高度自动调节
  • max_words 最大词数,默认200
  • stop_words 不显示的单词 stop_words={"python","java"}
  • Scale:默认值1。值越大,图像密度越大越清晰
  • prefer_horizontal:默认值0.90,浮点数类型。表示在水平如果不合适,就旋转为垂直方向,水平放置的词数占0.9?
  • relative_scaling:默认值0.5,浮点型。设定按词频倒序排列,上一个词相对下一位词的大小倍数。有如下取值:“0”表示大小标准只参考频率排名,“1”如果词频是2倍,大小也是2倍
  • mask 指定词云形状图片,默认为矩形

生成效果:

beautify-word-cloud

# 3.3.3 按照指定剪影生成

china-word-cloud.py

# -*- coding: utf-8 -*-

# 导入词云制作库wordcloud和中文分词库jieba
import jieba
import wordcloud

# 导入imageio库中的imread函数,并用这个函数读取本地图片,作为词云形状图片
import imageio

mk = imageio.imread("./input/chinamap.png")

# 构建并配置词云对象w,注意要加scale参数,提高清晰度。将不想展示在词云中的词放在stopwords集合里。
w = wordcloud.WordCloud(width=1000,
                        height=700,
                        background_color='white',
                        font_path='msyh.ttc',
                        mask=mk,
                        scale=15,
                        stopwords={'和', '的', '是'})

# 对来自外部文件的文本进行中文分词,得到string
f = open('./input/新时代中国特色社会主义.txt', encoding='utf-8')
txt = f.read()
txtlist = jieba.lcut(txt)
string = " ".join(txtlist)

# 将string变量传入w的generate()方法,给词云输入文字
w.generate(string)

# 将词云图片导出到当前文件夹
w.to_file('./output/china-word-cloud.png')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

说明:可以把大量stopwords写入到txt文件中,用如下方式读取成set()

# -*- coding: utf-8 -*-

import codecs


def read_stopwords(path):
    stop_words = set()
    for word in codecs.open(path, 'r', 'utf-8', 'ignore'):
        stop_words.add(word.strip())
    return stop_words


if __name__ == '__main__':
    path = './stopwords.txt'
    stop_words = read_stopwords(path)
    print(stop_words)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

生成效果:

china-word-cloud

# 3.3.4 勾勒轮廓线

contour-word-cloud.py

# -*- coding: utf-8 -*-

# 导入词云制作库wordcloud
import wordcloud

# 导入imageio库中的imread函数,并用这个函数读取本地图片,作为词云形状图片
import imageio

mk = imageio.imread("./input/alice.png")

# 将外部文件包含的文本保存在string变量中
string = open('./input/hamlet.txt').read()

# 构建词云对象w,注意增加参数contour_width和contour_color设置轮廓宽度和颜色
w = wordcloud.WordCloud(background_color="white",
                        mask=mk,
                        contour_width=1,
                        contour_color='steelblue')

# # 将string变量传入w的generate()方法,给词云输入文字
w.generate(string)

# 将词云图片导出到当前文件夹
w.to_file('./output/contour-word-cloud.png')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

生成效果:

contour-word-cloud

# 3.3.5 按照模板着色

color-word-cloud.py

# -*- coding: utf-8 -*-

# 导入绘图库matplotlib和词云制作库wordcloud
import matplotlib.pyplot as plt
from wordcloud import WordCloud, ImageColorGenerator

# 导入imageio库中的imread函数,并用这个函数读取本地图片,作为词云形状图片
import imageio
mk = imageio.imread("./input/alice_color.png")

# 将外部文件包含的文本保存在text变量中
text = open('./input/alice.txt').read()

# 构建词云对象w
wc = WordCloud(background_color="white",
               mask=mk,)
# 将text字符串变量传入w的generate()方法,给词云输入文字
wc.generate(text)

# 调用wordcloud库中的ImageColorGenerator()函数,提取模板图片各部分的颜色
image_colors = ImageColorGenerator(mk)

# 显示原生词云图、按模板图片颜色的词云图和模板图片,按左、中、右显示
fig, axes = plt.subplots(1, 3)
# 最左边的图片显示原生词云图
axes[0].imshow(wc)
# 中间的图片显示按模板图片颜色生成的词云图,采用双线性插值的方法显示颜色
axes[1].imshow(wc.recolor(color_func=image_colors), interpolation="bilinear")
# 右边的图片显示模板图片
axes[2].imshow(mk, cmap=plt.cm.gray)
for ax in axes:
    ax.set_axis_off()
plt.show()

# 给词云对象按模板图片的颜色重新上色
wc_color = wc.recolor(color_func=image_colors)
# 将词云图片导出到当前文件夹
wc_color.to_file('./output/color-word-cloud.png')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

注意事项:

  • 定制剪影图片可以这么获得:先使用remove.bg工具去除背景,然后用画图工具打开,截图出来即可。

生成效果:

color-word-cloud

# 3.3.6 情感分析词云

emotion-word-cloud.py

# -*- coding: utf-8 -*-

# 导入词云制作库wordcloud和中文分词库jieba
import jieba
import wordcloud

# 导入imageio库中的imread函数,并用这个函数读取本地图片,作为词云形状图片
import imageio
mk = imageio.imread("./input/chinamap.png")

# 构建并配置两个词云对象w1和w2,分别存放积极词和消极词
w1 = wordcloud.WordCloud(width=1000,
                        height=700,
                        background_color='white',
                        font_path='msyh.ttc',
                        mask=mk,
                        scale=15)
w2 = wordcloud.WordCloud(width=1000,
                        height=700,
                        background_color='white',
                        font_path='msyh.ttc',
                        mask=mk,
                        scale=15)

# 对来自外部文件的文本进行中文分词,得到积极词汇和消极词汇的两个列表
f = open('./input/三国演义.txt',encoding='utf-8')
txt = f.read()
txtlist = jieba.lcut(txt)
positivelist = []
negativelist = []

# 下面对文本中的每个词进行情感分析,情感>0.96判为积极词,情感<0.06判为消极词
print('开始进行情感分析,请稍等...')
# 导入自然语言处理第三方库snownlp
import snownlp
for each in txtlist:
    each_word = snownlp.SnowNLP(each)
    feeling = each_word.sentiments
    if feeling > 0.96:
        positivelist.append(each)
    elif feeling < 0.06:
        negativelist.append(each)
    else:
        pass
# 将积极和消极的两个列表各自合并成积极字符串和消极字符串,字符串中的词用空格分隔
positive_string = " ".join(positivelist)
negative_string = " ".join(negativelist)

# 将string变量传入w的generate()方法,给词云输入文字
w1.generate(positive_string)
w2.generate(negative_string)

# 将积极、消极的两个词云图片导出到当前文件夹
w1.to_file('./output/emotion-word-cloud-positive.png')
w2.to_file('./output/emotion-word-cloud-negative.png')
print('词云生成完成')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

生成效果:

emotion-word-cloud

# 3.3.7 人物阵营分色词云

camp-word-cloud.py

# -*- coding: utf-8 -*-

# 导入wordcloud库,并定义两个函数
from wordcloud import (WordCloud, get_single_color_func)
# 导入jieba分词
import jieba
# 导入imageio库中的imread函数,并用这个函数读取本地图片,作为词云形状图片
import imageio


class SimpleGroupedColorFunc(object):
    """Create a color function object which assigns EXACT colors
       to certain words based on the color to words mapping

       Parameters
       ----------
       color_to_words : dict(str -> list(str))
         A dictionary that maps a color to the list of words.

       default_color : str
         Color that will be assigned to a word that's not a member
         of any value from color_to_words.
    """

    def __init__(self, color_to_words, default_color):
        self.word_to_color = {word: color
                              for (color, words) in color_to_words.items()
                              for word in words}

        self.default_color = default_color

    def __call__(self, word, **kwargs):
        return self.word_to_color.get(word, self.default_color)


class GroupedColorFunc(object):
    """Create a color function object which assigns DIFFERENT SHADES of
       specified colors to certain words based on the color to words mapping.

       Uses wordcloud.get_single_color_func

       Parameters
       ----------
       color_to_words : dict(str -> list(str))
         A dictionary that maps a color to the list of words.

       default_color : str
         Color that will be assigned to a word that's not a member
         of any value from color_to_words.
    """

    def __init__(self, color_to_words, default_color):
        self.color_func_to_words = [
            (get_single_color_func(color), set(words))
            for (color, words) in color_to_words.items()]

        self.default_color_func = get_single_color_func(default_color)

    def get_color_func(self, word):
        """Returns a single_color_func associated with the word"""
        try:
            color_func = next(
                color_func for (color_func, words) in self.color_func_to_words
                if word in words)
        except StopIteration:
            color_func = self.default_color_func

        return color_func

    def __call__(self, word, **kwargs):
        return self.get_color_func(word)(word, **kwargs)


mk = imageio.imread("./input/chinamap.png")
w = WordCloud(width=1000,
              height=700,
              background_color='white',
              font_path='msyh.ttc',
              mask=mk,
              scale=15,
              max_font_size=60,
              max_words=20000,
              font_step=1)


# 对来自外部文件的文本进行中文分词,得到string
f = open('./input/三国演义.txt', encoding='utf-8')
txt = f.read()
txtlist = jieba.lcut(txt)
string = " ".join(txtlist)

# 将string变量传入w的generate()方法,给词云输入文字
w.generate(string)

# 创建字典,按人物所在的不同阵营安排不同颜色,绿色是蜀国,橙色是魏国,紫色是东吴,粉色是诸侯群雄
color_to_words = {
    'green': ['刘备', '刘玄德', '孔明', '诸葛孔明', '玄德', '关公', '玄德曰', '孔明曰',
              '张飞', '赵云', '后主', '黄忠', '马超', '姜维', '魏延', '孟获',
              '关兴', '诸葛亮', '云长', '孟达', '庞统', '廖化', '马岱'],
    'red': ['曹操', '司马懿', '夏侯', '荀彧', '郭嘉', '邓艾', '许褚',
            '徐晃', '许诸', '曹仁', '司马昭', '庞德', '于禁', '夏侯渊', '曹真', '钟会'],
    'purple': ['孙权', '周瑜', '东吴', '孙策', '吕蒙', '陆逊', '鲁肃', '黄盖', '太史慈'],
    'pink': ['董卓', '袁术', '袁绍', '吕布', '刘璋', '刘表', '貂蝉']
}

# 其它词语的颜色
default_color = 'gray'

# 构建新的颜色规则
grouped_color_func = GroupedColorFunc(color_to_words, default_color)

# 按照新的颜色规则重新绘制词云颜色
w.recolor(color_func=grouped_color_func)

# 将词云图片导出到当前文件夹
w.to_file('./output/camp-word-cloud.png')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

生成效果:

camp-word-cloud

# 4. 基于Pyppeteer将网页转PDF

# 4.1 基本概念

# 4.1.1 pyppeteer简介

Headless chrome/chromium 自动化库(是 puppeteer (opens new window) 无头 Chrome Node.js API 的Python版非官方库),可用于网页截图导出pdf。

puppeteer 和 pyppeteer 的区别:pyppeteer 努力尽可能地复制 puppeteer API,但是,Javascript 和 Python 之间的根本差异使得这很难精确地做到,具体细节对比官方文档。

# 4.1.2 无头浏览器简介

无头浏览器指的是没有图形用户界面的浏览器,它可以通过命令行界面或使用网络通信来提供对网页的自动控制。对于测试网页特别有用,因为它们能够像浏览器一样呈现和理解超文本标记语言,包括页面布局、颜色、字体选择以及JavaScript和AJAX的执行等样式元素,这些元素在使用其他测试方法时通常是不可用的。

无头浏览器作用:Web应用程序中的测试自动化、拍摄网页截图、对JavaScript库运行自动化测试、收集网站数据、自动化网页交互。

# 4.2 使用Flask进行封装

以下示例代码已在GitHub上开源,地址:https://github.com/Logistic98/pyppeteer-url2pdf (opens new window)

# 4.2.1 封装代码

server.py

# -*- coding: utf-8 -*-

import json
import time
from uuid import uuid1
from flask import Flask, jsonify, request
from flask_cors import CORS
import os
import asyncio

from log import logger
from responseCode import ResponseCode, ResponseMessage
from utils import url_save_pdf, download_file

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 将任意公开访问的url转化为pdf提供下载
"""
@app.route(rule='/api/pyppeteer/urlSavePdf', methods=['POST'])
def urlToPdf():
    # 获取JSON格式的请求体,并解析
    request_data = request.get_data(as_text=True)
    request_body = json.loads(request_data)

    # 参数校验模块
    url = request_body.get("url")
    if not url:
        fail_response = dict(code=ResponseCode.RARAM_FAIL, msg=ResponseMessage.RARAM_FAIL, data=None)
        logger.error(fail_response)
        return jsonify(fail_response)
    pdf_name = request_body.get("pdf_name")
    if not pdf_name:
        pdf_name = '{}.pdf'.format(uuid1())
    '''
     resolution: 设置网页显示尺寸
         width: 网页显示宽度
         height: 网页显示高度
     '''
    resolution = request_body.get("resolution")
    if not resolution:
        resolution = {"width": 1920, "height": 1680}
    '''
     clip: 位置与图片尺寸信息
         x: 网页截图的起始x坐标
         y: 网页截图的起始y坐标
         width: 图片宽度
         height: 图片高度
     '''
    clip = request_body.get("clip")
    if not clip:
        clip = {"width": 1920, "height": 1680}

    # 创建pdf的存储目录
    now_str = time.strftime("%Y%m%d", time.localtime())
    pdf_root_path = './tmp/'
    pdf_base_path = pdf_root_path + now_str
    if not os.path.exists(pdf_base_path):
        os.makedirs(pdf_base_path)
    pdf_path = pdf_base_path + '/' + pdf_name

    # 将url保存成pdf文件
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(url_save_pdf(url, pdf_path, resolution, clip))
        logger.info("成功将【{}】网址保存成pdf文件【{}】!".format(url, pdf_path))
    except Exception as e:
        logger.error(e)
        fail_response = dict(code=ResponseCode.BUSINESS_FAIL, msg=ResponseMessage.BUSINESS_FAIL, data=None)
        logger.error(fail_response)
        return jsonify(fail_response)

    # 将pdf文件转成文件流提供下载
    return download_file(pdf_path)


if __name__ == '__main__':
    # 解决中文乱码问题
    app.config['JSON_AS_ASCII'] = False
    # 启动服务,指定主机和端口
    app.run(host='0.0.0.0', port=5006, debug=False, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

utils.py

# -*- coding: utf-8 -*-

import os
import urllib.parse

from PIL import Image
from pyppeteer import launch
from reportlab.pdfgen.canvas import Canvas
from reportlab.lib.utils import ImageReader
import imageio.v2 as imageio
from flask import Response


# 指定url区域截屏保存成pdf
async def url_save_pdf(url, pdf_path, resolution, clip):
    start_parm = {
        # 下列三个参数用于解决 Flask 运行 Pyppeteer 报错 "signal only works in main thread"
        "handleSIGINT": False,
        "handleSIGTERM": False,
        "handleSIGHUP": False,
        "headless": True,    # 关闭无头浏览器
        "args": [
            '--no-sandbox',  # 关闭沙盒模式
        ],
    }
    browser = await launch(**start_parm)
    page = await browser.newPage()
    # 加载指定的网页url
    await page.goto(url)
    # 设置网页显示尺寸
    await page.setViewport(resolution)
    # 设置截屏区域
    if 'x' not in clip or 'y' not in clip:
        await page.pdf({'path': pdf_path, 'width': clip['width'], 'height': clip['height']})
        await browser.close()
    else:
        img_data = await page.screenshot({'clip': clip})
        img_data_array = imageio.imread(img_data, format="png")
        im = Image.fromarray(img_data_array)
        page_width, page_height = im.size
        c = Canvas(pdf_path, pagesize=(page_width, page_height))
        c.drawImage(ImageReader(im), 0, 0)
        c.save()


# 检验是否含有中文字符
def is_contains_chinese(strs):
    for _char in strs:
        if '\u4e00' <= _char <= '\u9fa5':
            return True
    return False


# 将文件转成文件流提供下载
def download_file(file_path):

    # 文件路径、文件名、后缀分割
    file_dir, file_full_name = os.path.split(file_path)
    file_name, file_ext = os.path.splitext(file_full_name)

    # 文件名如果包含中文则进行编码
    if is_contains_chinese(file_name):
        file_name = urllib.parse.quote(file_name)
    new_file_name = file_name + file_ext

    # 流式读取下载
    def send_file():
        with open(file_path, 'rb') as targetfile:
            while 1:
                data = targetfile.read(20 * 1024 * 1024)   # 每次读取20M
                if not data:
                    break
                yield data
    response = Response(send_file(), content_type='application/octet-stream')
    response.headers["Content-disposition"] = 'attachment; filename=%s' % new_file_name
    return response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

response.py

# -*- coding: utf-8 -*-

from responseCode import ResponseMessage, ResponseCode


class ResMsg(object):
    """
    封装响应文本
    """
    def __init__(self, data=None, code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS):
        self._data = data
        self._msg = msg
        self._code = code

    def update(self, code=None, data=None, msg=None):
        """
        更新默认响应文本
        :param code:响应状态码
        :param data: 响应数据
        :param msg: 响应消息
        :return:
        """
        if code is not None:
            self._code = code
        if data is not None:
            self._data = data
        if msg is not None:
            self._msg = msg

    def add_field(self, name=None, value=None):
        """
        在响应文本中加入新的字段,方便使用
        :param name: 变量名
        :param value: 变量值
        :return:
        """
        if name is not None and value is not None:
            self.__dict__[name] = value

    @property
    def data(self):
        """
        输出响应文本内容
        :return:
        """
        body = self.__dict__
        body["data"] = body.pop("_data")
        body["msg"] = body.pop("_msg")
        body["code"] = body.pop("_code")
        return body
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

responseCode.py

# -*- coding: utf-8 -*-


class ResponseCode(object):
    SUCCESS = 200
    RARAM_FAIL = 400
    BUSINESS_FAIL = 500


class ResponseMessage(object):
    SUCCESS = "请求成功"
    RARAM_FAIL = "参数校验失败"
    BUSINESS_FAIL = "业务处理失败"
1
2
3
4
5
6
7
8
9
10
11
12
13

log.py

# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./server.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 4.2.2 注意事项

1)初次执行时会自动下载Chromium。

2)Flask 运行 Pyppeteer 报错 “signal only works in main thread”

解决办法:将handleSIGINT、handleSIGTERM、handleSIGHUP设置为False。

    start_parm = {
        # 下列三个参数用于解决 Flask 运行 Pyppeteer 报错 "signal only works in main thread"
        "handleSIGINT": False,
        "handleSIGTERM": False,
        "handleSIGHUP": False,
        "headless": True,    # 关闭无头浏览器
        "args": [
            '--no-sandbox',  # 关闭沙盒模式
        ],
    }
1
2
3
4
5
6
7
8
9
10

# 4.3 使用Docker进行部署

# 4.3.1 安装Docker环境

$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh -   # 安装docker
$ sudo systemctl start docker  # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)
1
2
3
4

# 4.3.2 导出项目依赖

使用pipreqs导出依赖,使用pipreqs库导出本项目的依赖,生成requirements.txt文件。

$ pip install pipreqs
$ cd /root/test-project          // 切换到项目根目录
$ pipreqs ./ --encoding=utf8     // 需要带上编码的指定,否则会报GBK编码错误
1
2
3

注意这里还有个坑如下,这是因为本机开了翻墙代理导致的,把代理软件关了就好了。

requests.exceptions.SSLError: HTTPSConnectionPool(host='pypi.python.org', port=443): Max retries exceeded with url: /pypi/cv2/json (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1125)')))`
1

导出的依赖 requirements.txt:

Flask==2.1.2
Flask_Cors==3.0.10
imageio==2.19.3
Pillow==9.1.1
pyppeteer==1.0.2
reportlab==3.6.10
1
2
3
4
5
6

# 4.3.3 编写Dockerfile

FROM python:3.8.8-slim

# python:3.8-slim 是基于 Debian GNU/Linux 10 (buster) 制作的
# 设置 Debian 清华源 https://mirrors.tuna.tsinghua.edu.cn/help/debian/(可选)
# RUN mv /etc/apt/sources.list /etc/apt/sources.list_bak && \
#    echo '# 默认注释了源码镜像以提高 apt update 速度,如有需要可自行取消注释' >> /etc/apt/sources.list && \
#    echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian/ buster main contrib non-free' >> /etc/apt/sources.list && \
#    echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian/ buster main contrib non-free' >> /etc/apt/sources.list && \
#    echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main contrib non-free' >> /etc/apt/sources.list && \
#    echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-updates main contrib non-free' >> /etc/apt/sources.list && \
#    echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-backports main contrib non-free' >> /etc/apt/sources.list && \
#    echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian/ buster-backports main contrib non-free' >> /etc/apt/sources.list && \
#    echo 'deb https://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main contrib non-free' >> /etc/apt/sources.list && \
#    echo '# deb-src https://mirrors.tuna.tsinghua.edu.cn/debian-security buster/updates main contrib non-free' >> /etc/apt/sources.list
# 下载无头 Chrome 依赖,参考:https://github.com/puppeteer/puppeteer/blob/main/docs/troubleshooting.md#chrome-headless-doesnt-launch-on-unix=
RUN apt-get update && apt-get -y install apt-transport-https ca-certificates libnss3 xvfb gconf-service libasound2  \
    libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgbm1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0  \
    libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1  \
    libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6  \
    ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget && rm -rf /var/lib/apt/lists/*

# 安装常用调试命令(可选)
RUN apt-get install iputils-ping -y           # 安装ping
RUN apt-get install -y wget                   # 安装wget
RUN apt-get install curl -y                   # 安装curl
RUN apt-get install vim -y                    # 安装vim
RUN apt-get install lsof                      # 安装lsof

# 安装msyh.ttc字体解决中文乱码问题
# 来源:https://github.com/owent-utils/font/raw/master/%E5%BE%AE%E8%BD%AF%E9%9B%85%E9%BB%91/MSYH.TTC
RUN cp msyh.ttc /usr/share/fonts/

# 使用淘宝镜像加速下载 chromium(可选)
# ENV PYPPETEER_DOWNLOAD_HOST=https://npm.taobao.org/mirrors
# 设置 chromium 版本,发布日期为: 2021-02-26T08:47:06.448Z
ENV PYPPETEER_CHROMIUM_REVISION=856583

# 拷贝代码到容器内
RUN mkdir /code
ADD src /code/
WORKDIR /code

# 安装项目所需的 Python 依赖
RUN pip install -r requirements.txt

# 放行端口
EXPOSE 5006
# 启动项目
ENTRYPOINT ["nohup","python","server.py","&"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

注意事项

[1] 无头浏览器依赖问题

原始镜像里缺失很多无头浏览器的依赖环境,详见:puppeteer Troubleshooting 官方文档 (opens new window)

apt-get update && apt-get -y install apt-transport-https ca-certificates libnss3 xvfb gconf-service libasound2  \
    libatk1.0-0 libc6 libcairo2 libcups2 libdbus-1-3 libexpat1 libfontconfig1 libgbm1 libgcc1 libgconf-2-4 libgdk-pixbuf2.0-0  \
    libglib2.0-0 libgtk-3-0 libnspr4 libpango-1.0-0 libpangocairo-1.0-0 libstdc++6 libx11-6 libx11-xcb1 libxcb1  \
    libxcomposite1 libxcursor1 libxdamage1 libxext6 libxfixes3 libxi6 libxrandr2 libxrender1 libxss1 libxtst6  \
    ca-certificates fonts-liberation libappindicator1 libnss3 lsb-release xdg-utils wget && rm -rf /var/lib/apt/lists/*
1
2
3
4
5

[2] 中文字体无法解析问题

原始镜像内无中文字体,会出现中文字体无法解析的问题,下载 msyh.ttc (opens new window) 字体放到 /usr/share/fonts/ 目录里即可。

# 4.3.4 在服务器上部署项目

[1] 编写部署脚本

build.sh

docker build -t pyppeteer-image .
docker run -d -p 5006:5006 --name pyppeteer -e TZ="Asia/Shanghai" pyppeteer-image:latest
docker update pyppeteer --restart=always
1
2
3

[2] 项目部署结构

项目部署目录结构如下:

.
├── Dockerfile
├── README.md
├── build.sh
└── src
    ├── code.py
    ├── log.py
    ├── requirements.txt
    ├── msyh.ttc
    ├── response.py
    ├── server.py
    └── utils.py
1
2
3
4
5
6
7
8
9
10
11
12

[3] 部署项目服务

将部署包整个上传到服务器上,切换到部署包的根目录。

$ chmod u+x build.sh
$ ./build.sh
1
2

# 4.3.5 验证部署

[1] 接口文档

请求路径:/api/pyppeteer/urlSavePdf

请求方法:POST请求

请求参数:

 url:可公开访问的目标网址(必填,不能是那种需要登录权限的)
 pdf_name:下载的pdf文件名称(非必填,默认值是uuid命名的pdf文件)
 clip: 位置与图片尺寸信息(非必填,默认值{"width": 1920, "height": 1680})
     x: 网页截图的起始x坐标
     y: 网页截图的起始y坐标
     width: 图片宽度
     height: 图片高度
 注释:只传width、height的时候为整页导出,传x,y,width、height的时候为区域导出
 resolution: 设置网页显示尺寸 (非必填,默认值{"width": 1920, "height": 1680})
     width: 网页显示宽度
     height: 网页显示高度
1
2
3
4
5
6
7
8
9
10
11

请求示例:

{
    "url":"https://www.google.com",
    "pdf_name":"test.pdf",
    "resolution": {"width": 1920, "height": 1680},
    "clip": {"x": 0, "y": 0, "width": 1920, "height": 1680}
}
1
2
3
4
5
6

接口返回:以文件流的形式提供pdf文件下载

[2] 测试请求接口

$ curl -v -X POST http://127.0.0.1:5006/api/pyppeteer/urlSavePdf -H "Content-type: application/json" -d'{"url":"https://www.google.com","pdf_name":"test.pdf","resolution": {"width": 1920, "height": 1680},"clip": {"x": 0, "y": 0, "width": 1920, "height": 1680}}' >> test.pdf
1

# 5. 参考资料

[1] image-matting一键抠图 from Github (opens new window)

[2] MVSS-Net图像篡改检测 from Github (opens new window)

[3] DeOldify黑白旧照片着色神器:基于NoGAN的深度学习来实现旧照着色还原 from 佰阅部落 (opens new window)

[4] 人工智能DeOldify修复黑白图片和视频 from Bilibili (opens new window)

[5] 黑白老照片上色,手把手教你用Python怎么玩儿!from 简书 (opens new window)

[6] 一个基于深度学习的项目,用于着色和恢复旧图像和视频 from Github (opens new window)

[7] Drag Your GAN: Interactive Point-based Manipulation on the Generative Image Manifold from Github (opens new window)

[8] 开源(离线)中文文本转语音TTS(语音合成)工具整理 from CSDN (opens new window)

[9] Python调用playsound时报错:指定的设备未打开,或不被 MCI 所识别 from 程序员的秘密 (opens new window)

[10] 文本摘要/关键词TextRank算法的优化与思考 from 知乎 (opens new window)

[11] PaddleOCR快速开始 form Github (opens new window)

[12] paddleocr多平台使用 from jieli (opens new window)

[13] paddlepaddle/PaddleHub报错cannot import name '_convert_attention_mask' from 'paddle.nn.layer.transformer from Gitee Issues (opens new window)

[14] 什么是BERT from 知乎 (opens new window)

[15] Python词云可视化 from CNBLOG (opens new window)

[16] 词云可视化:四行Python代码轻松上手到精通 from Github (opens new window)

[17] 词云可视化:四行Python代码轻松上手到精通 from Bilibili (opens new window)

[18] 你不知道的词云 from python 123 (opens new window)

[19] Pyppeteer和Flask问题,服务器部署Requests_html问题,多线程调用pyppeteer或requests_html问题 from CSDN (opens new window)

[20] Puppeteer wait until page is completely loaded from stackoverflow (opens new window)

[21] Puppeteer--等待加载 from 博客园 (opens new window)

[22] Generate the pdf with empty content occasionally from GitHub Issues (opens new window)

[23] Python-在线网页导出为图片或pdf from 代码先锋网 (opens new window)

[24] Pyppeteer Browser closed unexpectedly in heroku from GitHub Issues (opens new window)

[25] puppeteer Troubleshooting 官方文档 from Github (opens new window)

[26] 使用docker部署chrome无头浏览器并解决中文乱码,为pyppeteer提供运行环境 from CSDN (opens new window)

[27] 免费且无限制的 Python 谷歌翻译 API from Github (opens new window)

Last Updated: 10/6/2024, 5:46:17 PM