SQLALchemy及数据库中间件的集成使用

11/8/2021 SQLALchemyORM框架

# 1. 使用SQLALchemy ORM框架

# 1.1 SQLALchemy简介

SQLALchemy是Python的一款优秀的ORM框架,它可以作用于任何第三方Web框架。它简化了应用程序开发人员在原生SQL上的操作,使开发人员将主要精力都放在程序逻辑上,从而提高开发效率。它提供了一整套著名的企业级持久性模式,设计用于高效和高性能的数据库访问。SQLALchemy相较于DjangoORM来说更加的贴近原生SQL语句,因此学习难度较低。

其主要组件如下:

组成部分 描述
Engine 框架引擎
Connection Pooling 数据库链接池
Dialect 数据库DB API种类
Schema/Types 架构&类型
SQL Expression Language SQL表达式语言
SQLALchemy主要组件

SQLALchemy必须依赖其他操纵数据的模块,Dialect用于和数据API进行交互,从而实现对数据库的操作,如:

[1] pymysql:mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
[2] cx_Oracle:oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
1
2

更多详见:https://docs.sqlalchemy.org/en/20/dialects/index.html (opens new window)

此处以pymysql为例,因此安装依赖如下:

$ pip install sqlalchemy
$ pip install pymysql
1
2

# 1.2 表操作及链接库

SQLALchemy不允许修改表结构,如果修改表结构则需要删除旧表,再创建新表:

models.py

# -*- coding: utf-8 -*-

import datetime

from sqlalchemy import Column, Integer, String, DateTime, UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base

# 基础类
Base = declarative_base()

# 创建引擎
engine = create_engine(
    # "mysql+pymysql://[email protected]:3306/db?charset=utf8",  # 无密码时
    "mysql+pymysql://root:[email protected]:3306/db?charset=utf8",  # 有密码时
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    age = Column(Integer, nullable=False)
    phone = Column(String(11))
    addr = Column(String(64), nullable=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    
    def __str__(self):
        return "id:%s name:%s age:%s phone:%s addr:%s create_time:%s" % (self.id, self.name, self.age, self.phone, self.addr, self.create_time)


def create_tb():
    """
    创建表
    :return:
    """
    Base.metadata.create_all(engine)


def drop_tb():
    """
    删除表
    :return:
    """
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_tb()
    create_tb()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

表创建好之后,开始链接库。

from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
1
2
3
4
5
6
7
8
9
10

# 1.3 单表增删改查

# 1.3.1 单表DML操作

[1] 新增记录

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

# 新增记录
user_obj = Users(name="user001", phone="15125352333", age=23, addr="China")
session.add(user_obj)

# 提交
session.commit()

# 关闭链接(可使用session.remove())
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

[2] 修改记录

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

# 修改名字
session.query(Users).filter_by(id=1).update({"name": "USER001"})
# 修改年龄,使用+号,默认为"fetch",代表只允许int类型使用+号
session.query(Users).filter_by(id=1).update({"age": Users.age + 1}, synchronize_session="fetch")
# 修改地址,使用+号,由于是字符类型,所以synchronize_session=False
session.query(Users).filter_by(id=1).update({"addr": Users.addr + "BeiJing"}, synchronize_session=False)

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

[3] 删除记录

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

# 删除记录
session.query(Users).filter_by(id=1).delete()

# 提交
session.commit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

[4] 批量增加

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

# 批量增加
session.add_all([
    Users(name="user001", age=21, phone="13269867233", addr="ShangHai"),
    Users(name="user002", age=18, phone="13269867234", addr="GuangZhou"),
    Users(name="user003", age=24, phone="13269867235", addr="ChongQing"),
    Users(name="user003", age=29, phone="13269867236", addr="ChongQing"),
    Users(name="user003", age=25, phone="13269867237", addr="ChongQing"),
])

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 1.3.2 单表DQL操作

使用批量增加的数据进行查询操作。

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)


# 打印结果
def print_result(result):
    if isinstance(result, list):
        for item in result:
            print(item)
    else:
        print(result)


# 基本查询
# -- 查所有 --
result_00 = session.query(Users).all()
print("-----result_00:")
print_result(result_00)

# -- 过滤 --
result_01 = session.query(Users).filter(Users.name == "user001").all()  # Python表达式的形式过滤
print("-----result_01:")
print_result(result_01)
result_02 = session.query(Users).filter_by(name="user002").all()  # ORM形式过滤
print("-----result_02:")
print_result(result_02)
result_03 = session.query(Users).filter_by(name="user003").first()  # ORM形式过滤 取第一个
print("-----result_03:")
print_result(result_03)

# 只拿某字段
result_04 = session.query(Users.name, Users.age).first()
print("-----result_04:")
print_result(result_04)

# and(用逗号或者用and_)
result_05 = session.query(Users).filter(Users.id > 1, Users.age < 23).all()
print("-----result_05:")
print_result(result_05)
from sqlalchemy import and_
result_06 = session.query(Users).filter(and_(Users.id > 1, Users.age < 23)).all()
print("-----result_06:")
print_result(result_06)

# or
from sqlalchemy import or_
result_07 = session.query(Users).filter(or_(Users.id > 3, Users.age < 23)).all()
print("-----result_07:")
print_result(result_07)

# and与or的组合使用
result_08 = session.query(Users).filter(or_(
    Users.id > 1,
    and_(Users.id > 2, Users.age < 24)
)).all()
print("-----result_08:")
print_result(result_08)

# 范围
result_09 = session.query(Users).filter(Users.age.between(18, 24)).all()
print("-----result_09:")
print_result(result_09)

# 包含
result_10 = session.query(Users).filter(Users.age.in_([18, 21, 24])).all()
print("-----result_10:")
print_result(result_10)

# 取反 ~
result_11 = session.query(Users).filter(~Users.age.in_([18, 21, 24])).all()
print("-----result_11:")
print_result(result_11)

# 通配符
result_12 = session.query(Users).filter(Users.name.like("us%")).all()
print("-----result_12:")
print_result(result_12)

# 分页
result_13 = session.query(Users).all()[0:1]
print("-----result_13:")
print_result(result_13)

# 排序
result_14 = session.query(Users).order_by(Users.id.desc()).all()  # 倒序
print("-----result_14:")
print_result(result_14)
result_15 = session.query(Users).order_by(Users.id.asc()).all()  # 正序
print("-----result_15:")
print_result(result_15)

# 分组
result_16 = session.query(Users).group_by(Users.id).all()
print("-----result_16:")
print_result(result_16)


# 聚合函数
from sqlalchemy.sql import func
result_17 = session.query(
    func.max(Users.age),
    func.sum(Users.age),
    func.min(Users.age),
).group_by(Users.name).having(func.max(Users.age > 12)).all()
print("-----result_17:")
print_result(result_17)

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

# 1.4 多表关联操作

# 1.4.1 一对多关系

首先是建立一对多的关系,使用relationship做逻辑一对多,不会在物理表中创建关系,但是可以通过该字段进行增删改查:

# -*- coding: utf-8 -*-

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base

# 基础类
Base = declarative_base()

# 创建引擎
engine = create_engine(
    # "mysql+pymysql://[email protected]:3306/db?charset=utf8",  # 无密码时
    "mysql+pymysql://root:[email protected]:3306/db?charset=utf8",  # 有密码时
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


class Classes(Base):
    __tablename__ = "classes"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)


class Students(Base):
    __tablename__ = "students"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)
    # 真实约束字段:避免脏数据写入,在物理表中会创建真实字段关系
    # 可选级联操作:CASCADE,DELETE、RESTRICT
    fk_class = Column(Integer, ForeignKey("classes.id",ondelete="CASCADE",onupdate="CASCADE"))
    # 逻辑关系字段:不会在真实物理表中创建字段,但是可以通过该逻辑字段进行增删改查
    # backref:反向查询的名字
    re_class = relationship("Classes", backref="students")


def create_tb():
    """
    创建表
    :return:
    """
    Base.metadata.create_all(engine)


def drop_tb():
    """
    删除表
    :return:
    """
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_tb()
    create_tb()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

通过逻辑字段进行增加:

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)


session.add_all(
    [
        Students(name="学生01", re_class=Classes(name="一年级一班")),  # 自动填入fk_class
        Students(name="学生02", re_class=Classes(name="一年级二班")),
    ]
)

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 1.4.2 多对多关系

多对多也使用relationship做逻辑多对多,不会在物理表中创建关系,但是可以通过该字段进行增删改查。

使用relationship时,传入指定手动生成的第三张表,代表这是多对多关系。

# -*- coding: utf-8 -*-

from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship

# 基础类
Base = declarative_base()

# 创建引擎
engine = create_engine(
    # "mysql+pymysql://[email protected]:3306/db?charset=utf8",  # 无密码时
    "mysql+pymysql://root:[email protected]:3306/db?charset=utf8",  # 有密码时
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


class Classes(Base):
    __tablename__ = "classes"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)


class Students(Base):
    __tablename__ = "students"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)
    # 可选级联操作:CASCADE,DELETE、RESTRICT
    fk_class = Column(Integer, ForeignKey("classes.id", ondelete="CASCADE", onupdate="CASCADE"))
    # 逻辑关系字段:不会在真实物理表中创建字段,但是可以通过该逻辑字段进行增删改查
    # backref:反向查询的名字
    re_class = relationship("Classes", backref="students")


class Teachers(Base):
    __tablename__ = "teachers"

    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)
    # 逻辑字段M2M:指定第三张表,secondary参数为__tablename__,反向查询为teachers
    re_class = relationship("Classes", secondary="teachers_m2m_classes", backref="teachers")


class TeachersM2mClasses(Base):
    __tablename__ = "teachers_m2m_classes"

    id = Column(Integer, primary_key=True)
    teacher_id = Column(Integer, ForeignKey("teachers.id"))
    class_id = Column(Integer, ForeignKey("classes.id"))

    __table_args__ = (
        UniqueConstraint("teacher_id", "class_id"),  
    )


def create_tb():
    """
    创建表
    :return:
    """
    Base.metadata.create_all(engine)


def drop_tb():
    """
    删除表
    :return:
    """
    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    drop_tb()
    create_tb()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80

使用逻辑字段添加或自己操纵第三张表:

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

session.add_all(
    [
        Teachers(name="老师01", re_class=[
            session.query(Classes).filter_by(id=1).first()
        ]),
        Teachers(name="老师02", re_class=[
            session.query(Classes).filter_by(id=1).first()
        ]),
        Teachers(name="老师03", re_class=[
            session.query(Classes).filter_by(id=2).first()
        ]),
    ]
)

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 1.4.3 组合查询

组合查询将多张表用笛卡尔积的效果显现出来:

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

# 必须用filter,获取全部也是,不可以使用all因为他会返回一个list,list不具备union_all
# 使用filter返回的对象是:<class 'sqlalchemy.orm.query.Query'>
# 并且query中必须单拿某一个字段,如果不指定字段就直接返回对象

s = session.query(Students.name).filter()
t = session.query(Teachers.name).filter()
c = session.query(Classes.name).filter()
ret = s.union_all(t).union_all(c).all()  # 用列表显示
print(ret)

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 1.4.4 连表查询

使用join进行连表查询:

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

# 手动指定条件查询
result = session.query(Students.name, Classes.name).filter(Students.id == Classes.id).all()
for i in result:
    print(i)
print("-----")

# 连接查询,同上,内部自动指定 Students.fk_class == Classes.id 的条件
result = session.query(Students.name, Classes.name).join(Classes).all()
# 相当于:result = session.query(Students.name,Classes.name).join(Classes, Students.fk_class == Classes.id).all()
for i in result:
    print(i)
print("-----")

# 左链接查询,即使有同学没有班级也拿出来
result = session.query(Students.name, Classes.name).join(Classes, isouter=True).all()
for i in result:
    print(i)
print("-----")

# 如果想查看有哪些班级没有同学,就换一个位置
result = session.query(Students.name, Classes.name).join(Students, isouter=True).all()
for i in result:
    print(i)
print("-----")

# 三表查询,需要自己指定条件
result = session.query(Teachers.name, Classes.name, TeachersM2mClasses.id) \
    .join(Teachers, TeachersM2mClasses.teacher_id == Teachers.id) \
    .join(Classes, TeachersM2mClasses.class_id == Classes.id) \
    .filter()  # 查看原生语句
print(result)
for i in result:
    print(i)

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

# 1.4.5 正反查询

上面是使用join进行的连表查询,也可以使用逻辑字段relationship查询。

# -*- coding: utf-8 -*-

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

# 正向:查看第一个老师都在哪些班级(通过逻辑字段的名字)
result = session.query(Teachers).first()
# result.re_class是一个列表,存了有关该老师所在的班级 <class 'sqlalchemy.orm.collections.InstrumentedList'>

for class_obj in result.re_class:  # 查看其所有的班级
    print(class_obj.name)

# 反向:查看第一个班级下都有哪些老师,都有哪些学生(通过逻辑字段中的backref参数进行反向查询)
result = session.query(Classes).first()

# 看老师
for teacher_obj in result.teachers:
    print(teacher_obj.name)

# 看学生
for student_obj in result.students:
    print(student_obj.name)

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 1.5 原生SQL语句

# 1.5.1 查看原生SQL语句

如果一条查询语句是以filter结尾,则返回结果对象的__str__方法中都是SQL语句:

result = session.query(Teachers).filter()
print(result)

# SELECT teachers.id AS teachers_id, teachers.name AS teachers_name 
# FROM teachers
1
2
3
4
5

如果是all结尾,返回的就是一个列表,first结尾也是一个列表:

result = session.query(Teachers).all()
print(result)

# [<models.Teachers object at 0x00000178EB0B5550>, <models.Teachers object at 0x00000178EB0B5518>, <models.Teachers object at 0x00000178EB0B5048>]
1
2
3
4

# 1.5.2 执行原生SQL语句

# -*- coding: utf-8 -*-

from sqlalchemy import text
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

# 导入引擎,模型表等
from models import *

# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)

cursor = session.execute(text("select * from students where id <= (:num)"), params={"num": 2})
print(cursor.fetchall())

# 提交
session.commit()

# 关闭链接
session.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 2. 数据库及中间件的集成与使用

# 2.1 使用Redis缓存数据

[1] 普通方式存取数据

Step1:引入redis库

$ pip install redis
1

Step2:使用Redis

往redis存值

import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0)
r = redis.Redis(connection_pool=pool)
r.set('id', '666666') 
1
2
3
4
5

从redis取值

import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0)
r = redis.Redis(connection_pool=pool)
get_value = r.get('id')
1
2
3
4
5

注意事项:如果存入字典,最新版的会出现如下报错

redis.exceptions.DataError: Invalid input of type: 'dict'. Convert to a bytes, string, int or float first.
1

可以降级版本解决该问题

$ pip install redis==2.10.6
1

[2] hash方式存取数据

单个方式的存取值:hset设置单个值、hgetall获取整个hash、hget根据key获取单独的value

# -*- coding: utf-8 -*-

import redis

pool = redis.ConnectionPool(host="127.0.0.1", port=6379, password="123456", db=1, decode_responses=True)
r = redis.Redis(connection_pool=pool)

test_list = [1, 2, 3, "123", [1, 2, 3, "123"]]
dict_list = {"test_list": [1, 2, 3, "123", [1, 2, 3, "123"]], "test_list2": [1, 2, 3]}
for dl in dict_list:
    r.hset("dl_hash", dl, dict_list[dl])
# 设置过期时间,单位秒
r.expire("dl_hash", 6000)

# 获取整个hash
print(r.hgetall("dl_hash"))
# 根据key获取单独的value
print(r.hget("dl_hash", "test_list2"))

>>> 结果输出
{b'test_list': b"[1, 2, 3, '123', [1, 2, 3, '123']]", b'test_list2': b'[1, 2, 3]'}
b'[1, 2, 3]'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

注:设置 decode_responses=True 是为了解决Redis与Python交互取出来的是bytes类型的问题。

Python使用Hash方式存储Redis

批量方式的存取值:hmset 批量设置多个值,hmget 批量获取多个值

r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
r.hmget('xx', 'k1', 'k2')
1
2

注意:redis库不能操作 redis 集群,实际使用中会遇到如下错误,可以使用 redis-py-cluster 库去实现 redis 集群的操作。

redis.exceptions.ResponseError: MOVED 12285 192.168.222.66:6384
1

[3] 遍历所有key及其内容

遍历所有key

# -*- encoding: UTF-8 -*-

import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0, decode_responses=True)
r = redis.StrictRedis(connection_pool=pool)

keys = r.keys()
print(keys)
1
2
3
4
5
6
7
8
9

批量遍历所有key的内容

# -*- coding: utf-8 -*-

import redis

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)

pipe = r.pipeline()
pipe_size = 100000

len = 0
key_list = []
result_list = []
keys = r.keys()
for key in keys:
    result_item = {}
    key_list.append(key)
    pipe.get(key)
    if len < pipe_size:
        len += 1
    else:
        for (k, v) in zip(key_list, pipe.execute()):
            result_item[k] = v
            result_list.append(result_item)
        len = 0
        key_list = []

for (k, v) in zip(key_list, pipe.execute()):
    result_item = {}
    result_item[k] = v
    result_list.append(result_item)

print(result_list)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 2.2 写入及查询MySQL

引入pymysql库

$ pip install pymysql
1

将数据保存到MySQL示例:

config.ini

[MYSQL]
host = 127.0.0.1
user = root
password = 123456
port = 3306
db = testdb
table = test_table
1
2
3
4
5
6
7

log.py

# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./save_mysql.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

save_mysql.py

# -*- coding: utf-8 -*-

from configparser import ConfigParser
import pymysql


# 读取配置文件
def read_config(config_path):
    cfg = ConfigParser()
    cfg.read(config_path, encoding='utf-8')
    section_list = cfg.sections()
    config_dict = {}
    for section in section_list:
        section_item = cfg.items(section)
        for item in section_item:
            config_dict[item[0]] = item[1]
    return config_dict


# 保存到mysql数据库
def save_data(mysql_dict, data_list):
    if data_list == []:
        return None
    mysql = pymysql.connect(host=str(mysql_dict['host']), user=str(mysql_dict['user']),
                            password=str(mysql_dict['password']), port=int(mysql_dict['port']),
                            db=str(mysql_dict['db']), charset='utf8')
    for i in data_list:
        cursor = mysql.cursor()
        qmarks = ', '.join(['%s'] * len(i))
        columns = ', '.join(i.keys())
        try:
            qry = "insert into " + str(mysql_dict['table']) + " (%s) Values (%s);" % (columns, qmarks)
            values_list = []
            for j in i.values():
                values_list.append(j)
            cursor.execute(qry, values_list)
            mysql.commit()
        except Exception as e:
            logger.error(e)
        cursor.close()
    mysql.close()

if __name__ == '__main__':
    config_path = './config.ini'
    mysql_dict = read_config(config_path)
    data_list = [{'USERNAME': 'zhangsan', 'MESSAGE': 'test1'},{'USERNAME': 'lisi', 'MESSAGE': 'test2'}]
    save_data(mysql_dict, data_list)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

注意事项:

1)插入时报错 Incorrect string value: ‘\xF0\x9F\x98\xAD“,...‘ for column ‘commentContent‘ at row 1

原因:数据库编码问题导致的,原因在于插入数据中存在emoji表情,而这些表情是按照四个字节一个单位进行编码的,而我们通常使用的utf-8编码在mysql数据库中默认是按照3个字节一个单位进行编码的,导致将数据存入mysql的时候出现错误。

解决:修改数据库与数据表编码,然后再改一下连接数据库的字符集编码。

1.修改mysql数据库的编码为uft8mb4  
2.修改数据表的编码为utf8mb4
3.将代码连接mysql处改为charset='utf8mb4'
1
2
3

2)mysql连接处报错AttributeError: 'NoneType' object has no attribute 'encoding'

应该是charset='utf8' (而不是charset='utf-8')
1

3)如果是mysql查询语句,参照如下示例编写即可。

    try:
    		sql = "select * from table"
    		cursor = mysql.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
        print(result)
    except Exception as e:
        logger.error(e)
1
2
3
4
5
6
7
8

4)将longblob类型字段的数据写入到文件

if not os.path.exists('./img'):
    os.makedirs('./img')
uuid = uuid1()
img_path = './img/{}.jpg'.format(uuid)
f = open(img_path, 'wb')
f.write(result['image_file'])
1
2
3
4
5
6

5)datetime类型字段的处理

MySQL的datetime类型字段直接查询出来是datetime.datetime()形式,再插入时就会出现问题。这里提供2种解决方案:一是sql层面转换,二是重写json函数进行转换。

# -*- coding: utf-8 -*-

import datetime
import json


class DateEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.strftime("%Y-%m-%d %H:%M:%S")
        else:
            return json.JSONEncoder.default(self, obj)


dic = {'name': 'jack', 'create_time': datetime.datetime(2019, 3, 19, 10, 6, 6)}

print(json.dumps(dic, cls=DateEncoder))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

6)将mysql的查询结果以字典的形式返回

desc = cursor.description  # 获取字段的描述
result = [dict(zip([col[0] for col in desc], row)) for row in cursor.fetchall()]   # 以字典的形式返回数据
1
2

7)mysql批量插入

构建好sql,使用 cursor.executemany 方法批量插入数据库中

# 读取txt文件并批量写入MySQL(不存在则插入,存在则更新)
def read_txt_batch_import_mysql(mysql_connect, txt_path, table_name):
    with open(txt_path, 'r', encoding='utf-8') as file:
        txt_str = file.read()
        
        # 将字符串形式的列表数据转成列表数据
        txt_list = eval(txt_str)
        
        # 构建查询列名
        if len(txt_list) == 0:
            return None
        else:
            columns = ', '.join(txt_list[0].keys())
            qmarks = ', '.join(['%s'] * len(txt_list[0].keys()))
        
        # 构建查询数据
        values_list = []
        for i in txt_list:
            i = json.loads(json.dumps(i, cls=DateEncoder))
            values_item_list = []
            for j in i.values():
                values_item_list.append(j)
            values_list.append(values_item_list)
            
        # 批量插入数据
        cursor = mysql_connect.cursor()
        try:
            # 使用replace代替insert,实现"存在则更新,不存在则插入"的需求
            qry = "replace into " + table_name + " (%s) values (%s);" % (columns, qmarks)
            cursor.executemany(qry, values_list)
            mysql_connect.commit()
        except Exception as e:
            logger.error(e)
        cursor.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# 2.3 查询Oracle的数据

引入cx_Oracle库

$ pip install cx_Oracle
1

安装Oracle Instant Client

从Oracle里查询数据示例

import cx_Oracle

# 指定Oracle Instant Client的目录
cx_Oracle.init_oracle_client(lib_dir="D:\\Development\\instantclient-basic-windows.x64-11.2.0.4.0")

# 执行查询sql
conn = cx_Oracle.connect("testuser", "123456", "127.0.0.1:1521/orcl")
curs = conn.cursor()
sql = "select a.id, a.title, a.text from test_table"
rr = curs.execute(sql)

# 将查询结果保存成字典
result_dir = {}
while(1):
	rs = rr.fetchone()
	if rs == None:
		break
	id = rs[0]
	title = rs[1]
	text = rs[2]
	
curs.close()
conn.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

注意事项:

1、cx_Oracle.init_oracle_client()要写在Flask接口的外面,否则第二次接口请求时会报cx_Oracle已经初始化的错误。

2、Linux端部署的时候,会出现找不到libclntsh.so动态连接库的问题,报错如下:

cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loaded: "Error loading shared library libclntsh.so: No such file or directory". See https://oracle.github.io/odpi/doc/installation.html#linux for help
1

报错原因:instantclient-basic-linux.x64-11.2.0.4.0.zip包里根本没有libclntsh.so,有的是libclntsh.so.11.1,而单纯的给这个文件改个名是不行的。

./instantclient_11_2:
|---BASIC_README
|---adrci
|---genezi
|---libclntsh.so.11.1
|---libnnz11.so
|---libocci.so.11.1
|---libociei.so
|---libocijdbc11.so
|---ojdbc5.jar
|---ojdbc6.jar
|---uidrvci
|---xstreeams.jar
1
2
3
4
5
6
7
8
9
10
11
12
13

解决办法:需要在Dockerfile里设置软链接解决(注意要用绝对路径)

ENV LD_LIBRARY_PATH=/home/instantclient_11_2
RUN ln -s /home/instantclient_11_2/libclntsh.so.11.1 /home/instantclient_11_2/libclntsh.so
1
2

# 2.4 操作SQLite的封装

Step1:创建SQLite数据库及测试数据表

方式一:打开终端,输入以下两条命令创建SQLite数据库文件(先随便创建一个测试表即可,没有表的话不会保存出test.db文件),用Navicat打开连接即可,这时测试表可以删了,然后创建自己的业务数据表。

$ sqlite3 test.db
$ create table test(name text); 
1
2

方式二:使用Python直接创建

这里创建了一个stu表,下面用这个表来测试封装。

# -*- coding: utf-8 -*-

import sqlite3
from sqlite3 import OperationalError

conn = sqlite3.connect('./test.db')
cur = conn.cursor()
try:
    sql = """CREATE TABLE stu (
                id integer primary key autoincrement,
                name varchar(15) not null,
                age integer
            );"""
    cur.execute(sql)
    print("create table success")
except OperationalError as o:
    print(str(o))
except Exception as e:
    print(e)
finally:
    cur.close()
    conn.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Step2:使用封装好的方法操作SQLite数据表

# -*- coding: utf-8 -*-

import sqlite3


class DBTool(object):
    def __init__(self):
        """
        初始化函数,创建数据库连接
        """
        self.conn = sqlite3.connect('./test.db')
        self.c = self.conn.cursor()

    def executeUpdate(self, sql, ob):
        """
        数据库的插入、修改函数
        :param sql: 传入的SQL语句
        :param ob: 传入数据
        :return: 返回操作数据库状态
        """
        try:
            self.c.executemany(sql, ob)
            i = self.conn.total_changes
        except Exception as e:
            print('错误类型: ', e)
            return False
        finally:
            self.conn.commit()
        if i > 0:
            return True
        else:
            return False

    def executeDelete(self, sql, ob):
        """
        操作数据库数据删除的函数
        :param sql: 传入的SQL语句
        :param ob: 传入数据
        :return: 返回操作数据库状态
        """
        try:
            self.c.execute(sql, ob)
            i = self.conn.total_changes
        except Exception as e:
            return False
        finally:
            self.conn.commit()
        if i > 0:
            return True
        else:
            return False

    def executeQuery(self, sql, ob):
        """
        数据库数据查询
        :param sql: 传入的SQL语句
        :param ob: 传入数据
        :return: 返回操作数据库状态
        """
        test = self.c.execute(sql, ob)
        return test

    def close(self):
        """
        关闭数据库相关连接的函数
        :return:
        """
        self.c.close()
        self.conn.close()


if __name__ == '__main__':
    db = DBTool()

    print("插入Student信息")
    name = input('输入姓名:')
    age = input('输入年龄:')
    ob = [(name, age)]
    sql = 'insert into stu (name, age) values (?,?)'
    T = db.executeUpdate(sql, ob)
    if T:
        print('插入成功!')
    else:
        print('插入失败!')

    print("通过ID修改Student姓名信息")
    sql2 = 'UPDATE stu set name = ? where ID=?'
    id = input('输入需要修改的ID:')
    name = input('输入修改的Name:')
    ob = [(name, id)]
    T = db.executeUpdate(sql2, ob)
    if T:
        print('修改成功!')
    else:
        print('修改失败!')

    print("通过姓名查询Student信息")
    sql = 'select * from stu where name=?'
    name = input('输入需要查询的学员姓名:')
    ob = [(name)]
    s = db.executeQuery(sql, ob)
    st = []
    for st in s:
        print('ID:', st[0], '  Name:', st[1], '  Age:', st[2])
    if any(st):
        pass
    else:
        print("输入有误,该学员不存在")

    print("通过ID删除Student信息")
    num = input('输入需要删除的学员ID:')
    sql2 = "DELETE from stu where ID=?"
    ob = [(num)]
    T = db.executeDelete(sql2, ob)
    if T:
        print('删除成功!')
    else:
        print('删除失败!')

    # 关闭数据库连接
    db.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

# 2.5 ElasticSearch的操作

# 2.5.1 ElasticSearch导入导出

Step1:安装依赖并编写配置文件

$ pip install elasticsearch==7.16.2   // 注意要和服务端的ES版本尽量相近(实测7.16.2、8.4.1是没问题的)
1

config.ini(把ES连接信息换成自己的)

[SOURCE_ES]
# IP地址
host = 111.111.111.111
# 端口号
port = 9200
# 用户名
user = your_es_user
# 密码
password = your_es_password
# 连接超时时间(ms)
timeout = 60
# 滚动查询的超时时间,这里设置为10分钟
scroll = 10m
# 单次查询的条数
size = 1000
# 索引列表(多个之间用","分隔)
index_list = index_1,index_2


[TARGET_ES]
# IP地址
host = 111.111.111.111
# 端口号
port = 9200
# 用户名
user = your_es_user
# 密码
password = your_es_password
# 连接超时时间(ms)
timeout = 60
# 单次批量插入数据量
step = 1000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

注:多个索引之间用英文逗号分隔(逗号后面有没有空格都无所谓,读取配置时会进行处理)

log.py

# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./es_data.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

Step2:编写ES导入导出脚本并执行

export_es_data.py

# -*- coding: utf-8 -*-

import json
import os
import sys
import time
from decimal import Decimal

from elasticsearch import Elasticsearch
from configparser import ConfigParser

from log import logger


# 将ES查询出来list写入到json文件
def write_list_to_json(list, json_file_path):
    with open(json_file_path, 'w', encoding='utf-8') as f:
        json.dump(list, f, ensure_ascii=False)


# 将符合条件的ES数据查出保存为json
def es_export_json(es_connect, es_size, es_scroll, index_list, original_data_path, now_time):
    for i in index_list:
        logger.info("正在保存{}索引的数据".format(i))
        query = {
            "range": {
                "update_time": {
                    # 小于等于本次读取开始时间
                    "lte": now_time
                }
            }
        }
        logger.info("全量导出,时间范围为{}之前".format(now_time))
        try:
            source_list = []
            # 滚动查询符合条件的所有es数据
            page = es_connect.search(index=i, query=query, size=es_size, scroll=es_scroll)
            for hit in page['hits']['hits']:
                source_data = hit['_source']
                source_data['_id'] = hit['_id']
                source_list.append(source_data)
            # 游标用于输出es查询出的所有结果
            sid = page['_scroll_id']
            # es查询出的结果总量
            scroll_size = page['hits']['total']['value']
            while (scroll_size > 0):
                page = es_connect.scroll(scroll_id=sid, scroll=es_scroll)
                sid = page['_scroll_id']
                scroll_size = len(page['hits']['hits'])
                for hit in page['hits']['hits']:
                    source_data = hit['_source']
                    source_data['_id'] = hit['_id']
                    source_list.append(source_data)
            json_file_path = original_data_path + "/" + str(i) + ".json"
            if len(source_list) != 0:
                write_list_to_json(source_list, json_file_path)
                logger.info('{}索引的数据已保存至{}路径,导出的数据总量为{}'.format(str(i), json_file_path, str(len(source_list))))
            else:
                logger.info('{}索引无更新'.format(str(i)))
        except Exception as e:
            logger.error("ES索引数据导出至JSON文件的过程出错:{}".format(e))


# 将符合条件的ES数据查出保存为json--调用入口
def export_es_data_main(source_export_dict, original_data_path, now_time):
    es_connect = Elasticsearch(
        hosts=[str(source_export_dict['es_host']) + ":" + str(source_export_dict['es_port'])],
        http_auth=(str(source_export_dict['es_user']), str(source_export_dict['es_password'])),
        request_timeout=int(source_export_dict['es_timeout'])
    )
    index_list = ''.join(source_export_dict['es_index_list'].split()).split(",")
    es_size = int(source_export_dict['es_size'])
    es_scroll = str(source_export_dict['es_scroll'])
    es_export_json(es_connect, es_size, es_scroll, index_list, original_data_path, now_time)


# 读取配置文件
def read_config(config_path):
    cfg = ConfigParser()
    cfg.read(config_path, encoding='utf-8')
    section_list = cfg.sections()
    config_dict = {}
    for section in section_list:
        section_item = cfg.items(section)
        for item in section_item:
            config_dict[item[0]] = item[1]
    return config_dict


if __name__ == '__main__':

    # 获取基础路径并读取配置信息
    base_path = os.getcwd()
    logger.info("基础路径:{}".format(base_path))
    config_path = base_path + '/config.ini'
    logger.info("配置文件路径:{}".format(config_path))
    source_export_dict = {}
    try:
        source_export_dict = read_config(config_path)
    except:
        logger.error("读取配置文件出错,程序已终止执行")
        sys.exit()

    # 导出ES数据并统计耗时
    start_time = time.time()
    start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    logger.info("----------开始导出源数据----------")
    export_es_data_main(source_export_dict, base_path, start_time_str)
    end_time = time.time()
    time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
    logger.info("----------导出源数据已完成,共耗时:{}----------".format(time_consuming_str))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

import_es_data.py

# -*- coding: utf-8 -*-

import glob
import os
import sys
import time
from decimal import Decimal

from elasticsearch import Elasticsearch, helpers
from configparser import ConfigParser

from log import logger


# 读取json文件并批量写入ES(不存在则插入,存在则更新)
def read_json_batch_import_es(es_connect, json_path, index_name, es_timeout, es_step):
    with open(json_path, 'r', encoding='utf-8') as file:
        json_str = file.read()
        # json_str中可能会存在null字符串表示空值,但是python里面没有null这个关键字,需要将null定义为变量名,赋值python里面的None
        null = None   # 不要删掉这行看似无用的代码,否则导入时遇到空值会报错 name 'null' is not defined
        # 将字符串形式的列表数据转成列表数据
        json_list = eval(json_str)
        # 按照配置文件中的步长进行写入,缓解批量写入的压力
        length = len(json_list)
        for i in range(0, length, es_step):
            # 要写入的数据长度大于步长,那么就分批写入
            if i + es_step < length:
                actions = []
                for j in range(i, i + es_step):
                    # 先把导入时添加的"_id"的值取出来
                    new_id = json_list[j]['_id']
                    del json_list[j]["_id"]  # 删除导入时添加的"_id"
                    action = {
                        "_index": str(index_name),
                        "_id": str(new_id),
                        "_source": json_list[j]
                    }
                    actions.append(action)
                helpers.bulk(es_connect, actions, request_timeout=es_timeout)
            # 要写入的数据小于步长,那么就一次性写入
            else:
                actions = []
                for j in range(i, length):
                    # 先把导入时添加的"_id"的值取出来
                    new_id = json_list[j]['_id']
                    del json_list[j]["_id"]  # 删除导入时添加的"_id"
                    action = {
                        "_index": str(index_name),
                        "_id": str(new_id),
                        "_source": json_list[j]
                    }
                    actions.append(action)
                helpers.bulk(es_connect, actions, request_timeout=es_timeout)
        logger.info("{}索引插入了{}条数据".format(str(index_name), str(length)))


# 将json数据文件导入到ES--调用入口
def import_es_data_main(target_import_dict, original_data_path):
    es_timeout = int(target_import_dict['es_timeout'])
    es_step = int(target_import_dict['es_step'])
    es_connect = Elasticsearch(
        hosts=[str(target_import_dict['es_host']) + ":" + str(target_import_dict['es_port'])],
        http_auth=(str(target_import_dict['es_user']), str(target_import_dict['es_password'])),
        request_timeout=es_timeout
    )
    json_path_list = glob.glob(original_data_path + '/*.json')
    for json_path in json_path_list:
        file_dir, file_full_name = os.path.split(json_path)
        index_name, file_ext = os.path.splitext(file_full_name)
        read_json_batch_import_es(es_connect, json_path, index_name, es_timeout, es_step)
        os.remove(json_path)  # 数据导入完成后删除json数据文件


# 读取配置文件
def read_config(config_path):
    cfg = ConfigParser()
    cfg.read(config_path, encoding='utf-8')
    section_list = cfg.sections()
    config_dict = {}
    for section in section_list:
        section_item = cfg.items(section)
        for item in section_item:
            config_dict[item[0]] = item[1]
    return config_dict


if __name__ == '__main__':

    # 获取基础路径并读取配置信息
    base_path = os.getcwd()
    logger.info("基础路径:{}".format(base_path))
    config_path = base_path + '/config.ini'
    logger.info("配置文件路径:{}".format(config_path))
    target_import_dict = {}
    try:
        target_import_dict = read_config(config_path)
    except:
        logger.error("读取配置文件出错,程序已终止执行")
        sys.exit()

    # 导出ES数据并统计耗时
    start_time = time.time()
    start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    logger.info("----------开始导入源数据----------")
    import_es_data_main(target_import_dict, base_path)
    end_time = time.time()
    time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
    logger.info("----------导入源数据已完成,共耗时:{}----------".format(time_consuming_str))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108

注意:json_str中可能会存在null字符串表示空值,但是python里面没有null这个关键字,需要将null定义为变量名,赋值python里面的None。

null = None   # 不要删掉这行看似无用的代码,否则导入时遇到空值会报错 name 'null' is not defined
1

# 2.5.2 ElasticSearch滚动查询

ES默认限制10000条查询限制,想要查询出符合条件的所有数据可以使用滚动查询。

# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch

es_connect = Elasticsearch(
    hosts="ip:port",
    http_auth=("your_user", "your_password"),
    request_timeout=60
)

source_list = []

# Elasticsearch 需要保持搜索的上下文环境多久,游标查询过期时间为10分钟(10m)
query = {
    "match_all": {}
}
page = es_connect.search(index="your_index", query=query, size=1000, scroll='10m')
for hit in page['hits']['hits']:
    source_data = hit['_source']
    source_data['_id'] = hit['_id']
    source_list.append(source_data)

# 游标用于输出es查询出的所有结果
sid = page['_scroll_id']
# es查询出的结果总量
scroll_size = page['hits']['total']['value']

while (scroll_size > 0):
    page = es_connect.scroll(scroll_id=sid, scroll='10m')
    sid = page['_scroll_id']
    scroll_size = len(page['hits']['hits'])
    for hit in page['hits']['hits']:
        source_data = hit['_source']
        source_data['_id'] = hit['_id']
        source_list.append(source_data)

print(len(source_list))
print(source_list)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 2.6 MinIO的文件上传

Step1:安装依赖并编写配置文件

$ pip install minio
1

config.ini

[minio]
minio_url = xxx.xxx.xxx.xxx:9000
access_key = minioadmin
secret_key = minioadmin
1
2
3
4

注:minio_url不要带上http://的前缀,否则会报如下错误

ValueError: path in endpoint is not allowed. Exception ignored in: <function Minio.__del__ at 0x0C0B9A98>
1

Step2:minio上传文件的代码示例

# -*- coding: utf-8 -*-

import logging
import os
from minio import Minio
from minio.error import S3Error
from configparser import ConfigParser

logging.basicConfig(filename='logging_minio.log', level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# 读取配置文件
def read_config(config_path):
    cfg = ConfigParser()
    cfg.read(config_path, encoding='utf-8')
    section_list = cfg.sections()
    config_dict = {}
    for section in section_list:
        section_item = cfg.items(section)
        for item in section_item:
            config_dict[item[0]] = item[1]
    return config_dict


# 初始化minio客户端
def get_minio_client(config_dict):
    # 使用endpoint、access key和secret key来初始化minioClient对象。
    minio_client = Minio(config_dict['minio_url'],
                         access_key=config_dict['access_key'],
                         secret_key=config_dict['secret_key'],
                         secure=False)
    return minio_client


# 创建一个存储桶(判断桶是否已经存在,不存在则创建,存在忽略)
def minio_make_bucket_ifnotexist(minio_client, bucket_name):
    bucket_name = bucket_name.replace('_', "-")
    try:
        if not minio_client.bucket_exists(bucket_name):
            logging.info("该存储桶不存在:" + bucket_name)
            minio_client.make_bucket(bucket_name)
            logging.info("存储桶创建:" + bucket_name)
    except S3Error as e:
        if "InvalidAccessKeyId" in str(e):
            logging.error("minio 的 access_key 可能有误")
        elif "SignatureDoesNotMatch" in str(e):
            logging.error("minio 的 secret_key 可能有误")
        else:
            logging.error("minio 的 endpoint、access_key、secret_key 可能有误")
        raise e


# 删除存储桶
def remove_bucket(minio_client, bucket_name):
    try:
        minio_client.remove_bucket(bucket_name)
        logging.info("删除存储桶成功:" + bucket_name)
    except S3Error as e:
        logging.error(e)


# 文件上传
def minio_upload_file(minio_client, bucket_name, object_name, file_path):
    logging.info(file_path)
    result = minio_client.fput_object(bucket_name, object_name, file_path)
    return result


# 级联遍历目录,获取目录下的所有文件路径
def find_filepaths(dir):
    result = []
    for root, dirs, files in os.walk(dir):
        for name in files:
            filepath = os.path.join(root, name)
            if os.path.exists(filepath):
                result.append(filepath)
    return result


# 获取object_name(上传到minio的路径)
def get_object_name(file_path):
    file_dir, file_full_name = os.path.split(file_path)
    return file_full_name


if __name__ == '__main__':
    # 读取配置文件
    config_dict = read_config()
    # 初始化minio客户端
    minio_client = get_minio_client(config_dict)
    # 创建一个存储桶(判断桶是否已经存在,不存在则创建,存在忽略)
    minio_make_bucket_ifnotexist(minio_client, 'test')
    # 删除存储桶
    remove_bucket(minio_client, 'test')
    # 文件上传
    minio_make_bucket_ifnotexist(minio_client, 'test')
    img_list = find_filepaths('./img')
    for img_path in img_list:
        object_name = get_object_name(img_path)
        minio_upload_file(minio_client, 'test', object_name, img_path)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

# 2.7 使用Kafka实现生产消费过程

[1] kafka-python库简介

项目简介:Apache Kafka 分布式流处理系统的 Python 客户端。kafka-python 的设计功能与官方 java 客户端非常相似,带有一些 pythonic 接口。

项目地址:https://github.com/dpkp/kafka-python (opens new window)

官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/modules.html (opens new window)

$ pip install kafka-python
1

[2] 生产者与消费者示例

生产者示例程序:

# -*- coding: utf-8 -*-

import json
from kafka import KafkaProducer


class KProducer:
    def __init__(self, bootstrap_servers, topic):
        """
        kafka 生产者
        :param bootstrap_servers: 地址
        :param topic:  topic
        """
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda m: json.dumps(m).encode('ascii'), )  # json 格式化发送的内容
        self.topic = topic

    def sync_producer(self, data_li: list):
        """
        同步发送 数据
        :param data_li:  发送数据
        :return:
        """
        for data in data_li:
            future = self.producer.send(self.topic, data)
            record_metadata = future.get(timeout=10)  # 同步确认消费
            partition = record_metadata.partition  # 数据所在的分区
            offset = record_metadata.offset  # 数据所在分区的位置
            print('save success, partition: {}, offset: {}'.format(partition, offset))

    def asyn_producer(self, data_li: list):
        """
        异步发送数据
        :param data_li:发送数据
        :return:
        """
        for data in data_li:
            self.producer.send(self.topic, data)
        self.producer.flush()  # 批量提交

    def asyn_producer_callback(self, data_li: list):
        """
        异步发送数据 + 发送状态处理
        :param data_li:发送数据
        :return:
        """
        for data in data_li:
            self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)
        self.producer.flush()  # 批量提交

    def send_success(self, *args, **kwargs):
        """异步发送成功回调函数"""
        print('save success')
        return

    def send_error(self, *args, **kwargs):
        """异步发送错误回调函数"""
        print('save error')
        return

    def close_producer(self):
        try:
            self.producer.close()
        except:
            pass


if __name__ == '__main__':
    send_data_li = [{"test": 1}, {"test": 2}]
    kp = KProducer(topic='topic', bootstrap_servers='127.0.0.1:9092')

    # 同步发送
    # kp.sync_producer(send_data_li)

    # 异步发送
    # kp.asyn_producer(send_data_li)

    # 异步+回调
    kp.asyn_producer_callback(send_data_li)

    kp.close_producer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

消费者示例程序:

# -*- coding: utf-8 -*-

from kafka import KafkaConsumer


if __name__ == '__main__':

    # 创建一个消费者,指定了topic,bootstrap_servers,这种方式只会获取新产生的数据

    bootstrap_server_list = [
        '127.0.0.1:9092'
    ]

    consumer = KafkaConsumer(
        # kafka 集群地址
        bootstrap_servers=','.join(bootstrap_server_list),
        enable_auto_commit=True,  # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
        auto_commit_interval_ms=5000,  # 自动提交的周期(毫秒)
    )

    consumer.subscribe(["topic"])  # 消息的主题,可以指定多个

    for msg in consumer:  # 迭代器,等待下一条消息
        print(msg)  # 打印消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

[3] 请求SASL账号密码验证的kafka

# -*- coding: utf-8 -*-

import time
import json
from datetime import datetime
from kafka import KafkaProducer


def producer_event(server_info):
    producer = KafkaProducer(bootstrap_servers=server_info,
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism='PLAIN',
                             sasl_plain_username='admin',
                             sasl_plain_password='your_password')
    topic = "test_kafka_topic"
    print("kafka连接成功")
    for i in range(7200):
        data = {
            "name": "hello world"
        }
        data_json = json.dumps(data)
        producer.send(topic, data_json.encode()).get(timeout=30)
        print("数据推送成功,当前时间为:{},数据为:{}".format(datetime.now(), data_json))
        time.sleep(1)
    producer.close()


server = "IP:9092"
producer_event(server)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 3. 参考资料

[1] python 将爬取的数据存入mysql from 代码先锋网 (opens new window)

[2] python操作mysql数据库(增,删,改,查)from CSDN (opens new window)

[3] Python SQLALchemy框架 from 摘星小筑 (opens new window)

[4] python 使用elasticsearch 实现翻页的三种方式 from 脚本之家 (opens new window)

[5] python3提取mysql数据并转化成字典数组 from 博客园 (opens new window)

[6] Python对MySQL的DateTime处理 from CSDN (opens new window)

[7] 使用python批量插入数据到mysql的三种方法 from CSDN (opens new window)

[8] 工作笔记——python如何向redis中存储hash from CSDN (opens new window)

[9] python操作redis,一次插入字典多个值 from CSDN (opens new window)

[10] redis 与Python交互取出来的是bytes类型 from CSDN (opens new window)

[11] redis 实战系列二:用python操作redis集群 from CSDN (opens new window)

[12] Python操作Kafka的通俗总结(kafka-python)from 知乎 (opens new window)

[13] Python 操作 Kafka --- kafka-python from CSDN (opens new window)

[14] Python连接Redis数据库插入数据出现错误:DataError: Invalid input of type: 'NoneType' from CSDN (opens new window)

[15] python-SQLite3的简单使用(创建数据库、数据表、增删改查)from CSDN (opens new window)

[16] Python+sqlite3 封装通用增删改查 from pythontechworld (opens new window)

[17] docker镜像alpine中安装oracle客户端 from 简书 (opens new window)

[18] Python连接MINIO Api, 实现上传下载等功能 from 一只技术小白 (opens new window)

[19] InvalideEndpointException .net core 3 from Github (opens new window)

[20] MinIO 基于 python 把目录下的文件上传到 minio from 夏来风 (opens new window)

Last Updated: 11/25/2023, 4:43:29 PM