分布式系统中必备的一个中间件就是消息队列,通过消息队列你能对服务间进行异步解耦、流量消峰、实现最终一致性。
目前市面上已经有RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人会问:“Redis 适合做消息队列么?”
在回答这个问题之前,你先从本质思考。
我将结合消息队列的特点,分析使用 Redis 的 List 作为消息队列的实现原理,并分享如何把 SpringBoot 与 Redission 整合来操作 Redis 运用到项目中。
消息队列是一种异步的服务间通信方式,适用于分布式和微服务架构。消息在被处理和删除之前一直存储在队列上。
每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。
消息队列在实际应用中包括如下四个场景。
消息队列满足哪些特性
消息有序性
消息是异步处理的,但是消费者需要按照生产者发送消息的顺序来消费,避免出现后发送的消息被先处理的情况。
重复消息处理
生产者可能因为网络问题出现消息重传导致消费者可能会收到多条重复消息。
同样的消息重复多次的话可能会造成一业务逻辑多次执行,需要确保如何避免重复消费问题。
可靠性
一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。
当消费者重启后,可以继续读取消息进行处理,防止消息遗漏。
生产者使用LPUSH key element[element...]将消息插入到队列的头部,如果 key 不存在则会创建一个空的队列再插入消息。
如下,生产者向队列 queue 先后插入了 “Java”、“码哥字节”、“Go”,返回值表示消息插入队列后的个数。
> LPUSH queue Java 码哥字节 Go(integer) 3
确实,List 并没有提供类似于 Kafka 的 ConsumeGroup ,会使用多个消费者策划给你续组成一个消费组来分担处理队列消息。不过在 Redis 5.0 之后,提供了 Streams 数据类型,后面我会介绍到。
消费者使用RPOP key依次读取队列的消息,先进先出,所以 “Java”会先读取消费:
> RPOP queue"Java"> RPOP queue"码哥字节"> RPOP queue"Go"
图2-13
别高兴的太早,LPUSH、RPOP存在一个性能风险,生产者向队列插入数据的时候,List 并不会主动通知消费者及时消费。
程序需要不断轮询并判断是否为空再执行消费逻辑,这就会导致即使没有新消息写入队列,消费者也在不停地调用RPOP命令占用CPU资源。
请叫我贴心哥 Redis,我提供了BLPOP、BRPOP阻塞读取的命令,消费者在读取队列没有数据的时候自动阻塞,直到有新的消息写入队列,才会继续读取新消息执行业务逻辑。
BRPOP queue 0
参数 0 表示阻塞等待时间无止期,哪怕是烟花易冷人事易分,雨纷纷旧故里草木深,斑驳的城门盘踞着老树根,石板上回荡的是再等,一直等到“心上人”来。
其实这就是幂等,对于同一条消息,消费者收到后处理一次的结果和多次的结果是一致的。
本质就是消费者在处理消息的时候崩溃了,无法再读取消息,缺乏一个消息确认可靠机制。
我提供了BRPOPLPUSH source destination timeout指令,含义是阻塞的方式从source队列读取消息的同时把这条消息复制到另一个destination队列中(备份),并且是原子操作。
不过这个指令在 6.2 版本被BLMOVE取代。接下来,上才艺!生产者使用LPUSH把消息依次从存入order:pay队列队头(左端)。
LPUSH order:pay "谢霸戈"LPUSH order:pay "肖材吉"
消费者消费消息的时候在while循环使用BLMOVE以阻塞的方式从队列order:pay队尾(右端)弹出消息“谢霸戈”,同时把该消息复制到队列order:pay:back队头(左端),该操作是原子性的,最后一个参数 timeout = 0 表示持续等待。
BLMOVE order:pay order:pay:back RIGHT LEFT 0
如果消费消息“谢霸戈”成功,那就使用LREM把队列order:pay:back的“谢霸戈”消息删除,从而实现 ACK 确认机制。
LREM order:pay:back 0 "谢霸戈"
倒数第二个参数 count 的含义如下。
消费异常的话,应用程序使用BRPOP order:pay:back从备份队列再次读取消息处理即可。
在 Java 中,你可以利用 Redission 封装的 API 来快速实现队列,接下来我将基于 SpringBoot 2.1.4 版本来教你如何整合 Redisson。
添加依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.3</version></dependency>
application.yaml引入 Redisson 配置文件。
spring:application:name: redissionredis:redisson:file: classpath:redisson-config.yaml
创建redisson-config.yaml配置。
singleServerConfig:idleConnectionTimeout: 10000connectTimeout: 10000timeout: 3000retryAttempts: 3retryInterval: 1500password: magebytesubscriptionsPerConnection: 5clientName: redissonClientaddress: "redis://127.0.0.1:6379"subscriptionConnectionMinimumIdleSize: 1subscriptionConnectionPoolSize: 50connectionMinimumIdleSize: 24connectionPoolSize: 64database: 0dnsMonitoringInterval: 5000threads: 16nettyThreads: 32codec: !<org.redisson.codec.Kryo5Codec> {}transportMode: "NIO"
在代码中,我使用的是阻塞双端队列,消费者开启死循环,执行指令。
@Slf4j@Servicepublic class QueueService {@Autowiredprivate RedissonClient redissonClient;private static final String ORDER_PAY_QUEUE = "order:pay";private static final String ORDER_PAY_BACK_QUEUE = "order:pay:back";/*** 生产者发送消息到队列头部** @param message*/public void sendMessage(String message) {RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);try {orderPayQueue.putFirst(message);log.info("将消息: {} 插入到队列 {}。", message, ORDER_PAY_QUEUE);} catch (InterruptedException e) {e.printStackTrace();}}/*** 消费者消费消息*/public void onMessage() {RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);while (true) {// BLMOVE order:pay order:pay:back RIGHT LEFT 0String message = orderPayQueue.move(Duration.ofSeconds(0), DequeMoveArgs.pollLast().addFirstTo(ORDER_PAY_BACK_QUEUE));log.info("从队列 {} 中读取到消息:{},并把消息复制到 {} 队列.", ORDER_PAY_QUEUE, message, ORDER_PAY_BACK_QUEUE);// 消费正常,从 ORDER_PAY_BACK_QUEUE 删除这条消息,LREM order:pay:back 0 messageremoveBackQueueMessage(message, ORDER_PAY_BACK_QUEUE);}}/*** 从队列中删除消息* @param message* @param queueName*/private void removeBackQueueMessage(String message, String queueName) {RBlockingDeque<String> orderPayBackDeque = redissonClient.getBlockingDeque(queueName);boolean remove = orderPayBackDeque.remove(message);log.info("消费正常,删除队列 {} 的消息 {}。", queueName, message);}}
单元测试
RunWith(SpringRunner.class)@SpringBootTest(classes = RedissionApplication.class)public class RedissionApplicationTests {@Autowiredprivate QueueService queueService;@Testpublic void testQueue() throws InterruptedException {new Thread(() -> {for (int i = 0; i < 1000; i++) {queueService.sendMessage("消息" + i);}}).start();new Thread(() -> queueService.onMessage()).start();Thread.currentThread().join();}}
可以使用 List 数据结构来实现消息队列,满足先进先出。
Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。
而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。
需要注意的是,我们要避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存。
在消息量不大的情况下使用 Redis 作为消息队列,他能给我们带来高性能的消息读写,这似乎也是一个很好消息队列解决方案。
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://www.jmbhsh.com/shumazixun/34392.html