# 1. 前言
# 1.1 Redis简介
Redis (opens new window) 是完全开源免费的,遵守 BSD 协议,是一个灵活的高性能 key-value 数据结构存储,可以用来作为数据库、缓存和消息队列。
Redis 比其他 key-value 缓存产品有以下三个特点:
- Redis 支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载到内存使用。
- Redis 不仅支持简单的 key-value 类型的数据,同时还提供 list,set,zset,hash 等数据结构的存储。
- Redis 支持主从复制,即 master-slave 模式的数据备份。
Redis项目地址:https://github.com/redis/redis (opens new window)
Redis官方文档:http://redis.io/documentation (opens new window)
# 1.2 Redis的数据类型
Redis 支持 5 种数据类型:string(字符串),hash(哈希),list(列表),set(集合),zset(有序集合)。
- string 是 redis 最基本的数据类型,一个 key 对应一个 value。string 是二进制安全的,也就是说 redis 的 string 可以包含任何数据,比如 jpg 图片或者序列化的对象,string 类型的值最大能存储 512 MB。
- hash 是一个键值对(key - value)集合。hash 特别适合用于存储对象,并且可以像数据库一样只对某一项属性值进行存储、读取、修改等操作。
- list 是简单的字符串列表,按照插入顺序排序。我们可以往列表的左边或者右边添加元素。list 就是一个简单的字符串集合,和 Java 中的 list 相差不大,区别就是这里的 list 存放的是字符串。list 内的元素是可重复的,可以做消息队列或最新消息排行等功能。
- set 是字符串类型的无序集合。集合是通过哈希表实现的,因此添加、删除、查找的复杂度都是 O(1)。redis 的 set 是一个 key 对应着多个字符串类型的 value,也是一个字符串类型的集合,和 redis 的 list 不同的是 set 中的字符串集合元素不能重复,但是 list 可以,也就是具有唯一性。
- zset 和 set 一样都是字符串类型元素的集合,并且集合内的元素不能重复。不同的是 zset 每个元素都会关联一个 double 类型的分数。redis 通过分数来为集合中的成员进行从小到大的排序。zset 的元素是唯一的,但是分数(score)却可以重复,可用作排行榜等场景。
# 1.3 Redis单线程及并发安全
# 1.3.1 Redis单线程还很快的原因
Redis采用了单线程的模型,保证了每个操作的原子性,也减少了线程的上下文切换和竞争。 另外,数据结构也帮了不少忙,Redis全程使用hash结构,读取速度快,还有一些特殊的数据结构,对数据存储进行了优化,如压缩表,对短数据进行压缩存储,再如,跳表,使用有序的数据结构加快读取的速度。
# 1.3.2 Redis单线程能够支持高并发的原因
Redis是基于内存的,内存的读写速度非常快;
Redis是单线程的,省去了很多上下文切换线程的时间;
Redis使用多路复用技术,可以处理并发的连接。
# 1.3.3 Redis的并发线程安全
Redis 是并发安全的,它采用单线程模型,通过事件驱动机制来处理并发请求。在 Redis 中,所有的请求都会被放入一个队列中,然后由单个线程来处理这些请求,因此 Redis 可以保证所有的操作都是原子性的,不会出现并发冲突的问题。
此外,Redis 还提供了多种并发控制机制,例如乐观锁和悲观锁等,可以保证在并发情况下数据的一致性和安全性。例如,在使用 Redis 时,可以使用 SETNX 命令来实现分布式锁,保证同一时间只有一个客户端可以对某个数据进行修改。
需要注意的是,虽然 Redis 是并发安全的,但是在实际使用中,还是需要考虑并发情况下的数据一致性和安全性问题,例如使用乐观锁或悲观锁来控制并发访问,或者使用 Redis 事务来保证多个操作的原子性。
# 1.4 Redis持久化机制及内存淘汰策略
# 1.4.1 Redis持久化机制
Redis的持久化的机制有AOF和RDB两种,默认是RDB,将redis.conf中的appendonly配置改为yes即可改成AOF,若为no则为RDB。
[1] AOF(增量):基于数据日志操作实现的持久化,AOF的三种同步方式如下:
- appendfsync always 每次有数据修改发生时都会写入AOF文件,能够数据不丢失,但是效率非常低。
- appendfsync everysec 每秒钟同步一次,该策略为AOF的缺省(默认)策略(缺点:1秒内数据可能丢失)。
- appendfsync no 从不同步,高效但是数据不会被持久化。
建议最好使用everysec既能够保证数据的同步,效率还可以。
[2] RDB(默认,全量):采用定时持久化机制,但是服务器因为某种原因宕机可能会数据丢失。
# 1.4.2 Redis内存淘汰策略
在Redis4.0以下的策略配置中可以看到以下六种淘汰,Redis默认配置的淘汰策略是 noeviction。
- noeviction: 如果内存使用达到了maxmemory,client还要继续写入数据,那么就直接报错给客户端。
- allkeys-lru: 就是我们常说的LRU算法,移除掉最近最少使用的那些keys对应的数据。
- volatile-lru: 也是采取LRU算法,但是仅仅针对那些设置了指定存活时间(TTL)的key才会清理掉。
- allkeys-random: 随机选择一些key来删除掉。
- volatile-random: 随机选择一些设置了TTL的key来删除掉,但仅限于在过期集合的键。
- volatile-ttl:回收在过期集合的键,并且优先回收存活时间(TTL)较短的键,使得新添加的数据有空间存放。
Redis4.0以后新增LFU两种淘汰策略:
- volatile-lfu:在设置了过期时间的key中使用LFU算法淘汰key。
- allkeys-lfu:在所有的key中使用LFU算法淘汰数据。
# 1.5 Redis适用场景
Redis有以下常见的适用场景:
- 缓存:缓存现在几乎是所有中大型网站都在用的必杀技,合理的利用缓存不仅能够提升网站访问速度,还能大大降低数据库的压力。Redis提供了键过期功能,也提供了灵活的键淘汰策略,所以,现在Redis用在缓存的场合非常多。
- 排行榜:很多网站都有排行榜应用的,如京东的月度销量榜单、商品按时间的上新排行榜等。Redis提供的有序集合数据类构能实现各种复杂的排行榜应用。
- 计数器:为了保证数据实时效,每次浏览都得给+1,并发量高时如果每次都请求数据库操作无疑是种挑战和压力。Redis提供的incr命令来实现计数器功能,内存操作,性能非常好,非常适用于这些计数场景。
- 分布式会话:集群模式下,在应用不多的情况下一般使用容器自带的session复制功能就能满足,当应用增多相对复杂的系统中,一般都会搭建以Redis等内存数据库为中心的session服务,session不再由容器管理,而是由session服务及内存数据库管理。
- 分布式锁:在很多互联网公司中都使用了分布式技术,分布式技术带来的技术挑战是对同一个资源的并发访问,如全局ID、减库存、秒杀等场景,并发量不大的场景可以使用数据库的悲观锁、乐观锁来实现,但在并发量高的场合中,利用数据库锁来控制资源的并发访问是不太理想的,大大影响了数据库的性能。可以利用Redis的setnx功能来编写分布式的锁,如果设置返回1说明获取锁成功,否则获取锁失败,实际应用中要考虑的细节要更多。
- 社交网络:点赞、踩、关注/被关注、共同好友等是社交网站的基本功能,社交网站的访问量通常来说比较大,而且传统的关系数据库类型不适合存储这种类型的数据,Redis提供的哈希、集合等数据结构能很方便的的实现这些功能。
- 最新列表:Redis列表结构,LPUSH可以在列表头部插入一个内容ID作为关键字,LTRIM可用来限制列表的数量,这样列表永远为N个ID,无需查询最新的列表,直接根据ID去到对应的内容页即可。
- 消息系统:消息队列是大型网站必用的中间件,如ActiveMQ、RabbitMQ、Kafka等流行的消息队列中间件,主要用于业务解耦、流量削峰及异步处理实时性低的业务。Redis提供了发布/订阅及阻塞队列功能,能实现一个简单的消息队列系统,但是这个不能和专业的消息中间件相比。
# 2. 数据不一致问题及缓存更新方案
# 2.1 数据不一致问题
- 除了少部分配置信息类缓存,大部分缓存应用一般是作为前端请求和持久化存储的中间层,承担前端的海量请求。缓存层和数据库存储层是独立的系统,我们在数据更新的时候,最理想的情况当然是缓存和数据库同时更新成功。但是由于缓存和数据库是分开的,无法做到原子性的同时进行数据修改,可能出现缓存更新失败,或者数据库更新失败的情况,这时候会出现数据不一致,影响前端业务。
- 以电商中的商品服务为例,针对 C 端用户的大部分请求都是通过缓存来承载的,假设某次更新操作将商品详情 A 的价格从 1000 元更新为 1200 元,数据库更新成功,但是缓存更新失败。这时候就会出现 C 端用户在查看商品详情时,看到的还是 1000 元,实际下单时可能是别的价格,最终会影响用户的购买决策,影响平台的购物体验。可以看到,在使用缓存时,如果不能很好地控制缓存和数据库的一致性,可能会出现非常多的业务问题。
# 2.2 更新缓存的方式
缓存更新方案是通过对更新缓存和更新数据库这两个操作的设计,来实现数据的最终一致性,避免出现业务问题。前端请求的读操作先从缓存中查询数据,如果没有命中数据,则查询数据库,从数据库查询成功后,返回结果,同时更新缓存,方便下次操作。在数据不发生变更的情况下,这种方式没有问题,如果数据发生了更新操作,就必须要考虑如何操作缓存,保证一致性。
[1] 先更新数据库,再更新缓存
- 先来看第一种方式,在写操作中,先更新数据库,更新成功后,再更新缓存。这种方式最容易想到,但是问题也很明显,数据库更新成功以后,更新缓存可能会失败,就会出现上面例子中的问题,数据库是新的,但缓存中数据是旧的,出现不一致的情况。
[2] 先删缓存,再更新数据库
- 这种方案是在数据更新时,首先删除缓存,再更新数据库,这样可以在一定程度上避免数据不一致的情况。现在考虑一个并发场景,假如某次的更新操作,更新了商品详情 A 的价格,线程 A 进行更新时删除了缓存数据,但还没更新数据库,线程 B 此时发起一次查询,发现缓存为空,于是查询数据库并更新缓存,然后线程 A 更新数据库为新的价格。在这种并发操作下,缓存的数据仍然是旧的,出现业务不一致。
[3] 先更新数据库,再删缓存(最佳)
- 这个是经典的缓存 + 数据库读写的模式,有些资料称它为 Cache Aside 方案。具体操作是这样的:读的时候,先读缓存,缓存没有的话,那么就读数据库,然后取出数据后放入缓存,同时返回响应,更新的时候,先更新数据库,数据库更新成功之后再删除缓存。
注意:为什么删除而不是更新缓存?
- 系统设计中有一个思想叫 Lazy Loading,适用于那些加载代价大的操作,删除缓存而不是更新缓存,就是懒加载思想的一个应用。
- 在 Cache Aside 方案中,也存在删除缓存失败的可能,因为缓存删除操作比较轻量级,可以通过多次重试等来解决。
# 3. 搭建Redis服务
以下我将采用Docker的方式进行搭建,VPS系统用的是Debian 11 x86_64。VPS的购买及配置、Docker的概念及使用...这些基本的就不再赘述了,如果不会的话见我的其他博客:VPS基本部署环境的搭建与配置 (opens new window)、Docker容器化及项目环境管理 (opens new window)。
# 3.1 准备Docker环境
$ apt-get update -y && apt-get install curl -y # 安装curl
$ curl https://get.docker.com | sh - # 安装docker
$ sudo systemctl start docker # 启动docker服务
$ docker version # 查看docker版本(客户端要与服务端一致)
2
3
4
# 3.2 搭建Redis服务
方案一:不使用配置文件启动
$ docker pull redis:3.2.8
$ docker run --name redis -p 6379:6379 -d redis:3.2.8 --requirepass "mypassword" --appendonly yes
$ docker update redis --restart=always
2
3
注:--requirepass用来设置密码,--appendonly yes用来设置AOF持久化。
方案二:使用redis.conf配置文件启动
redis容器里没有redis.conf文件,可以从 https://redis.io/docs/management/config/ (opens new window) 地址下载对应版本的配置文件,挂载进去。
$ docker pull redis:3.2.8
$ cd /root/redis
$ wget https://raw.githubusercontent.com/redis/redis/3.2/redis.conf
$ chmod 777 redis.conf
$ vim /root/redis/redis.conf
修改以下配置项
# bind 127.0.0.1 # 这行要注释掉,解除本地连接限制
protected-mode no # 默认yes,如果设置为yes,则只允许在本机的回环连接,其他机器无法连接。
daemonize no # 默认no 为不守护进程模式,docker部署不需要改为yes,docker run -d本身就是后台启动,不然会冲突
requirepass mypassword # 设置密码
appendonly yes # 持久化
$ docker run --name redis \
-p 6379:6379 \
-v /root/redis/redis.conf:/etc/redis/redis.conf \
-v /root/redis/data:/data \
-d redis:3.2.8 redis-server
$ docker update redis --restart=always
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 3.3 Redis数据库的可视化连接
建议使用 AnotherRedisDesktopManager (opens new window) 开源工具进行可视化连接和管理。
# 4. 在Springboot整合Redis的封装
# 4.1 依赖及配置文件
pom.xml的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
2
3
4
5
6
7
8
application.properties配置文件:
## redis config
spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0
spring.redis.password=redisPassword
spring.redis.lettuce.pool.max-active=-1
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.max-idle=200
spring.redis.lettuce.pool.min-idle=20
2
3
4
5
6
7
8
9
# 4.2 整合Redis的代码封装
代码封装的目录结构如下:
├── annotation
│ ├── RedisEntity.java
│ └── RedisKey.java
├── config
│ └── RedisConfig.java
└── service
├── RedisService.java
└── SampleRedisService.java
2
3
4
5
6
7
8
RedisEntity.java
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisEntity {
//用于生成key的key模板,例:TBoxData_%s_%s,每个占位符(%s)用带有@RedisKey注解的字段的值填充
String keyTemplate() default "";
long expire() default 0;//过期时间,单位:秒
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RedisKey.java
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisKey {
int order() default 9999;//key序号,用于区分字段值在key中的位置,越小越靠前
}
2
3
4
5
6
7
8
9
10
RedisConfig.java
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// 配置连接工厂
redisTemplate.setConnectionFactory(connectionFactory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
//指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
//hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
//value序列化方式采用jackson
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
//hash的value序列化方式采用jackson
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
RedisService.java
import com.yoyo.admin.redis_common.annotation.RedisEntity;
import com.yoyo.admin.redis_common.annotation.RedisKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Service
public class RedisService {
private RedisTemplate<String, Object> redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
//获取实体类型Key模板
public String getEntityTemplate(Class<?> typeClass) {
RedisEntity redisEntity = typeClass.getAnnotation(RedisEntity.class);
if (redisEntity == null) {
return null;
}
if (!redisEntity.keyTemplate().isEmpty()) {
return redisEntity.keyTemplate();
} else {
return typeClass.getName();
}
}
//从实体定义中获取key
private <T> String getEntityKey(T t) throws IllegalAccessException {
Class<?> typeClass = t.getClass();
RedisEntity redisEntity = typeClass.getAnnotation(RedisEntity.class);
if (redisEntity == null) {
return null;
}
if (!redisEntity.keyTemplate().isEmpty()) {
String template = redisEntity.keyTemplate();
LinkedList<Integer> orders = new LinkedList<>();
LinkedList<String> keyParameters = new LinkedList<>();
Field[] fields = typeClass.getDeclaredFields();
if (fields.length > 0) {
for (Field field : fields) {
field.setAccessible(true);
RedisKey redisKey = field.getAnnotation(RedisKey.class);
if (redisKey != null) {
int order = redisKey.order();
boolean inserted = false;
for (int i = 0; i <= orders.size() - 1; i++) {
if (order < orders.get(i)) {
orders.add(i, order);
keyParameters.add(i, field.get(t) != null ? field.get(t).toString() : "");
inserted = true;
break;
}
}
if (!inserted) {
orders.addLast(order);
keyParameters.addLast(field.get(t) != null ? field.get(t).toString() : "");
}
}
}
}
if (keyParameters.size() > 0) {
return String.format(template, keyParameters.toArray());
} else {
return template;
}
} else {
return typeClass.getName();
}
}
//从实体定义中获取过期时间
private <T> Long getEntityExpire(T t) {
Class<?> typeClass = t.getClass();
RedisEntity redisEntity = typeClass.getAnnotation(RedisEntity.class);
if (redisEntity == null || redisEntity.expire() <= 0) {
return null;
}
return redisEntity.expire();
}
//向redis写入值,使用实体定义中的key
public String set(Object t) throws IllegalAccessException, RuntimeException {
String key = getEntityKey(t);
if (key == null) {
throw new RuntimeException("Redis Set : Type has no @RedisEntity annotation");
}
Long expire = getEntityExpire(t);
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
operations.set(key, t);
if (expire != null) {
expire(key, expire);
}
return key;
}
//向redis写入值,使用实体定义中的key
public String set(String key,Object t) throws IllegalAccessException, RuntimeException {
if (key == null) {
throw new RuntimeException("Redis Set : Type has no @RedisEntity annotation");
}
Long expire = getEntityExpire(t);
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
operations.set(key, t);
if (expire != null) {
expire(key, expire);
}
return key;
}
//从redis中按照实体定义使用指定参数生成key以得到value
public <T> T get(Class<T> tClass, Object... keyParams) throws RuntimeException {
RedisEntity redisEntity = tClass.getAnnotation(RedisEntity.class);
if (redisEntity == null) {
throw new RuntimeException("Redis Get : Type has no @RedisEntity annotation");
}
String template = redisEntity.keyTemplate();
if (keyParams != null && keyParams.length > 0) {
String key = String.format(template, keyParams);
return getForType(tClass, key);
} else {
return getForType(tClass, template);
}
}
//从redis中按照实体定义使用指定参数生成key以得到value
public <T> T get(Class<T> tClass, String key) throws RuntimeException {
return getForType(tClass, key);
}
//从redis中使用指定key以得到特定类型的value
public <T> T getForType(Class<T> type, String key) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
Object object = operations.get(key);
if (object != null && type.equals(object.getClass())) {
return type.cast(object);
}
return null;
}
//从redis中使用指定key集合以得到特定类型的value集合
public <T> List<T> listForType(Class<T> type, List<String> keys) {
List<T> list = new ArrayList<>();
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
List<Object> objects = operations.multiGet(keys);
if (objects == null || objects.size() == 0) {
return list;
}
for (int i = 0; i <= objects.size() - 1; i++) {
Object object = objects.get(i);
if (object != null && type.equals(object.getClass())) {
list.add(type.cast(object));
}
}
return list;
}
//使用实体设置的模板获取实体所有的key
public <T> Set<String> keys(Class<T> tClass, Object... keyParams) throws RuntimeException {
RedisEntity redisEntity = tClass.getAnnotation(RedisEntity.class);
if (redisEntity == null) {
throw new RuntimeException("Redis Keys(Class<T>,Object...) : Type has no @RedisEntity annotation");
}
String template = redisEntity.keyTemplate();
String pattern = String.format(template, keyParams);
return keys(pattern);
}
//使用实体设置的模板获取实体所有的key
public <T> Set<String> keys(Class<T> tClass) throws RuntimeException {
RedisEntity redisEntity = tClass.getAnnotation(RedisEntity.class);
if (redisEntity == null) {
throw new RuntimeException("Redis Keys(Class<T>) : Type has no @RedisEntity annotation");
}
String template = redisEntity.keyTemplate();
String pattern = template.substring(0, template.indexOf("%"));
return keys(pattern + "*");
}
//获取所有匹配的key
public Set<String> keys(String pattern) {
return redisTemplate.keys(pattern);
}
//使用实体设备的模板删除实体
public <T> void delete(Class<T> tClass, Object... keyParams) throws RuntimeException {
RedisEntity redisEntity = tClass.getAnnotation(RedisEntity.class);
if (redisEntity == null) {
throw new RuntimeException("Redis Delete : Type has no @RedisEntity annotation");
}
String template = redisEntity.keyTemplate();
String key = String.format(template, keyParams);
delete(key);
}
//删除key
public void delete(String key) {
redisTemplate.delete(key);
}
//批量删除key
public void delete(Collection<String> keys) {
redisTemplate.delete(keys);
}
//为指定Key设置过期时间,单位秒
public void expire(String key, long expire) {
redisTemplate.expire(key, expire, TimeUnit.SECONDS);
}
//判断key是否存在
public <T> Boolean hasKey(Class<T> tClass, Object... keyParams) {
RedisEntity redisEntity = tClass.getAnnotation(RedisEntity.class);
if (redisEntity == null) {
return false;
}
String template = redisEntity.keyTemplate();
return hasKey(String.format(template, keyParams));
}
//判断key是否存在
public Boolean hasKey(String key) {
return redisTemplate.hasKey(key);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
SampleRedisService.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
@Service
public class SampleRedisService {
private RedisTemplate<String, Object> redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void setString(String key, String str) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
operations.set(key, str);
}
public String getString(String key) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
Object object = operations.get(key);
if (object instanceof String) {
return (String) object;
}
return null;
}
public void setMap(String key, Map<?, ?> map) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
operations.set(key, map);
}
public Map<?, ?> getMap(String key) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
Object object = operations.get(key);
if (object instanceof Map) {
return (Map<?, ?>) object;
}
return null;
}
public void setList(String key, List<?> list) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
operations.set(key, list);
}
public List<?> getList(String key) {
ValueOperations<String, Object> operations = redisTemplate.opsForValue();
Object object = operations.get(key);
if (object instanceof List) {
return (List<?>) object;
}
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 5. Python操作Redis缓存数据
# 5.1 普通方式存取数据
Step1:引入redis库
$ pip install redis
Step2:使用Redis
往redis存值
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0)
r = redis.Redis(connection_pool=pool)
r.set('id', '666666')
2
3
4
5
从redis取值
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0)
r = redis.Redis(connection_pool=pool)
get_value = r.get('id')
2
3
4
5
注意事项:如果存入字典,最新版的会出现如下报错
redis.exceptions.DataError: Invalid input of type: 'dict'. Convert to a bytes, string, int or float first.
可以降级版本解决该问题
$ pip install redis==2.10.6
# 5.2 hash方式存取数据
单个方式的存取值:hset设置单个值、hgetall获取整个hash、hget根据key获取单独的value
# -*- coding: utf-8 -*-
import redis
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, password="123456", db=1, decode_responses=True)
r = redis.Redis(connection_pool=pool)
test_list = [1, 2, 3, "123", [1, 2, 3, "123"]]
dict_list = {"test_list": [1, 2, 3, "123", [1, 2, 3, "123"]], "test_list2": [1, 2, 3]}
for dl in dict_list:
r.hset("dl_hash", dl, dict_list[dl])
# 设置过期时间,单位秒
r.expire("dl_hash", 6000)
# 获取整个hash
print(r.hgetall("dl_hash"))
# 根据key获取单独的value
print(r.hget("dl_hash", "test_list2"))
>>> 结果输出
{b'test_list': b"[1, 2, 3, '123', [1, 2, 3, '123']]", b'test_list2': b'[1, 2, 3]'}
b'[1, 2, 3]'
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
注:设置 decode_responses=True 是为了解决Redis与Python交互取出来的是bytes类型的问题。
批量方式的存取值:hmset 批量设置多个值,hmget 批量获取多个值
r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
r.hmget('xx', 'k1', 'k2')
2
注意:redis库不能操作 redis 集群,实际使用中会遇到如下错误,可以使用 redis-py-cluster 库去实现 redis 集群的操作。
redis.exceptions.ResponseError: MOVED 12285 192.168.222.66:6384
# 5.3 遍历所有key及其内容
遍历所有key
# -*- encoding: UTF-8 -*-
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0, decode_responses=True)
r = redis.StrictRedis(connection_pool=pool)
keys = r.keys()
print(keys)
2
3
4
5
6
7
8
9
批量遍历所有key的内容
# -*- coding: utf-8 -*-
import redis
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, password='123456', db=0, decode_responses=True)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline()
pipe_size = 100000
len = 0
key_list = []
result_list = []
keys = r.keys()
for key in keys:
result_item = {}
key_list.append(key)
pipe.get(key)
if len < pipe_size:
len += 1
else:
for (k, v) in zip(key_list, pipe.execute()):
result_item[k] = v
result_list.append(result_item)
len = 0
key_list = []
for (k, v) in zip(key_list, pipe.execute()):
result_item = {}
result_item[k] = v
result_list.append(result_item)
print(result_list)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 6. CPU Cache原理
代码都是由 CPU 跑起来的,我们代码写的好与坏就决定了 CPU 的执行效率,特别是在编写计算密集型的程序,更要注重 CPU 的执行效率,否则将会大大影响系统性能。CPU 内部嵌入了 CPU Cache(高速缓存),它的存储容量很小,但是离 CPU 核心很近,所以缓存的读写速度是极快的,那么如果 CPU 运算时,直接从 CPU Cache 读取数据,而不是从内存的话,运算速度就会很快。
# 6.1 CPU Cache 有多快
根据摩尔定律,CPU 的访问速度每 18 个月就会翻倍,相当于每年增长 60% 左右,内存的速度当然也会不断增长,但是增长的速度远小于 CPU,平均每年只增长 7% 左右。于是,CPU 与内存的访问性能的差距不断拉大。到现在,一次内存访问所需时间是 200~300 多个时钟周期,这意味着 CPU 和内存的访问速度已经相差 200~300 多倍了。
为了弥补 CPU 与内存两者之间的性能差异,就在 CPU 内部引入了 CPU Cache,也称高速缓存。CPU Cache 通常分为大小不等的三级缓存,分别是 L1 Cache、L2 Cache 和 L3 Cache。由于 CPU Cache 所使用的材料是 SRAM,价格比内存使用的 DRAM 高出很多,在当今每生产 1 MB 大小的 CPU Cache 需要 7 美金的成本,而内存只需要 0.015 美金的成本,成本方面相差了 466 倍,所以 CPU Cache 不像内存那样动辄以 GB 计算,它的大小是以 KB 或 MB 来计算的。
在 Linux 系统中,我们可以使用如下命令来查看各级 CPU Cache 的大小,比如我这手上这台服务器,离 CPU 核心最近的 L1 Cache 是 32KB,其次是 L2 Cache 是 1MB,最大的 L3 Cache 则是 38.5MB。
// 查看 L1 Cache 数据缓存的大小
$ cat /sys/devices/system/cpu/cpu0/cache/index0/size
32K
// 查看 L1 Cache 指令缓存的大小
$ cat /sys/devices/system/cpu/cpu0/cache/index1/size
32K
// 查看 L2 Cache 的大小
$ cat /sys/devices/system/cpu/cpu0/cache/index2/size
1024K
// 查看 L3 Cache 的大小
$ cat /sys/devices/system/cpu/cpu0/cache/index3/size
39424K
2
3
4
5
6
7
8
9
10
11
12
13
14
15
其中,L1 Cache 通常会分为「数据缓存」和「指令缓存」,这意味着数据和指令在 L1 Cache 这一层是分开缓存的,上面的 index0
也就是数据缓存,而 index1
则是指令缓存,它两的大小通常是一样的。另外,L3 Cache 比 L1 Cache 和 L2 Cache 大很多,这是因为 L1 Cache 和 L2 Cache 都是每个 CPU 核心独有的,而 L3 Cache 是多个 CPU 核心共享的。
程序执行时,会先将内存中的数据加载到共享的 L3 Cache 中,再加载到每个核心独有的 L2 Cache,最后进入到最快的 L1 Cache,之后才会被 CPU 读取。它们之间的层级关系,如下图:
越靠近 CPU 核心的缓存其访问速度越快,CPU 访问 L1 Cache 只需要2-4个时钟周期,访问 L2 Cache 大约10-20个时钟周期,访问 L3 Cache 大约20-60个时钟周期,而访问内存速度大概在200-300个 时钟周期之间,如下图所示:
# 6.2 CPU Cache 的数据结构和读取过程
CPU Cache 是由多个 Cache Line 组成的,Cache Line 是 CPU 从内存读取数据的基本单位,而 Cache Line 是由各种标志(Tag)+ 数据块(Data Block)组成。
CPU Cache 的数据是从内存中读取过来的,它是以一小块一小块读取数据的,而不是按照单个数组元素来读取数据的,在 CPU Cache 中的,这样一小块一小块的数据,称为 Cache Line(缓存块)。
在 Linux 系统,可以用下面这种方式来查看 CPU 的 Cache Line,我的这台服务器的 L1 Cache Line 大小是 64 字节,也就意味着 L1 Cache 一次载入数据的大小是 64 字节。
$ cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
64
2
比如,有一个 int array[100] 的数组,当载入 array[0] 时,由于这个数组元素的大小只占 4 字节,不足 64 字节,CPU 就会顺序加载数组元素到 array[15],意味着 array[0]~array[15] 数组元素都会被缓存在 CPU Cache 中了,因此当下次访问这些数组元素时,会直接从 CPU Cache 读取,而不用再从内存中读取,大大提高了 CPU 读取数据的性能。
事实上,CPU 读取数据的时候,无论数据是否存放到 Cache 中,CPU 都是先访问 Cache,只有当 Cache 中找不到数据时,才会去访问内存,并把内存中的数据读入到 Cache 中,CPU 再从 CPU Cache 读取数据。
这样的访问机制,跟我们使用「内存作为硬盘的缓存」的逻辑是一样的,如果内存有缓存的数据,则直接返回,否则要访问龟速一般的硬盘。
那 CPU 怎么知道要访问的内存数据,是否在 Cache 里?如果在的话,如何找到 Cache 对应的数据呢?我们从最简单、基础的直接映射 Cache(Direct Mapped Cache) 说起,来看看整个 CPU Cache 的数据结构和访问逻辑。
前面,我们提到 CPU 访问内存数据时,是一小块一小块数据读取的,具体这一小块数据的大小,取决于 coherency_line_size
的值,一般 64 字节。在内存中,这一块的数据我们称为内存块(Block),读取的时候我们要拿到数据所在内存块的地址。
对于直接映射 Cache 采用的策略,就是把内存块的地址始终「映射」在一个 CPU Cache Line(缓存块) 的地址,至于映射关系实现方式,则是使用「取模运算」,取模运算的结果就是内存块地址对应的 CPU Cache Line(缓存块) 的地址。
举个例子,内存共被划分为 32 个内存块,CPU Cache 共有 8 个 CPU Cache Line,假设 CPU 想要访问第 15 号内存块,如果 15 号内存块中的数据已经缓存在 CPU Cache Line 中的话,则是一定映射在 7 号 CPU Cache Line 中,因为 15 % 8
的值是 7。使用取模方式映射的话,就会出现多个内存块对应同一个 CPU Cache Line,除了 15 号内存块是映射在 7 号 CPU Cache Line 中,还有 7 号、23 号、31 号内存块都是映射到 7 号 CPU Cache Line 中。
因此,为了区别不同的内存块,在对应的 CPU Cache Line 中我们还会存储一个组标记(Tag)。这个组标记会记录当前 CPU Cache Line 中存储的数据对应的内存块,我们可以用这个组标记来区分不同的内存块。
除了组标记信息外,CPU Cache Line 还有两个信息:
- 一个是,从内存加载过来的实际存放数据(Data)。
- 另一个是,有效位(Valid bit),它是用来标记对应的 CPU Cache Line 中的数据是否是有效的,如果有效位是 0,无论 CPU Cache Line 中是否有数据,CPU 都会直接访问内存,重新加载数据。
CPU 在从 CPU Cache 读取数据的时候,并不是读取 CPU Cache Line 中的整个数据块,而是读取 CPU 所需要的一个数据片段,这样的数据统称为一个字(Word)。那怎么在对应的 CPU Cache Line 中数据块中找到所需的字呢?答案是,需要一个偏移量(Offset)。
因此,一个内存的访问地址,包括组标记、CPU Cache Line 索引、偏移量这三种信息,于是 CPU 就能通过这些信息,在 CPU Cache 中找到缓存的数据。而对于 CPU Cache 里的数据结构,则是由索引 + 有效位 + 组标记 + 数据块组成。
如果内存中的数据已经在 CPU Cache 中了,那 CPU 访问一个内存地址的时候,会经历这 4 个步骤:
- Step1:根据内存地址中索引信息,计算在 CPU Cache 中的索引,也就是找出对应的 CPU Cache Line 的地址;
- Step2:找到对应 CPU Cache Line 后,判断 CPU Cache Line 中的有效位,确认 CPU Cache Line 中数据是否是有效的,如果是无效的,CPU 就会直接访问内存,并重新加载数据,如果数据有效,则往下执行;
- Step3:对比内存地址中组标记和 CPU Cache Line 中的组标记,确认 CPU Cache Line 中的数据是我们要访问的内存数据,如果不是的话,CPU 就会直接访问内存,并重新加载数据,如果是的话,则往下执行;
- Step4:根据内存地址中偏移量信息,从 CPU Cache Line 的数据块中,读取对应的字。
除了直接映射 Cache 之外,还有其他通过内存地址找到 CPU Cache 中的数据的策略,比如全相连 Cache(Fully Associative Cache)、组相连 Cache(Set Associative Cache)等,这几种策略的数据结构都比较相似。
# 6.3 如何写出让 CPU 跑得更快的代码
CPU 访问内存的速度,比访问 CPU Cache 的速度慢了 100 多倍,所以如果 CPU 所要操作的数据在 CPU Cache 中的话,这样将会带来很大的性能提升。访问的数据在 CPU Cache 中的话,意味着缓存命中,缓存命中率越高的话,代码的性能就会越好,CPU 也就跑的越快。
于是,「如何写出让 CPU 跑得更快的代码?」这个问题,可以改成「如何写出 CPU 缓存命中率高的代码?」。L1 Cache 通常分为「数据缓存」和「指令缓存」,这是因为 CPU 会分别处理数据和指令,比如 1+1=2 这个运算,+ 就是指令,会被放在「指令缓存」中,而输入数字 1 则会被放在「数据缓存」里。因此,我们要分开来看「数据缓存」和「指令缓存」的缓存命中率。
# 6.3.1 如何提升数据缓存的命中率
假设要遍历二维数组,有以下两种形式,虽然代码执行结果是一样,但经过测试,形式一 array[i][j]
执行时间比形式二 array[j][i]
快十几倍。
public class Test {
public static void main(String[] args) {
int n = 10000;
int[][] array = new int[n][n];
// 形式一
long start1 = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++) {
array[i][j]=0;
}
}
System.out.println("形式一耗时:" + (System.currentTimeMillis() - start1) + " ms");
// 形式二
long start2 = System.currentTimeMillis();
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++) {
array[j][i]=0;
}
}
System.out.println("形式二耗时:" + (System.currentTimeMillis() - start2 + " ms"));
}
}
// 输出结果
形式一耗时:66 ms
形式二耗时:1072 ms
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
之所以有这么大的差距,是因为二维数组 array 所占用的内存是连续的,比如长度n的值是2的话,那么内存中的数组元素的布局顺序是这样的:
// 内存中的数组元素的布局顺序
array[0][0], array[0][1], array[1][0], array[1][1]
2
形式一用 array[i][j]
访问数组元素的顺序,正是和内存中数组元素存放的顺序一致。当 CPU 访问 array[0][0]
时,由于该数据不在 Cache 中,于是会「顺序」把跟随其后的 3 个元素从内存中加载到 CPU Cache,这样当 CPU 访问后面的 3 个数组元素时,就能在 CPU Cache 中成功地找到数据,这意味着缓存命中率很高,缓存命中的数据不需要访问内存,这便大大提高了代码的性能。
而如果用形式二的 array[j][i]
来访问,则访问的顺序就是:
// 形式二使用 array[j][i] 的访问顺序:
array[0][0], array[1][0], array[0][1], array[1][1]
2
可以看到,访问的方式跳跃式的,而不是顺序的,那么如果 N 的数值很大,那么操作 array[j][i]
时,是没办法把 array[j+1][i]
也读入到 CPU Cache 中的,既然 array[j+1][i]
没有读取到 CPU Cache,那么就需要从内存读取该数据元素了。很明显,这种不连续性、跳跃式访问数据元素的方式,可能不能充分利用到了 CPU Cache 的特性,从而代码的性能不高。
那访问 array[0][0]
元素时,CPU 具体会一次从内存中加载多少元素到 CPU Cache 呢?这跟 CPU Cache Line 有关,它表示 CPU Cache 一次性能加载数据的大小,可以在 Linux 里通过 coherency_line_size
配置查看它的大小,通常是 64 个字节。
# 6.3.2 如何提升指令缓存的命中率
有一个元素为 0 到 1 之间随机数字组成的一维数组:
- 第一个操作,循环遍历数组,把小于 0.5 的数组元素置为 0;
- 第二个操作,将数组排序;
可以发现:排序后再遍历要比直接遍历快。
import java.util.Arrays;
public class Test {
// 操作一:遍历数组
public static void iterateArray(double[] array, int n){
for (int i = 0; i < n; i++) {
if(array[i] < 0.5){
array[i] = 0;
}
}
}
// 操作二:排序
public static double[] sortArray(double[] array){
Arrays.sort(array);
return array;
}
public static void main(String[] args) {
int n = 100000000;
double[] array = new double[n];
for (int i = 0; i < n; i++) {
array[i] = Math.random();
}
// 为了统计耗时的准确性,方式一与方式二单独运行,执行一个的时候注释另一个
// 方式一:直接遍历
long start1 = System.currentTimeMillis();
iterateArray(array, n);
System.out.println("直接遍历,耗时:" + (System.currentTimeMillis() - start1) + " ms");
// 方式二:排序后再遍历(只统计遍历时间)
double[] sortArray = sortArray(array);
long start2 = System.currentTimeMillis();
iterateArray(sortArray, n);
System.out.println("排序后再遍历,耗时:" + (System.currentTimeMillis() - start2) + " ms");
}
}
// 输出结果
直接遍历,耗时:459 ms
排序后再遍历,耗时:298 ms
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
解释这个问题,需要了解一下 CPU 的分支预测器。对于 if 条件语句,意味着此时至少可以选择跳转到两段不同的指令执行,也就是 if 还是 else 中的指令。那么,如果分支预测可以预测到接下来要执行 if 里的指令,还是 else 指令的话,就可以「提前」把这些指令放在指令缓存中,这样 CPU 可以直接从 Cache 读取到指令,于是执行速度就会很快。
当数组中的元素是随机的,分支预测就无法有效工作,而当数组元素都是是顺序的,分支预测器会动态地根据历史命中数据对未来进行预测,这样命中率就会很高。因此,先排序再遍历速度会更快,这是因为排序之后,数字是从小到大的,那么前几次循环命中 if < 0.5
的次数会比较多,于是分支预测就会缓存 if 里的 array[i] = 0
指令到 Cache 中,后续 CPU 执行该指令就只需要从 Cache 读取就好了。
# 6.3.3 如何提升多核 CPU 的缓存命中率
在单核 CPU,虽然只能执行一个线程,但是操作系统给每个线程分配了一个时间片,时间片用完了,就调度下一个线程,于是各个线程就按时间片交替地占用 CPU,从宏观上看起来各个线程同时在执行。
而现代 CPU 都是多核心的,线程可能在不同 CPU 核心来回切换执行,这对 CPU Cache 不是有利的,虽然 L3 Cache 是多核心之间共享的,但是 L1 和 L2 Cache 都是每个核心独有的,如果一个线程在不同核心来回切换,各个核心的缓存命中率就会受到影响,相反如果线程都在同一个核心上执行,那么其数据的 L1 和 L2 Cache 的缓存命中率可以得到有效提高,缓存命中率高就意味着 CPU 可以减少访问内存的频率。
当有多个同时执行「计算密集型」的线程,为了防止因为切换到不同的核心,而导致缓存命中率下降的问题,我们可以把线程绑定在某一个 CPU 核心上,这样性能可以得到非常可观的提升。
在 Linux 上提供了 sched_setaffinity
方法,来实现将线程绑定到某个 CPU 核心这一功能。
#define _GNU_SOURCE
#include <sched.h>
int sched_setaffinity(pid_t pid, size_t cpusetsize, cpu_set_t *mask);
2
3
# 6.4 CPU Cache内容总结
由于随着计算机技术的发展,CPU 与 内存的访问速度相差越来越多,如今差距已经高达好几百倍了,所以 CPU 内部嵌入了 CPU Cache 组件,作为内存与 CPU 之间的缓存层,CPU Cache 由于离 CPU 核心很近,所以访问速度也是非常快的,但由于所需材料成本比较高,它不像内存动辄几个 GB 大小,而是仅有几十 KB 到 MB 大小。
当 CPU 访问数据的时候,先是访问 CPU Cache,如果缓存命中的话,则直接返回数据,就不用每次都从内存读取数据了。因此,缓存命中率越高,代码的性能越好。但需要注意的是,当 CPU 访问数据时,如果 CPU Cache 没有缓存该数据,则会从内存读取数据,但是并不是只读一个数据,而是一次性读取一块一块的数据存放到 CPU Cache 中,之后才会被 CPU 读取。
内存地址映射到 CPU Cache 地址里的策略有很多种,其中比较简单是直接映射 Cache,它巧妙的把内存地址拆分成「索引 + 组标记 + 偏移量」的方式,使得我们可以将很大的内存地址,映射到很小的 CPU Cache 地址里。
要想写出让 CPU 跑得更快的代码,就需要写出缓存命中率高的代码,CPU L1 Cache 分为数据缓存和指令缓存,因而需要分别提高它们的缓存命中率:
- 对于数据缓存,我们在遍历数据的时候,应该按照内存布局的顺序操作,这是因为 CPU Cache 是根据 CPU Cache Line 批量操作数据的,所以顺序地操作连续内存数据时,性能能得到有效的提升;
- 对于指令缓存,有规律的条件分支语句能够让 CPU 的分支预测器发挥作用,进一步提高执行的效率;
另外,对于多核 CPU 系统,线程可能在不同 CPU 核心来回切换,这样各个核心的缓存命中率就会受到影响,于是要想提高线程的缓存命中率,可以考虑把线程绑定 CPU 到某一个 CPU 核心。
# 7. 常见的缓存调度策略
现有查询方式通常查询数据库或者查询文件,由于数据查询速度或者文件IO处理速度远低于CPU执行速度,因此查询速度很慢,一般是秒级的,很多实时场景的要求。一种改进的方案是基于缓存的查询。这种方法通过在第一次查询之后,将数据缓存在内存中,使得之后该数据的查询可以直接在内存中进行,从而有效提高了之后查询的效率,但是这种方式的首次查询无法使用缓存提高查询效率。
常用缓存调度算法有RAND(随机算法)、FIFO(先进先出算法)、LFU(最少访问算法)、LRU(最近最少使用算法)和OPT(最优替换算法)等。其中RAND 和 FIFO 只是简单调度,命中率低。LFU 和 LRU 是利用了计算的局部性(最近使用过的数据还会被使用),相比于 RAND 和 FIFO 提高了命中率,但没有考虑周期性系统的处理规律,还有进一步提升空间。而 OPT 是需要将程序运行一遍,然后获得调度依据,仅仅适合代码缓存和小规模程序,而真实世界主要是数据缓存和大规模系统。
# 7.1 双向链表
# 7.1.1 双向链表简介
双向链表是一种常见的数据结构,它和单向链表类似,也是由一系列节点组成,每个节点包含一个数据元素和两个指针,分别指向前驱节点和后继节点。不同的是,双向链表中每个节点都有一个前驱节点和一个后继节点,这样可以双向遍历链表。
双向链表允许元素的插入和删除操作更加高效,因为在单向链表中,如果要删除一个节点,需要先找到它的前驱节点,而在双向链表中,可以直接通过当前节点的前驱和后继指针来删除它。同样,插入操作也可以通过修改前驱和后继指针来完成。双向链表的缺点是占用的空间比单向链表多,因为每个节点需要多一个指针。此外,由于需要维护前驱和后继指针,双向链表的实现相对复杂一些。
# 7.1.2 双向链表封装
由于下面的FIFO、LRU、LRU算法的实现都需要用到双向链表,因此这里先定义一个双向链表公共类,把常见的操作都封装好了。
DoubleLinkedList.py
# -*- coding: utf-8 -*-
class Node:
def __init__(self, key, value):
"""
初始化方法
:param key:
:param value:
"""
self.key = key
self.value = value
self.prev = None
self.next = None
def __str__(self):
val = '{%s: %s}' % (self.key, self.value)
return val
def __repr__(self):
val = '{%s: %s}' % (self.key, self.value)
return val
class DoubleLinkedList:
def __init__(self, capacity=0xffffffff):
"""
双向链表
:param capacity: 链表容量 初始化为int的最大值2^32-1
:return:
"""
self.capacity = capacity
self.size = 0
self.head = None
self.tail = None
def __add_head(self, node):
"""
向链表头部添加节点
头部节点不存在 新添加节点为头部和尾部节点
头部节点已存在 新添加的节点为新的头部节点
:param node: 要添加的节点
:return: 已添加的节点
"""
# 头部节点为空
if not self.head:
self.head = node
self.tail = node
self.head.next = None
self.tail.prev = None
# 头部节点不为空
else:
node.next = self.head
self.head.prev = node
self.head = node
self.head.prev = None
self.size += 1
return node
def __add_tail(self, node):
"""
向链表尾部添加节点
尾部节点不存在 新添加的节点为头部和尾部节点
尾部节点已存在 新添加的节点为新的尾部节点
:param node: 添加的节点
:return: 已添加的节点
"""
# 尾部节点为空
if not self.tail:
self.tail = node
self.head = node
self.head.next = None
self.tail.prev = None
# 尾部节点不为空
else:
node.prev = self.tail
self.tail.next = node
self.tail = node
self.tail.next = None
self.size += 1
return node
def __remove_head(self):
"""
删除头部节点
头部节点不存在 返回None
头部节点已存在 判断链表节点数量 删除头部节点
:return: 头部节点
"""
# 头部节点不存在
if not self.head:
return None
# 链表至少存在两个节点
head = self.head
if head.next:
head.next.prev = None
self.head = head.next
# 只存在头部节点
else:
self.head = self.tail = None
self.size -= 1
return head
def __remove_tail(self):
"""
删除尾部节点
尾部节点不存在 返回None
尾部节点已存在 判断链表节点数量 删除尾部节点
:return: 尾部节点
"""
# 尾部节点不存在
if not self.tail:
return None
# 链表至少存在两个节点
tail = self.tail
if tail.prev:
tail.prev.next = None
self.tail = tail.prev
# 只存在尾部节点
else:
self.head = self.tail = None
self.size -= 1
return tail
def __remove(self, node):
"""
删除任意节点
被删除的节点不存在 默认删除尾部节点
删除头部节点
删除尾部节点
删除其他节点
:param node: 被删除的节点
:return: 被删除的节点
"""
# 被删除的节点不存在
if not node:
node = self.tail
# 删除的是头部节点
if node == self.head:
self.__remove_head()
# 删除的是尾部节点
elif node == self.tail:
self.__remove_tail()
# 删除的既不是头部也不是尾部节点
else:
node.next.prev = node.prev
node.prev.next = node.next
self.size -= 1
return node
def pop(self):
"""
弹出头部节点
:return: 头部节点
"""
return self.__remove_head()
def append(self, node):
"""
添加尾部节点
:param node: 待追加的节点
:return: 尾部节点
"""
return self.__add_tail(node)
def append_front(self, node):
"""
添加头部节点
:param node: 待添加的节点
:return: 已添加的节点
"""
return self.__add_head(node)
def remove(self, node=None):
"""
删除任意节点
:param node: 待删除的节点
:return: 已删除的节点
"""
return self.__remove(node)
def print(self):
"""
打印当前链表
:return:
"""
node = self.head
line = ''
while node:
line += '%s' % node
node = node.next
if node:
line += '=>'
print(line)
if __name__ == '__main__':
double_linked_list = DoubleLinkedList(10)
nodes = []
# 构建十个节点的双向列表
for i in range(10):
node_item = Node(i, i)
nodes.append(node_item)
double_linked_list.append(nodes[0])
double_linked_list.print()
double_linked_list.append(nodes[1])
double_linked_list.print()
double_linked_list.pop()
double_linked_list.print()
double_linked_list.append_front(nodes[2])
double_linked_list.print()
double_linked_list.append(nodes[3])
double_linked_list.print()
double_linked_list.remove(nodes[3])
double_linked_list.print()
double_linked_list.remove()
double_linked_list.print()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
测试结果:
{0: 0}
{0: 0}=>{1: 1}
{1: 1}
{2: 2}=>{1: 1}
{2: 2}=>{1: 1}=>{3: 3}
{2: 2}=>{1: 1}
{2: 2}
2
3
4
5
6
7
# 7.2 FIFO先进先出算法
# 7.2.1 算法简介
FIFO(First In First Out)是一种先进先出的数据缓存器,他与普通存储器的区别是没有外部读写地址线,这样使用起来非常简单,但缺点就是只能顺序写入数据,顺序的读出数据, 其数据地址由内部读写指针自动加1完成,不能像普通存储器那样可以由地址线决定读取或写入某个指定的地址。
# 7.2.2 算法实现
FIFO.py
# -*- coding: utf-8 -*-
from DoubleLinkedList import Node, DoubleLinkedList
class FIFOCache(object):
def __init__(self, capacity=0xffffffff):
"""
FIFO缓存置换算法
:param capacity:
"""
self.capacity = capacity
self.map = {}
self.size = 0
self.list = DoubleLinkedList(capacity)
def get(self, key):
"""
获取元素
不存在 返回None
已存在 则返回缓存值
:param key:
:return:
"""
# 当前缓存中不存在
if key not in self.map:
return None
# 当前缓存中存在
node = self.map.get(key)
return node.value
def put(self, key, value):
"""
添加元素
已存在 更新值并添加至链表尾部
不存在 判断缓存容量大小后添加
:param key:
:param value:
:return: 已添加的节点
"""
# 当前缓存中已存在
if key in self.map:
node = self.map.get(key)
self.list.remove(node)
node.value = value
self.list.append(node)
else:
# 缓存容量达到上限 删除头结点
if self.size >= self.capacity:
old_node = self.list.pop()
del self.map[old_node.key]
self.size -= 1
node = Node(key, value)
self.map[key] = node
self.list.append(node)
self.size += 1
return node
def print(self):
"""
打印当前链表
:return:
"""
self.list.print()
# print(self.map)
if __name__ == '__main__':
fifo_cache = FIFOCache(2)
fifo_cache.put(1, 1)
fifo_cache.print()
fifo_cache.put(2, 2)
fifo_cache.print()
print(fifo_cache.get(2))
fifo_cache.put(3, 3)
fifo_cache.print()
print(fifo_cache.get(1))
fifo_cache.put(2, 4)
fifo_cache.print()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
测试结果:
{1: 1}
{1: 1}=>{2: 2}
2
{2: 2}=>{3: 3}
None
{3: 3}=>{2: 4}
2
3
4
5
6
# 7.3 LRU最近最少使用算法
# 7.3.1 算法简介
LRU(Least Recently Used)即最近最少使用。 操作系统中一种内存管理的页面置换算法,主要用于找出内存中较久时间没有使用的内存块,将其移出内存从而为新数据提供空间。此算法的前提是认为:当一个数据被访问的越频繁,则这个数据在未来被访问的概率就越高。
LRU算法其实就是按照使用顺序将元素倒序排列,在有限的存储空间中,若空间满时,删除最后的元素(即最久没有使用的元素)。
# 7.3.2 算法实现
LRU.py
# -*- coding: utf-8 -*-
from DoubleLinkedList import Node, DoubleLinkedList
class LRUCache(object):
def __init__(self, capacity=0xffffffff):
"""
LRU缓存置换算法 最近最少使用
:param capacity:
"""
self.capacity = capacity
self.size = 0
self.map = {}
self.list = DoubleLinkedList(capacity)
def get(self, key):
"""
获取元素
获取元素不存在 返回None
获取元素已存在 将节点从当前位置删除并添加至链表头部
:param key:
:return:
"""
# 元素不存在
if key not in self.map:
return None
node = self.map.get(key)
self.list.remove(node)
self.list.append_front(node)
return node.value
def put(self, key, value):
"""
添加元素
被添加的元素已存在 更新元素值并已到链表头部
被添加的元素不存在
链表容量达到上限 删除尾部元素
链表容量未达上限 添加至链表头部
:param key:
:param value:
:return:
"""
if key in self.map:
node = self.map.get(key)
node.value = value
self.list.remove(node)
self.list.append_front(node)
else:
if self.size >= self.capacity:
old_node = self.list.remove()
del self.map[old_node.key]
self.size -= 1
node = Node(key, value)
self.map[key] = node
self.list.append_front(node)
self.size += 1
return node
def print(self):
"""
打印当前链表
:return:
"""
self.list.print()
if __name__ == '__main__':
lru_cache = LRUCache(3)
lru_cache.put(1, 1)
lru_cache.print()
lru_cache.put(2, 2)
lru_cache.print()
print(lru_cache.get(1))
lru_cache.print()
lru_cache.put(3, 3)
lru_cache.print()
lru_cache.put(1, 100)
lru_cache.print()
lru_cache.put(4, 4)
lru_cache.print()
print(lru_cache.get(1))
lru_cache.print()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
测试结果:
{1: 1}
{2: 2}=>{1: 1}
1
{1: 1}=>{2: 2}
{3: 3}=>{1: 1}=>{2: 2}
{1: 100}=>{3: 3}=>{2: 2}
{4: 4}=>{1: 100}=>{3: 3}
100
{1: 100}=>{4: 4}=>{3: 3}
2
3
4
5
6
7
8
9
# 7.4 LFU最少访问算法
# 7.4.1 算法简介
LFU(Least Frequently Used)算法根据数据的历史访问频率来淘汰数据,其核心思想是“如果数据过去被访问多次,那么将来被访问的频率也更高”。 LFU的每个数据块都有一个引用计数,所有数据块按照引用计数排序,具有相同引用计数的数据块则按照时间排序。
LFU将数据和数据的访问频次保存在一个容量有限的容器中,当访问一个数据时:
- 该数据在容器中,则将该数据的访问频次加1。
- 该数据不在容器中,则将该数据加入到容器中,且访问频次为1。
当数据量达到容器的限制后,会剔除掉访问频次最低的数据。下图是一个简易的LFU算法示意图。
# 7.4.2 算法实现
LFU.py
# -*- coding: utf-8 -*-
from DoubleLinkedList import Node, DoubleLinkedList
class LFUNode(Node):
def __init__(self, key, value):
"""
LFU节点 增加频率属性
:param key:
:param value:
"""
self.freq = 0
super(LFUNode, self).__init__(key, value)
class LFUCache(object):
def __init__(self, capacity=0xffffffff):
"""
LFU缓存置换算法 最少访问
:param capacity:
"""
self.capacity = capacity
self.size = 0
self.map = {}
self.freq_map = {}
def __update_freq(self, node):
"""
更新节点频率
:param node:
:return:
"""
freq = node.freq
# 当前节点所在频率存在 在当前频率链表中移除当前节点
if freq in self.freq_map:
node = self.freq_map[freq].remove(node)
# 当前频率链表为空时删除该频率链表
if self.freq_map[freq].size == 0:
del self.freq_map[freq]
# 将节点按照新频率写入频率链表
freq += 1
node.freq = freq
if freq not in self.freq_map:
self.freq_map[freq] = DoubleLinkedList()
self.freq_map[freq].append(node)
return node
def get(self, key):
"""
获取元素
:return:
"""
# 节点不存在
if key not in self.map:
return None
# 节点存在 更新使用频率
old_node = self.map.get(key)
new_node = self.__update_freq(old_node)
self.map[key] = new_node
return new_node.value
def put(self, key, value):
"""
设置元素
:param key:
:param value:
:return:
"""
# 节点已存在 更新频率
if key in self.map:
old_node = self.map.get(key)
old_node.value = value
new_node = self.__update_freq(old_node)
self.map[key] = new_node
else:
# 节点容量达到上限 移除最小频率链表头部的节点
if self.size >= self.capacity:
min_freq = min(self.freq_map)
node = self.freq_map[min_freq].pop()
del self.map[node.key]
self.size -= 1
# 构建新的节点 更新频率
new_node = LFUNode(key, value)
new_node = self.__update_freq(new_node)
self.map[key] = new_node
self.size += 1
return new_node
def print(self):
"""
打印当前链表
:return:
"""
for freq, link in self.freq_map.items():
print("frequencies: %d" % freq)
link.print()
if __name__ == '__main__':
lfu_cache = LFUCache(4)
lfu_cache.put(1, 1)
lfu_cache.print()
lfu_cache.put(2, 2)
lfu_cache.print()
print(lfu_cache.get(1))
lfu_cache.print()
lfu_cache.put(3, 3)
lfu_cache.print()
lfu_cache.put(4, 4)
lfu_cache.print()
lfu_cache.put(5, 5)
lfu_cache.print()
print(lfu_cache.get(2))
lfu_cache.put(4, 400)
lfu_cache.print()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
测试结果:
frequencies: 1
{1: 1}
frequencies: 1
{1: 1}=>{2: 2}
1
frequencies: 1
{2: 2}
frequencies: 2
{1: 1}
frequencies: 1
{2: 2}=>{3: 3}
frequencies: 2
{1: 1}
frequencies: 1
{2: 2}=>{3: 3}=>{4: 4}
frequencies: 2
{1: 1}
frequencies: 1
{3: 3}=>{4: 4}=>{5: 5}
frequencies: 2
{1: 1}
None
frequencies: 1
{3: 3}=>{5: 5}
frequencies: 2
{1: 1}=>{4: 400}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 7.5 LFU相比于LRU的优劣
# 7.5.1 区别及优劣
区别:LFU是基于访问频次的模式,而LRU是基于访问时间的模式。
优势:在数据访问符合正态分布时,相比于LRU算法,LFU算法的缓存命中率会高一些。
劣势:
- LFU的复杂度要比LRU更高一些。
- 需要维护数据的访问频次,每次访问都需要更新。
- 早期的数据相比于后期的数据更容易被缓存下来,导致后期的数据很难被缓存。
- 新加入缓存的数据很容易被剔除,像是缓存的末端发生“抖动”。
# 7.5.2 LFU算法优化
从上面的优劣分析中我们可以发现,优化LFU算法可以从下面几点入手:
- 更加紧凑的数据结构,避免维护访问频次的高消耗。
- 避免早期的热点数据一直占据缓存,即LFU算法也需有一些访问时间模式的特性。
- 消除缓存末端的抖动。
# 8. 参考资料
[1] Redis教程 - Redis知识体系详解 from Java全栈知识体系 (opens new window)
[2] Redis基本原理 from 知乎 (opens new window)
[3] Redis原理和机制详解 from 知乎 (opens new window)
[4] Redis架构原理及应用实践 from segmentfault (opens new window)
[5] Redis 的 8 大应用场景 from segmentfault (opens new window)
[6] Linux下使用docker部署Redis from CSDN (opens new window)
[7] 重修Redis之五:缓存清理机制 from CSDN (opens new window)
[8] 一文看懂Redis的持久化原理 from 稀土掘金 (opens new window)
[9] Redis持久化的几种方式—RDB深入解析 from 开源基础软件社区 (opens new window)
[10] Redis进阶 - 持久化:RDB和AOF机制详解 from 全栈知识体系 (opens new window)
[11] Python获取Redis所有Key以及内容 from CSDN (opens new window)
[12] 经典问题:先更新数据库,还是先更新缓存?from 技术文章摘抄 (opens new window)
[13] Helios多级缓存实践 from 51CTO博客 (opens new window)
[14] MultiQueue多优先级队列缓存替换算法分析 from CSDN (opens new window)
[15] FIFO调度算法和LRU算法——缓存调度算法 from CSDN (opens new window)
[16] 一种基于机器学习的缓存预测调度方法、系统及介质 from Google Patents (opens new window)
[17] 如何写出让 CPU 跑得更快的代码 from 小林coding (opens new window)
[18] 基础知识一、Python实现双向链表 from CSDN (opens new window)
[19] 基础知识二、Python实现FIFO算法 form CSDN (opens new window)
[20] 基础知识三、Python实现LRU算法 from CSDN (opens new window)
[21] 基础知识四、Python实现LFU算法 from CSDN (opens new window)
[22] LRU Cache的原理和python的实现 from 简书 (opens new window)
[23] 数据结构与算法(Python)from Gitbook (opens new window)
[24] Redis底层数据结构-双向链表 from 稀土掘金 (opens new window)
[25] 深入了解Redis底层数据结构 from 稀土掘金 (opens new window)
[26] Redis进阶 - 数据结构:底层数据结构详解 from 全栈知识体系 (opens new window)
[27] 图解Redis底层数据结构实现原理 from CSDN (opens new window)
[28] 一文带你了解什么是 LRU 算法?from 51CTO (opens new window)