# 1. 使用SQLALchemy ORM框架
# 1.1 SQLALchemy简介
SQLALchemy是Python的一款优秀的ORM框架,它可以作用于任何第三方Web框架。它简化了应用程序开发人员在原生SQL上的操作,使开发人员将主要精力都放在程序逻辑上,从而提高开发效率。它提供了一整套著名的企业级持久性模式,设计用于高效和高性能的数据库访问。SQLALchemy相较于DjangoORM来说更加的贴近原生SQL语句,因此学习难度较低。
- 项目地址:https://github.com/sqlalchemy/sqlalchemy (opens new window)
- 官方文档:https://www.sqlalchemy.org/docs/ (opens new window)
其主要组件如下:
组成部分 | 描述 |
---|---|
Engine | 框架引擎 |
Connection Pooling | 数据库链接池 |
Dialect | 数据库DB API种类 |
Schema/Types | 架构&类型 |
SQL Expression Language | SQL表达式语言 |
SQLALchemy必须依赖其他操纵数据的模块,Dialect用于和数据API进行交互,从而实现对数据库的操作,如:
[1] pymysql:mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
[2] cx_Oracle:oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
2
更多详见:https://docs.sqlalchemy.org/en/20/dialects/index.html (opens new window)
此处以pymysql为例,因此安装依赖如下:
$ pip install sqlalchemy
$ pip install pymysql
2
# 1.2 表操作及链接库
SQLALchemy不允许修改表结构,如果修改表结构则需要删除旧表,再创建新表:
models.py
# -*- coding: utf-8 -*-
import datetime
from sqlalchemy import Column, Integer, String, DateTime, UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
# 基础类
Base = declarative_base()
# 创建引擎
engine = create_engine(
# "mysql+pymysql://[email protected]:3306/db?charset=utf8", # 无密码时
"mysql+pymysql://root:[email protected]:3306/db?charset=utf8", # 有密码时
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
age = Column(Integer, nullable=False)
phone = Column(String(11))
addr = Column(String(64), nullable=True)
create_time = Column(DateTime, default=datetime.datetime.now)
def __str__(self):
return "id:%s name:%s age:%s phone:%s addr:%s create_time:%s" % (self.id, self.name, self.age, self.phone, self.addr, self.create_time)
def create_tb():
"""
创建表
:return:
"""
Base.metadata.create_all(engine)
def drop_tb():
"""
删除表
:return:
"""
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_tb()
create_tb()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
表创建好之后,开始链接库。
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
2
3
4
5
6
7
8
9
10
# 1.3 单表增删改查
# 1.3.1 单表DML操作
[1] 新增记录
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 新增记录
user_obj = Users(name="user001", phone="15125352333", age=23, addr="China")
session.add(user_obj)
# 提交
session.commit()
# 关闭链接(可使用session.remove())
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[2] 修改记录
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 修改名字
session.query(Users).filter_by(id=1).update({"name": "USER001"})
# 修改年龄,使用+号,默认为"fetch",代表只允许int类型使用+号
session.query(Users).filter_by(id=1).update({"age": Users.age + 1}, synchronize_session="fetch")
# 修改地址,使用+号,由于是字符类型,所以synchronize_session=False
session.query(Users).filter_by(id=1).update({"addr": Users.addr + "BeiJing"}, synchronize_session=False)
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[3] 删除记录
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 删除记录
session.query(Users).filter_by(id=1).delete()
# 提交
session.commit()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[4] 批量增加
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 批量增加
session.add_all([
Users(name="user001", age=21, phone="13269867233", addr="ShangHai"),
Users(name="user002", age=18, phone="13269867234", addr="GuangZhou"),
Users(name="user003", age=24, phone="13269867235", addr="ChongQing"),
Users(name="user003", age=29, phone="13269867236", addr="ChongQing"),
Users(name="user003", age=25, phone="13269867237", addr="ChongQing"),
])
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 1.3.2 单表DQL操作
使用批量增加的数据进行查询操作。
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 打印结果
def print_result(result):
if isinstance(result, list):
for item in result:
print(item)
else:
print(result)
# 基本查询
# -- 查所有 --
result_00 = session.query(Users).all()
print("-----result_00:")
print_result(result_00)
# -- 过滤 --
result_01 = session.query(Users).filter(Users.name == "user001").all() # Python表达式的形式过滤
print("-----result_01:")
print_result(result_01)
result_02 = session.query(Users).filter_by(name="user002").all() # ORM形式过滤
print("-----result_02:")
print_result(result_02)
result_03 = session.query(Users).filter_by(name="user003").first() # ORM形式过滤 取第一个
print("-----result_03:")
print_result(result_03)
# 只拿某字段
result_04 = session.query(Users.name, Users.age).first()
print("-----result_04:")
print_result(result_04)
# and(用逗号或者用and_)
result_05 = session.query(Users).filter(Users.id > 1, Users.age < 23).all()
print("-----result_05:")
print_result(result_05)
from sqlalchemy import and_
result_06 = session.query(Users).filter(and_(Users.id > 1, Users.age < 23)).all()
print("-----result_06:")
print_result(result_06)
# or
from sqlalchemy import or_
result_07 = session.query(Users).filter(or_(Users.id > 3, Users.age < 23)).all()
print("-----result_07:")
print_result(result_07)
# and与or的组合使用
result_08 = session.query(Users).filter(or_(
Users.id > 1,
and_(Users.id > 2, Users.age < 24)
)).all()
print("-----result_08:")
print_result(result_08)
# 范围
result_09 = session.query(Users).filter(Users.age.between(18, 24)).all()
print("-----result_09:")
print_result(result_09)
# 包含
result_10 = session.query(Users).filter(Users.age.in_([18, 21, 24])).all()
print("-----result_10:")
print_result(result_10)
# 取反 ~
result_11 = session.query(Users).filter(~Users.age.in_([18, 21, 24])).all()
print("-----result_11:")
print_result(result_11)
# 通配符
result_12 = session.query(Users).filter(Users.name.like("us%")).all()
print("-----result_12:")
print_result(result_12)
# 分页
result_13 = session.query(Users).all()[0:1]
print("-----result_13:")
print_result(result_13)
# 排序
result_14 = session.query(Users).order_by(Users.id.desc()).all() # 倒序
print("-----result_14:")
print_result(result_14)
result_15 = session.query(Users).order_by(Users.id.asc()).all() # 正序
print("-----result_15:")
print_result(result_15)
# 分组
result_16 = session.query(Users).group_by(Users.id).all()
print("-----result_16:")
print_result(result_16)
# 聚合函数
from sqlalchemy.sql import func
result_17 = session.query(
func.max(Users.age),
func.sum(Users.age),
func.min(Users.age),
).group_by(Users.name).having(func.max(Users.age > 12)).all()
print("-----result_17:")
print_result(result_17)
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# 1.4 多表关联操作
# 1.4.1 一对多关系
首先是建立一对多的关系,使用relationship
做逻辑一对多,不会在物理表中创建关系,但是可以通过该字段进行增删改查:
# -*- coding: utf-8 -*-
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
# 基础类
Base = declarative_base()
# 创建引擎
engine = create_engine(
# "mysql+pymysql://[email protected]:3306/db?charset=utf8", # 无密码时
"mysql+pymysql://root:[email protected]:3306/db?charset=utf8", # 有密码时
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
class Classes(Base):
__tablename__ = "classes"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
class Students(Base):
__tablename__ = "students"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 真实约束字段:避免脏数据写入,在物理表中会创建真实字段关系
# 可选级联操作:CASCADE,DELETE、RESTRICT
fk_class = Column(Integer, ForeignKey("classes.id",ondelete="CASCADE",onupdate="CASCADE"))
# 逻辑关系字段:不会在真实物理表中创建字段,但是可以通过该逻辑字段进行增删改查
# backref:反向查询的名字
re_class = relationship("Classes", backref="students")
def create_tb():
"""
创建表
:return:
"""
Base.metadata.create_all(engine)
def drop_tb():
"""
删除表
:return:
"""
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_tb()
create_tb()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
通过逻辑字段进行增加:
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
session.add_all(
[
Students(name="学生01", re_class=Classes(name="一年级一班")), # 自动填入fk_class
Students(name="学生02", re_class=Classes(name="一年级二班")),
]
)
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 1.4.2 多对多关系
多对多也使用relationship
做逻辑多对多,不会在物理表中创建关系,但是可以通过该字段进行增删改查。
使用relationship
时,传入指定手动生成的第三张表,代表这是多对多关系。
# -*- coding: utf-8 -*-
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
# 基础类
Base = declarative_base()
# 创建引擎
engine = create_engine(
# "mysql+pymysql://[email protected]:3306/db?charset=utf8", # 无密码时
"mysql+pymysql://root:[email protected]:3306/db?charset=utf8", # 有密码时
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
class Classes(Base):
__tablename__ = "classes"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
class Students(Base):
__tablename__ = "students"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 可选级联操作:CASCADE,DELETE、RESTRICT
fk_class = Column(Integer, ForeignKey("classes.id", ondelete="CASCADE", onupdate="CASCADE"))
# 逻辑关系字段:不会在真实物理表中创建字段,但是可以通过该逻辑字段进行增删改查
# backref:反向查询的名字
re_class = relationship("Classes", backref="students")
class Teachers(Base):
__tablename__ = "teachers"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 逻辑字段M2M:指定第三张表,secondary参数为__tablename__,反向查询为teachers
re_class = relationship("Classes", secondary="teachers_m2m_classes", backref="teachers")
class TeachersM2mClasses(Base):
__tablename__ = "teachers_m2m_classes"
id = Column(Integer, primary_key=True)
teacher_id = Column(Integer, ForeignKey("teachers.id"))
class_id = Column(Integer, ForeignKey("classes.id"))
__table_args__ = (
UniqueConstraint("teacher_id", "class_id"),
)
def create_tb():
"""
创建表
:return:
"""
Base.metadata.create_all(engine)
def drop_tb():
"""
删除表
:return:
"""
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_tb()
create_tb()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
使用逻辑字段添加或自己操纵第三张表:
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
session.add_all(
[
Teachers(name="老师01", re_class=[
session.query(Classes).filter_by(id=1).first()
]),
Teachers(name="老师02", re_class=[
session.query(Classes).filter_by(id=1).first()
]),
Teachers(name="老师03", re_class=[
session.query(Classes).filter_by(id=2).first()
]),
]
)
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 1.4.3 组合查询
组合查询将多张表用笛卡尔积的效果显现出来:
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 必须用filter,获取全部也是,不可以使用all因为他会返回一个list,list不具备union_all
# 使用filter返回的对象是:<class 'sqlalchemy.orm.query.Query'>
# 并且query中必须单拿某一个字段,如果不指定字段就直接返回对象
s = session.query(Students.name).filter()
t = session.query(Teachers.name).filter()
c = session.query(Classes.name).filter()
ret = s.union_all(t).union_all(c).all() # 用列表显示
print(ret)
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 1.4.4 连表查询
使用join
进行连表查询:
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 手动指定条件查询
result = session.query(Students.name, Classes.name).filter(Students.id == Classes.id).all()
for i in result:
print(i)
print("-----")
# 连接查询,同上,内部自动指定 Students.fk_class == Classes.id 的条件
result = session.query(Students.name, Classes.name).join(Classes).all()
# 相当于:result = session.query(Students.name,Classes.name).join(Classes, Students.fk_class == Classes.id).all()
for i in result:
print(i)
print("-----")
# 左链接查询,即使有同学没有班级也拿出来
result = session.query(Students.name, Classes.name).join(Classes, isouter=True).all()
for i in result:
print(i)
print("-----")
# 如果想查看有哪些班级没有同学,就换一个位置
result = session.query(Students.name, Classes.name).join(Students, isouter=True).all()
for i in result:
print(i)
print("-----")
# 三表查询,需要自己指定条件
result = session.query(Teachers.name, Classes.name, TeachersM2mClasses.id) \
.join(Teachers, TeachersM2mClasses.teacher_id == Teachers.id) \
.join(Classes, TeachersM2mClasses.class_id == Classes.id) \
.filter() # 查看原生语句
print(result)
for i in result:
print(i)
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 1.4.5 正反查询
上面是使用join
进行的连表查询,也可以使用逻辑字段relationship
查询。
# -*- coding: utf-8 -*-
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 正向:查看第一个老师都在哪些班级(通过逻辑字段的名字)
result = session.query(Teachers).first()
# result.re_class是一个列表,存了有关该老师所在的班级 <class 'sqlalchemy.orm.collections.InstrumentedList'>
for class_obj in result.re_class: # 查看其所有的班级
print(class_obj.name)
# 反向:查看第一个班级下都有哪些老师,都有哪些学生(通过逻辑字段中的backref参数进行反向查询)
result = session.query(Classes).first()
# 看老师
for teacher_obj in result.teachers:
print(teacher_obj.name)
# 看学生
for student_obj in result.students:
print(student_obj.name)
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 1.5 原生SQL语句
# 1.5.1 查看原生SQL语句
如果一条查询语句是以filter
结尾,则返回结果对象的__str__
方法中都是SQL
语句:
result = session.query(Teachers).filter()
print(result)
# SELECT teachers.id AS teachers_id, teachers.name AS teachers_name
# FROM teachers
2
3
4
5
如果是all
结尾,返回的就是一个列表,first
结尾也是一个列表:
result = session.query(Teachers).all()
print(result)
# [<models.Teachers object at 0x00000178EB0B5550>, <models.Teachers object at 0x00000178EB0B5518>, <models.Teachers object at 0x00000178EB0B5048>]
2
3
4
# 1.5.2 执行原生SQL语句
# -*- coding: utf-8 -*-
from sqlalchemy import text
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
cursor = session.execute(text("select * from students where id <= (:num)"), params={"num": 2})
print(cursor.fetchall())
# 提交
session.commit()
# 关闭链接
session.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 2. 数据库及中间件的集成与使用
# 2.1 使用Redis缓存数据
[1] 普通方式存取数据
Step1:引入redis库
$ pip install redis
Step2:使用Redis
往redis存值
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0)
r = redis.Redis(connection_pool=pool)
r.set('id', '666666')
2
3
4
5
从redis取值
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0)
r = redis.Redis(connection_pool=pool)
get_value = r.get('id')
2
3
4
5
注意事项:如果存入字典,最新版的会出现如下报错
redis.exceptions.DataError: Invalid input of type: 'dict'. Convert to a bytes, string, int or float first.
可以降级版本解决该问题
$ pip install redis==2.10.6
[2] hash方式存取数据
单个方式的存取值:hset设置单个值、hgetall获取整个hash、hget根据key获取单独的value
# -*- coding: utf-8 -*-
import redis
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, password="123456", db=1, decode_responses=True)
r = redis.Redis(connection_pool=pool)
test_list = [1, 2, 3, "123", [1, 2, 3, "123"]]
dict_list = {"test_list": [1, 2, 3, "123", [1, 2, 3, "123"]], "test_list2": [1, 2, 3]}
for dl in dict_list:
r.hset("dl_hash", dl, dict_list[dl])
# 设置过期时间,单位秒
r.expire("dl_hash", 6000)
# 获取整个hash
print(r.hgetall("dl_hash"))
# 根据key获取单独的value
print(r.hget("dl_hash", "test_list2"))
>>> 结果输出
{b'test_list': b"[1, 2, 3, '123', [1, 2, 3, '123']]", b'test_list2': b'[1, 2, 3]'}
b'[1, 2, 3]'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
注:设置 decode_responses=True 是为了解决Redis与Python交互取出来的是bytes类型的问题。
批量方式的存取值:hmset 批量设置多个值,hmget 批量获取多个值
r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
r.hmget('xx', 'k1', 'k2')
2
注意:redis库不能操作 redis 集群,实际使用中会遇到如下错误,可以使用 redis-py-cluster 库去实现 redis 集群的操作。
redis.exceptions.ResponseError: MOVED 12285 192.168.222.66:6384
[3] 遍历所有key及其内容
遍历所有key
# -*- encoding: UTF-8 -*-
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0, decode_responses=True)
r = redis.StrictRedis(connection_pool=pool)
keys = r.keys()
print(keys)
2
3
4
5
6
7
8
9
批量遍历所有key的内容
# -*- coding: utf-8 -*-
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline()
pipe_size = 100000
len = 0
key_list = []
result_list = []
keys = r.keys()
for key in keys:
result_item = {}
key_list.append(key)
pipe.get(key)
if len < pipe_size:
len += 1
else:
for (k, v) in zip(key_list, pipe.execute()):
result_item[k] = v
result_list.append(result_item)
len = 0
key_list = []
for (k, v) in zip(key_list, pipe.execute()):
result_item = {}
result_item[k] = v
result_list.append(result_item)
print(result_list)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 2.2 写入及查询MySQL
引入pymysql库
$ pip install pymysql
将数据保存到MySQL示例:
config.ini
[MYSQL]
host = 127.0.0.1
user = root
password = 123456
port = 3306
db = testdb
table = test_table
2
3
4
5
6
7
log.py
# -*- coding: utf-8 -*-
import logging
logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./save_mysql.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
save_mysql.py
# -*- coding: utf-8 -*-
from configparser import ConfigParser
import pymysql
# 读取配置文件
def read_config(config_path):
cfg = ConfigParser()
cfg.read(config_path, encoding='utf-8')
section_list = cfg.sections()
config_dict = {}
for section in section_list:
section_item = cfg.items(section)
for item in section_item:
config_dict[item[0]] = item[1]
return config_dict
# 保存到mysql数据库
def save_data(mysql_dict, data_list):
if data_list == []:
return None
mysql = pymysql.connect(host=str(mysql_dict['host']), user=str(mysql_dict['user']),
password=str(mysql_dict['password']), port=int(mysql_dict['port']),
db=str(mysql_dict['db']), charset='utf8')
for i in data_list:
cursor = mysql.cursor()
qmarks = ', '.join(['%s'] * len(i))
columns = ', '.join(i.keys())
try:
qry = "insert into " + str(mysql_dict['table']) + " (%s) Values (%s);" % (columns, qmarks)
values_list = []
for j in i.values():
values_list.append(j)
cursor.execute(qry, values_list)
mysql.commit()
except Exception as e:
logger.error(e)
cursor.close()
mysql.close()
if __name__ == '__main__':
config_path = './config.ini'
mysql_dict = read_config(config_path)
data_list = [{'USERNAME': 'zhangsan', 'MESSAGE': 'test1'},{'USERNAME': 'lisi', 'MESSAGE': 'test2'}]
save_data(mysql_dict, data_list)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
注意事项:
1)插入时报错 Incorrect string value: ‘\xF0\x9F\x98\xAD“,...‘ for column ‘commentContent‘ at row 1
原因:数据库编码问题导致的,原因在于插入数据中存在emoji表情,而这些表情是按照四个字节一个单位进行编码的,而我们通常使用的utf-8编码在mysql数据库中默认是按照3个字节一个单位进行编码的,导致将数据存入mysql的时候出现错误。
解决:修改数据库与数据表编码,然后再改一下连接数据库的字符集编码。
1.修改mysql数据库的编码为uft8mb4
2.修改数据表的编码为utf8mb4
3.将代码连接mysql处改为charset='utf8mb4'
2
3
2)mysql连接处报错AttributeError: 'NoneType' object has no attribute 'encoding'
应该是charset='utf8' (而不是charset='utf-8')
3)如果是mysql查询语句,参照如下示例编写即可。
try:
sql = "select * from table"
cursor = mysql.cursor()
cursor.execute(sql)
result = cursor.fetchall()
print(result)
except Exception as e:
logger.error(e)
2
3
4
5
6
7
8
4)将longblob类型字段的数据写入到文件
if not os.path.exists('./img'):
os.makedirs('./img')
uuid = uuid1()
img_path = './img/{}.jpg'.format(uuid)
f = open(img_path, 'wb')
f.write(result['image_file'])
2
3
4
5
6
5)datetime类型字段的处理
MySQL的datetime类型字段直接查询出来是datetime.datetime()形式,再插入时就会出现问题。这里提供2种解决方案:一是sql层面转换,二是重写json函数进行转换。
# -*- coding: utf-8 -*-
import datetime
import json
class DateEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return json.JSONEncoder.default(self, obj)
dic = {'name': 'jack', 'create_time': datetime.datetime(2019, 3, 19, 10, 6, 6)}
print(json.dumps(dic, cls=DateEncoder))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
6)将mysql的查询结果以字典的形式返回
desc = cursor.description # 获取字段的描述
result = [dict(zip([col[0] for col in desc], row)) for row in cursor.fetchall()] # 以字典的形式返回数据
2
7)mysql批量插入
构建好sql,使用 cursor.executemany 方法批量插入数据库中
# 读取txt文件并批量写入MySQL(不存在则插入,存在则更新)
def read_txt_batch_import_mysql(mysql_connect, txt_path, table_name):
with open(txt_path, 'r', encoding='utf-8') as file:
txt_str = file.read()
# 将字符串形式的列表数据转成列表数据
txt_list = eval(txt_str)
# 构建查询列名
if len(txt_list) == 0:
return None
else:
columns = ', '.join(txt_list[0].keys())
qmarks = ', '.join(['%s'] * len(txt_list[0].keys()))
# 构建查询数据
values_list = []
for i in txt_list:
i = json.loads(json.dumps(i, cls=DateEncoder))
values_item_list = []
for j in i.values():
values_item_list.append(j)
values_list.append(values_item_list)
# 批量插入数据
cursor = mysql_connect.cursor()
try:
# 使用replace代替insert,实现"存在则更新,不存在则插入"的需求
qry = "replace into " + table_name + " (%s) values (%s);" % (columns, qmarks)
cursor.executemany(qry, values_list)
mysql_connect.commit()
except Exception as e:
logger.error(e)
cursor.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 2.3 查询Oracle的数据
引入cx_Oracle库
$ pip install cx_Oracle
安装Oracle Instant Client
- 去 Oracle官网 (opens new window)下载对应版本的客户端,并解压。
从Oracle里查询数据示例
import cx_Oracle
# 指定Oracle Instant Client的目录
cx_Oracle.init_oracle_client(lib_dir="D:\\Development\\instantclient-basic-windows.x64-11.2.0.4.0")
# 执行查询sql
conn = cx_Oracle.connect("testuser", "123456", "127.0.0.1:1521/orcl")
curs = conn.cursor()
sql = "select a.id, a.title, a.text from test_table"
rr = curs.execute(sql)
# 将查询结果保存成字典
result_dir = {}
while(1):
rs = rr.fetchone()
if rs == None:
break
id = rs[0]
title = rs[1]
text = rs[2]
curs.close()
conn.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
注意事项:
1、cx_Oracle.init_oracle_client()要写在Flask接口的外面,否则第二次接口请求时会报cx_Oracle已经初始化的错误。
2、Linux端部署的时候,会出现找不到libclntsh.so动态连接库的问题,报错如下:
cx_Oracle.DatabaseError: DPI-1047: 64-bit Oracle Client library cannot be loaded: "Error loading shared library libclntsh.so: No such file or directory". See https://oracle.github.io/odpi/doc/installation.html#linux for help
报错原因:instantclient-basic-linux.x64-11.2.0.4.0.zip
包里根本没有libclntsh.so
,有的是libclntsh.so.11.1
,而单纯的给这个文件改个名是不行的。
./instantclient_11_2:
|---BASIC_README
|---adrci
|---genezi
|---libclntsh.so.11.1
|---libnnz11.so
|---libocci.so.11.1
|---libociei.so
|---libocijdbc11.so
|---ojdbc5.jar
|---ojdbc6.jar
|---uidrvci
|---xstreeams.jar
2
3
4
5
6
7
8
9
10
11
12
13
解决办法:需要在Dockerfile里设置软链接解决(注意要用绝对路径)
ENV LD_LIBRARY_PATH=/home/instantclient_11_2
RUN ln -s /home/instantclient_11_2/libclntsh.so.11.1 /home/instantclient_11_2/libclntsh.so
2
# 2.4 操作SQLite的封装
Step1:创建SQLite数据库及测试数据表
方式一:打开终端,输入以下两条命令创建SQLite数据库文件(先随便创建一个测试表即可,没有表的话不会保存出test.db文件),用Navicat打开连接即可,这时测试表可以删了,然后创建自己的业务数据表。
$ sqlite3 test.db
$ create table test(name text);
2
方式二:使用Python直接创建
这里创建了一个stu表,下面用这个表来测试封装。
# -*- coding: utf-8 -*-
import sqlite3
from sqlite3 import OperationalError
conn = sqlite3.connect('./test.db')
cur = conn.cursor()
try:
sql = """CREATE TABLE stu (
id integer primary key autoincrement,
name varchar(15) not null,
age integer
);"""
cur.execute(sql)
print("create table success")
except OperationalError as o:
print(str(o))
except Exception as e:
print(e)
finally:
cur.close()
conn.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Step2:使用封装好的方法操作SQLite数据表
# -*- coding: utf-8 -*-
import sqlite3
class DBTool(object):
def __init__(self):
"""
初始化函数,创建数据库连接
"""
self.conn = sqlite3.connect('./test.db')
self.c = self.conn.cursor()
def executeUpdate(self, sql, ob):
"""
数据库的插入、修改函数
:param sql: 传入的SQL语句
:param ob: 传入数据
:return: 返回操作数据库状态
"""
try:
self.c.executemany(sql, ob)
i = self.conn.total_changes
except Exception as e:
print('错误类型: ', e)
return False
finally:
self.conn.commit()
if i > 0:
return True
else:
return False
def executeDelete(self, sql, ob):
"""
操作数据库数据删除的函数
:param sql: 传入的SQL语句
:param ob: 传入数据
:return: 返回操作数据库状态
"""
try:
self.c.execute(sql, ob)
i = self.conn.total_changes
except Exception as e:
return False
finally:
self.conn.commit()
if i > 0:
return True
else:
return False
def executeQuery(self, sql, ob):
"""
数据库数据查询
:param sql: 传入的SQL语句
:param ob: 传入数据
:return: 返回操作数据库状态
"""
test = self.c.execute(sql, ob)
return test
def close(self):
"""
关闭数据库相关连接的函数
:return:
"""
self.c.close()
self.conn.close()
if __name__ == '__main__':
db = DBTool()
print("插入Student信息")
name = input('输入姓名:')
age = input('输入年龄:')
ob = [(name, age)]
sql = 'insert into stu (name, age) values (?,?)'
T = db.executeUpdate(sql, ob)
if T:
print('插入成功!')
else:
print('插入失败!')
print("通过ID修改Student姓名信息")
sql2 = 'UPDATE stu set name = ? where ID=?'
id = input('输入需要修改的ID:')
name = input('输入修改的Name:')
ob = [(name, id)]
T = db.executeUpdate(sql2, ob)
if T:
print('修改成功!')
else:
print('修改失败!')
print("通过姓名查询Student信息")
sql = 'select * from stu where name=?'
name = input('输入需要查询的学员姓名:')
ob = [(name)]
s = db.executeQuery(sql, ob)
st = []
for st in s:
print('ID:', st[0], ' Name:', st[1], ' Age:', st[2])
if any(st):
pass
else:
print("输入有误,该学员不存在")
print("通过ID删除Student信息")
num = input('输入需要删除的学员ID:')
sql2 = "DELETE from stu where ID=?"
ob = [(num)]
T = db.executeDelete(sql2, ob)
if T:
print('删除成功!')
else:
print('删除失败!')
# 关闭数据库连接
db.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 2.5 ElasticSearch的操作
# 2.5.1 ElasticSearch导入导出
Step1:安装依赖并编写配置文件
$ pip install elasticsearch==7.16.2 // 注意要和服务端的ES版本尽量相近(实测7.16.2、8.4.1是没问题的)
config.ini(把ES连接信息换成自己的)
[SOURCE_ES]
# IP地址
host = 111.111.111.111
# 端口号
port = 9200
# 用户名
user = your_es_user
# 密码
password = your_es_password
# 连接超时时间(ms)
timeout = 60
# 滚动查询的超时时间,这里设置为10分钟
scroll = 10m
# 单次查询的条数
size = 1000
# 索引列表(多个之间用","分隔)
index_list = index_1,index_2
[TARGET_ES]
# IP地址
host = 111.111.111.111
# 端口号
port = 9200
# 用户名
user = your_es_user
# 密码
password = your_es_password
# 连接超时时间(ms)
timeout = 60
# 单次批量插入数据量
step = 1000
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
注:多个索引之间用英文逗号分隔(逗号后面有没有空格都无所谓,读取配置时会进行处理)
log.py
# -*- coding: utf-8 -*-
import logging
logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./es_data.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Step2:编写ES导入导出脚本并执行
export_es_data.py
# -*- coding: utf-8 -*-
import json
import os
import sys
import time
from decimal import Decimal
from elasticsearch import Elasticsearch
from configparser import ConfigParser
from log import logger
# 将ES查询出来list写入到json文件
def write_list_to_json(list, json_file_path):
with open(json_file_path, 'w', encoding='utf-8') as f:
json.dump(list, f, ensure_ascii=False)
# 将符合条件的ES数据查出保存为json
def es_export_json(es_connect, es_size, es_scroll, index_list, original_data_path, now_time):
for i in index_list:
logger.info("正在保存{}索引的数据".format(i))
query = {
"range": {
"update_time": {
# 小于等于本次读取开始时间
"lte": now_time
}
}
}
logger.info("全量导出,时间范围为{}之前".format(now_time))
try:
source_list = []
# 滚动查询符合条件的所有es数据
page = es_connect.search(index=i, query=query, size=es_size, scroll=es_scroll)
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
# 游标用于输出es查询出的所有结果
sid = page['_scroll_id']
# es查询出的结果总量
scroll_size = page['hits']['total']['value']
while (scroll_size > 0):
page = es_connect.scroll(scroll_id=sid, scroll=es_scroll)
sid = page['_scroll_id']
scroll_size = len(page['hits']['hits'])
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
json_file_path = original_data_path + "/" + str(i) + ".json"
if len(source_list) != 0:
write_list_to_json(source_list, json_file_path)
logger.info('{}索引的数据已保存至{}路径,导出的数据总量为{}'.format(str(i), json_file_path, str(len(source_list))))
else:
logger.info('{}索引无更新'.format(str(i)))
except Exception as e:
logger.error("ES索引数据导出至JSON文件的过程出错:{}".format(e))
# 将符合条件的ES数据查出保存为json--调用入口
def export_es_data_main(source_export_dict, original_data_path, now_time):
es_connect = Elasticsearch(
hosts=[str(source_export_dict['es_host']) + ":" + str(source_export_dict['es_port'])],
http_auth=(str(source_export_dict['es_user']), str(source_export_dict['es_password'])),
request_timeout=int(source_export_dict['es_timeout'])
)
index_list = ''.join(source_export_dict['es_index_list'].split()).split(",")
es_size = int(source_export_dict['es_size'])
es_scroll = str(source_export_dict['es_scroll'])
es_export_json(es_connect, es_size, es_scroll, index_list, original_data_path, now_time)
# 读取配置文件
def read_config(config_path):
cfg = ConfigParser()
cfg.read(config_path, encoding='utf-8')
section_list = cfg.sections()
config_dict = {}
for section in section_list:
section_item = cfg.items(section)
for item in section_item:
config_dict[item[0]] = item[1]
return config_dict
if __name__ == '__main__':
# 获取基础路径并读取配置信息
base_path = os.getcwd()
logger.info("基础路径:{}".format(base_path))
config_path = base_path + '/config.ini'
logger.info("配置文件路径:{}".format(config_path))
source_export_dict = {}
try:
source_export_dict = read_config(config_path)
except:
logger.error("读取配置文件出错,程序已终止执行")
sys.exit()
# 导出ES数据并统计耗时
start_time = time.time()
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
logger.info("----------开始导出源数据----------")
export_es_data_main(source_export_dict, base_path, start_time_str)
end_time = time.time()
time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
logger.info("----------导出源数据已完成,共耗时:{}----------".format(time_consuming_str))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import_es_data.py
# -*- coding: utf-8 -*-
import glob
import os
import sys
import time
from decimal import Decimal
from elasticsearch import Elasticsearch, helpers
from configparser import ConfigParser
from log import logger
# 读取json文件并批量写入ES(不存在则插入,存在则更新)
def read_json_batch_import_es(es_connect, json_path, index_name, es_timeout, es_step):
with open(json_path, 'r', encoding='utf-8') as file:
json_str = file.read()
# json_str中可能会存在null字符串表示空值,但是python里面没有null这个关键字,需要将null定义为变量名,赋值python里面的None
null = None # 不要删掉这行看似无用的代码,否则导入时遇到空值会报错 name 'null' is not defined
# 将字符串形式的列表数据转成列表数据
json_list = eval(json_str)
# 按照配置文件中的步长进行写入,缓解批量写入的压力
length = len(json_list)
for i in range(0, length, es_step):
# 要写入的数据长度大于步长,那么就分批写入
if i + es_step < length:
actions = []
for j in range(i, i + es_step):
# 先把导入时添加的"_id"的值取出来
new_id = json_list[j]['_id']
del json_list[j]["_id"] # 删除导入时添加的"_id"
action = {
"_index": str(index_name),
"_id": str(new_id),
"_source": json_list[j]
}
actions.append(action)
helpers.bulk(es_connect, actions, request_timeout=es_timeout)
# 要写入的数据小于步长,那么就一次性写入
else:
actions = []
for j in range(i, length):
# 先把导入时添加的"_id"的值取出来
new_id = json_list[j]['_id']
del json_list[j]["_id"] # 删除导入时添加的"_id"
action = {
"_index": str(index_name),
"_id": str(new_id),
"_source": json_list[j]
}
actions.append(action)
helpers.bulk(es_connect, actions, request_timeout=es_timeout)
logger.info("{}索引插入了{}条数据".format(str(index_name), str(length)))
# 将json数据文件导入到ES--调用入口
def import_es_data_main(target_import_dict, original_data_path):
es_timeout = int(target_import_dict['es_timeout'])
es_step = int(target_import_dict['es_step'])
es_connect = Elasticsearch(
hosts=[str(target_import_dict['es_host']) + ":" + str(target_import_dict['es_port'])],
http_auth=(str(target_import_dict['es_user']), str(target_import_dict['es_password'])),
request_timeout=es_timeout
)
json_path_list = glob.glob(original_data_path + '/*.json')
for json_path in json_path_list:
file_dir, file_full_name = os.path.split(json_path)
index_name, file_ext = os.path.splitext(file_full_name)
read_json_batch_import_es(es_connect, json_path, index_name, es_timeout, es_step)
os.remove(json_path) # 数据导入完成后删除json数据文件
# 读取配置文件
def read_config(config_path):
cfg = ConfigParser()
cfg.read(config_path, encoding='utf-8')
section_list = cfg.sections()
config_dict = {}
for section in section_list:
section_item = cfg.items(section)
for item in section_item:
config_dict[item[0]] = item[1]
return config_dict
if __name__ == '__main__':
# 获取基础路径并读取配置信息
base_path = os.getcwd()
logger.info("基础路径:{}".format(base_path))
config_path = base_path + '/config.ini'
logger.info("配置文件路径:{}".format(config_path))
target_import_dict = {}
try:
target_import_dict = read_config(config_path)
except:
logger.error("读取配置文件出错,程序已终止执行")
sys.exit()
# 导出ES数据并统计耗时
start_time = time.time()
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
logger.info("----------开始导入源数据----------")
import_es_data_main(target_import_dict, base_path)
end_time = time.time()
time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
logger.info("----------导入源数据已完成,共耗时:{}----------".format(time_consuming_str))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
注意:json_str中可能会存在null字符串表示空值,但是python里面没有null这个关键字,需要将null定义为变量名,赋值python里面的None。
null = None # 不要删掉这行看似无用的代码,否则导入时遇到空值会报错 name 'null' is not defined
# 2.5.2 ElasticSearch滚动查询
ES默认限制10000条查询限制,想要查询出符合条件的所有数据可以使用滚动查询。
# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch
es_connect = Elasticsearch(
hosts="ip:port",
http_auth=("your_user", "your_password"),
request_timeout=60
)
source_list = []
# Elasticsearch 需要保持搜索的上下文环境多久,游标查询过期时间为10分钟(10m)
query = {
"match_all": {}
}
page = es_connect.search(index="your_index", query=query, size=1000, scroll='10m')
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
# 游标用于输出es查询出的所有结果
sid = page['_scroll_id']
# es查询出的结果总量
scroll_size = page['hits']['total']['value']
while (scroll_size > 0):
page = es_connect.scroll(scroll_id=sid, scroll='10m')
sid = page['_scroll_id']
scroll_size = len(page['hits']['hits'])
for hit in page['hits']['hits']:
source_data = hit['_source']
source_data['_id'] = hit['_id']
source_list.append(source_data)
print(len(source_list))
print(source_list)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 2.6 MinIO的文件上传
Step1:安装依赖并编写配置文件
$ pip install minio
config.ini
[minio]
minio_url = xxx.xxx.xxx.xxx:9000
access_key = minioadmin
secret_key = minioadmin
2
3
4
注:minio_url不要带上http://
的前缀,否则会报如下错误
ValueError: path in endpoint is not allowed. Exception ignored in: <function Minio.__del__ at 0x0C0B9A98>
Step2:minio上传文件的代码示例
# -*- coding: utf-8 -*-
import logging
import os
from minio import Minio
from minio.error import S3Error
from configparser import ConfigParser
logging.basicConfig(filename='logging_minio.log', level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 读取配置文件
def read_config(config_path):
cfg = ConfigParser()
cfg.read(config_path, encoding='utf-8')
section_list = cfg.sections()
config_dict = {}
for section in section_list:
section_item = cfg.items(section)
for item in section_item:
config_dict[item[0]] = item[1]
return config_dict
# 初始化minio客户端
def get_minio_client(config_dict):
# 使用endpoint、access key和secret key来初始化minioClient对象。
minio_client = Minio(config_dict['minio_url'],
access_key=config_dict['access_key'],
secret_key=config_dict['secret_key'],
secure=False)
return minio_client
# 创建一个存储桶(判断桶是否已经存在,不存在则创建,存在忽略)
def minio_make_bucket_ifnotexist(minio_client, bucket_name):
bucket_name = bucket_name.replace('_', "-")
try:
if not minio_client.bucket_exists(bucket_name):
logging.info("该存储桶不存在:" + bucket_name)
minio_client.make_bucket(bucket_name)
logging.info("存储桶创建:" + bucket_name)
except S3Error as e:
if "InvalidAccessKeyId" in str(e):
logging.error("minio 的 access_key 可能有误")
elif "SignatureDoesNotMatch" in str(e):
logging.error("minio 的 secret_key 可能有误")
else:
logging.error("minio 的 endpoint、access_key、secret_key 可能有误")
raise e
# 删除存储桶
def remove_bucket(minio_client, bucket_name):
try:
minio_client.remove_bucket(bucket_name)
logging.info("删除存储桶成功:" + bucket_name)
except S3Error as e:
logging.error(e)
# 文件上传
def minio_upload_file(minio_client, bucket_name, object_name, file_path):
logging.info(file_path)
result = minio_client.fput_object(bucket_name, object_name, file_path)
return result
# 级联遍历目录,获取目录下的所有文件路径
def find_filepaths(dir):
result = []
for root, dirs, files in os.walk(dir):
for name in files:
filepath = os.path.join(root, name)
if os.path.exists(filepath):
result.append(filepath)
return result
# 获取object_name(上传到minio的路径)
def get_object_name(file_path):
file_dir, file_full_name = os.path.split(file_path)
return file_full_name
if __name__ == '__main__':
# 读取配置文件
config_dict = read_config()
# 初始化minio客户端
minio_client = get_minio_client(config_dict)
# 创建一个存储桶(判断桶是否已经存在,不存在则创建,存在忽略)
minio_make_bucket_ifnotexist(minio_client, 'test')
# 删除存储桶
remove_bucket(minio_client, 'test')
# 文件上传
minio_make_bucket_ifnotexist(minio_client, 'test')
img_list = find_filepaths('./img')
for img_path in img_list:
object_name = get_object_name(img_path)
minio_upload_file(minio_client, 'test', object_name, img_path)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# 2.7 使用Kafka实现生产消费过程
[1] kafka-python库简介
项目简介:Apache Kafka 分布式流处理系统的 Python 客户端。kafka-python 的设计功能与官方 java 客户端非常相似,带有一些 pythonic 接口。
项目地址:https://github.com/dpkp/kafka-python (opens new window)
官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/modules.html (opens new window)
$ pip install kafka-python
[2] 生产者与消费者示例
生产者示例程序:
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
class KProducer:
def __init__(self, bootstrap_servers, topic):
"""
kafka 生产者
:param bootstrap_servers: 地址
:param topic: topic
"""
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda m: json.dumps(m).encode('ascii'), ) # json 格式化发送的内容
self.topic = topic
def sync_producer(self, data_li: list):
"""
同步发送 数据
:param data_li: 发送数据
:return:
"""
for data in data_li:
future = self.producer.send(self.topic, data)
record_metadata = future.get(timeout=10) # 同步确认消费
partition = record_metadata.partition # 数据所在的分区
offset = record_metadata.offset # 数据所在分区的位置
print('save success, partition: {}, offset: {}'.format(partition, offset))
def asyn_producer(self, data_li: list):
"""
异步发送数据
:param data_li:发送数据
:return:
"""
for data in data_li:
self.producer.send(self.topic, data)
self.producer.flush() # 批量提交
def asyn_producer_callback(self, data_li: list):
"""
异步发送数据 + 发送状态处理
:param data_li:发送数据
:return:
"""
for data in data_li:
self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error)
self.producer.flush() # 批量提交
def send_success(self, *args, **kwargs):
"""异步发送成功回调函数"""
print('save success')
return
def send_error(self, *args, **kwargs):
"""异步发送错误回调函数"""
print('save error')
return
def close_producer(self):
try:
self.producer.close()
except:
pass
if __name__ == '__main__':
send_data_li = [{"test": 1}, {"test": 2}]
kp = KProducer(topic='topic', bootstrap_servers='127.0.0.1:9092')
# 同步发送
# kp.sync_producer(send_data_li)
# 异步发送
# kp.asyn_producer(send_data_li)
# 异步+回调
kp.asyn_producer_callback(send_data_li)
kp.close_producer()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
消费者示例程序:
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
if __name__ == '__main__':
# 创建一个消费者,指定了topic,bootstrap_servers,这种方式只会获取新产生的数据
bootstrap_server_list = [
'127.0.0.1:9092'
]
consumer = KafkaConsumer(
# kafka 集群地址
bootstrap_servers=','.join(bootstrap_server_list),
enable_auto_commit=True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms=5000, # 自动提交的周期(毫秒)
)
consumer.subscribe(["topic"]) # 消息的主题,可以指定多个
for msg in consumer: # 迭代器,等待下一条消息
print(msg) # 打印消息
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[3] 请求SASL账号密码验证的kafka
# -*- coding: utf-8 -*-
import time
import json
from datetime import datetime
from kafka import KafkaProducer
def producer_event(server_info):
producer = KafkaProducer(bootstrap_servers=server_info,
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username='admin',
sasl_plain_password='your_password')
topic = "test_kafka_topic"
print("kafka连接成功")
for i in range(7200):
data = {
"name": "hello world"
}
data_json = json.dumps(data)
producer.send(topic, data_json.encode()).get(timeout=30)
print("数据推送成功,当前时间为:{},数据为:{}".format(datetime.now(), data_json))
time.sleep(1)
producer.close()
server = "IP:9092"
producer_event(server)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 3. 参考资料
[1] python 将爬取的数据存入mysql from 代码先锋网 (opens new window)
[2] python操作mysql数据库(增,删,改,查)from CSDN (opens new window)
[3] Python SQLALchemy框架 from 摘星小筑 (opens new window)
[4] python 使用elasticsearch 实现翻页的三种方式 from 脚本之家 (opens new window)
[5] python3提取mysql数据并转化成字典数组 from 博客园 (opens new window)
[6] Python对MySQL的DateTime处理 from CSDN (opens new window)
[7] 使用python批量插入数据到mysql的三种方法 from CSDN (opens new window)
[8] 工作笔记——python如何向redis中存储hash from CSDN (opens new window)
[9] python操作redis,一次插入字典多个值 from CSDN (opens new window)
[10] redis 与Python交互取出来的是bytes类型 from CSDN (opens new window)
[11] redis 实战系列二:用python操作redis集群 from CSDN (opens new window)
[12] Python操作Kafka的通俗总结(kafka-python)from 知乎 (opens new window)
[13] Python 操作 Kafka --- kafka-python from CSDN (opens new window)
[14] Python连接Redis数据库插入数据出现错误:DataError: Invalid input of type: 'NoneType' from CSDN (opens new window)
[15] python-SQLite3的简单使用(创建数据库、数据表、增删改查)from CSDN (opens new window)
[16] Python+sqlite3 封装通用增删改查 from pythontechworld (opens new window)
[17] docker镜像alpine中安装oracle客户端 from 简书 (opens new window)
[18] Python连接MINIO Api, 实现上传下载等功能 from 一只技术小白 (opens new window)
[19] InvalideEndpointException .net core 3 from Github (opens new window)
[20] MinIO 基于 python 把目录下的文件上传到 minio from 夏来风 (opens new window)