检索增强大语言模型生成RAG

12/17/2023 检索增强大模型生成RAGBaichuan2-13B-Chat

# 1. 检索增强生成

# 1.1 RAG基本介绍

# 1.1.1 RAG是什么

开源的基座模型参数量不够大,本身拥有的能力有限。要完成复杂的知识密集型的任务,可以基于语言模型构建一个系统,通过访问外部知识源来做到。这样可以使生成的答案更可靠,有助于缓解“幻觉”问题。

RAG 会接受输入并检索出一组相关/支撑的文档,并给出文档的来源。这些文档作为上下文和输入的原始提示词组合,送给文本生成器得到最终的输出。这样 RAG 更加适应事实会随时间变化的情况,这非常有用,因为 LLM 的参数化知识是静态的,RAG 让语言模型不用重新训练就能够获取最新的信息,基于检索生成产生可靠的输出。

RAG基本介绍

# 1.1.2 RAG发展历程

“RAG”概念由Lewis在2020年引入,其发展迅速,标志着研究旅程中的不同阶段。最初,这项研究旨在通过在预训练阶段为它们注入额外知识来增强语言模型。ChatGPT的推出引发了对利用大型模型进行深度上下文理解的高度兴趣,加速了RAG在推断阶段的发展。随着研究人员更深入地探索大型语言模型(LLMs)的能力,焦点转向提升他们的可控性和推理技巧以跟上日益增长的需求。GPT-4 的出现标志着一个重要里程碑,它革新了 RAG ,采取一种将其与微调技术相结合的新方法,并继续优化预训练策略。

RAG发展时间轴

# 1.1.3 RAG生态及挑战

RAG的应用已不再局限于问答系统,其影响力正在扩展到更多领域。现在,诸如推荐系统、信息提取和报告生成等各种任务开始从RAG技术的应用中受益。与此同时,RAG技术栈正在经历一次繁荣。除了众所周知的工具如Langchain和LlamaIndex外,市场上也出现了更多针对性强的RAG工具,例如:为满足更专注场景需求而定制化的;为进一步降低入门门槛而简化使用的;以及功能专业化、逐渐面向生产环境目标发展的。

RAG当前面临的挑战:

  • 上下文长度:当检索到的内容过多并超出窗口限制时该怎么办?如果LLMs的上下文窗口不再受限,应如何改进RAG?
  • 鲁棒性:如何处理检索到的错误内容?如何筛选和验证检索到的内容?如何增强模型对毒化和噪声的抵抗力?
  • 与微调协同工作:如何同时利用RAG和FT的效果,它们应该如何协调、组织,是串行、交替还是端对端?
  • 规模定律:RAG模型是否满足规模定律?会有什么情况下可能让RAG经历逆向规模定律现象呢?
  • 生产环境应用:如何减少超大规模语料库的检索延迟? 如何确保被 LLMS 检索出来的内容不会泄露?

# 1.2 RAG技术实现

# 1.2.1 RAG技术范式

在RAG的技术发展中,我们从技术范式的角度总结了其演变过程,主要分为以下几个阶段:

  • 初级RAG:初级RAG主要包括三个基本步骤:1)索引——将文档语料库切分成更短的片段,并通过编码器建立向量索引。2)检索——根据问题和片段之间的相似性检索相关文档片段。3)生成——依赖于检索到的上下文来生成对问题的回答。
  • 高级RAG:初级RAG在检索、生成和增强方面面临多重挑战。随后提出了高级RAG范式,涉及到预检索和后检索阶段额外处理。在检索之前,可以使用查询重写、路由以及扩展等方法来调整问题与文档片段之间语义差异。在检索之后,重新排列已获取到的文档语料库可以避免"迷失在中间"现象,或者可以过滤并压缩上下文以缩短窗口长度。
  • 模块化RAG:随着RAG技术进一步发展和演变,模块化RAG的概念诞生了。结构上,它更自由灵活,引入更具体功能模块如查询搜索引擎以及多答案融合。技术层面上,它将信息查找与微调、强化学习等技术集成起来。在流程方面,RAG模块设计并协同工作形成各种不同类型RAG。

然而,模块化 RAG 并非突然出现,这三种范式存在继承与发展关系。高级RAG是模块化RAG的特殊情况,而初级RAG是高级RAG的特殊情况。

RAG技术范式

# 1.2.2 RAG基本流程

基本流程概述:用户输入问题——>问题重构(补全指代信息,保证多轮对话的能力)——>从检索库检索答案——用LLM总结答案

RAG 由两部分组成:

  • 第一部分负责在知识库中,根据 query 检索出匹配的文档。
  • 第二部分将 query 和文档拼接起来作为 QA 的 prompt,送入 seq2seq 模型,生成回复。

RAG原理

# 1.2.3 选择RAG还是微调

除了RAG之外,LLMs的主要优化策略还包括提示工程和微调(FT)。每种都有其独特的特点。根据它们对外部知识的依赖性以及对模型调整的需求,每种都有适合的应用场景。

RAG与FT的比较

RAG就像是给模型提供了一本定制信息检索的教科书,非常适合特定的查询。另一方面,FT就像一个学生随着时间内化知识,更适合模仿特定的结构、风格或格式。通过增强基础模型的知识、调整输出和教授复杂指令,FT可以提高模型的性能和效率。然而,它并不擅长整合新知识或快速迭代新用例。RAG和FT并不互斥,它们相辅相成,并且同时使用可能会产生最好的结果。

RAG与FT的关系

# 1.2.4 如何评价RAG的效果

对RAG的评估方法多种多样,主要包括三个质量分数:上下文相关性、答案准确性和答案相关性。此外,评估还涉及四项关键能力:抗噪声能力、拒绝能力、信息整合以及反事实鲁棒性。这些评价维度将传统的定量指标与针对RAG特点的专门评估标准相结合,尽管这些标准尚未得到标准化。

在评价框架方面,有RGB和RECALL等基准测试,以及像RAGAS、ARES和TruLens等自动化评价工具,它们帮助全面衡量RAG模型的表现。

如何评价RAG的效果

# 2. 部署大模型服务

# 2.1 租用GPU服务器

实验环境:租用的AutoDL的GPU服务器,NVIDIA RTX 4090 / 24GB,Ubuntu20.04,Python 3.8, CUDA 11.3

由于这家的服务器都是境内的,拉取Github代码和HuggingFace模型都会受到墙的干扰,建议配置一下代理。

$ source /etc/network_turbo
1

另外,该服务器上的服务不可被外界直接调用,如有需要,可使用自定义服务。

  • 打开自定义服务,在本地终端输入上述命令(6006换成服务器实际部署服务的端口),之后就可以在本地通过127.0.0.1:6006去访问服务器部署的服务了。

本地访问AutoDL部署的服务

注:现在这个“自定义服务”需要实名认证,6006端口只是个示例,可以换成任意端口,也可以开多个。

# 2.2 大模型基座选型

选用当下效果比较好的Baichuan13B大模型,以下将会提供普通服务和流式服务两种调用方式。

显存要求如下表所示,由于租用的3090显卡只有24GB显存,因此只能跑8bits量化模型。如果你的显卡资源够,可以跑全精度,代码改成model = model.cuda()

Precision Baichuan2-7B Baichuan2-13B
bf16 / fp16 15.3 27.5
8bits 8.0 16.1
4bits 5.1 8.6

# 2.3 准备部署代码

# 2.3.1 普通服务的代码

baichuan_api_server.py

# -*- coding: utf-8 -*-

from flask import Flask, request
from flask_cors import cross_origin
import json
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation.utils import GenerationConfig
import datetime

model_path = '/Path/Baichuan2-13B-Chat'
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16,
                                             trust_remote_code=True)
torch.cuda.set_device(0)  # 指定显卡
# model = model.cuda()
model = model.quantize(8).cuda()
model.generation_config = GenerationConfig.from_pretrained(
    model_path
)
tokenizer = AutoTokenizer.from_pretrained(
    model_path,
    use_fast=False,
    trust_remote_code=True
)
model.eval()

app = Flask(__name__)


@app.route('/', methods=['POST'])
@cross_origin()
def batch_chat():
    global model, tokenizer

    data = json.loads(request.get_data())
    now = datetime.datetime.now()
    time_format = now.strftime("%Y-%m-%d %H:%M:%S")
    try:
        messages = data.get("messages")
        response = model.chat(tokenizer, messages)
        answer = {"response": response, "history": [], "status": 200, "time": time_format}
        return answer
    except Exception as e:
        return {"response": f"大模型预测出错:{repr(e)}", "history": [('', '')], "status": 444, "time": time_format}


if __name__ == '__main__':
    with torch.no_grad():
        app.run(host='0.0.0.0', port=1707)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

后台启动服务:

$ nohup python3 baichuan_api_server.py > baichuan_api_server.log 2>&1 &           
1

# 2.3.2 流式服务的代码

baichuan_stream_api_server.py

# -*- coding: utf-8 -*-

import argparse
from flask import Flask, request, Response
from flask_cors import cross_origin
import json
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.generation.utils import GenerationConfig

model_path = '/Path/Baichuan2-13B-Chat'
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, device_map="auto",
                                             trust_remote_code=True)
torch.cuda.set_device(0)  # 指定显卡
# model = model.cuda()
model = model.quantize(8).cuda()
model.generation_config = GenerationConfig.from_pretrained(
    model_path
)
tokenizer = AutoTokenizer.from_pretrained(
    model_path,
    use_fast=False,
    trust_remote_code=True
)
model.eval()

app = Flask(__name__)


def solve(messages):
    position = 0
    for response in model.chat(tokenizer, messages, stream=True):
        chunk = response[position:]
        yield chunk
        position = len(response)


@app.route('/', methods=['POST'])
@cross_origin()
def batch_chat():
    global model, tokenizer

    data = json.loads(request.get_data())
    messages = data.get("messages")
    return Response(solve(messages), content_type='text/plain; charset=utf-8')


parser = argparse.ArgumentParser(description='')
parser.add_argument('--port', default=1708, type=int, help='服务端口')
args = parser.parse_args()

if __name__ == '__main__':
    with torch.no_grad():
        app.run(host='0.0.0.0', port=args.port)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

后台启动服务:

$ nohup python3 baichuan_stream_api_server.py > baichuan_stream_api_server.log 2>&1 & 
1

# 2.4 下载模型并安装依赖

# 2.4.1 下载模型文件

模型地址:https://huggingface.co/baichuan-inc/Baichuan2-13B-Chat/tree/main (opens new window)

Baichuan2-13B-Chat模型

注:如果没有梯子,也可以用国内镜像站去下载模型,https://aifasthub.com/models (opens new window)

可以使用 HuggingFace Hub 下载模型文件,首先,我们需要安装huggingface_hub依赖。

$ pip3 install huggingface_hub
1

之后执行该脚本即可。

# -*- coding: utf-8 -*-

import os
from huggingface_hub import snapshot_download

# 模型仓库的标识
repo_id = "baichuan-inc/Baichuan2-13B-Chat"

# 下载模型到指定目录
local_dir = "./{}".format(repo_id)

# 检查目录是否存在,如果不存在则创建
if not os.path.exists(local_dir):
    os.makedirs(local_dir)

snapshot_download(repo_id=repo_id, local_dir=local_dir)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2.4.2 安装依赖环境

torch环境使用服务器镜像自带的(没有的话 pip3 install torch 安装一下)。依赖安装的坑比较多,主要是CUDA环境不匹配的问题。

$ pip3 install flask 
$ pip3 install flask_cors
$ pip3 install accelerate 
$ pip3 install sentencepiece
$ pip3 install scipy
$ pip3 install transformers==4.33.2  
$ pip3 install xformers

$ git clone https://github.com/TimDettmers/bitsandbytes.git
$ cd bitsandbytes
$ vim Makefile
# CC_ADA_HOPPER := -gencode arch=compute_89,code=sm_89
# CC_ADA_HOPPER += -gencode arch=compute_90,code=sm_90
$ CUDA_VERSION=121 make cuda12x
$ python3 setup.py install
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

踩过的坑:

[1] transformers 安装问题

一开始直接使用 pip3 install transformers 去安装,但出现了 AttributeError: 'BaichuanTokenizer' object has no attribute 'sp_model' 的问题。检查模型文件下载全了,查阅资料得知 pip3 install transformers==4.33.2 版本可解决此问题。

安装完之后,执行时又卡在 Xformers is not installed correctly. If you want to use memory_efficient_attention to accelerate training use the following command to install Xformers,之后又执行 pip3 install xformers 安装该依赖,解决了该问题。

[2] bitsandbytes安装问题

bitsandbytes是用于大模型量化的库,项目地址:https://github.com/TimDettmers/bitsandbytes (opens new window)

一开始直接使用 pip3 install bitsandbytes 去安装,但出现了与CUDA不兼容的问题。(该问题不一定会出现,优先使用pip3去安装,不行的话再考虑编译安装)

bitsandbytes与CUDA版本不兼容问题

然后我又使用了编译安装的方式,又出现 Unsupported gpu architecture 'compute_89' 的问题。

bitsandbytes编译安装的版本问题

使用 nvcc --list-gpu-arch 命令查询,发现只支持到 compute_86。因此修改 Makefile,将compute_89和compute_90的都注释掉,然后重新进行编译即可。

# 2.5 使用大模型服务

# 2.5.1 使用普通服务

baichuan_api_test.py

# -*- coding: utf-8 -*-


import requests
import json


class Baichuan:
    def __init__(self, url):
        self.url = url

    def __call__(self, messages: list) -> str:
        data = {"messages": messages}
        response = requests.post(self.url, json=data)
        response = json.loads(response.content)
        return response["response"]


if __name__ == '__main__':
    llm = Baichuan("http://127.0.0.1:1707/")
    messages = [{
        "role": "user",
        "content": "解释一下量子计算"
    }]
    response = llm(messages)
    print(response)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 2.5.2 使用流式服务

baichuan_stream_api_test.py

# -*- coding: utf-8 -*-

import requests
import json


class Baichuan:
    def __init__(self, url):
        self.url = url

    def stream_request(self, messages: list):
        data = json.dumps({"messages": messages})
        try:
            with requests.post(self.url, data=data, headers={'Content-Type': 'application/json'}, stream=True) as response:
                response.raise_for_status()
                for line in response.iter_lines():
                    if line:
                        decoded_chunk = line.decode('utf-8')
                        yield decoded_chunk
        except requests.RequestException as e:
            print(f"请求错误: {e}")


if __name__ == '__main__':
    llm = Baichuan("http://127.0.0.1:1708")
    messages = [{
        "role": "user",
        "content": "解释一下量子计算"
    }]
    for response in llm.stream_request(messages):
        print(response)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

注:使用流式服务,可以让结果随着推理过程,一点儿一点儿的往外输出,用户体验更好,但使用流式服务会比普通服务更耗资源。

# 2.5.3 运行出的效果

以下是 Baichuan2-13B-Chat 模型在 8bits 量化的运行效果。

Baichuan2-13B-Chat-8bits量化的运行效果

# 2.6 对服务进行压力测试

实验环境:单卡 NVIDIA A40 / 48GB,跑的全精度 Baichuan2-13B-Chat 模型,使用如下脚本对普通服务进行压测。

# -*- coding: utf-8 -*-

from typing import Union, Any, List, Tuple
import requests
import json
import threading
import time


class Baichuan:
    def __init__(self, url):
        self.url = url

    def send_request(self, messages: List[dict]) -> Tuple[bool, Union[str, Any], float]:
        start_time = time.time()
        try:
            data = {"messages": messages}
            response = requests.post(self.url, json=data)
            response = json.loads(response.content)
            success = True
        except Exception as e:
            response = str(e)
            success = False
        end_time = time.time()
        return success, response, end_time - start_time


def worker(url, messages, index, stats):
    bc = Baichuan(url)
    success, response, duration = bc.send_request(messages)
    with stats['lock']:
        if success:
            stats['success_count'] += 1
        else:
            stats['failure_count'] += 1

        stats['total_duration'] += duration
        if duration < stats['min_duration']:
            stats['min_duration'] = duration
        if duration > stats['max_duration']:
            stats['max_duration'] = duration

    print(f"Thread {index}: {'Success' if success else 'Failure'}, Response: {response}, Duration: {duration}s")


if __name__ == '__main__':
    url = "http://127.0.0.1:1707/"
    messages = [{
        "role": "user",
        "content": "解释一下量子计算"
    }]

    num_threads = 10  # 测试并发线程数
    num_rounds = 3  # 测试轮数

    stats = {
        'success_count': 0,
        'failure_count': 0,
        'total_duration': 0.0,
        'min_duration': float('inf'),
        'max_duration': float('-inf'),
        'lock': threading.Lock()
    }

    for round in range(num_rounds):
        print(f"开始测试轮数: {round + 1}")
        threads = []

        for i in range(num_threads):
            thread = threading.Thread(target=worker, args=(url, messages, i, stats))
            threads.append(thread)
            thread.start()

        for thread in threads:
            thread.join()

    # 输出总体统计结果
    avg_duration = stats['total_duration'] / (num_threads * num_rounds) if num_threads > 0 else 0
    print(f"总成功次数: {stats['success_count']}, 总失败次数: {stats['failure_count']}")
    print(f"整体最短耗时: {stats['min_duration']:.2f}s, 整体最长耗时: {stats['max_duration']:.2f}s, 整体平均耗时: {avg_duration:.2f}s")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

测试效果:1线程平均耗时16.32s,3线程平均耗时49.21s,5线程平均耗时82.62s,10线程平均耗时172.23s。它是可以扛住少量并发的,但开多线程并不能提高处理效率。如果出现少量并发都出现大模型预测出错问题,那说明是显卡不行(比如3090、A6000),跟代码无关,可以考虑Nginx负载均衡,详见我的另外一篇博客:Docker容器化及项目环境管理 (opens new window)

Baichuan大模型服务的压力测试

# 3. 检索增强大模型生成实例

# 3.1 实例场景概述

需求场景:有一批内部的政府政策文档,数据不可外传,只能使用自托管大模型来实现,需要基于这些文档进行垂直领域的问答。

数据预处理:提供的文档主要是Word、PDF等格式,无法直接使用,需要将数据预处理再入检索库,这里是处理成txt格式了。

检索增强大模型生成-数据预处理后的格式

数据预处理要求:数据量少的话,可以人工去做,做的方法是每个文档拆开,拆开后每个数据是:“文档标题+文档中的某一段”,目的是保证每条数据都有较完整的语义,并且长度不会太长(1000个token以内最好,当然肯定是越短越好,会更加准确)。

以下示例项目我已经在Github上进行了开源(数据不便于公开,所以用于构建检索库的数据文件我删掉了),项目地址为:https://github.com/Logistic98/rag-llm (opens new window)

# 3.2 构建ES检索库

# 3.2.1 搭建ES环境并安装ik分词器

Step1:搭建Docker环境

$ apt-get update -y && apt-get install curl -y  # 安装curl
$ curl https://get.docker.com | sh -   # 安装docker
$ sudo systemctl start docker  # 启动docker服务(改成restart即为重启服务)
$ docker version # 查看docker版本(客户端要与服务端一致)
1
2
3
4

Step2:使用Docker搭建ElasticSearch

$ docker pull elasticsearch:7.16.2
$ docker run -d --name es \
-p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms1g -Xmx1g" \
elasticsearch:7.16.2
$ docker update es --restart=always
1
2
3
4
5
6

Step3:进入容器给ElasticSearch配置密码

$ docker exec -it es /bin/bash 
$ cd config
$ chmod o+w elasticsearch.yml
$ vi elasticsearch.yml
1
2
3
4

其中,在 elasticsearch.yml 文件的末尾添加以下配置,代表开启xpack安全认证)

xpack.security.enabled: true    
1

然后把权限修改回来,重启容器,设置账号密码,浏览器访问http://IP:9200地址即可(用 elastic账号 和自己设置的密码登录即可)

$ chmod o-w elasticsearch.yml
$ exit
$ docker restart es
$ docker exec -it es /bin/bash 
$ ./bin/elasticsearch-setup-passwords interactive   // 然后设置一大堆账号密码
1
2
3
4
5

Step4:安装ik分词器插件

$ docker exec -it es /bin/bash
$ apt-get install -y wget   
$ wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip
$ unzip -o -d /usr/share/elasticsearch/elasticsearch-analysis-ik-7.16.2 /usr/share/elasticsearch/elasticsearch-analysis-ik-7.16.2.zip
$ rm –f elasticsearch-analysis-ik-7.16.2.zip
$ mv /usr/share/elasticsearch/elasticsearch-analysis-ik-7.16.2 /usr/share/elasticsearch/plugins/ik
$ cd /usr/share/elasticsearch/bin
$ elasticsearch-plugin list
$ exit
$ docker restart es
1
2
3
4
5
6
7
8
9
10

# 3.2.2 构建ES索引并写入数据

安装 elasticsearch 依赖

$ pip3 install elasticsearch
1

es_index.py

# -*- coding: utf-8 -*-

import os
from elasticsearch import Elasticsearch
from elasticsearch import helpers

index_name = "audit_qa"
es = Elasticsearch(
    hosts=["http://127.0.0.1:9200"],
    basic_auth=("elastic", "your_password"),
    request_timeout=60
)
CREATE_BODY = {
    "settings": {
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "content": {
                "type": "text",
                "analyzer": "ik_max_word"
            }
        }
    }
}

es.indices.create(index=index_name, body=CREATE_BODY)
directory_path = "./preprocess_data"
contents = []

# 遍历目录下的文件
for filename in os.listdir(directory_path):
    # 确保文件是以txt为扩展名的文本文件
    if filename.endswith(".txt"):
        file_path = os.path.join(directory_path, filename)
        # 读取文件内容并添加到列表中
        with open(file_path, 'r', encoding='utf-8') as file:
            file_content = file.read()
            contents.append(file_content)

action = (
    {
        "_index": index_name,
        "_type": "_doc",
        "_id": i,
        "_source": {
            "content": contents[i]
        }
    } for i in range(0, len(contents))
)
helpers.bulk(es, action)

print("export es finish")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

执行该文件,将预处理的数据导入ES索引库。

将预处理的数据导入ES索引库

# 3.2.3 构建ES文档检索服务

es_search.py

# -*- coding: utf-8 -*-

import json
from flask import Flask, request
from flask_cors import cross_origin
from elasticsearch import Elasticsearch

index_name = "policy_qa"
es = Elasticsearch(
    hosts=["http://127.0.0.1:9200"],
    basic_auth=("elastic", "your_password"),
    request_timeout=60
)

app = Flask(__name__)


@app.route('/', methods=['POST'])
@cross_origin()
def retrieval():
    data = json.loads(request.get_data())
    question = data.get("question")
    top_k = data.get("top_k")
    query_body = {
        "query": {
            "match": {
                "content": question
            }
        },
        "size": top_k
    }
    res = es.search(index=index_name, body=query_body)
    docs = []
    for hit in res['hits']['hits']:
        docs.append(hit["_source"]["content"])
    return {"docs": docs}


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=1709)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

启动ES检索服务,下面会用到。

# 3.3 基于ES检索增强生成回答

solve.py

# -*- coding: utf-8 -*-

import os
import requests
import json

# Global Parameters
RETRIEVAL_TOP_K = 2
LLM_HISTORY_LEN = 30


class Baichuan:
    def __init__(self, url):
        self.url = url

    def __call__(self, messages: list) -> str:
        data = {"messages": messages}
        response = requests.post(self.url, json=data)
        response = json.loads(response.content)
        return response["response"]


def init_cfg(url_llm):
    global llm
    llm = Baichuan(url=url_llm)


def get_docs(question: str, url: str, top_k=RETRIEVAL_TOP_K):
    data = {"question": question, "top_k": top_k}
    docs = requests.post(url, json=data)
    docs = json.loads(docs.content)
    return docs["docs"]


def get_knowledge_based_answer(query, history_obj, url_retrieval):
    global llm, RETRIEVAL_TOP_K

    if len(history_obj.history) > LLM_HISTORY_LEN:
        history_obj.history = history_obj.history[-LLM_HISTORY_LEN:]

    # Rewrite question
    if len(history_obj.history):
        rewrite_question_input = history_obj.history.copy()
        rewrite_question_input.append(
            {
                "role": "user",
                "content": f"""请基于对话历史,对后续问题进行补全重构,如果后续问题与历史相关,你必须结合语境将代词替换为相应的指代内容,让它的提问更加明确;否则直接返回原始的后续问题。
                注意:请不要对后续问题做任何回答和解释。
                
                后续问题:{query}
                
                修改后的后续问题:"""
            }
        )
        new_query = llm(rewrite_question_input)
    else:
        new_query = query

    # 获取相关文档
    docs = get_docs(new_query, url_retrieval, RETRIEVAL_TOP_K)
    doc_string = ""
    for i, doc in enumerate(docs):
        doc_string = doc_string + doc + "\n"
    history_obj.history.append(
        {
            "role": "user",
            "content": f"请基于参考,回答问题,并给出参考依据:\n问题:\n{query}\n参考:\n{doc_string}\n答案:"
        }
    )

    # 调用大模型获取回复
    response = llm(history_obj.history)

    # 修改history,将之前的参考资料从history删除,避免history太长
    history_obj.history[-1] = {"role": "user", "content": query}
    history_obj.history.append({"role": "assistant", "content": response})

    # 检查history.json是否存在,如果不存在则创建
    if not os.path.exists("./history.json"):
        with open("./history.json", "w", encoding="utf-8") as file:
            json.dump([], file, ensure_ascii=False, indent=2)

    # 读取现有数据,追加新数据,并写回文件
    with open("./history.json", "r", encoding="utf-8") as file:
        data = json.load(file)
    data.append({"query": query, "new_query": new_query, "docs": docs, "response": response,
                 "retrieval": "ES"})
    with open("./history.json", "w", encoding="utf-8") as file:
        json.dump(data, file, ensure_ascii=False, indent=2)

    return {"response": response, "docs": docs}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

server.py

# -*- coding: utf-8 -*-

from flask import Flask, request
from flask_cors import cross_origin
import time
from solve import *

app = Flask(__name__)


class History:
    def __init__(self):
        self.history = []


session_histories = {}


@app.route("/get_bot_response", methods=["POST"])
@cross_origin()
def get_bot_response():
    global session_histories
    data = json.loads(request.get_data())
    userText = data["content"]  # 用户输入
    session_id = data["id"]  # 用户id,用于保存对话历史

    # 获取对话历史,如果有的话
    if session_id in session_histories:
        history_obj = session_histories[session_id]["history"]
        session_histories[session_id]["last_access_time"] = time.time()
    else:
        history_obj = History()
        session_histories[session_id] = {
            "history": history_obj,
            "last_access_time": time.time(),
        }

    # 如果用户超过一个小时没有交互,则删除该用户的对话历史
    max_idle_time = 60 * 60
    for session_id, session_data in session_histories.copy().items():
        idle_time = time.time() - session_data["last_access_time"]
        if idle_time > max_idle_time:
            del session_histories[session_id]

    if userText == "清空对话历史":
        history_obj.history = []
        return str("已清空")

    response = get_knowledge_based_answer(
        query=userText, history_obj=history_obj, url_retrieval="http://127.0.0.1:1709/"
    )
    return response


if __name__ == "__main__":
    init_cfg("http://127.0.0.1:1707/")
    app.run(host="0.0.0.0", port=1710)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

rag_test.py

# -*- coding: utf-8 -*-

import json
import requests
import random
from tqdm import trange


if __name__ == '__main__':
    url = "http://127.0.0.1:1710/get_bot_response"
    question = ["什么是政府专项债务?", "专项债收入可以用于经常性支出吗?", "政府专项债务应当通过什么偿还?"]
    for i in trange(len(question)):
        data = {"id": random.randint(0, 9999999), "content": question[i]}
        res = requests.post(url, json=data)
        res = json.loads(res.content)
        print("\nQuestion: " + question[i])
        print("\nAnswer: " + res["response"])
        print("\n-------------------------------------------------")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

先执行 server.py 启动 ES 检索增强大模型生成服务,再执行 rag_test.py 进行测试。输出里会有个 history.json 文件,记录中间过程及结果。

基于ES检索增强生成回答的效果

# 4. 参考资料

[1] 检索增强生成 (RAG) from Prompt Engineering Guide (opens new window)

[2] 用检索增强生成让大模型更强大,这里有个手把手的Python实现 from 机器之心 (opens new window)

[3] 检索、提示:检索增强的(Retrieval Augmented)自然语言处理 from 知乎 (opens new window)

[4] Retrieval Augmented Generation: Streamlining the creation of intelligent natural language processing models from Meta (opens new window)

[5] nvcc fatal : Unsupported gpu architecture 'compute_86' from CSDN (opens new window)

[6] ‘BaichuanTokenizer' object has no attribute 'sp_model' 问题 from CSDN (opens new window)

[7] 一文详解检索增强语言模型新范式REPLUG from CSDN (opens new window)

[8] 大模型+检索增强(RAG、Atlas 和 REPLUG)from CSDN (opens new window)

[9] Huggingface Transformers+Accelerate多卡推理实践(指定GPU和最大显存)from 知乎 (opens new window)

[10] Handling big models for inference from HuggingFace官方文档 (opens new window)

[11] 检索增强生成RAG的技术趋势调查仓库RAG-Survey from Github (opens new window)

[12] 一文读懂RAG的来源、发展和前沿 from 微信公众号 (opens new window)

[13] 多模态RAG综述 from 知乎 (opens new window)

Last Updated: 4/14/2024, 1:36:38 PM