基础篇

初识Redis

NoSQL

NoSql可以翻译做Not Only Sql(不仅仅是SQL),或者是No Sql(非Sql的)数据库。是相对于传统关系型数据库而言,有很大差异的一种特殊的数据库,因此也称之为非关系型数据库

结构化与非结构化

结构化:传统关系型数据库是结构化数据,每一张表都有严格的约束信息:字段名.字段数据类型.字段约束等等信息,插入的数据必须遵守这些约束。(缺点:表不能随意修改)

非结构化:对数据库格式没有严格约束,往往形式松散,自由。

  • 键值型:Redis
  • 文档型(字段约束非常松散):MongoDB
  • 列类型:HBase
  • 图格式:Neo4j

关联与非关联

传统数据库的表与表之间往往存在关联,例如外键。

而非关系型数据库不存在关联关系,要维护关系要么靠代码中的业务逻辑,要么靠数据之间的耦合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
id: 1,
name: "张三",
orders: [
{
id: 1,
item: {
id: 10, title: "荣耀6", price: 4999
}
},
{
id: 2,
item: {
id: 20, title: "小米11", price: 3999
}
}
]
}

此处要维护“张三”的订单与商品“荣耀”和“小米11”的关系,不得不冗余的将这两个商品保存在张三的订单文档中,不够优雅。还是建议用业务来维护关联关系。

查询方式

传统关系型数据库会基于Sql语句做查询,语法有统一标准;

而不同的非关系数据库查询语法差异极大,五花八门各种各样。

事务

传统关系型数据库能满足事务ACID的原则。

而非关系型数据库往往不支持事务,或者不能严格保证ACID的特性,特性:BASE,只能实现基本的一致性。

总结

SQL NoSQL
数据结构 结构化 非结构化
数据关联 关联的 非关联的
查询方式 SQL查询 非SQl
事务特性 ACID BASE
存储方式 磁盘 内存
扩展性 垂直 水平
使用场景 1)数据结构固定
2)相关业务对数据安全性、一致性要求较高
1)数据结构不固定
2)对一致性、安全性要求不高
3)对性能要求

认识Redis

特征:

  • 键值(key-value)型,value支持多种不同数据结构,功能丰富
  • 单线程,每个命令具备原子性
  • 低延迟,速度快(基于内存、IO多路复用、良好的编码)
  • 支持数据持久化
  • 支持主从集群、分片集群
  • 支持多语言客户端(java、Python、c等)

安装Redis

Redis安装说明

大多数企业都是基于Linux服务器来部署项目,而且Redis官方也没有提供Windows版本的安装包。因此课程中我们会基于Linux系统来安装Redis.

此处选择的Linux版本为CentOS 7

Redis的官方网站地址:https://redis.io/

单机安装Redis

步骤1.安装Redis依赖

Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖:

1
yum install -y gcc tcl
步骤2.上传安装包并解压

然后将Redis安装包上传到虚拟机的任意目录:

例如,放到了/usr/local/src 目录:

解压缩:

1
tar -xzf redis-6.2.6.tar.gz

解压后:

进入redis目录:

1
cd redis-6.2.6

运行编译命令:

1
make && make install

如果没有出错,应该就安装成功了。

默认的安装路径是在 /usr/local/bin目录下:

该目录以及默认配置到环境变量,因此可以在任意目录下运行这些命令。其中:

  • redis-cli:是redis提供的命令行客户端
  • redis-server:是redis的服务端启动脚本
  • redis-sentinel:是redis的哨兵启动脚本
步骤3.启动

redis的启动方式有很多种,例如:

  • 默认启动
  • 指定配置启动
  • 开机自启
默认启动

安装完成后,在任意目录输入redis-server命令即可启动Redis:

1
redis-server

如图:

这种启动属于前台启动,会阻塞整个会话窗口,窗口关闭或者按下CTRL + C则Redis停止。不推荐使用。

指定配置启动

如果要让Redis以后台方式启动,则必须修改Redis配置文件,就在我们之前解压的redis安装包下(/usr/local/src/redis-6.2.6),名字叫redis.conf:

我们先将这个配置文件备份一份:

1
cp redis.conf redis.conf.bck

然后修改redis.conf文件中的一些配置:

1
2
3
4
5
6
# 监听的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0
bind 0.0.0.0
# 守护进程,修改为yes后即可后台运行
daemonize yes
# 密码,设置后访问Redis必须输入密码
requirepass 123321

Redis的其它常见配置:

1
2
3
4
5
6
7
8
9
10
# 监听的端口
port 6379
# 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
dir .
# 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
databases 1
# 设置redis能够使用的最大内存
maxmemory 512mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"

启动Redis:

1
2
3
4
# 进入redis安装目录 
cd /usr/local/src/redis-6.2.6
# 启动
redis-server redis.conf

停止服务:

1
2
3
# 利用redis-cli来执行 shutdown 命令,即可停止 Redis 服务,
# 因为之前配置了密码,因此需要通过 -u 来指定密码
redis-cli -u 123321 shutdown
开机自启

我们也可以通过配置来实现开机自启。

首先,新建一个系统服务文件:

1
vi /etc/systemd/system/redis.service

内容如下:

1
2
3
4
5
6
7
8
9
10
11
[Unit]
Description=redis-server
After=network.target

[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf
PrivateTmp=true

[Install]
WantedBy=multi-user.target

然后重载系统服务:

1
systemctl daemon-reload

现在,我们可以用下面这组命令来操作redis了:

1
2
3
4
5
6
7
8
# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis

执行下面的命令,可以让redis开机自启:

1
systemctl enable redis

Redis命令行客户端

Redis安装完成后就自带了命令行客户端:redis-cli,使用方式如下:

1
redis-cli [options] [commonds]

其中常见的options有:

  • -h 127.0.0.1:指定要连接的redis节点的IP地址,默认是127.0.0.1
  • -p 6379:指定要连接的redis节点的端口,默认是6379
  • -a 123321:指定redis的访问密码

其中的commonds就是Redis的操作命令,例如:

  • ping:与redis服务端做心跳测试,服务端正常会返回pong

不指定commonds时,会进入redis-cli的交互控制台:

图形化桌面客户端

GitHub上的大神编写了Redis的图形化桌面客户端,地址:https://github.com/uglide/RedisDesktopManager

不过该仓库提供的是RedisDesktopManager的源码,并未提供windows安装包。

在下面这个仓库可以找到安装包:https://github.com/lework/RedisDesktopManager-Windows/releases,然后安装

建立连接

点击左上角的连接到Redis服务器按钮:

在弹出的窗口中填写Redis服务信息:

点击测试连接,看连接是否成功。

注意:需要先设置虚拟机的防火墙

1
2
sudo firewall-cmd --permanent --add-port=6379/tcp 
sudo firewall-cmd --reload

点击确定后,在左侧菜单会出现这个链接:

点击即可建立连接了:

Redis默认有16个仓库,编号从0至15. 通过配置文件可以设置仓库数量,但是不超过16,并且不能自定义仓库名称。

如果是基于redis-cli连接Redis服务,可以通过select命令来选择数据库:

1
2
# 选择 0号库
select 0

Redis命令

数据结构介绍

Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样:

GEO:地理位置

贴心小建议:命令不要死记,学会查询就好啦

Redis为了方便我们学习,将操作不同数据类型的命令也做了分组,在官网( https://redis.io/commands )可以查看到不同的命令。

或者通过Help命令来帮助我们去查看命令:

通用命令

通用指令是部分数据类型的都可以使用的指令,可以通过help @generic查看有哪些通用命令,常见的有:

  • KEYS:查看符合模板的所有key

    • ```sh 127.0.0.1:6379> keys *
      1. "name"
      2. "age" 127.0.0.1:6379>

      查询以a开头的key

      127.0.0.1:6379> keys a*
      1. "age" 127.0.0.1:6379>
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28

        **贴心小提示:在生产环境下,不推荐使用keys 命令,因为这个命令在key过多的情况下,效率不高,模糊查询**

        - DEL:删除一个指定的key

        - ```sh
        127.0.0.1:6379> del name #删除单个
        (integer) 1 #成功删除1个

        127.0.0.1:6379> keys *
        1) "age"

        127.0.0.1:6379> MSET k1 v1 k2 v2 k3 v3 #批量添加数据
        OK

        127.0.0.1:6379> keys *
        1) "k3"
        2) "k2"
        3) "k1"
        4) "age"

        127.0.0.1:6379> del k1 k2 k3 k4
        (integer) 3 #此处返回的是成功删除的key,由于redis中只有k1,k2,k3 所以只成功删除3个,最终返回
        127.0.0.1:6379>

        127.0.0.1:6379> keys * #再查询全部的key
        1) "age" #只剩下一个了
        127.0.0.1:6379>
  • EXISTS:判断key是否存在

    • ```sh 127.0.0.1:6379> help EXISTS

      EXISTS key [key ...] summary: Determine if a key exists since: 1.0.0 group: generic

      127.0.0.1:6379> exists age (integer) 1

      127.0.0.1:6379> exists name (integer) 0

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27

      - EXPIRE:给一个key设置有效期,有效期到期时该key会被自动删除(为什么会存在这个命令:因为Redis是在内存中存储的,内存非常宝贵)

      - ```sh
      127.0.0.1:6379> expire age 10
      (integer) 1

      127.0.0.1:6379> ttl age
      (integer) 8

      127.0.0.1:6379> ttl age
      (integer) 6

      127.0.0.1:6379> ttl age
      (integer) -2

      127.0.0.1:6379> ttl age
      (integer) -2 #当这个key过期了,那么此时查询出来就是-2

      127.0.0.1:6379> keys *
      (empty list or set)

      127.0.0.1:6379> set age 10 #如果没有设置过期时间
      OK

      127.0.0.1:6379> ttl age
      (integer) -1 # ttl的返回值就是-1

  • TTL:查看一个KEY的剩余有效期

String命令

String类型,也就是字符串类型,是Redis中最简单的存储类型。

其value是字符串,不过根据字符串的格式不同,又可以分为3类:

  • string:普通字符串
  • int:整数类型,可以做自增,自减操作
  • float:浮点类型,可以做自增,自减操作
KEY VALUE
msg hello world
num 10
score 92.5

String的常见命令有:

  • SET:添加或者修改已经存在的一个String类型的键值对
  • GET:根据key获取String类型的value
  • MSET:批量添加多个String类型的键值对
  • MGET:根据多个key获取多个String类型的value
  • INCR:让一个整型的key自增1
  • INCRBY:让一个整型的key自增并指定步长(也可以是负数,就是自减),例如:incrby num 2 让num值自增2
  • INCRBYFLOAT:让一个浮点类型的数字自增并指定步长
  • SETNX:添加一个String类型的键值对,前提是这个key不存在,否则不执行
    • 其实和set 键 nx效果一样,nx是set的参数
  • SETEX:添加一个String类型的键值对,并且指定有效期
    • 其实和set 键 ex 数字(表时间)

Key的层级结构

问题:Redis没有类似MySQL中的Table的概念,我们该如何区分不同类型的key呢?例如,需要存储用户.商品信息到redis,有一个用户id是1,有一个商品id恰好也是1,此时如果使用id作为key,那就会冲突了,该怎么办?

解决方案:可以通过给key添加前缀加以区分,不过这个前缀不是随便加的,有一定的规范。

Redis的key允许有多个单词形成层级结构,多个单词之间用:隔开,格式如下:项目名:业务名:类型:id。这个格式并非固定,也可以根据自己的需求来删除或添加词条。

例如我们的项目名称叫 heima,有user和product两种不同类型的数据,我们可以这样定义key:

  • user相关的key:heima:user:1

  • product相关的key:heima:product:1

如果Value是一个Java对象,例如一个User对象,则可以将对象序列化为JSON字符串后存储:

KEY VALUE
heima:user:1 {"id":1, "name": "Jack", "age": 21}
heima:product:1 {"id":1, "name": "小米11", "price": 4999}

一旦我们向redis采用这样的方式存储,那么在可视化界面中,redis会以层级结构来进行存储,形成类似于这样的结构,更加方便Redis获取数据。

Hash命令

Hash类型,也叫散列,其value是一个无序字典,类似于Java中的HashMap结构。

String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便:

Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD:

Hash类型的常见命令

  • HSET key field value:添加或者修改hash类型key的field的值

    • ```sh 127.0.0.1:6379> HSET heima:user:3 name Lucy//大key是 heima:user:3 小key是name,小value是Lucy (integer) 1 127.0.0.1:6379> HSET heima:user:3 age 21// 如果操作不存在的数据,则是新增 (integer) 1 127.0.0.1:6379> HSET heima:user:3 age 17 //如果操作存在的数据,则是修改 (integer) 0 127.0.0.1:6379> HGET heima:user:3 name "Lucy" 127.0.0.1:6379> HGET heima:user:3 age "17"
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46

      - HGET key field:获取一个hash类型key的field的值

      - HMSET:批量添加多个hash类型key的field的值

      - HMGET:批量获取多个hash类型key的field的值

      - HGETALL:获取一个hash类型的key中的所有的field和value

      - HKEYS:获取一个hash类型的key中的所有的field

      - HINCRBY:让一个hash类型key的字段值自增并指定步长

      - HSETNX:添加一个hash类型的key的field值,前提是这个field不存在,否则不执行



      ### List命令

      Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构,既可以支持正向检索和也可以支持反向检索。

      特征也与LinkedList类似:

      * 有序
      * 元素可以重复
      * 插入和删除快
      * 查询速度一般

      常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。

      **List的常见命令有:**

      - LPUSH key element ... :向列表左侧插入一个或多个元素
      - LPOP key:移除并返回列表左侧的第一个元素,没有则返回nil
      - RPUSH key element ... :向列表右侧插入一个或多个元素
      - RPOP key:移除并返回列表右侧的第一个元素
      - LRANGE key star end:返回一段角标范围内的所有元素
      - BLPOP和BRPOP:与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil

      * LPUSH和RPUSH

      ```java
      127.0.0.1:6379> LPUSH users 1 2 3
      (integer) 3
      127.0.0.1:6379> RPUSH users 4 5 6
      (integer) 6
  • 以上操作完之后是3 2 1 4 5 6

  • LPOP和RPOP

1
2
3
4
127.0.0.1:6379> LPOP users
"3"
127.0.0.1:6379> RPOP users
"6"
  • 以上操作完之后是2 1 4 5
  • LRANGE
1
2
3
127.0.0.1:6379> LRANGE users 1 2
1) "1"
2) "4"

如何用List结构模拟一个栈?

  • 入口和出口在同一边

如何用List结构模拟一个队列?

  • 入口和出口在不同边

如何用List结构模拟一个阻塞队列?

  • 入口和出口在不同边
  • 出队时采用BLPOP或BRPOP

Set命令

Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:

  • 无序
  • 元素不可重复
  • 查找快
  • 支持交集、并集、差集等功能

Set类型的常见命令

  • SADD key member ... :向set中添加一个或多个元素
  • SREM key member ... : 移除set中的指定元素
  • SCARD key: 返回set中元素的个数
  • SISMEMBER key member:判断一个元素是否存在于set中
  • SMEMBERS:获取set中的所有元素
  • SINTER key1 key2 ... :求key1与key2的交集
  • SDIFF key1 key2 ... :求key1与key2的差集
  • SUNION key1 key2 ..:求key1和key2的并集

SortedSet命令

Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。

SortedSet具备下列特性:

  • 可排序
  • 元素不重复
  • 查询速度快

因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能。

SortedSet的常见命令有:

  • ZADD key score member:添加一个或多个元素到sorted set ,如果已经存在则更新其score值
  • ZREM key member:删除sorted set中的一个指定元素
  • ZSCORE key member : 获取sorted set中的指定元素的score值
  • ZRANK key member:获取sorted set 中的指定元素的排名
  • ZCARD key:获取sorted set中的元素个数
  • ZCOUNT key min max:统计score值在给定范围内的所有元素的个数
  • ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
  • ZRANGE key min max:按照score排序后,获取指定排名范围内的元素
  • ZRANGEBYSCORE key min max:按照score排序后,获取指定score范围内的元素
  • ZDIFF.ZINTER.ZUNION:求差集.交集.并集

注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可,例如:

  • 升序获取sorted set 中的指定元素的排名:ZRANK key member
  • 降序获取sorted set 中的指定元素的排名:ZREVRANK key memeber

Redis的Java客户端-Jedis

快速入门

1.创建工程

2.引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<!--jedis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>/version>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version></version>
<scope>test</scope>
</dependency>

3.建立连接

1
2
3
4
5
6
7
8
9
10
11
12
private Jedis jedis;

@BeforeEach
void setUp() {
// 1.建立连接
// jedis = new Jedis("192.168.150.101", 6379);
jedis = JedisConnectionFactory.getJedis();
// 2.设置密码
jedis.auth("123321");
// 3.选择库
jedis.select(0);
}

4.测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
void testString() {
// 存入数据
String result = jedis.set("name", "虎哥");
System.out.println("result = " + result);
// 获取数据
String name = jedis.get("name");
System.out.println("name = " + name);
}

@Test
void testHash() {
// 插入hash数据
jedis.hset("user:1", "name", "Jack");
jedis.hset("user:1", "age", "21");

// 获取
Map<String, String> map = jedis.hgetAll("user:1");
System.out.println(map);
}

5.释放资源

1
2
3
4
5
6
@AfterEach
void tearDown() {
if (jedis != null) {
jedis.close();
}
}

Jedis连接池

Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此我们推荐大家使用Jedis连接池代替Jedis的直连方式

有关池化思想,并不仅仅是这里会使用,很多地方都有,比如说我们的数据库连接池,比如我们tomcat中的线程池,这些都是池化思想的体现

创建Jedis连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class JedisConnectionFacotry {

private static final JedisPool jedisPool;

static {
//配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(8);
poolConfig.setMaxIdle(8);
poolConfig.setMinIdle(0);
poolConfig.setMaxWait(Duration.ofMillis(1000));
//创建连接池对象
jedisPool = new JedisPool(poolConfig,
"192.168.57.130",6379,1000,"123321");
}

public static Jedis getJedis(){
return jedisPool.getResource();
}
}

说明:

  • JedisConnectionFacotry:工厂设计模式是实际开发中非常常用的一种设计模式,我们可以使用工厂,去降低代的耦合,比如Spring中的Bean的创建,就用到了工厂设计模式

  • 静态代码块:随着类的加载而加载,确保只能执行一次,我们在加载当前工厂类的时候,就可以执行static的操作完成对 连接池的初始化

  • 最后提供返回连接池中连接的方法.

改造原始代码

代码说明:

1.在我们完成了使用工厂设计模式来完成代码的编写之后,我们在获得连接时,就可以通过工厂来获得,而不用直接去new对象,降低耦合,并且使用的还是连接池对象。

2.当我们使用了连接池后,当我们关闭连接其实并不是关闭,而是将Jedis还回连接池的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 @BeforeEach
void setUp(){
//建立连接
/*jedis = new Jedis("127.0.0.1",6379);*/
jedis = JedisConnectionFacotry.getJedis();
//选择库
jedis.select(0);
}

@AfterEach
void tearDown() {
if (jedis != null) {
jedis.close();
}
}

Redis的Java客户端-SpringDataRedis

SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis,官网地址:https://spring.io/projects/spring-data-redis

  • 提供了对不同Redis客户端的整合(Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis哨兵和Redis集群
  • 支持基于Lettuce的响应式编程
  • 支持基于JDK.JSON.字符串.Spring对象的数据序列化及反序列化
  • 支持基于Redis的JDKCollection实现

SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:

快速入门

1.导入依赖

1
2
3
4
5
6
7
8
9
10
<!--redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--common-pool-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

2.配置文件

1
2
3
4
5
6
7
8
9
10
11
12
spring:
data:
redis:
host: 192.168.57.130
port: 6379
password: 123321
lettuce:
pool:
max-active: 8 #最大连接
max-idle: 8 #最大空闲连接
min-idle: 0 #最小空闲连接
max-wait: 1000ms #最小空闲连接

3.测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootTest
class RedisDemoApplicationTests {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Test
void testString() {
// 写入一条String数据
redisTemplate.opsForValue().set("name", "虎哥");
// 获取string数据
Object name = redisTemplate.opsForValue().get("name");
System.out.println("name = " + name);
}
}

数据序列化器

RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化,得到的结果是这样的:

缺点:

  • 可读性差
  • 内存占用较大

我们可以自定义RedisTemplate的序列化方式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
// 创建RedisTemplate对象
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 设置连接工厂
template.setConnectionFactory(connectionFactory);
// 创建JSON序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer =
new GenericJackson2JsonRedisSerializer();
// 设置Key的序列化
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
// 设置Value的序列化
template.setValueSerializer(jsonRedisSerializer);
template.setHashValueSerializer(jsonRedisSerializer);
// 返回
return template;
}
}

这里采用了JSON序列化来代替默认的JDK序列化方式。最终结果如图:

整体可读性有了很大提升,并且能将Java对象自动的序列化为JSON字符串,并且查询时能自动把JSON反序列化为Java对象。不过,其中记录了序列化时对应的class名称,目的是为了查询时实现自动反序列化。这会带来额外的内存开销。

StringRedisTemplate

尽管JSON的序列化方式可以满足我们的需求,但依然存在一些问题,如上图。

为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。

为了减少内存的消耗,我们可以采用手动序列化的方式,换句话说,就是不借助默认的序列化器,而是我们自己来控制序列化的动作,同时,我们只采用String的序列化器,这样,在存储value时,我们就不需要在内存中就不用多存储数据,从而节约我们的内存空间。

这种用法比较普遍,因此SpringDataRedis就提供了RedisTemplate的子类:StringRedisTemplate,它的key和value的序列化方式默认就是String方式。

省去了我们自定义RedisTemplate的序列化方式的步骤,而是直接使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@SpringBootTest
class RedisStringTests {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
void testString() {
// 写入一条String数据
stringRedisTemplate.opsForValue().set("verify:phone:13600527634", "124143");
// 获取string数据
Object name = stringRedisTemplate.opsForValue().get("name");
System.out.println("name = " + name);
}

private static final ObjectMapper mapper = new ObjectMapper();

@Test
void testSaveUser() throws JsonProcessingException {
// 创建对象
User user = new User("虎哥", 21);
// 手动序列化
String json = mapper.writeValueAsString(user);
// 写入数据
stringRedisTemplate.opsForValue().set("user:200", json);

// 获取数据
String jsonUser = stringRedisTemplate.opsForValue().get("user:200");
// 手动反序列化
User user1 = mapper.readValue(jsonUser, User.class);
System.out.println("user1 = " + user1);
}

}

总结:

RedisTemplate的两种序列化实践方案:

  • 方案一:
    • 自定义RedisTemplate
    • 修改RedisTemplate的序列化器为GenericJackson2JsonRedisSerializer
  • 方案二:
    • 使用StringRedisTemplate
    • 写入Redis时,手动把对象序列化为JSON
    • 读取Redis时,手动把读取到的JSON反序列化为对象

Hash结构操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootTest
class RedisStringTests {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
void testHash() {
stringRedisTemplate.opsForHash().put("user:400", "name", "虎哥");
stringRedisTemplate.opsForHash().put("user:400", "age", "21");

Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries("user:400");
System.out.println("entries = " + entries);
}
}

实战篇

短信登录

流程:

集群的session共享问题:

session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。

session的替代方案应该满足:

  • 数据共享
  • 内存存储
  • key、value结构

改进后的流程:

保存用户到Redis中不用手机号作为Redis的key是因为:要返回给前端并放在authorization中,为了以后登录时进行验证,如果用手机号的话容易泄漏。

登录拦截器的优化:

原来的登录拦截器存在的问题:不拦截一些不需要登录的api,比如首页、获取验证码等,如果将刷新token有效期的代码放在这里面,就会导致用户在访问首页或者其他不被拦截的api时token不会被刷线。

改进:

商户查询

什么是缓存

缓存就是数据交换的缓存区(称作Cache),是贮存数据的临时地方,一般读写性能较高。

缓存的作用:

  • 降低后端负载
  • 提高读写效率,降低响应时间

缓存的成本:

  • 数据一致性成本
  • 代码维护成本
  • 运维成本

缓存更新策略

内存淘汰 超时剔除 主动更新
说明 不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。 给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。 编写业务逻辑,在修改数据库的同时,更新缓存。
一致性 一般
维护成本

业务场景:

  • 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存。
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存。

主动更新策略

  • Cache Aside Pattern:由缓存的调用者,在更新数据库的同时更新缓存。(主要使用)
  • Read/Write Through Pattern:缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题。
  • Write Behind Caching Pattern:调用者只操作缓存,由其他线程异步的将缓存数据持久化到数据库,保证最终一致。

Cache Aside Pattern存在的三个问题

  • 删除缓存还是更新缓存?
    • 更新缓存:每次更新数据库都要更新缓存,无效写操作较多
    • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
  • 如何保证缓存与数据库的操作的同时成功或失败?
    • 单体系统,将缓存与数据库操作放在一个事务
    • 分布式系统,利用TCC等分布事务方案
  • 先操作缓存还是先操作数据库?
    • 先删除缓存,再操作数据库
    • 先操作数据库,再删除缓存
先删除缓存,再操作数据库

正常情况:

步骤 缓存 数据库
初始 10 10
1 0 10
2 0 20
3 0 20
4 20 20

异常情况:

步骤 缓存 数据库
初始 10 10
1 0 10
2 0 10
3 10 10
4 10 20

因为更新数据库的操作一般是很慢的,而删除缓存的操作是很快的,很可能线程1删除缓存之后,先让线程2执行。

这种情况发生的可能性比较高。

先操作数据库,再删除缓存

正常情况:

步骤 缓存 数据库
初始 10 10
1 10 20
2 0 20
3 0 20
4 20 20

异常情况(假设线程1查询时,缓存恰好失效了):

步骤 缓存 数据库
初始 10 10
1 0 10
2 0 20
3 0 20
4 10 20

情况发生的条件:

  • 两个线程在并行执行
  • 线程1查询时,缓存恰好失效
  • 线程1查询后,要写入缓存之间(之间微秒级)突然来了一个线程(这里是线程2)要更新数据库,删缓存。

综合分析,可能性比较低。

缓存更新策略的最佳实践

  • 删除缓存还是更新缓存?删除缓存
  • 方案2(即先操作数据库,再删除缓存)发生异常情况的可能性较低。

1.低一致性需求:使用Redis自带的内存淘汰机制

2.高一致性需求:主动更新,并以超时剔除作为兜底方案

  • 读操作:
    • 缓存命中则直接返回
    • 缓存未命中则查询数据库,并写入缓存,设定超时时间
  • 写操作:
    • 先写数据库,然后再删除缓存
    • 要确保数据库与缓存操作的原子性

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

危害:

常见的解决方案有两种:

  • 缓存空对象
    • 优点:实现简单,维护方便
    • 缺点:
      • 额外的内存消耗(比如请求一堆不存在的id,就会导致缓存中存储大量不存在id的null对象。解决办法:设置较短的TTL)
      • 可能造成短期的不一致(比如请求一个不存在的id,然后缓存中存储了它的null对象,然后插入了这个id的商铺,再查询这个id,查到的是缓存中的null对象。解决办法:插入这条数据的时候,主动在缓存中覆盖数据)
  • 布隆过滤
    • 原理:可以简单理解成一个byte数组,把数据库中的数据基于某一种hash算法计算出hash值,然后将这些hash值转换成二进制保存到布隆过滤器里。判断数据是否存在是判断对应的位置是0还是1。这种存在判断是概率上的统计,不是百分百的准确。重点:当布隆过滤器说不存在时,百分百不存在;当布隆过滤器说存在时,不一定存在。
    • 优点:内存占用较少,没有多余key
    • 缺点:
      • 实现复杂
      • 存在误判可能(当布隆过滤器说不存在时,百分百不存在;当布隆过滤器说存在时,不一定存在。)

解决商铺查询的缓存穿透问题

总结

缓存穿透产生的原因是什么?

用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些?

  • 缓存null值(被动)
  • 布隆过滤(被动)
  • 增强id的复杂度,避免被猜测id规律
  • 做好数据的基础格式校验(比如0这种id)
  • 加强用户权限校验(要登录才能访问,或者访问频率限制)
  • 做好热点参数的限流

缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案:

  • 给不同的Key的TTL添加随机值(针对同一时段大量的缓存key同时失效)
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

缓存击穿

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

图中查询数据库,重建缓存的线段比较长,表示所需时间比较长。

常见的解决方案有两种:

  • 互斥锁(只有一个线程能获取锁,其他线程都需要等待,所以性能上会比较差)
  • 逻辑过期(向redis存储数据时,不设置TTL,逻辑过期不是真正的过期,它要求我们在存储数据到redis的时候,额外的要添加一个过期时间的字段,这个key本身是不用去设置ttl的,所以它的过期时间不是由redis控制的,而是由我们程序员自己去判断它是否过期)
解决方案 优点 缺点
互斥锁 1.没有额外的内存消耗
2.保证一致性
3.实现简单
1.线程需要等待,性能受影响
2.可能有死锁风险
逻辑过期 线程无需等待,性能较好 1.不保证一致性
2.有额外内存消耗
3.实现复杂

基于互斥锁方式解决缓存击穿问题

利用Redis中string的setnx命令来实现:setnx,对key进行赋值(当且仅当key不存在时赋值)

基于逻辑过期方式解决缓存击穿问题

优惠券秒杀

全局ID生成器

当用户抢购时,就会生成订单保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显(用户可以间接知晓每日销量)
  • 受单表数据量的限制(订单特别多,不能用单表存储,用多表存储,每个表id自增会重复)

全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息:

使用数值类型存储(Long类型,8Byte)

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

全局唯一ID生成策略:

  • UUID(生成的是十六进制的数值,返回的是字符串结构,没有自增)
  • Redis自增
  • snowflake算法(雪花算法)
  • 数据库自增(用一张表专门做自增)

Redis自增ID策略:

  • 每天一个key,方便统计订单量
  • ID构造是时间戳+计数器

实现秒杀下单

实现优惠券秒杀下单功能

下单需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

超卖问题

原库存是1

超卖问题是点选的多线程安全问题,针对这一问题的常见解决方案就是加锁。

悲观锁和乐观锁

悲观锁:

认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。

  • 例如Synchronized、Lock都属于悲观锁

乐观锁:

认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其他线程对数据做了修改。

  • 如果没有修改则认为是安全的,自己才更新数据。
  • 如果已经被其他线程修改说明发生了安全问题,此时可以重试或异常。
乐观锁

乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见的方式有两种:

1.版本号法:

原理:查询的版本号和修改时的版本号有没有变,每次修改版本号+1。

步骤 id stock version
初始 10 1 1
线程1第一步 10 1 1
线程2第一步 10 1 1
线程1第二步 10 0 2
线程2第二步 10 0 2

改进:每次版本号变化的时候库存也会变化,所以可以将库存作为版本号,即每次修改的时候看与查询的时候库存有没有变,没变则修改,也就是下面的CAS法。

2.CAS法 (compare and set)

乐观锁解决超卖问题
1
2
3
4
5
6
7
8
9
10
11
// 5.扣减库存--修改前
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId) // where id = ?
.update();

// 5.扣减库存--修改后
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock", voucher.getStock()) // where id = ? and stock = ?
.update();

使用上面的CAS法改写代码,会发现失败率大大增加。

原因是:假设库存100,有100个线程同步执行,全部查询得到的库存都是100,最先执行的那个线程执行后库存数为99,其他99个线程查询到库存为99(与一开始查询到的库存100不同,不执行)

这里是有并发,但是没有业务安全问题,因为只要大于0就行执行。

乐观锁问题:成功率太低。

改进:

只要库存大于0就能执行。

总结

悲观锁:添加同步锁,让线程串行执行

  • 优点:简单粗暴
  • 性能一般

乐观锁:不加锁,在更新时判断是否有其他线程在修改

  • 优点:性能好
  • 缺点:存在成功率低的问题

一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

并发安全问题

主要是集群中出现的。

假设图中的灰框表示一个单体

每个jvm中都有自己的锁,两个不同的锁监视器,当前jvm的锁监视器只在当前jvm中有效,所以不同单体的线程各自能获取各自的锁。

分布式锁

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

MySQL Redis Zookeeper
互斥 利用MySQL本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

zookeeper创建节点都是唯一的并且id单调递增。

节点的有序性:每创建一个节点,节点的id都是递增的。现在假设很多线程在zookeeper里创建节点,这样id是单调递增的,我们可以约定id最小的那个获取锁成功,这样就实现了互斥。

释放锁:删除该节点即可,这样就不是最小的了。

基于Redis的分布式锁

实现分布式锁需要实现两个基本方法:

  • 获取锁

    • 互斥:

      1
      2
      # 添加锁,利用setnx的互斥性质,lock是key,thread1是value,nx是互斥,ex是设置超时时间,set + nx = setnx
      SET lock thread1 NX EX 10
  • 释放锁

    • 手动释放:

      1
      2
      # 释放锁,删除即可,lock是key
      DEL lock
    • 超时释放:获取锁时添加一个超时时间

      1
      2
      # 释放锁,删除即可
      DEL lock

Redis分布式误删问题

极端情况:

线程1获取了锁,但是在获取锁后业务发生了阻塞,阻塞时间甚至比锁的超时时间还久(设置锁的时候,设置了超时时间expire ex),这个时候锁提前释放,但是线程1还没执行完。假设这个时候线程2获取了锁,在线程2执行的过程中,线程1业务执行完了,会执行释放锁,也就将线程2的锁释放了。以此类推,当线程3获取了释放的锁,线程2也会释放锁。

解决方案

释放锁的时候查看锁的标识和当前标识是否一致(之前获取锁存了线程id)。

业务流程图变化:

需求:修改之前的分布式锁实现,满足:

1.在获取锁时存入线程提示(可以用UUID表示,不用线程ID的原因:jvm每创建一个线程,ID都会递增。如果是在集群的模式下,有多个jvm,所以线程的id很可能冲突)

2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

  • 如果一致则释放锁
  • 如果不一致则不释放锁

分布式锁的原子性问题

接着上面的思路,假设一种极端情况,当线程1获取锁后执行业务后,判断当前标示和锁中标示一致后,开始释放锁,但是释放锁这个动作被阻塞了(这是很可能存在的,因为jvm的垃圾回收机制,当jvm去做full GC的时候,会阻塞所有的代码。)当释放锁这个被阻塞的时间足够长,超过锁的超时时间时,会释放锁,其他线程也就能获取锁了,但线程1释放锁的动作阻塞结束,它也就会执行释放锁的动作,也就把线程2的锁释放了。(在之前的代码中,key是一样的,value是uuid+线程id)

以下是之前的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SimpleRedisLock implements ILock {

private String name;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程表示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
// 有自动拆箱的动作,万一是空的会出错,所以要用以下语句
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

解决方案:

将判断标示的动作和释放锁的动作整合成一个原子动作。

redis中的事务无法看到中间结果,所以我们用Lua脚本。

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令(Redis时在执行他们时,会一次性全部执行),确保多条命令执行时的原子性。

介绍一下lua中Redis提供的调用函数,语法如下:

1
2
-- 执行set name jack
redis.call('set', 'name', 'jack')
1
2
3
4
5
6
7
-- 例如,我们要先执行set name Rose,再执行get name,则脚本如下:
-- 先执行 set name jack
redis.call('set', 'name', 'jaclk')
-- 再执行get name
local name = redis.call('get', 'name')
-- 返回
return name

写好脚本后,需要用Redis命令来调用脚本,常见命令如下:

1
2
3
4
5
127.0.0.1:6379> help @scripting

EVAL script numkeys key [key ...] arg [arg ...]
summary: Execute a Lua script server side
since: 2.6.0

例如,我们要执行redis.call('set', 'name', 'jack')这个脚本,语法如下:

1
2
# 调用脚本 0代表脚本需要的key类型的参数个数
EVAL "return redis.call('set', 'name', 'jack')" 0

如果脚本中的key,value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

1
2
# 调用脚本
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose

注意:lua语言中数组下标是从1开始的,不是0

解决:

总结

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用set nx满足互斥性
  • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用Redis集群保证高可用和高并发特性

Redisson

基于Redis的分布式锁优化

基于setnx实现的分布式锁存在下面的问题:

  • 不可重入:同一个线程无法多次获取同一把锁
    • 可重入:同一个线程可以多次获取同一把锁。比如一个方法a要去调一个方法b,在方法a中要先去获取锁,然后执行业务去调b,而b里又要去获取同一把锁。这种情况,如果锁是不可重入的,方法b中获取锁显然是会失败的,此时b会等待锁的释放,而锁又无法释放,因为方法a还没执行完,于是出现了死锁
  • 不可重试:获取锁只尝试一次就返回false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
    • 锁时间设置得太短,业务还么执行完,锁就释放了,导致其他业务也可能在同步执行中;如果设置过长,会导致其他业务等待时间长,阻塞周期长。
  • 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机,如果从并同步主中的锁数据,则会出现锁实现。
    • 比如有一个线程在主节点获取了锁(因为是用setnx实现,实际上是一个写操作),尚未将这个写操作同步给从节点时,存在延迟,突然主节点就宕机了,此时会选择一个从节点作为主节点,因为这个从节点尚未完成同步,没有锁这个标识,所以其他线程可以趁虚而入,拿到锁。

介绍

Redisson是一个在Redis的基础上实现的Java驻内存数据网络。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包括了各种分布式锁的实现。

官网:https://redisson.pro/docs/

入门

1.引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

2.配置Redisson客户端

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class RedisConfig {
@Bean
public Redissonclient redissonclient(){
//配置类
Config config = new Config();
//添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassowrd("123321");
//创建客户端
return Redisson.create(config);
}
}

3.使用Redisson的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
//获取锁(可重入),指定锁的名称
RLock lock = redissonclient.getLock("anyLock");
//尝试获取锁,参数分别是,获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断释放政取成功
if(isLock){
try {
System.out.println("执行业务");
}finally {
//释放锁
lock.unlock();
}
}
}

Redisson可重入锁原理

重置锁有效期是为了给后续业务留下时间。

以上存在着多个步骤,要采用lua脚本实现,确保原子性。

Redisson分布式锁原理

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {}

参数:

  • waitTime:尝试获取锁的最大等待时间
    • 如果锁当前被其他线程持有,调用线程会持续尝试获取锁,直到超过waitTime设定的时长。若在此期间仍未获得锁,则返回false
  • leaseTime:锁的持有时间
    • 成功获取锁后,锁会在leaseTime时长后自动释放,即使业务逻辑未执行完毕。若leaseTime=-1,则锁不会自动释放,需通过unlock()显式释放,此时Redisson的看门狗机制(WatchDog)会自动续期锁,防止死锁。
  • unit:时间单位,用于统一waitTimeleaseTime的时间尺度
源码追溯(看门狗)

基于jdk1.8

tryLock()原理

RedissonLock.class中:

1
2
3
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return this.tryLock(waitTime, -1L, unit);
}

点击tryLock()

:point_down:

RedissonLock.class中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime); // 可以看到先将等待时间转换成了毫秒
long current = System.currentTimeMillis(); // 得到当前时间
long threadId = Thread.currentThread().getId(); // 得到线程id,其实就是以后锁里的线程标识
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); // 尝试获取锁
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}

});
}

this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}

currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);

this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}

点击tryAcquire()

:point_down:

RedissonLock.class中:

1
2
3
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

点击tryAcquireAsync()

:point_down:

RedissonLock.class中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
// 如果leaseTime有值,就用给的leaseTime值
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 无值,就使用默认值,getLockWatchdogTimeout(),看门狗超时时间
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}

}
});
return ttlRemainingFuture;
}
}

追溯getLockWatchdogTimeout()

:point_down:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 以下是在Config.class中
public long getLockWatchdogTimeout() {
return this.lockWatchdogTimeout;
}

// 点击lockWatchdogTimeout,以下是在Config.class中
private long lockWatchdogTimeout;
private boolean keepPubSubOrder;
private boolean decodeInExecutor;
private boolean useScriptCache;
private int minCleanUpDelay;
private int maxCleanUpDelay;
private int cleanUpKeysAmount;
private NettyHook nettyHook;
private boolean useThreadClassLoader;
private AddressResolverGroupFactory addressResolverGroupFactory;

public Config() {
this.transportMode = TransportMode.NIO;
this.lockWatchdogTimeout = 30000L; // 可以看到,默认时间是30s
this.keepPubSubOrder = true;
this.decodeInExecutor = false;
this.useScriptCache = false;
this.minCleanUpDelay = 5;
this.maxCleanUpDelay = 1800;
this.cleanUpKeysAmount = 100;
this.nettyHook = new DefaultNettyHook();
this.useThreadClassLoader = true;
this.addressResolverGroupFactory = new DnsAddressResolverGroupFactory();
}

点击tryAcquireAsync中的tryLockInnerAsync

:point_down:

RedissonLock.class中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
// 先把锁释放时间记录到本地的一个成员变量,叫做内部锁释放时间
this.internalLockLeaseTime = unit.toMillis(leaseTime);
// 接着执行以下这段脚本
/**
脚本先是判断锁是否存在,如果不存在,就去记录锁的标识并且次数加一,然后设置锁的有效期,返回nil;
如果存在,就先判断锁是不是自己的,如果是自己的,锁重试次数加一,并且设置有效期,返回nil;
以上两种情况都是获取锁成功的情况,返回的都是nil,失败了返回一个结果,pttl和ttl效果类似,都是获取指定key剩余有效期,只不过ttl返回的是以秒为单位,而pttl返回的是以毫秒为单位。
所以失败返回的是锁的剩余有效期。
*/
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}

private CommandBatchService createCommandBatchService() {
if (this.commandExecutor instanceof CommandBatchService) {
return (CommandBatchService)this.commandExecutor;
} else {
MasterSlaveEntry entry = this.commandExecutor.getConnectionManager().getEntry(this.getName());
BatchOptions options = BatchOptions.defaults().syncSlaves(entry.getAvailableSlaves(), 1L, TimeUnit.SECONDS);
return new CommandBatchService(this.commandExecutor.getConnectionManager(), options);
}
}

往回倒到tryAcquireAsync()中

:point_up:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
// 如果leaseTime有值,就用给的leaseTime值
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 无值,就使用默认值,getLockWatchdogTimeout(),看门狗超时时间
/*
从上面的分析知,tryLockInnerAsync返回的是一个剩余有效期,这个结果是封装在Future里面的,因为这个函数是一个异步函数(Async),代表函数执行完代表命令发出去了,结果拿没拿到还不清楚
*/
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}

}
});
// 最终返回的也是一个Future
return ttlRemainingFuture;
}
}

往回倒到tryAcquire()中

:point_up:

1
2
3
4
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 此处有个get,其实就是阻塞等待Future的结果,等待里面得到的剩余有效期
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

往回倒到tryLock()中

:point_up:

就是最初进来的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime); // 可以看到先将等待时间转换成了毫秒
long current = System.currentTimeMillis(); // 得到当前时间
long threadId = Thread.currentThread().getId(); // 得到线程id,其实就是以后锁里的线程标识
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); // 尝试获取锁,得到的结果有两种,一种是null,代表获取锁成功;一种是锁的剩余有效期,代表获取锁失败。
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}

});
}

this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}

time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}

currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}

time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);

this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间

Redisson的multiLock原理

Redisson分布式锁主从一致性问题:

每一个节点都获取锁才算成功。