# 猿口碑 **Repository Path**: giteeygq/ape-reputation ## Basic Information - **Project Name**: 猿口碑 - **Description**: 本项目是基于SpringBoot实现的一个点评类网站。通过Redis实现分布式session保存集群下用户数据,使用Redission分布式锁解决并发安全问题,使用缓存保存热点数据提高响应速度并降低数据库压力;RabbitMQ消息队列实现订单异步响应,使用户体验更加友好,并使用消息确认机制确保系统数据安全。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2023-08-14 - **Last Updated**: 2023-09-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: SpringBoot, MybatisPlus, Redis, RabbitMQ, Hutool ## README # 猿口碑 本项目基于 Spring Boot + MyBatisPlus + Redis + RabbitMQ 实现的店铺点评类网站,用户可以查询店铺、发布点评、抢购商品优惠券、点赞关注等功能。 ![image-20230901184426537](README.assets/image-20230901184426537.png) ## 技术选型 - Java Spring Boot - MySQL 数据库 - MyBatis-Plus 及 MyBatis X 自动生成 - Redis 缓存及会话 - Redisson 分布式锁 - RabbitMQ 消息队列 - Hutool、Gson 等工具库 ## Focal Point ### 1. 使用Redis代替Session登录 #### 1.1 Session登录存在的问题 - Session的数据都保存在服务器中数据量很大的时候可能导致服务器内存不足 - 在分布式系统下每一个服务器的Session数据是独立的用户在不同服务器之间切换的时候可能因为session问题而需要反复登录 #### 1.2 使用Redis代替Session登录分析 **为什么可以是用Redis代替Session** - Redis是基于内存的读写速度非常快和Session类似 - 多个服务器访问的是同一个Redis就实现了数据的共享。此外Redis集群内部的数据一致性机制也很棒 **Redis中数据结构和key的选择** - 保存验证码到Redis由于在登录中还需要根据用户手机号来从Redis中获取验证码并比对并且保存到信息相对简单因此采用的key就是手机号value保存验证码。采用String类型保存。 - 登录key设计登录key需要满足脱敏性和唯一性。因此使用UUID-token作为key,value是经过脱敏后的用户Hash存储。(注意这里边key相当于JSESSIONID,也就是说使用不同的token来实现共享session) #### 1.3 使用Redis登录的流程 **发送验证码流程** - 验证手机号 - 获取验证码 - 将验证码保存到Redis中。key为手机号 **登录流程** - 验证手机号 - 获取用户输入验证码并根据手机号从Redis中获取验证码进行比较 - 通过后生成UUID的token。将用户信息脱敏后以token为key保存到Redis中 - 向客户端返回token **发送验证码** ```java @Override public Result sendCode(String phone) { //1.效验手机号 if(RegexUtils.isPhoneInvalid(phone)){ //2.不符合 return Result.fail("手机号格式错误!"); } //3.符合,生成验证码 String code = RandomUtil.randomNumbers(6); //4.保存到redis中 stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES); //5.发送验证码 log.debug("发送短信验证码成功,验证码:{}", code); //返回ok return Result.ok(); } ``` **登录流程** ```java @Override public Result login(LoginFormDTO loginForm) { String phone = loginForm.getPhone(); String code = loginForm.getCode(); //1.效验手机号 if(RegexUtils.isPhoneInvalid(phone)){ //2.不符合 return Result.fail("手机号格式错误!"); } //3.效验验证码 String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone); if(cacheCode == null || !cacheCode.equals(code)){ //4.不一致 return Result.fail("验证码失效"); } //5.根据手机号查询用户 User user = this.query().eq("phone", phone).one(); //6.不存在 if(user == null){ user = createUserWithPhone(phone); } // 7.保存用户信息到 redis中 // 7.1.随机生成token,作为登录令牌 String token = UUID.randomUUID().toString(true); // 7.2.将User对象转为HashMap存储 UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create() .setIgnoreNullValue(true) .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); // 7.3.存储 String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, userMap); // 7.4.设置token有效期 stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES); // 8.返回token return Result.ok(token); } ``` #### 1.4 解决Redis中有效期问题 **存在的问题** 根据上述逻辑Redis中保存用户信息的记录只在用户登录时设置了30min。用户访问其他网页时记录并不会向session一样续期。 - 方案一在登录拦截器中从Redis中获取到用户信息然后续期。但是对于这个项目来说查看商店信息等请求并不会被拦截因为不登录也可以看。当登录用户访问这些请求的时候Redis并不会续期导致记录过期。 - 方案二在登录拦截器之间加一个全局拦截器这个拦截器拦截所有请求并放行所有请求。它主要的作用时判断用户是否登录如果登录则续期。 **全局响应拦截器** ```java public class RefreshTokenInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.获取请求头中的token String token = request.getHeader("authorization"); if (StrUtil.isBlank(token)) { return true; } // 2.基于TOKEN获取redis中的用户 String key = LOGIN_USER_KEY + token; Map userMap = stringRedisTemplate.opsForHash().entries(key); // 3.判断用户是否存在 if (userMap.isEmpty()) { return true; } // 5.将查询到的hash数据转为UserDTO UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); // 6.存在,保存用户信息到 ThreadLocal UserHolder.saveUser(userDTO); // 7.刷新token有效期 stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES); // 8.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { // 移除用户 UserHolder.removeUser(); } } ``` **登录拦截器** ```java public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.判断是否需要拦截(ThreadLocal中是否有用户) if (UserHolder.getUser() == null) { // 没有,需要拦截,设置状态码 response.setStatus(401); // 拦截 return false; } // 有用户,则放行 return true; } } ``` ### 2.使用Redis作为缓存 #### 2.1 为什么使用缓存 在高并发的场景下用户对于一些热点数据如商品信息等访问请求量很大如果每一次请求都访问数据库那么对数据库等压力是非常大的。因此考虑使用缓存。 #### 2.2 缓存策略 考虑了使用缓存就不得不谈缓存和数据库的数据一致性问题。在这个项目中采用了在代码中手动更新的方式来保证数据的一致性。下面还有一些细节问题 **问题1: 当数据库中的数据发生变化的时候是删除缓存还是更新缓存** 应该是删除缓存。因为当数据库中的某一个数据发生多次变化而在这期间没有请求访问数据库那么更新缓存的操作只有最后一次有效因此应该选择删除缓存。当有访问请求的时候再去重建缓存。 **问题2: 如何保证缓存和数据库的操作同时成功** 在单体项目中使用事务注解@Transactional在分布式中使用分布式事务框架 ```java @Override @Transactional public Result update(Shop shop) { Long id = shop.getId(); if(id == null){ Result.fail("商品id不能为空"); } //1.更新数据库 this.updateById(shop); //2.删除缓存 stringRedisTemplate.delete(CACHE_SHOP_KEY + id); return Result.ok(); } ``` **问题3: 先操作数据库还是先操作缓存** 应该是先操作数据库。因为如果先操作缓存那么把缓存删除掉此时有一个请求访问缓存发现缓存中没有数据那么就访问数据库重建缓存而此时数据库还没有进行修改。当数据修改完成后又有新的请求访问一看缓存中有那么直接读取之间的脏数据。 #### 2.3 解决缓存穿透 **什么是缓存穿透** 一个请求始终访问数据库和缓存中都没有的记录导致每一次请求都需要到数据库中进行查询给数据库造成巨大压力。这种现象就是缓存穿透。 **解决方案1:** 本项目中采用缓存空对象的方法解决这个问题。 - 当一个请求请求某一个数据然后查询到缓存中没有则到数据库查询 - 如果数据库中也没有查询到则在缓存中创建一个空对象。 - 这样当这个请求再次发送时缓存中就有了一个空对象数据。我们在查询缓存时判断如果从缓存中查询到的结果是空对象那么就直接返回查询不到即可。 这样就解决了缓存穿透的问题。但是这个方案还有一些缺点比如如果一开始请求的这个数据不存在后期存在了然而缓存中保存的还是空对象就导致这个新增的数据查询不到。我们可以通过合理的设置空对象的过期时间来缓解这个问题。 除此之外在项目中增加ID值的长度这样的话如果有人恶意的访问某一个不存在的数据在前端检测的时候直接发现ID不合法那么也可以避免对数据库的访问。 ```java @Service public class ShopTypeServiceImpl extends ServiceImpl implements IShopTypeService { @Resource StringRedisTemplate stringRedisTemplate; @Override public Result queryTypeList() { // 1.从 Redis 中查询商铺缓存 String shopTypeJson = stringRedisTemplate.opsForValue().get(CACHE_SHOPTYPE_KEY); // 2.判断 Redis 中是否存在数据 if (StrUtil.isNotBlank(shopTypeJson)) { // 2.1.存在,则返回 List shopTypes = JSONUtil.toList(shopTypeJson, ShopType.class); return Result.ok(shopTypes); } // 2.2.Redis 中不存在,则从数据库中查询 List shopTypes = query().orderByAsc("sort").list(); // 3.判断数据库中是否存在 if (shopTypes == null) { // 3.1.数据库中也不存在,则返回 false return Result.fail("分类不存在!"); } // 3.2.数据库中存在,则将查询到的信息存入 Redis stringRedisTemplate.opsForValue().set(CACHE_SHOPTYPE_KEY, JSONUtil.toJsonStr(shopTypes)); // 3.3返回 return Result.ok(shopTypes); } } ``` **解决方案2:** 利用布隆过滤算法,在请求进入Redis之前判断是否存在,不存在直接拒绝请求,但布隆过滤器实现比较复杂。 #### 2.4 解决缓存雪崩 **什么是缓存雪崩** 缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机导致大量请求到达数据库带来巨大压力。、 解决方案给不同的Key的TTL添加随机值。保证key不会同时失效。利用Redis集群提高服务的可用性 #### 2.5 解决缓存击穿 **什么是缓存击穿** 缓存击穿也叫做热点key问题。在高并发的情况下某一个热点key缓存过期导致大量的请求直接发送到数据库导致数据库压力过大。 比如说商品列表同时有很多人访问。一旦商品列表的缓存过期那么很多人的请求就直接打到数据库上数据库就很有可能出现宕机的情况。 **解决方案1:** 可以利用互斥锁的方式解决。 - 当一个请求发现热点key过期以后直接获取互斥锁一旦获取到互斥锁那么就开始访问数据库然后重建缓存。 - 其他的请求发现缓存过期但是已经有其他的请求获取到了互斥锁所以这个请求只能等待。 - 当重建缓存的请求结束以后。其他的请求又发过来此时缓存已经建立完毕所以请求不会到数据库中。 这个互斥锁可以使用setnx命令来实现因为setnx命令只允许一个请求成功其他的都失败。 > 由于使用了互斥锁并发性会降低一些。 ```java private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock(String key) { stringRedisTemplate.delete(key); } ``` **解决方案2:** 使用逻辑过期的方式。 这种方案创建的热点key的TTL是-1。从而不会因为缓存失效而缓存击穿 - 在缓存中保存数据的同时会保存一个逻辑过期时间的时间戳。 - 当从缓存中查询到这个数据的时候首先获取它的逻辑过期时间。 - 如果已经过期单独的去开辟一个线程用来重建数据。当前线程直接返回缓存中已经过期的数据。 > 这种方法的好处是保证了程序的并发性(异步的构建缓存)但是在某一段时间内程序读取到的数据是脏数据。 ```java private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); public Shop queryWithLogicalExpire( Long id ) { String key = CACHE_SHOP_KEY + id; // 1.从redis查询商铺缓存 String json = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isBlank(json)) { // 3.存在,直接返回 return null; } // 4.命中,需要先把json反序列化为对象 RedisData redisData = JSONUtil.toBean(json, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); // 5.判断是否过期 if(expireTime.isAfter(LocalDateTime.now())) { // 5.1.未过期,直接返回店铺信息 return shop; } // 5.2.已过期,需要缓存重建 // 6.缓存重建 // 6.1.获取互斥锁 String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); // 6.2.判断是否获取锁成功 if (isLock){ CACHE_REBUILD_EXECUTOR.submit( ()->{ try{ //重建缓存 this.saveShop2Redis(id,20L); }catch (Exception e){ throw new RuntimeException(e); }finally { unlock(lockKey); } }); } // 6.4.返回过期的商铺信息 return shop; } ``` #### 2.6 缓存穿透和缓存击穿有什么区别? 缓存穿透中,请求的 key 既不存在于缓存中,也不存在于数据库中。 缓存击穿中,请求的 key 对应的是 **热点数据** ,该数据 **存在于数据库中,但不存在于缓存中(通常是因为缓存中的那份数据已经过期)** 。 #### 2.7 Redis工具类 基于StringRedisTemplate封装一个缓存工具类,满足下列需求: * 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间(或随机时间) * 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题 * 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题 * 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题 ### 3.全局唯一ID生成器 假设一个场景,用户在对优惠券抢购时一个订单就对应一个订单ID在高并发情况下需要保证ID的一下特点 - 唯一性ID必须保证唯一 - 高可用可以以极快的速度生成唯一ID - 递增型递增的ID有利于在数据库中建立索引 在本项目中自己实现了一个ID生成器。生成的ID结构如下 - 第一位符号位永远为0 - 时间戳位31bit以秒为单位可以使用69年。当前时间戳–自定义的起始时间戳 - 32bit秒内的计数器支持每秒产生2^32个不同ID。这个使用Redis中的incr函数实现 Key的设计`incr业务名日期` 如果一个业务只建立一个key那么随着时间的推移redis中的value会达到上限此时ID生成器就不可用了。 ```java @Component public class RedisIdWorker { /** * 开始时间戳 */ private static final long BEGIN_TIMESTAMP = 1640995200L; /** * 序列号的位数 */ private static final int COUNT_BITS = 32; @Resource private StringRedisTemplate stringRedisTemplate; public long nextId(String keyPrefix) { // 1.生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; // 2.生成序列号 // 2.1.获取当前日期,精确到天 String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); // 2.2.自增长 long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); // 3.拼接并返回 return timestamp << COUNT_BITS | count; } } ``` ### 4.优惠券功能 #### **4.1 集群下一人一单和超卖问题的产生** 一人一单:如果不对同一用户进行加锁,那么同一用户可以无限次抢购。 超卖问题:假设目前库存位为1在高并发环境下 - 线程1执行判断是否有库存的操作然后时间片结束。结束的时候线程1并没有完成创建订单的操作。 - 此时线程2获得时间片开始判断是否有库存由于线程1并没有完成下单操作因此此时库存仍然为1。 - 这时线程1获得时间片完成下单操作。 - 线程2获得时间片完成下单操作 那么就会出现超卖问题此时库存应该为-1 #### 4.2 解决办法:Redisson分布式锁 - 利用哈希结构实现重入 - 利用信号量和PubSub控制锁重试 - 利用看门狗机制实现续期 ### 5.使用RabbitMQ消息队列优化业务 **问题分析** 在之前的秒杀业务中我们发现需要多次访问数据库并且业务也是串行执行的。但是分析一下我们可以发现我们可以将业务拆分成两个子业务 - 一个业务只负责判断是否有购买资格如果有购买资格则直接创建订单信息到消息队列。此时并没有真正的访问数据库创建订单因此效率会非常高。 - 第二个业务开辟一个单独的线程从消息队列中读取数据保存的数据库。 第二个业务并不需要很高的即时性当第一个业务判断完用户有购买资格后直接返回通知用户下单成功即可。 **具体实现** - 首先这些优惠券的热点信息包括库存等信息需要提前保存的Redis中 - 在Redis中一个优惠券对应一个键value保存库存量 - 在Redis中一个优惠券对应一个set集合里面保存不可重复列表列表中每一个元素保存下单成功的用户id - 用户下单可以直接访问Redis先判断是否有库存然后判断是否下过单如果条件都满足则直接通知用户下单成功。向订单信息保存到消息队列中由独立进程慢慢的将所有的订单信息都写入数据库。 **发送消息至消息队列** ```java @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } // 3.往消息队列发送消息 sendVoucherOrder2Queue(orderId, userId, voucherId); // 4.返回订单id return Result.ok(orderId); } private void sendVoucherOrder2Queue(Long orderId, Long userId, Long voucherId) { // 1.消息 VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); // 2.交换机 String exchange = "flash.direct"; rabbitTemplate.convertAndSend(exchange, "all", voucherOrder); } ``` **从消息队列中取出数据** ```java //短信通知业务 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "message.queue"), exchange = @Exchange(name = "flash.direct", type = ExchangeTypes.DIRECT), key = {"all", "message"} )) public void listenMessageQueue(VoucherOrder voucherOrder) { log.debug("恭喜用户" + voucherOrder.getUserId() + "抢到了大额秒杀券!!!"); } //创建订单业务 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "order.queue"), exchange = @Exchange(name = "flash.direct", type = ExchangeTypes.DIRECT), key = {"all", "order"} )) public void listenOrderQueue(VoucherOrder voucherOrder) { createVoucherOrder(voucherOrder); } private void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); // 创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); // 尝试获取锁 boolean isLock = redisLock.tryLock(); // 判断 if (!isLock) { // 获取锁失败,直接返回失败或者重试 log.error("不允许重复下单!"); return; } try { // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("不允许重复下单!"); return; } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 log.error("库存不足!"); return; } // 7.创建订单 save(voucherOrder); } finally { // 释放锁 redisLock.unlock(); } } ``` 消息队列的好处在于解耦。最简单的例子生活中取快递的例子。消息队列就相当于菜鸟驿站。快递员就相当于生产者快递员(生产者)把快递放到快递柜里边(Message Queue)去我们(消费者)从快递柜里边去拿东西这就是一个异步两者之间没有耦合。但是如果去掉菜鸟驿站让快递员亲手交给我那么如果我不在家那么快递员就只能等待这就浪费了大量的时间耦合性高。 #### 4.1 生成者消息确认 像目前这样的订单业务时非常重要的,我们要保证消息发送到消费者(生产者、消费者消息确认)。RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。 返回结果有两种方式: - publisher-confirm,发送者确认 - 消息成功投递到交换机,返回ack - 消息未投递到交换机,返回nack - publisher-return,发送者回执 - 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。 **ConfirmCallback** ```java private void sendVoucherOrder2Queue(Long orderId, Long userId, Long voucherId) { // 1.消息 VoucherOrder voucherOrder = new VoucherOrder(); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); // 2.全局唯一的消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 3.添加callback correlationData.getFuture().addCallback( result -> { if(result.isAck()){ // 3.1.ack,消息成功 log.debug("消息发送成功, ID:{}", correlationData.getId()); }else{ // 3.2.nack,消息失败 log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()) ); // 4.交换机 String exchange = "flash.direct"; rabbitTemplate.convertAndSend(exchange, "all", voucherOrder, correlationData); } ``` **ReturnCallback** ```java @Slf4j @Configuration public class RabbitMQConfig implements ApplicationContextAware { /** * 目的是以Json序列化的方式存储消息Message * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 发送者回执Return * @param applicationContext * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate对象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判断是否是延迟消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) { // 是一个延迟消息,忽略这个错误提示 return; } // 记录日志 log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的话,重发消息 }); } } ``` #### 4.2 消费者消息确认 RabbitMQ是**阅后即焚**机制,RabbitMQ确认消息被消费者消费后会立刻删除。 而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。 设想这样的场景: - 1)RabbitMQ投递消息给消费者 - 2)消费者获取消息后,返回ACK给RabbitMQ - 3)RabbitMQ删除消息 - 4)消费者宕机,消息尚未处理 这样,消息就丢失了。因此消费者返回ACK的时机非常重要。 而SpringAMQP则允许配置三种确认模式: •manual:手动ack,需要在业务代码结束后,调用api发送ack。 •auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack •none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除 一般,我们都是使用默认的auto即可。 Auto模式下当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力 我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。 修改consumer服务的application.yml文件,添加内容: ```yaml spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false ``` 重启consumer服务,重复之前的测试。可以发现: - 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了 - 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了 所以为了确保消息被管理或消费,我们可以给队列添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。 ```java @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue"), exchange = @Exchange(name = "dl.direct", type = ExchangeTypes.DIRECT), key = "error" )) public void listenDeadLetter(VoucherOrder voucherOrder) { log.warn("死信交换机创建订单..."); createVoucherOrder(voucherOrder); } ```