点评项目
点评项目
短信登录功能
基于Session实现登录
Session原理
- Session会为每一次会话分配一个Session对象
- 同一个浏览器发起的多次请求,同属于一次会话(Session)
- 首次使用到Session时,服务器会自动创建Session,并创建Cookie存储SessionId发送回客户端
Session在项目中的使用
- 保存验证码到session:session.setAttribute(“code”, code)
- 校验验证码:session.getAttribute(“code”)获取前面生成的验证码,与提交表单中的验证码进行比较,相同则说明输入正确
- 保存用户到session:session.setAttribute(“user”,BeanUtil.copyProperties(user,UserDTO.class))
- 其中这里保存的并非User对象,而是UserDTO对象
- 这是因为user对象中包含密码等敏感信息,若直接将User对象保存到session不安全,故将UserDTO对象保存到session中,用于后面的登录校验
登录校验功能
登录校验功能用于检验用户的登录状态,因为某些功能需要用户登录才能实现,比如:用户主页、用户私信。但是每次执行这些功能,都需要在对应功能的service实现类中实现从session中获取用户的操作,这样会很麻烦。
于是,我们选择在前面加一层拦截器,用于实现session中获取用户的操作,具体实现如下:
拦截器的实现
我们用于实现登录状态校验的拦截器LoginInterceptor,需要实现HandlerInterceptor,拦截器的执行流程分为三个主要阶段:preHandle、postHandle和afterCompletion。
- 在请求处理的前期,preHandle方法会被调用,如果返回true,则继续执行后续的拦截器和请求处理器;如果返回false,则中断执行流程。
- 在请求处理完毕后,postHandle方法会被调用,用于进行后处理操作。
- 最后,在请求完成后,afterCompletion方法会被调用,用于进行资源清理等操作。
拦截器的注册
1 |
|
保存用户到ThreadLocal
ThreadLocal的三个方法:
1 |
|
所以我们可以借助这个ThreadLocal来存储登录用户的信息,在一个请求中,所有调用的方法都在同一个线程中去处理,这样就实现了在任何地方都可以获取到用户信息了,从而摆脱了HttpServletRequest的束缚。
基于MybatisPlus实现的增删改查操作
项目中使用MybatisPlus,简化了对数据库的增删改查操作,这是由service实现类通过继承MybatisPlus的ServiceImpl<UserMapper, User>实现的
根据手机号查询用户:User user = query().eq(“phone”, phone).one()
根据手机号生成新用户:
1
2
3
4
5
6// 1.创建用户
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
// 2.保存用户
save(user);
集群的Session共享问题
当使用Tomcat集群时,多台Tomcat并不共享session的存储空间,当请求切换到不同tomcat服务时会导致数据丢失问题。如:用户登录时,使用Tomcat1,其对应的用户登录信息存储在Tomcat1的session中,当用户访问其他功能时,系统负载均衡分配了Tomcat2,这时的Tomcat2中不存在该用户存在session中的信息,于是会导致用户即使刚刚才登录,此时又需要再登录一遍。
session的替代方案应满足如下需求:
- 数据共享
- 内存存储:session是基于内存存储的,它的读写效率高。而登录校验的访问频率非常高
- key、value结构
——Redis
基于Redis实现共享session登录
redis在项目中的使用
保存验证码到redis:stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES),key设置为如上格式,能够清晰读懂redis各条键值对的功能,同时设置验证码的过期时间
校验验证码:stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone)
保存用户到redis:以随机token为key存储用户数据,用户对象使用Hash类型存储
1
2
3
4
5
6
7
8
9
10
11
12// 7.保存用户信息到redis中
// 7.1随机生成token作为登陆令牌
String token = UUID.randomUUID().toString(true);
// 7.2 将User对象转换为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);
userMap.replace("id", userDTO.getId().toString());
// 7.3存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 7.4设置有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);刷新过期时间:在拦截器中更新用户的过期时间
通过上述改动,我们完成了使用redis实现共享session登录
刷新过期时间问题
因为刷新过期时间是在拦截器中实现的,因此只有请求通过拦截器才会刷新过期时间,即使用户一直浏览主界面也不会刷新过期时间。为了解决这个问题,我们又添加了一个拦截器在前面
- 第一个拦截器RefreshTokenInterceptor的核心工作是得到用户、保存起来并刷新过期时间
- 第二个拦截器LoginInterceptor的核心工作是对不存在的用户进行拦截
商户查询缓存
添加商户缓存
使用redis中的string类型存储商铺缓存:
- 若redis中存在要查询的商户,利用redis查到的json字符串结合hutool中的JSONUtil工具实现:Shop shop = JSONUtil.toBean(shopJson, Shop.class)
- 若redis中不存在要查询的商户,到数据库中去查找,将找到的Shop对象通过hutool中的JSONUtil工具转换为json字符串:JSONUtil.toJsonStr(shop)
添加商户类型缓存
查询商户类型返回的是多个商户类型,即List
先在缓存中查询:List
shopTypeJsonList = stringRedisTemplate.opsForList().range(CACHE_SHOPTYPE_LIST, 0, -1),0、-1表示从头查到尾==(注意,list类型查询返回只能得到一个List 类型的集合,因此我们每一个对象都要先转为json字符串再存储)== 若redis中存在,即shopTypeJsonList非空,则将这个json字符串集合一个个转换为ShopType对象后,返回一个ShopTypeList集合
若redis中不存在,则需要到数据库中查找,注意,我们需要按照“sort”字段排序查找
查找到之后,将这些ShopType对象的集合一个个push到redis中去:
1
2
3for(ShopType shopType : shopTypeList) {
stringRedisTemplate.opsForList().rightPush(CACHE_SHOPTYPE_LIST, JSONUtil.toJsonStr(shopType));
}最后返回一个ShopTypeList集合
缓存更新策略
- 低一致性需求:使用内存淘汰机制。如商户类型查询的缓存。
- 高一致性需求:主动更新,并以超时剔除作为兜底方案。如店铺详情查询的缓存。
操作缓存和数据库需要考虑的问题
- 删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效的写操作较多。
- ==删除缓存==:更新数据库时让缓存失效,查询时再更新缓存。
- 如何保证缓存和数据库的操作同时成功或失败?
- 单体系统:将缓存和数据库的操作放在同一个事务内。如放在同一个方法内,并用**@Transactional**修饰这个方法。
- 分布式系统:利用TCC等分布式事务。
- 先操作缓存还是先操作数据库?
- 先删除缓存再操作数据库
- ==先操作数据库再删除缓存==
- 左边为正常情况,右边为异常情况
- 相比之下,前者出现异常情况的概率较大,而后者出现异常情况的概率较小,这是因为更新数据库的耗时相对而言较长导致的,因此可以选择后者。
缓存穿透
缓存穿透是指客户端请求的数据在缓存和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。常见的解决方案有两种:
缓存空对象:将这种数据在redis中使用一个空对象如 “” 缓存起来,并设置一个较短的TTL,当下次再有这样的请求过来时就不会打到数据库了。
- 优点:实现简单,维护方便
- 缺点:额外的内存消耗、可能造成短期的不一致
布隆过滤
- 布隆过滤算法是一种数据统计算法,用于检索一个元素是否在集合中。但是布隆过滤无需存储元素到集合,而是把元素映射到一个很长的二进制数位上。利用布隆过滤算法,在请求进入redis之前先判断是否存在,如果不存在则直接拒绝请求
- 优点:内存占用较少,没有多余的key
- 缺点:实现复杂、存在误判可能
其他
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者redis服务宕机,导致大量请求打到数据库,带来巨大压力。解决方案:
- 给不同的key的TTL添加随机值
- 利用redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
缓存击穿

缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务比较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大冲击。常见的解决方案有两种:

- 互斥锁:给第一个未命中缓存的线程加锁、查询数据库并写入缓存后再释放锁,其他线程在此期间需要等待。
- 优点:1. 没有额外的内存消耗 2. 保证一致性 3. 实现简单
- 缺点:1. 线程需要等待,性能受影响 2. 可能有死锁风险
- 逻辑过期:不设置TTL,而是设置一个逻辑过期时间,首个发现逻辑时间过期的线程会开启一个新的线程用于更新数据,其本身以及在此期间查询的其他线程则会返回当下的过期数据。
- 优点:线程无需等待,性能良好
- 缺点:1. 不保证一致性 2. 有额外内存消耗 3. 实现复杂
利用互斥锁解决缓存击穿问题

- 利用redis中的==setnx(SET if Not eXists)命令==实现互斥锁。
- setnx基本语法:SETNX KEY_NAME VALUE 在指定的 key 不存在时,为 key 设置指定的值,返回设置的值;若指定的key存在时,返回0。
- Java中如何使用setnx方法实现获取互斥锁?stringRedisTemplate.opsForValue().setIfAbsent,这个方法返回一个Boolean值,在需要获取互斥锁时,在redis中设置setnx lockKey 1(TTL = 20s),如果设置成功,则返回true;而其他想要获取互斥锁的线程都会在setnx lockKey 1这一步中被堵住,故只有一个最早的线程能够到达后面的数据库,并进行缓存重建。
- Java中如何使用setnx方法实现释放互斥锁?只需要进行缓存重建的线程在重建完毕后,将lockKey删除即可,这样后面想要获取互斥锁的线程就能够得到互斥锁了。注意,为了避免互斥锁无法释放,一般将释放锁操作放在finally代码块中执行。
- 由于缓存重建的过程需要先查找数据库,再写入redis,这一过程需要花费一定时间,在这段时间中,因为缓存未命中而想要重建缓存的线程都会被互斥锁挡住,直到缓存重建完毕,因此不会存在不一致的情况,即这段时间内所有的线程返回的值都是缓存重建完后的值。
利用逻辑过期解决缓存击穿问题

- 因为要处理的数据一般为热点数据,所以会提前写入缓存中预热,且只有逻辑过期时间、没有TTL,永不过期。因此,如果发现缓存未命中,说明不存在这样的数据,则直接返回空。
- 由于存到redis中的数据还需要包含一个逻辑过期的时间,因此创建一个新的对象redisData将商铺对象、逻辑过期时间包含起来。
- 将取得的逻辑过期时间与当前时间对比,若未过期,说明当前的商铺信息仍在有效期内,故直接返回。
- 若已过期,则先获取一个lockKey的互斥锁,获取方法同上。然后开启一个独立的线程,进行缓存重建,重建完释放互斥锁。而当前线程则直接返回获取到的过期数据。
- 在缓存重建期间,其他线程到达想要获取互斥锁,获取失败后也直接返回过期数据。
- 因为该方法存在返回过期数据的情况,因此这个方法会有不一致的情况出现。
- 注意⚠️:由于本项目中选择的缓存更新策略是——更新数据库时让缓存失效,查询时再更新缓存。但是在本方法中,删除缓存之后会导致其他线程无法命中缓存就直接返回空值,因此逻辑过期不适用于当前方法。可以采用更新缓存的策略——每次更新数据库都更新缓存。
缓存工具封装
==泛型==
优惠券秒杀
全局唯一ID生成策略
当用户购买商品时,就会生成订单并保存到订单表中,而如果订单表采用数据库自增ID就会出现一些问题:
- ID的规律性太过明显,不安全
- 受单表数据量的限制,因为订单是会不断累积的,而单表所能容纳的数据量是有限的,后期会需要用到分库分表。而如果此时还使用数据库的自增ID,就会出现多个订单有相同的订单ID。
故我们需要选择一种方法实现全局ID的生成,这种方法需要满足:唯一性、高可用、高性能、递增性、安全性的特点——redis。
- 递增性:INCR key。当执行 INCR 命令时,如果键不存在,Redis 会先创建一个新的键,并将其初始值设置为0,然后再进行自增操作。
- 安全性:INCR 命令是原子操作,这意味着当多个客户端同时对同一个键执行 INCR 命令时,Redis 会确保操作的原子性。这意味着在多线程或并发环境下,不会出现竞态条件或数据不一致的情况。为了增加ID的安全性,我们可以不直接使用redis自增的数值,而是拼接一些其他的信息,如:时间戳(31位)+计数位(32位)
添加优惠券
本项目中的优惠券分为普通优惠券与秒杀优惠券,普通优惠券不限量且没有购买时间要求,而秒杀优惠券有库存,且需要在指定时间范围内才能购买。两者的数据表如下所示:
1 |
|
1 |
|
两张表是关联的,添加秒杀优惠券只需先保存到优惠券表中,然后再将部分字段保存到秒杀优惠券表中即可。
实现秒杀优惠券下单

扣减库存选择直接使用MybatisPlus中的update操作:seckillVoucherService.update().setSql(“stock = stock - 1”).eq(“voucher_id”, voucherId).update()。
超卖问题
但是这样的做法会导致超卖现象的产生:即在线程1扣减库存之前,其他线程查到了未扣减之前的库存,导致最终库存变为了负数。

超卖问题是典型的多线程安全问题,常见的解决方案就是加锁。而锁又可分为**==悲观锁和乐观锁==**:
- 悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。Synchronized、Lock都属于悲观锁。优点:简单;缺点:性能一般。
- 乐观锁:认为线程安全问题不一定会发生,因此==不加锁==,而是在更新数据的时候去判断有没有其他线程对数据进行修改。优点:性能好;缺点:存在成功率低的问题。
- 如果没有修改,则认为是安全的,更新数据。
- 如果发现数据被修改了,说明发生了线程安全问题,此时可以重试或报异常。
- 乐观锁的常见实现方式有两种:版本号法、CAS(Compare and Swap)
- 版本号法:在原有数据基础上,为每个数据添加一个版本号version,数据每进行一次修改就使版本号增加。当要修改数据时,比较之前查询该数据得到的版本号与当前的版本号是否一致,若不一致则说明数据出现了修改。
- CAS:在版本号的基础上,直接拿数据中要进行修改的字段进行比较,若前后不一致则说明发生了修改。
CAS解决超卖问题时出现的问题
我们选择使用上方的CAS方法解决超卖问题,在jMeter压力测试中,选择使用200个线程同时对100张秒杀优惠券进行抢购,这一次优惠券的库存没有变为负数,但是优惠券只卖出了20张,秒杀成功率大大减少了。
这是因为当有多个线程同时查到了同样的库存时,只有一个线程能够抢到优惠券,其他线程会因为当前剩余库存与前面查询到的库存不一致导致秒杀失败。
解决方案:将sql语句中的:stock = 前面查询到的stock 更改为 stock > 0 即可。
实现一人一单功能

相较于上面主要的改动就是在扣减库存之前多了一道判断订单是否存在的步骤:判断数据库中是否已存在相同user_id和相同voucher_id的订单,如果这样的订单数量大于0,说明该用户已经买过这张优惠券,返回异常值。
1️⃣但是这样的操作会导致==线程安全问题==:一个用户在多个线程中同时进行了订单是否存在的判断,而此时还没创建订单,因此判断的结果都说明数据库中不存在这样的订单,则这多个线程会同时对库存进行扣减,还是没达到一人一单的效果。为了解决这个线程安全问题,我们可以加锁。由于乐观锁是在数据发生修改时才生效的,因此无法用于本问题里,于是我们选择悲观锁——synchronized用于解决线程安全问题。
由于前面的判断环节不会产生线程安全问题,为了方便synchronized关键字的使用,我们选择将查询订单到返回订单id这一段代码独立出来成为一个方法createVoucherOrder,该方法带有**@Transactional注释**。
2️⃣现在就有一个问题,==synchronized关键字的位置应该放在什么位置呢==?是用于修饰方法createVoucherOrder呢?还是修饰方法内的一段代码块呢?解决这个问题的关键在于我们引入锁的初衷——实现一人一单功能。
- 如果我们将synchronized用于修饰方法,那么当一个用户执行这个方法时,其他用户无法执行这个方法,只能等待,这显然大大削减了性能。而我们的目的仅仅只是让一个用户的多个线程无法同时进行方法的执行。
- 因此我们选择将synchronized用于修饰代码块,同时将**==userId作为对象传入synchronized实现对单个用户加锁==**。
- 为了实现对单个用户加锁,不同的用户不会被锁定,那么同一个用户传入synchronized的Long userId对象就需要是同一个:
- 如果仅仅只传userId,多个线程下查到的userId地址不同,是不同的userId对象❌
- 如果传入userId.toString(),多个线程下查到的字符串地址不同,是不同的字符串对象❌
- 因此需要传入的是userId.toString().intern(),String.intern()是一个Native方法,它的作用是:如果字符常量池中已经包含一个等于此String对象的字符串,则返回常量池中字符串的引用。总之就是源于String对象的字符串是存储在==常量池==中的,如果多个String对象的字符串长得一样,那么他们在常量池中都指向同一个字符串,这就保证了传入synchronized关键字的是同一个对象。
✅
3️⃣但是,这样也会产生新的问题。由于方法是被@Transactional注释所修饰的,因此如果synchronized修饰方法内的一段代码块,就会出现**==先释放锁,再提交事务==的现象。如果一个线程释放锁但还没提交事务,这时有一个新的线程获取了锁,由于事务尚未提交,如果这时这个新的线程去查询订单,查询到的可能就是前一个线程未修改前的数据,这就产生了线程安全问题。这说明==锁的范围小了,应该在事务提交之后再释放锁==**,于是synchronized应该修饰函数调用所在的代码块,即:
4️⃣经过上述操作,确保了线程安全,但是随之而来又有事务方面的问题。注意到,我们是对当前的createVoucherOrder方法进行了@Transactional的注释,而没有给外面的函数seckillVoucher加事务,而外面的函数seckillVoucher在执行上面这段代码时,等价于是这样执行的: 这里的**==this==代表的是当前VoucherOrderServiceImpl这个类的对象,而不是它的代理对象**。

而事务要想生效,是因为spring对VoucherOrderServiceImpl这个类进行了动态代理,拿到了它的==代理对象==,用这个代理对象来去做事务处理;而现在这个this指的是非代理对象,是不具有事务功能的。因此我们需要拿到这个代理对象,获取方法如下: 当然,上面的解决代理对象问题还需要如下两个步骤:
- pom文件中引入aspectjweaver依赖。
- 启动类添加@EnableAspectJAutoProxy(exposeProxy = true)注解,用于暴露代理对象。
这样就解决了事务问题。
集群下一人一单的并发安全问题
上面的处理方式,在单体部署的情况下是没有问题的,因为此时只有一台Tomcat1,即只有一台JVM1,线程获取的锁都是这台JVM1中的同一把锁(锁的UUID保存在常量池中),故多个线程竞争这一把锁,保证了线程安全。
但是如果在集群部署的情况下,就说明有多台Tomcat提供服务,即有多台JVM,故Tomcat1中的线程竞争的是JVM1中的锁,而Tomcat2中的线程竞争的是JVM2中的锁,此时有多把锁。故此时如果一个用户在两台Tomcat中都实现了下单操作,则两边都能获取到锁,故生成了两个订单,违背了一人一单的规定,这就导致了集群下一人一单问题的并发安全问题。

Redis的分布式锁实现

为了解决上述的并发安全,我们就需要实现多台JVM下的线程竞争同一把锁,即分布式锁——满足分布式系统或集群模式下多进程可见并且互斥的锁,本项目中我们选择使用redis的setnx关键字来解决分布式锁的实现。
而为了实现redis分布式锁只对同一个用户产生作用,设置的key值就需要区分不同用户,以此来实现对同一个用户的不同进程加锁,而不同用户获取的锁不同。对于下单功能,本项目设置的key值为:“lock:order:”+userId。
1️⃣而使用redis实现分布式锁就会出现一个问题:当一个进程占有锁时,若此时redis宕机了,就会导致锁无法被释放,造成死锁现象的产生。解决这个问题也很简单,我们只需要给这个锁设置一个过期时间,超时自动释放锁,就不会出现由于redis宕机导致的死锁现象。
2️⃣但是,正是由于给锁设置了过期时间,新的问题产生了——锁的误删问题。如下图所示,当线程1获取锁但是业务阻塞导致超时释放锁,在线程1业务完成之前,线程2趁虚而入拿到了锁并开始执行业务,这时候线程1完成了业务并按部就班去释放锁,但是这时候占用锁的是线程2,也就是说线程1把线程2的锁给释放了,这时候如果又有一个线程3来获取锁是能够获取成功的,这就导致了线程2、线程3同时执行业务,产生了并发安全问题。

解决锁的误删问题可以采用如下方法:在给锁设置value值时,使用线程ID作为锁的value值,这样就能知道当前的锁是不是本线程所设置的,当线程业务执行完毕想要释放锁时,先执行一个判断,判断当前锁的value值与自身线程ID是否相同,如果相同说明是同一把锁可以释放,否则说明是别的线程的锁,不做操作,这样就避免了锁的误删问题。
当然,仅仅只使用线程ID作为value值是不够的,因为在不同的进程之间可能存在相同的线程ID,有小概率出现混淆的情况,我们可以选择在线程ID之前拼接一个UUID确保唯一性,将拼接的结果作为锁的value值。

3️⃣但是,还会有一个问题:当线程1获取锁之后,未执行业务就发生了阻塞,此时如果锁释放了,线程2来获取锁是能够获取得到的,这就会造成线程1、2同时执行业务的情况出现,还是会发生一个用户下了多个订单的情况。如上图中线程1、2执行业务有重叠的部分。
4️⃣此外,由于判断锁标识是否一致与释放锁不是一个原子操作,如果判断完后想要释放锁时发生了阻塞,会触发锁的超时释放,此时线程2来获取了锁,线程1阻塞结束后会使用之前的判断结果去把线程2的锁释放,这就又导致的并发安全问题,如下图所示:

为了解决原子性的问题,我们可以选择使用redis提供的lua脚本功能,在一个脚本中编写多条redis命令,确保多条命令执行的原子性。
Redisson分布式锁
基于SETNX实现的分布式锁存在的问题
- 重入问题
- 重入问题是指获取锁的线程,可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,例如在HashTable这样的代码中,它的方法都是使用synchronized修饰的,加入它在一个方法内调用另一个方法,如果此时是不可重入的,那就死锁了。所以可重入锁的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
- 不可重试
- 我们编写的分布式锁只能尝试一次,失败了就返回false,没有重试机制。但合理的情况应该是:当线程获取锁失败后,他应该能再次尝试获取锁
- 超时释放
- 我们在加锁的时候增加了TTL,这样我们可以防止死锁,但是如果卡顿(阻塞)时间太长,也会导致锁的释放。虽然我们采用Lua脚本来防止删锁的时候,误删别人的锁,但现在的新问题是没锁住,也有安全隐患,也就是上面的问题3️⃣
- 主从一致性
- 如果Redis提供了主从集群,那么当我们向集群写数据时,主机需要异步的将数据同步给从机,万一在同步之前,主机宕机了(主从同步存在延迟,虽然时间很短,但还是发生了),那么又会出现死锁问题
Redisson可重入锁原理

method1在方法内部调用method2,method1和method2出于同一个线程,那么method1已经拿到一把锁了,想进入method2中拿另外一把锁,必然是拿不到的,于是就出现了死锁
所以我们需要额外判断,method1和method2是否处于同一线程,如果是同一个线程,则可以拿到锁,但是state会
+1
,之后执行method2中的方法,释放锁,释放锁的时候也只是将state进行-1
,只有减至0,才会真正释放锁由于我们需要额外存储一个state,所以用字符串型
SET NX EX
是不行的,需要用到**Hash
结构**,但是Hash
结构又没有NX
这种方法,所以我们需要将原有的逻辑拆开,进行手动判断,如上图所示为了保证原子性,所以流程图中的业务逻辑也是需要我们用Lua来实现的
Redisson锁重试和WatchDog机制

- 锁重试:利用信号量、发布消息publish、订阅消息subscribe功能,实现获取锁失败后的一段时间(ttl)内重新尝试获取锁。而重新尝试获取锁并不是立刻重新尝试,而是通过订阅释放锁的消息,接收到锁释放的消息后去重试,减轻了cpu的负担,因此在线程释放锁后需要向外发布释放锁的消息。
- WatchDog机制:给锁添加过期时间,虽然能够解决死锁的问题,但是如果事务发生了阻塞导致超时释放锁,还是会出现多个线程同时执行业务的情况,失去了锁的作用,造成了一人多单的情况。因此,关键点就是**==不要让事务阻塞导致超时释放锁,超时释放只应该在redis服务宕机、或持有锁的线程挂掉时起作用==**,于是就引出了WatchDog机制。
- WatchDog就是持有锁的线程给锁加了一条看门狗,只要这个线程存在,狗就会不断给锁续期不让它过期,==直到线程执行完事务并亲自释放锁==。
- 既然WatchDog会给锁不断续期,那么锁设置过期时间还有意义吗?答案是有的。因为这个过期时间主要是为了防止线程挂掉、redis宕机导致的死锁,过期时间只应在这些情况下释放锁,如果过期时间是因为线程事务发生阻塞超时释放锁,就会产生上面的并发问题,而WatchDog就是引进来不让这种情况发生的。
Redisson锁的MutiLock原理
- 为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例
- 此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了
- 哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题
- 为了解决这个问题。Redisson提出来了MutiLock锁,使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机,那我们就**==需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功==**,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性
Redisson小结
- 不可重入Redis分布式锁
- 原理:利用SETNX的互斥性;利用EX避免死锁;释放锁时判断线程标识
- 缺陷:不可重入、无法重试、锁超时失效
- 可重入Redis分布式锁
- 原理:利用Hash结构,记录线程标识与重入次数;利用WatchDog延续锁时间;利用信号量控制锁重试等待
- 缺陷:Redis宕机引起锁失效问题
- Redisson的multiLock
- 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
- 缺陷:运维成本高、实现复杂
秒杀优化
异步秒杀思路
我们先来回顾一下下单流程,当用户发起请求,此时会先请求Nginx,Nginx反向代理到Tomcat,而Tomcat中的程序,会进行串行操作,分为如下几个步骤:
- 查询优惠券
- 判断秒杀库存是否足够
- 查询订单
- 校验是否一人一单
- 扣减库存
- 创建订单
在这六个步骤中,有很多操作都是要去操作数据库的,而且还是一个线程串行执行,这样就会导致我们的程序执行很慢,所以我们需要异步程序执行,那么如何加速呢?

- 优化方案:我们将耗时较短的逻辑判断放到Redis中,例如:库存是否充足,是否一人一单这样的操作,只要满足这两条操作,那我们是一定可以下单成功的,不用等数据真的写进数据库,我们直接告诉用户下单成功就好了。然后后台再开一个线程,后台线程再去慢慢执行队列里的消息,这样我们就能很快的完成下单业务。
- 我们现在来看整体思路:当用户下单之后,判断库存是否充足,只需要取Redis中根据key找对应的value是否大于0即可,如果不充足,则直接结束。如果充足,则在Redis中判断用户是否可以下单,如果set集合中没有该用户的下单数据,则可以下单,并将userId和优惠券存入到Redis中,并且返回0,整个过程需要保证是原子性的,所以我们要用Lua来操作
- 我们只需要判断Lua脚本的返回值是否为0,如果是0,则表示可以下单,将信息保存到queue中去,然后返回,开一个线程来异步下单
异步秒杀小结
- 秒杀业务的优化思路是什么?
- 先利用Redis完成库存容量、一人一单的判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
- 基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题:
- 我们现在使用的是JDK里的阻塞队列,它使用的是JVM的内存,如果在高并发的条件下,无数的订单都会放在阻塞队列里,可能就会造成内存溢出,所以我们在创建阻塞队列时,设置了一个长度,但是如果真的存满了,再有新的订单来往里塞,那就塞不进去了,存在内存限制问题
- 数据安全问题:
- 经典服务器宕机了,用户明明下单了,但是数据库里没看到
- 内存限制问题:
Redis消息队列
认识消息队列
- 什么是消息队列?字面意思就是存放消息的队列,最简单的消息队列模型包括3个角色
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
- 使用队列的好处在于
解耦
:举个例子,快递员(生产者)把快递放到驿站/快递柜里去(Message Queue)去,我们(消费者)从快递柜/驿站去拿快递,这就是一个异步,如果耦合,那么快递员必须亲自上楼把快递递到你手里,服务当然好,但是万一我不在家,快递员就得一直等我,浪费了快递员的时间。所以解耦还是非常有必要的
基于List实现消息队列
- 基于List结构模拟消息队列
- 消息队列(Message Queue),字面意思就是存放消息的队列,而Redis的list数据结构是一个双向链表,很容易模拟出队列的效果
- 队列的入口和出口不在同一边,所以我们可以利用:LPUSH结合RPOP或者RPUSH结合LPOP来实现消息队列。
- 不过需要注意的是,当队列中没有消息时,RPOP和LPOP操作会返回NULL,而不像JVM阻塞队列那样会阻塞,并等待消息,所以我们这里应该使用BRPOP或者BLPOP来实现阻塞效果
- 基于List的消息队列有哪些优缺点?
- 优点
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保障
- 可以满足消息有序性
- 缺点
- 无法避免消息丢失(经典服务器宕机)
- 只支持单消费者(一个消费者把消息拿走了,其他消费者就看不到这条消息了)
- 优点
基于PubSub的消息队列
- PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费和可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息
SUBSCRIBE channel [channel]
:订阅一个或多个频道PUBLISH channel msg
:向一个频道发送消息PSUBSCRIBE pattern [pattern]
:订阅与pattern格式匹配的所有频道- 基于PubSub的消息队列有哪些优缺点
- 优点:
- 采用发布订阅模型,支持多生产,多消费
- 缺点:
- 不支持数据持久化
- 无法避免消息丢失(如果向频道发送了消息,却没有人订阅该频道,那发送的这条消息就丢失了)
- 消息堆积有上限,超出时数据丢失(消费者拿到数据的时候处理的太慢,而发送消息发的太快)
- 优点:
基于Stream的消息队列
- 发送消息的命令
1 |
|
- 读取消息的命令
1 |
|
- STREAM类型消息队列的XREAD命令特点
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有漏读消息的风险
基于Stream的消息队列—消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列,具备以下特点
- 消息分流
- 队列中的消息会分留给组内的不同消费者,而不是重复消费者,从而加快消息处理的速度
- 消息标识
- 消费者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费
- 消息确认
- 消费者获取消息后,消息处于pending状态,并存入一个pending-list,当处理完成后,需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移除
- 消息分流
创建消费者组
1
2
3
4
5
6
7
8
9
10XGROUP CREATE key groupName ID [MKSTREAM]
key
队列名称
groupName
消费者组名称
ID
起始ID标识,$代表队列中的最后一个消息,0代表队列中的第一个消息
MKSTREAM
队列不存在时自动创建队列从消费者组中读取消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [keys ...] ID [ID ...]
group
消费者组名称
consumer
消费者名,如果消费者不存在,会自动创建一个消费者
count
本次查询的最大数量
BLOCK milliseconds
当前没有消息时的最大等待时间
NOACK
无需手动ACK,获取到消息后自动确认(一般不用,我们都是手动确认)
STREAMS key
指定队列名称
ID
获取消息的起始ID
>:从下一个未消费的消息开始(pending-list中)
其他:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始STREAM类型消息队列的XREADGROUP命令的特点
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
三种方式实现消息队列对比
List | PubSub | Stream | |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间, 可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度, 可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
Redis消息队列实现异步秒杀
需求:
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
业务实现伪代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29while(true){
// 尝试监听队列,使用阻塞模式,最大等待时长为2000ms
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >")
if(msg == null){
// 没监听到消息,重试
continue;
}
try{
//处理消息,完成后要手动确认ACK
handleMessage(msg);
} catch(Exception e){
while(true){
//0表示从pending-list中的第一个消息开始,如果前面都ACK了,那么这里就不会监听到消息
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if(msg == null){
//null表示没有异常消息,所有消息均已确认,结束循环
break;
}
try{
//说明有异常消息,再次处理
handleMessage(msg);
} catch(Exception e){
//再次出现异常,记录日志,继续循环
log.error("..");
continue;
}
}
}
}
使用RabbitMQ实现异步秒杀
具体实现:
- 首先还是先使用lua脚本,在redis中实现耗时较短的逻辑判断:判断库存是否充足、判断用户是否下单、扣减redis中的库存、将用户加入下单名单里
- 如果上面判断出用户还未下过单,则能够进行下一步扣减数据库中的库存
- 数据库的扣减就交给消息队列去实现,实现了如下几个程序:
- MQSender:将信息封装成一个优惠券订单对象后,转换为JSON字符串,使用这个MQSender发送出去
- MQReceiver:rabbitmq在接收到这个消息后,将接收到的json字符串解析为订单对象,在数据库中判断一人一单、基于CAS实现库存扣减,然后扣减库存。
1 |
|
1 |
|
Blog点赞功能
一人一赞
当前的业务下,点赞功能直接在controller层中,update数据库中blog的点赞数,这会导致一个用户可以给一篇blog无限点赞,这是不合理的。我们的需求是:
- 同一个用户只能对同一篇笔记点赞一次,再次点击则取消点赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现的方法如下:
- 修改点赞功能,利用redis中的set集合的sismember方法来判断是否点赞过,未点赞则点赞数+1,已点赞则点赞数-1
- 修改根据id查询blog的业务,判断当前用户是否点赞过,赋值给blog对象的isLike字段
- 修改分页查询blog业务,判断当前用户是否点赞过,赋值给blog对象的isLike字段
点赞排行榜
当我们点击探店笔记详情页面时,应该按点赞顺序展示点赞用户,比如显示最早点赞的TOP5,形成点赞排行榜。之前的点赞是放到Set集合中,但是Set集合又不能排序,所以这个时候,我们就可以改用SortedSet(Zset),将时间戳作为zset对应用户id的得分,根据得分排序即可实现显示最早点赞的top5。
而Zset没有ismember的方法,我们可以选择score方法,该方法查询对应用户ID的score,如果没有这个用户,就返回空值。
// 查询点赞排行榜 @Override public Result queryBlogLikes(Integer id) { String key = BLOG_LIKED_KEY + id; //zrange key 0 4 查询zset中前5个元素 Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4); //如果是空的(可能没人点赞),直接返回一个空集合 if (top5 == null || top5.isEmpty()) { return Result.ok(Collections.emptyList()); } List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList()); //将ids使用`,`拼接,SQL语句查询出来的结果并不是按照我们期望的方式进行排 //所以我们需要用order by field来指定排序方式,期望的排序方式就是按照查询出来的id进行排序 String idsStr = StrUtil.join(",", ids); //select * from tb_user where id in (ids[0], ids[1] ...) order by field(id, ids[0], ids[1] ...) List<UserDTO> userDTOS = userService.query().in("id", ids) .last("order by field(id," + idsStr + ")") .list().stream() .map(user -> BeanUtil.copyProperties(user, UserDTO.class)) .collect(Collectors.toList()); return Result.ok(userDTOS); }
好友关注功能
关注与取关
关注与取关会传入一个isFollow参数,true表示关注,false表示取关
- 关注只需要创建一个Follow对象,将关注者(当前用户)id与被关注者id赋给这个Follow对象,然后直接保存到数据库中即可
- 同理,取关只需要把数据库中
user_id = userId
且follow_user_id = followUserId
的记录删除即可。
共同关注
共同关注可以利用redis中set数据类型,对两个key的set取交集来实现
- key用于区分用户,模式为
follow:userId
- value则是对应用户的关注对象的set集合
- 因此,需要在关注时,同步将关注信息传入redis中;同理取关时也要将被关注者从当前用户的set集合中删除
- 使用set数据结构的intersect功能来实现取交集
- 取得共同关注id集合(String集合)后,要将id集合解析(String转化为Long),然后查询各id对应的用户信息user并封装到userDTO中确保安全,然后返回。
关注推送
- 需求:
- 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
- 收件箱满足可以根据时间戳排序,必须使用Redis的数据结构实现
- 查询收件箱数据时,实现分页查询
- 实现:
- 基于redis实现收件箱:在redis为每个用户设置一个收件箱,key模式为
FEED_KEY + userId
,每当一个用户发布一条blog时,在tb_follow表中查询他的粉丝,并将blog id推送到每个粉丝的收件箱 - 由于要按照时间戳排序,我们选择redis数据结构为zset,score使用时间戳来表示
- 基于redis实现收件箱:在redis为每个用户设置一个收件箱,key模式为