redis-黑马点评学习
基础篇
初识Redis
NoSQL
NoSql可以翻译做Not Only Sql(不仅仅是SQL),或者是No Sql(非Sql的)数据库。是相对于传统关系型数据库而言,有很大差异的一种特殊的数据库,因此也称之为非关系型数据库。
结构化与非结构化
结构化:传统关系型数据库是结构化数据,每一张表都有严格的约束信息:字段名.字段数据类型.字段约束等等信息,插入的数据必须遵守这些约束。(缺点:表不能随意修改)
非结构化:对数据库格式没有严格约束,往往形式松散,自由。
- 键值型:Redis
- 文档型(字段约束非常松散):MongoDB
- 列类型:HBase
- 图格式:Neo4j
关联与非关联
传统数据库的表与表之间往往存在关联,例如外键。
而非关系型数据库不存在关联关系,要维护关系要么靠代码中的业务逻辑,要么靠数据之间的耦合:
1 | { |
此处要维护“张三”的订单与商品“荣耀”和“小米11”的关系,不得不冗余的将这两个商品保存在张三的订单文档中,不够优雅。还是建议用业务来维护关联关系。
查询方式
传统关系型数据库会基于Sql语句做查询,语法有统一标准;
而不同的非关系数据库查询语法差异极大,五花八门各种各样。
事务
传统关系型数据库能满足事务ACID的原则。
而非关系型数据库往往不支持事务,或者不能严格保证ACID的特性,特性:BASE,只能实现基本的一致性。
总结
SQL | NoSQL | |
---|---|---|
数据结构 | 结构化 | 非结构化 |
数据关联 | 关联的 | 非关联的 |
查询方式 | SQL查询 | 非SQl |
事务特性 | ACID | BASE |
存储方式 | 磁盘 | 内存 |
扩展性 | 垂直 | 水平 |
使用场景 | 1)数据结构固定 2)相关业务对数据安全性、一致性要求较高 |
1)数据结构不固定 2)对一致性、安全性要求不高 3)对性能要求 |
认识Redis
特征:
- 键值(key-value)型,value支持多种不同数据结构,功能丰富
- 单线程,每个命令具备原子性
- 低延迟,速度快(基于内存、IO多路复用、良好的编码)
- 支持数据持久化
- 支持主从集群、分片集群
- 支持多语言客户端(java、Python、c等)
安装Redis
Redis安装说明
大多数企业都是基于Linux服务器来部署项目,而且Redis官方也没有提供Windows版本的安装包。因此课程中我们会基于Linux系统来安装Redis.
此处选择的Linux版本为CentOS 7
Redis的官方网站地址:https://redis.io/
单机安装Redis
步骤1.安装Redis依赖
Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖:
1 | yum install -y gcc tcl |
步骤2.上传安装包并解压
然后将Redis安装包上传到虚拟机的任意目录:
例如,放到了/usr/local/src 目录:
解压缩:
1 | tar -xzf redis-6.2.6.tar.gz |
解压后:
进入redis目录:
1 | cd redis-6.2.6 |
运行编译命令:
1 | make && make install |
如果没有出错,应该就安装成功了。
默认的安装路径是在 /usr/local/bin
目录下:
该目录以及默认配置到环境变量,因此可以在任意目录下运行这些命令。其中:
- redis-cli:是redis提供的命令行客户端
- redis-server:是redis的服务端启动脚本
- redis-sentinel:是redis的哨兵启动脚本
步骤3.启动
redis的启动方式有很多种,例如:
- 默认启动
- 指定配置启动
- 开机自启
默认启动
安装完成后,在任意目录输入redis-server命令即可启动Redis:
1 | redis-server |
如图:
这种启动属于前台启动
,会阻塞整个会话窗口,窗口关闭或者按下CTRL + C
则Redis停止。不推荐使用。
指定配置启动
如果要让Redis以后台
方式启动,则必须修改Redis配置文件,就在我们之前解压的redis安装包下(/usr/local/src/redis-6.2.6
),名字叫redis.conf:
我们先将这个配置文件备份一份:
1 | cp redis.conf redis.conf.bck |
然后修改redis.conf文件中的一些配置:
1 | # 监听的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0 |
Redis的其它常见配置:
1 | # 监听的端口 |
启动Redis:
1 | # 进入redis安装目录 |
停止服务:
1 | # 利用redis-cli来执行 shutdown 命令,即可停止 Redis 服务, |
开机自启
我们也可以通过配置来实现开机自启。
首先,新建一个系统服务文件:
1 | vi /etc/systemd/system/redis.service |
内容如下:
1 | [Unit] |
然后重载系统服务:
1 | systemctl daemon-reload |
现在,我们可以用下面这组命令来操作redis了:
1 | # 启动 |
执行下面的命令,可以让redis开机自启:
1 | systemctl enable redis |
Redis命令行客户端
Redis安装完成后就自带了命令行客户端:redis-cli,使用方式如下:
1 | redis-cli [options] [commonds] |
其中常见的options有:
-h 127.0.0.1
:指定要连接的redis节点的IP地址,默认是127.0.0.1-p 6379
:指定要连接的redis节点的端口,默认是6379-a 123321
:指定redis的访问密码
其中的commonds就是Redis的操作命令,例如:
ping
:与redis服务端做心跳测试,服务端正常会返回pong
不指定commonds时,会进入redis-cli
的交互控制台:
图形化桌面客户端
GitHub上的大神编写了Redis的图形化桌面客户端,地址:https://github.com/uglide/RedisDesktopManager
不过该仓库提供的是RedisDesktopManager的源码,并未提供windows安装包。
在下面这个仓库可以找到安装包:https://github.com/lework/RedisDesktopManager-Windows/releases,然后安装
建立连接
点击左上角的连接到Redis服务器
按钮:
在弹出的窗口中填写Redis服务信息:
点击测试连接,看连接是否成功。
注意:需要先设置虚拟机的防火墙
1
2 sudo firewall-cmd --permanent --add-port=6379/tcp
sudo firewall-cmd --reload
点击确定后,在左侧菜单会出现这个链接:
点击即可建立连接了:
Redis默认有16个仓库,编号从0至15. 通过配置文件可以设置仓库数量,但是不超过16,并且不能自定义仓库名称。
如果是基于redis-cli连接Redis服务,可以通过select命令来选择数据库:
1 | # 选择 0号库 |
Redis命令
数据结构介绍
Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样:
GEO:地理位置
贴心小建议:命令不要死记,学会查询就好啦
Redis为了方便我们学习,将操作不同数据类型的命令也做了分组,在官网( https://redis.io/commands )可以查看到不同的命令。
或者通过Help命令来帮助我们去查看命令:
通用命令
通用指令是部分数据类型的都可以使用的指令,可以通过help @generic
查看有哪些通用命令,常见的有:
KEYS:查看符合模板的所有key
- ```sh 127.0.0.1:6379> keys *
- "name"
- "age" 127.0.0.1:6379>
查询以a开头的key
127.0.0.1:6379> keys a*- "age" 127.0.0.1:6379>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
**贴心小提示:在生产环境下,不推荐使用keys 命令,因为这个命令在key过多的情况下,效率不高,模糊查询**
- DEL:删除一个指定的key
- ```sh
127.0.0.1:6379> del name #删除单个
(integer) 1 #成功删除1个
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> MSET k1 v1 k2 v2 k3 v3 #批量添加数据
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
4) "age"
127.0.0.1:6379> del k1 k2 k3 k4
(integer) 3 #此处返回的是成功删除的key,由于redis中只有k1,k2,k3 所以只成功删除3个,最终返回
127.0.0.1:6379>
127.0.0.1:6379> keys * #再查询全部的key
1) "age" #只剩下一个了
127.0.0.1:6379>
- ```sh 127.0.0.1:6379> keys *
EXISTS:判断key是否存在
```sh 127.0.0.1:6379> help EXISTS
EXISTS key [key ...] summary: Determine if a key exists since: 1.0.0 group: generic
127.0.0.1:6379> exists age (integer) 1
127.0.0.1:6379> exists name (integer) 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- EXPIRE:给一个key设置有效期,有效期到期时该key会被自动删除(为什么会存在这个命令:因为Redis是在内存中存储的,内存非常宝贵)
- ```sh
127.0.0.1:6379> expire age 10
(integer) 1
127.0.0.1:6379> ttl age
(integer) 8
127.0.0.1:6379> ttl age
(integer) 6
127.0.0.1:6379> ttl age
(integer) -2
127.0.0.1:6379> ttl age
(integer) -2 #当这个key过期了,那么此时查询出来就是-2
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> set age 10 #如果没有设置过期时间
OK
127.0.0.1:6379> ttl age
(integer) -1 # ttl的返回值就是-1
TTL:查看一个KEY的剩余有效期
String命令
String类型,也就是字符串类型,是Redis中最简单的存储类型。
其value是字符串,不过根据字符串的格式不同,又可以分为3类:
- string:普通字符串
- int:整数类型,可以做自增,自减操作
- float:浮点类型,可以做自增,自减操作
KEY | VALUE |
---|---|
msg | hello world |
num | 10 |
score | 92.5 |
String的常见命令有:
- SET:添加或者修改已经存在的一个String类型的键值对
- GET:根据key获取String类型的value
- MSET:批量添加多个String类型的键值对
- MGET:根据多个key获取多个String类型的value
- INCR:让一个整型的key自增1
- INCRBY:让一个整型的key自增并指定步长(也可以是负数,就是自减),例如:incrby num 2 让num值自增2
- INCRBYFLOAT:让一个浮点类型的数字自增并指定步长
- SETNX:添加一个String类型的键值对,前提是这个key不存在,否则不执行
- 其实和
set 键 nx
效果一样,nx是set的参数
- 其实和
- SETEX:添加一个String类型的键值对,并且指定有效期
- 其实和
set 键 ex 数字(表时间)
- 其实和
Key的层级结构
问题:Redis没有类似MySQL中的Table的概念,我们该如何区分不同类型的key呢?例如,需要存储用户.商品信息到redis,有一个用户id是1,有一个商品id恰好也是1,此时如果使用id作为key,那就会冲突了,该怎么办?
解决方案:可以通过给key添加前缀加以区分,不过这个前缀不是随便加的,有一定的规范。
Redis的key允许有多个单词形成层级结构,多个单词之间用:
隔开,格式如下:项目名:业务名:类型:id
。这个格式并非固定,也可以根据自己的需求来删除或添加词条。
例如我们的项目名称叫 heima,有user和product两种不同类型的数据,我们可以这样定义key:
user相关的key:heima:user:1
product相关的key:heima:product:1
如果Value是一个Java对象,例如一个User对象,则可以将对象序列化为JSON字符串后存储:
KEY | VALUE |
---|---|
heima:user:1 | {"id":1, "name": "Jack", "age": 21} |
heima:product:1 | {"id":1, "name": "小米11", "price": 4999} |
一旦我们向redis采用这样的方式存储,那么在可视化界面中,redis会以层级结构来进行存储,形成类似于这样的结构,更加方便Redis获取数据。
Hash命令
Hash类型,也叫散列,其value是一个无序字典,类似于Java中的HashMap结构。
String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便:
Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD:
Hash类型的常见命令
HSET key field value:添加或者修改hash类型key的field的值
- ```sh 127.0.0.1:6379> HSET heima:user:3 name Lucy//大key是
heima:user:3 小key是name,小value是Lucy (integer) 1 127.0.0.1:6379>
HSET heima:user:3 age 21// 如果操作不存在的数据,则是新增 (integer) 1
127.0.0.1:6379> HSET heima:user:3 age 17
//如果操作存在的数据,则是修改 (integer) 0 127.0.0.1:6379> HGET
heima:user:3 name "Lucy" 127.0.0.1:6379> HGET heima:user:3 age "17"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
- HGET key field:获取一个hash类型key的field的值
- HMSET:批量添加多个hash类型key的field的值
- HMGET:批量获取多个hash类型key的field的值
- HGETALL:获取一个hash类型的key中的所有的field和value
- HKEYS:获取一个hash类型的key中的所有的field
- HINCRBY:让一个hash类型key的字段值自增并指定步长
- HSETNX:添加一个hash类型的key的field值,前提是这个field不存在,否则不执行
### List命令
Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构,既可以支持正向检索和也可以支持反向检索。
特征也与LinkedList类似:
* 有序
* 元素可以重复
* 插入和删除快
* 查询速度一般
常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。
**List的常见命令有:**
- LPUSH key element ... :向列表左侧插入一个或多个元素
- LPOP key:移除并返回列表左侧的第一个元素,没有则返回nil
- RPUSH key element ... :向列表右侧插入一个或多个元素
- RPOP key:移除并返回列表右侧的第一个元素
- LRANGE key star end:返回一段角标范围内的所有元素
- BLPOP和BRPOP:与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil
* LPUSH和RPUSH
```java
127.0.0.1:6379> LPUSH users 1 2 3
(integer) 3
127.0.0.1:6379> RPUSH users 4 5 6
(integer) 6
- ```sh 127.0.0.1:6379> HSET heima:user:3 name Lucy//大key是
heima:user:3 小key是name,小value是Lucy (integer) 1 127.0.0.1:6379>
HSET heima:user:3 age 21// 如果操作不存在的数据,则是新增 (integer) 1
127.0.0.1:6379> HSET heima:user:3 age 17
//如果操作存在的数据,则是修改 (integer) 0 127.0.0.1:6379> HGET
heima:user:3 name "Lucy" 127.0.0.1:6379> HGET heima:user:3 age "17"
以上操作完之后是
3 2 1 4 5 6
LPOP和RPOP
1 | 127.0.0.1:6379> LPOP users |
- 以上操作完之后是
2 1 4 5
- LRANGE
1 | 127.0.0.1:6379> LRANGE users 1 2 |
如何用List结构模拟一个栈?
- 入口和出口在同一边
如何用List结构模拟一个队列?
- 入口和出口在不同边
如何用List结构模拟一个阻塞队列?
- 入口和出口在不同边
- 出队时采用BLPOP或BRPOP
Set命令
Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:
- 无序
- 元素不可重复
- 查找快
- 支持交集、并集、差集等功能
Set类型的常见命令
- SADD key member ... :向set中添加一个或多个元素
- SREM key member ... : 移除set中的指定元素
- SCARD key: 返回set中元素的个数
- SISMEMBER key member:判断一个元素是否存在于set中
- SMEMBERS:获取set中的所有元素
- SINTER key1 key2 ... :求key1与key2的交集
- SDIFF key1 key2 ... :求key1与key2的差集
- SUNION key1 key2 ..:求key1和key2的并集
SortedSet命令
Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。
SortedSet具备下列特性:
- 可排序
- 元素不重复
- 查询速度快
因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能。
SortedSet的常见命令有:
- ZADD key score member:添加一个或多个元素到sorted set ,如果已经存在则更新其score值
- ZREM key member:删除sorted set中的一个指定元素
- ZSCORE key member : 获取sorted set中的指定元素的score值
- ZRANK key member:获取sorted set 中的指定元素的排名
- ZCARD key:获取sorted set中的元素个数
- ZCOUNT key min max:统计score值在给定范围内的所有元素的个数
- ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
- ZRANGE key min max:按照score排序后,获取指定排名范围内的元素
- ZRANGEBYSCORE key min max:按照score排序后,获取指定score范围内的元素
- ZDIFF.ZINTER.ZUNION:求差集.交集.并集
注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可,例如:
- 升序获取sorted set 中的指定元素的排名:ZRANK key member
- 降序获取sorted set 中的指定元素的排名:ZREVRANK key memeber
Redis的Java客户端-Jedis
快速入门
1.创建工程
2.引入依赖
1 | <!--jedis--> |
3.建立连接
1 | private Jedis jedis; |
4.测试
1 |
|
5.释放资源
1 |
|
Jedis连接池
Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此我们推荐大家使用Jedis连接池代替Jedis的直连方式
有关池化思想,并不仅仅是这里会使用,很多地方都有,比如说我们的数据库连接池,比如我们tomcat中的线程池,这些都是池化思想的体现
创建Jedis连接池
1 | public class JedisConnectionFacotry { |
说明:
JedisConnectionFacotry:工厂设计模式是实际开发中非常常用的一种设计模式,我们可以使用工厂,去降低代的耦合,比如Spring中的Bean的创建,就用到了工厂设计模式
静态代码块:随着类的加载而加载,确保只能执行一次,我们在加载当前工厂类的时候,就可以执行static的操作完成对 连接池的初始化
最后提供返回连接池中连接的方法.
改造原始代码
代码说明:
1.在我们完成了使用工厂设计模式来完成代码的编写之后,我们在获得连接时,就可以通过工厂来获得,而不用直接去new对象,降低耦合,并且使用的还是连接池对象。
2.当我们使用了连接池后,当我们关闭连接其实并不是关闭,而是将Jedis还回连接池的。
1 |
|
Redis的Java客户端-SpringDataRedis
SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis,官网地址:https://spring.io/projects/spring-data-redis
- 提供了对不同Redis客户端的整合(Lettuce和Jedis)
- 提供了RedisTemplate统一API来操作Redis
- 支持Redis的发布订阅模型
- 支持Redis哨兵和Redis集群
- 支持基于Lettuce的响应式编程
- 支持基于JDK.JSON.字符串.Spring对象的数据序列化及反序列化
- 支持基于Redis的JDKCollection实现
SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:
快速入门
1.导入依赖
1 | <!--redis依赖--> |
2.配置文件
1 | spring: |
3.测试代码
1 |
|
数据序列化器
RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化,得到的结果是这样的:
缺点:
- 可读性差
- 内存占用较大
我们可以自定义RedisTemplate的序列化方式,代码如下:
1 |
|
这里采用了JSON序列化来代替默认的JDK序列化方式。最终结果如图:
整体可读性有了很大提升,并且能将Java对象自动的序列化为JSON字符串,并且查询时能自动把JSON反序列化为Java对象。不过,其中记录了序列化时对应的class名称,目的是为了查询时实现自动反序列化。这会带来额外的内存开销。
StringRedisTemplate
尽管JSON的序列化方式可以满足我们的需求,但依然存在一些问题,如上图。
为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。
为了减少内存的消耗,我们可以采用手动序列化的方式,换句话说,就是不借助默认的序列化器,而是我们自己来控制序列化的动作,同时,我们只采用String的序列化器,这样,在存储value时,我们就不需要在内存中就不用多存储数据,从而节约我们的内存空间。
这种用法比较普遍,因此SpringDataRedis就提供了RedisTemplate的子类:StringRedisTemplate,它的key和value的序列化方式默认就是String方式。
省去了我们自定义RedisTemplate的序列化方式的步骤,而是直接使用:
1 |
|
总结:
RedisTemplate的两种序列化实践方案:
- 方案一:
- 自定义RedisTemplate
- 修改RedisTemplate的序列化器为GenericJackson2JsonRedisSerializer
- 方案二:
- 使用StringRedisTemplate
- 写入Redis时,手动把对象序列化为JSON
- 读取Redis时,手动把读取到的JSON反序列化为对象
Hash结构操作
1 |
|
实战篇
短信登录
流程:
集群的session共享问题:
session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
session的替代方案应该满足:
- 数据共享
- 内存存储
- key、value结构
改进后的流程:
保存用户到Redis中不用手机号作为Redis的key是因为:要返回给前端并放在authorization中,为了以后登录时进行验证,如果用手机号的话容易泄漏。
登录拦截器的优化:
原来的登录拦截器存在的问题:不拦截一些不需要登录的api,比如首页、获取验证码等,如果将刷新token有效期的代码放在这里面,就会导致用户在访问首页或者其他不被拦截的api时token不会被刷线。
改进:
商户查询
什么是缓存
缓存就是数据交换的缓存区(称作Cache),是贮存数据的临时地方,一般读写性能较高。
缓存的作用:
- 降低后端负载
- 提高读写效率,降低响应时间
缓存的成本:
- 数据一致性成本
- 代码维护成本
- 运维成本
缓存更新策略
内存淘汰 | 超时剔除 | 主动更新 | |
---|---|---|---|
说明 | 不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。 | 给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。 | 编写业务逻辑,在修改数据库的同时,更新缓存。 |
一致性 | 差 | 一般 | 好 |
维护成本 | 无 | 低 | 高 |
业务场景:
- 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存。
- 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存。
主动更新策略
- Cache Aside Pattern:由缓存的调用者,在更新数据库的同时更新缓存。(主要使用)
- Read/Write Through Pattern:缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题。
- Write Behind Caching Pattern:调用者只操作缓存,由其他线程异步的将缓存数据持久化到数据库,保证最终一致。
Cache Aside Pattern存在的三个问题
- 删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都要更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
- 如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC等分布事务方案
- 先操作缓存还是先操作数据库?
- 先删除缓存,再操作数据库
- 先操作数据库,再删除缓存
先删除缓存,再操作数据库
正常情况:
步骤 | 缓存 | 数据库 |
---|---|---|
初始 | 10 | 10 |
1 | 0 | 10 |
2 | 0 | 20 |
3 | 0 | 20 |
4 | 20 | 20 |
异常情况:
步骤 | 缓存 | 数据库 |
---|---|---|
初始 | 10 | 10 |
1 | 0 | 10 |
2 | 0 | 10 |
3 | 10 | 10 |
4 | 10 | 20 |
因为更新数据库的操作一般是很慢的,而删除缓存的操作是很快的,很可能线程1删除缓存之后,先让线程2执行。
这种情况发生的可能性比较高。
先操作数据库,再删除缓存
正常情况:
步骤 | 缓存 | 数据库 |
---|---|---|
初始 | 10 | 10 |
1 | 10 | 20 |
2 | 0 | 20 |
3 | 0 | 20 |
4 | 20 | 20 |
异常情况(假设线程1查询时,缓存恰好失效了):
步骤 | 缓存 | 数据库 |
---|---|---|
初始 | 10 | 10 |
1 | 0 | 10 |
2 | 0 | 20 |
3 | 0 | 20 |
4 | 10 | 20 |
情况发生的条件:
- 两个线程在并行执行
- 线程1查询时,缓存恰好失效
- 线程1查询后,要写入缓存之间(之间微秒级)突然来了一个线程(这里是线程2)要更新数据库,删缓存。
综合分析,可能性比较低。
缓存更新策略的最佳实践
- 删除缓存还是更新缓存?删除缓存
- 方案2(即先操作数据库,再删除缓存)发生异常情况的可能性较低。
1.低一致性需求:使用Redis自带的内存淘汰机制
2.高一致性需求:主动更新,并以超时剔除作为兜底方案
- 读操作:
- 缓存命中则直接返回
- 缓存未命中则查询数据库,并写入缓存,设定超时时间
- 写操作:
- 先写数据库,然后再删除缓存
- 要确保数据库与缓存操作的原子性
缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
危害:
常见的解决方案有两种:
- 缓存空对象
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗(比如请求一堆不存在的id,就会导致缓存中存储大量不存在id的null对象。解决办法:设置较短的TTL)
- 可能造成短期的不一致(比如请求一个不存在的id,然后缓存中存储了它的null对象,然后插入了这个id的商铺,再查询这个id,查到的是缓存中的null对象。解决办法:插入这条数据的时候,主动在缓存中覆盖数据)
- 布隆过滤
- 原理:可以简单理解成一个byte数组,把数据库中的数据基于某一种hash算法计算出hash值,然后将这些hash值转换成二进制保存到布隆过滤器里。判断数据是否存在是判断对应的位置是0还是1。这种存在判断是概率上的统计,不是百分百的准确。重点:当布隆过滤器说不存在时,百分百不存在;当布隆过滤器说存在时,不一定存在。
- 优点:内存占用较少,没有多余key
- 缺点:
- 实现复杂
- 存在误判可能(当布隆过滤器说不存在时,百分百不存在;当布隆过滤器说存在时,不一定存在。)
解决商铺查询的缓存穿透问题
总结
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存null值(被动)
- 布隆过滤(被动)
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验(比如0这种id)
- 加强用户权限校验(要登录才能访问,或者访问频率限制)
- 做好热点参数的限流
缓存雪崩
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值(针对同一时段大量的缓存key同时失效)
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
图中查询数据库,重建缓存的线段比较长,表示所需时间比较长。
常见的解决方案有两种:
- 互斥锁(只有一个线程能获取锁,其他线程都需要等待,所以性能上会比较差)
- 逻辑过期(向redis存储数据时,不设置TTL,逻辑过期不是真正的过期,它要求我们在存储数据到redis的时候,额外的要添加一个过期时间的字段,这个key本身是不用去设置ttl的,所以它的过期时间不是由redis控制的,而是由我们程序员自己去判断它是否过期)
解决方案 | 优点 | 缺点 |
---|---|---|
互斥锁 | 1.没有额外的内存消耗 2.保证一致性 3.实现简单 |
1.线程需要等待,性能受影响 2.可能有死锁风险 |
逻辑过期 | 线程无需等待,性能较好 | 1.不保证一致性 2.有额外内存消耗 3.实现复杂 |
基于互斥锁方式解决缓存击穿问题
利用Redis中string的setnx命令来实现:setnx,对key进行赋值(当且仅当key不存在时赋值)
基于逻辑过期方式解决缓存击穿问题
优惠券秒杀
全局ID生成器
当用户抢购时,就会生成订单保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
- id的规律性太明显(用户可以间接知晓每日销量)
- 受单表数据量的限制(订单特别多,不能用单表存储,用多表存储,每个表id自增会重复)
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息:
使用数值类型存储(Long类型,8Byte)
ID的组成部分:
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
全局唯一ID生成策略:
- UUID(生成的是十六进制的数值,返回的是字符串结构,没有自增)
- Redis自增
- snowflake算法(雪花算法)
- 数据库自增(用一张表专门做自增)
Redis自增ID策略:
- 每天一个key,方便统计订单量
- ID构造是时间戳+计数器
实现秒杀下单
实现优惠券秒杀下单功能
下单需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
超卖问题
原库存是1
超卖问题是点选的多线程安全问题,针对这一问题的常见解决方案就是加锁。
悲观锁和乐观锁
悲观锁:
认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
- 例如Synchronized、Lock都属于悲观锁
乐观锁:
认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其他线程对数据做了修改。
- 如果没有修改则认为是安全的,自己才更新数据。
- 如果已经被其他线程修改说明发生了安全问题,此时可以重试或异常。
乐观锁
乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:
1.版本号法:
原理:查询的版本号和修改时的版本号有没有变,每次修改版本号+1。
步骤 | id | stock | version |
---|---|---|---|
初始 | 10 | 1 | 1 |
线程1第一步 | 10 | 1 | 1 |
线程2第一步 | 10 | 1 | 1 |
线程1第二步 | 10 | 0 | 2 |
线程2第二步 | 10 | 0 | 2 |
改进:每次版本号变化的时候库存也会变化,所以可以将库存作为版本号,即每次修改的时候看与查询的时候库存有没有变,没变则修改,也就是下面的CAS法。
2.CAS法 (compare and set)
乐观锁解决超卖问题
1 | // 5.扣减库存--修改前 |
使用上面的CAS法改写代码,会发现失败率大大增加。
原因是:假设库存100,有100个线程同步执行,全部查询得到的库存都是100,最先执行的那个线程执行后库存数为99,其他99个线程查询到库存为99(与一开始查询到的库存100不同,不执行)
这里是有并发,但是没有业务安全问题,因为只要大于0就行执行。
乐观锁问题:成功率太低。
改进:
只要库存大于0就能执行。
总结
悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 性能一般
乐观锁:不加锁,在更新时判断是否有其他线程在修改
- 优点:性能好
- 缺点:存在成功率低的问题
一人一单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
并发安全问题
主要是集群中出现的。
假设图中的灰框表示一个单体
每个jvm中都有自己的锁,两个不同的锁监视器,当前jvm的锁监视器只在当前jvm中有效,所以不同单体的线程各自能获取各自的锁。
分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
MySQL | Redis | Zookeeper | |
---|---|---|---|
互斥 | 利用MySQL本身的互斥锁机制 | 利用setnx这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
zookeeper创建节点都是唯一的并且id单调递增。
节点的有序性:每创建一个节点,节点的id都是递增的。现在假设很多线程在zookeeper里创建节点,这样id是单调递增的,我们可以约定id最小的那个获取锁成功,这样就实现了互斥。
释放锁:删除该节点即可,这样就不是最小的了。
基于Redis的分布式锁
实现分布式锁需要实现两个基本方法:
获取锁
互斥:
1
2# 添加锁,利用setnx的互斥性质,lock是key,thread1是value,nx是互斥,ex是设置超时时间,set + nx = setnx
SET lock thread1 NX EX 10
释放锁
手动释放:
1
2# 释放锁,删除即可,lock是key
DEL lock超时释放:获取锁时添加一个超时时间
1
2# 释放锁,删除即可
DEL lock
Redis分布式误删问题
极端情况:
线程1获取了锁,但是在获取锁后业务发生了阻塞,阻塞时间甚至比锁的超时时间还久(设置锁的时候,设置了超时时间expire ex),这个时候锁提前释放,但是线程1还没执行完。假设这个时候线程2获取了锁,在线程2执行的过程中,线程1业务执行完了,会执行释放锁,也就将线程2的锁释放了。以此类推,当线程3获取了释放的锁,线程2也会释放锁。
解决方案:
释放锁的时候查看锁的标识和当前标识是否一致(之前获取锁存了线程id)。
业务流程图变化:
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程提示(可以用UUID表示,不用线程ID的原因:jvm每创建一个线程,ID都会递增。如果是在集群的模式下,有多个jvm,所以线程的id很可能冲突)
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
分布式锁的原子性问题
接着上面的思路,假设一种极端情况,当线程1获取锁后执行业务后,判断当前标示和锁中标示一致后,开始释放锁,但是释放锁这个动作被阻塞了(这是很可能存在的,因为jvm的垃圾回收机制,当jvm去做full GC的时候,会阻塞所有的代码。)当释放锁这个被阻塞的时间足够长,超过锁的超时时间时,会释放锁,其他线程也就能获取锁了,但线程1释放锁的动作阻塞结束,它也就会执行释放锁的动作,也就把线程2的锁释放了。(在之前的代码中,key是一样的,value是uuid+线程id)
以下是之前的代码:
1 | public class SimpleRedisLock implements ILock { |
解决方案:
将判断标示的动作和释放锁的动作整合成一个原子动作。
redis中的事务无法看到中间结果,所以我们用Lua脚本。
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令(Redis时在执行他们时,会一次性全部执行),确保多条命令执行时的原子性。
介绍一下lua中Redis提供的调用函数,语法如下:
1 | -- 执行set name jack |
1 | -- 例如,我们要先执行set name Rose,再执行get name,则脚本如下: |
写好脚本后,需要用Redis命令来调用脚本,常见命令如下:
1 | 127.0.0.1:6379> help @scripting |
例如,我们要执行redis.call('set', 'name', 'jack')
这个脚本,语法如下:
1 | # 调用脚本 0代表脚本需要的key类型的参数个数 |
如果脚本中的key,value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
1 | # 调用脚本 |
注意:lua语言中数组下标是从1开始的,不是0
解决:
总结
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
Redisson
基于Redis的分布式锁优化
基于setnx实现的分布式锁存在下面的问题:
- 不可重入:同一个线程无法多次获取同一把锁
- 可重入:同一个线程可以多次获取同一把锁。比如一个方法a要去调一个方法b,在方法a中要先去获取锁,然后执行业务去调b,而b里又要去获取同一把锁。这种情况,如果锁是不可重入的,方法b中获取锁显然是会失败的,此时b会等待锁的释放,而锁又无法释放,因为方法a还没执行完,于是出现了死锁。
- 不可重试:获取锁只尝试一次就返回false,没有重试机制
- 超时释放:锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
- 锁时间设置得太短,业务还么执行完,锁就释放了,导致其他业务也可能在同步执行中;如果设置过长,会导致其他业务等待时间长,阻塞周期长。
- 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机,如果从并同步主中的锁数据,则会出现锁实现。
- 比如有一个线程在主节点获取了锁(因为是用setnx实现,实际上是一个写操作),尚未将这个写操作同步给从节点时,存在延迟,突然主节点就宕机了,此时会选择一个从节点作为主节点,因为这个从节点尚未完成同步,没有锁这个标识,所以其他线程可以趁虚而入,拿到锁。
介绍
Redisson是一个在Redis的基础上实现的Java驻内存数据网络。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包括了各种分布式锁的实现。
官网:https://redisson.pro/docs/
入门
1.引入依赖:
1 | <dependency> |
2.配置Redisson客户端
1 |
|
3.使用Redisson的分布式锁
1 |
|
Redisson可重入锁原理
重置锁有效期是为了给后续业务留下时间。
以上存在着多个步骤,要采用lua脚本实现,确保原子性。
Redisson分布式锁原理
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {}
参数:
- waitTime:尝试获取锁的最大等待时间
- 如果锁当前被其他线程持有,调用线程会持续尝试获取锁,直到超过
waitTime
设定的时长。若在此期间仍未获得锁,则返回false
。
- 如果锁当前被其他线程持有,调用线程会持续尝试获取锁,直到超过
- leaseTime:锁的持有时间
- 成功获取锁后,锁会在
leaseTime
时长后自动释放,即使业务逻辑未执行完毕。若leaseTime=-1
,则锁不会自动释放,需通过unlock()
显式释放,此时Redisson的看门狗机制(WatchDog)会自动续期锁,防止死锁。
- 成功获取锁后,锁会在
- unit:时间单位,用于统一
waitTime
和leaseTime
的时间尺度
源码追溯(看门狗)
基于jdk1.8
tryLock()原理
RedissonLock.class中:
1 | public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException { |
点击tryLock()
:point_down:
RedissonLock.class中:
1 | public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { |
点击tryAcquire()
:point_down:
RedissonLock.class中:
1 | private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { |
点击tryAcquireAsync()
:point_down:
RedissonLock.class中:
1 | private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { |
追溯getLockWatchdogTimeout()
:point_down:
1 | // 以下是在Config.class中 |
点击tryAcquireAsync中的tryLockInnerAsync
:point_down:
RedissonLock.class中:
1 | <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { |
往回倒到tryAcquireAsync()中
:point_up:
1 | private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { |
往回倒到tryAcquire()中
:point_up:
1 | private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { |
往回倒到tryLock()中
:point_up:
就是最初进来的地方
1 | public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { |
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间
Redisson的multiLock原理
Redisson分布式锁主从一致性问题:
每一个节点都获取锁才算成功。