曾彪彪的个人网站
首页
文章列表
>>
文章详情
RabbitMQ可靠性投递方案设计
作者:
曾彪彪
日期:
2023-05-16 05:11:22
阅读(653)
分类:
消息中间件
读书笔记
#### 消息可靠性投递1 使用数据库,投递之前先记录消息元数据(exchange, routing key, body, delivery count, next delivery time, delivery status),并使用conform进行消息确认,在收到confirm后,设置delivery status为success。如果没收到confirm,则在超过next delivery time后进行重头,并增加投递次数。在进行一定次数的重投后,如果仍不成功,在设置delivery status为fail。 这种方式可以保证消息100%投递成功,但是频繁操作数据库,会有一定的性能瓶颈。 #### 消息可靠性投递2 延迟二次投递,发送消息时,第一条消息发给consumer,间隔一段时间后,发送check message给callback service。当consumer接收到消息后,发送消费成功消息给callback service。callback server将消费成功消息入库。 当producer发送check message给callback service时,去数据库查询消息是否消费成功。如果消费成功,则丢弃。如果消费不成功,则使用RPC告诉producer进行消息重投。 这种方式无法保证消息100%投递成功,比如producer的消息无法到达broker时,则消息无法成功投递。 但是这种消息一定程度上保证了消息的可靠性投递,另外也能保证消息一定被consumer消费了。还有使用callback server专门处理消息重投,使得消息重投与producer解耦。另外也减少了对数据库的操作,提高了性能。互联网公司大都采用这种方案。 #### 消息幂等性处理 数据库乐观锁机制,使用version对每条记录进行版本管控,在更新时带上版本号,这样在并发的情况下,只有一个客户端能更新成功。 ``` UPDATE Repository SET COUNT=COUNT-1,VERSION=VERSION+1 WHERE VERSION=10 ``` 如果消费者重复接收消息,则需要进行消息幂等性处理,确保消息只被处理一次。 消息幂等性处理思路是生成唯一消息ID,并借助数据库主键去重。这种方案可能存在的问题是,在并发量大时,会出现数据库读写瓶颈,可以使用分库+hash命中方案解决。 还有一种方案是使用Redis的原子性进行解决,但是需要考虑一些其它问题,如数据是否落库,落库时数据库数据与Redis数据的一致性等等。 #### Confirm/Return RabbitMQ 支持消息确认,用于保证消息可靠性投递。 另外也支持投递反馈,如果消息不可达,producer可以收到消息投递失败通知。需要设置发送消息的机制为mandatory,才支持消息return通知。 #### 消费端消息限流 如果Rabbit MQ中堆积了大量消息,并且Consumer中使用了actoAck机制,那么可能造成consumer瞬间接收大量消息,造成consumer端应用crash。 所以需要在consumer中使用manual ack机制对消息进行限流。限流的相关参数包括prefetch size, prefetch count, global等参数。一般prefetch count设置为1。 size设置为0表示不做限制。global设置为false,表示scope为consumer而不是channel。 生产环境中,都会使用manual ack。 #### TTL Time to live表示消息的存活时间。可以在queue上设置TTL时间,当消息在队列中存活一定时间后仍未被消费,则消息会被自动删除。 也可以为消息设置expiration,当消息在一定时间内未被消费,Rabbit MQ会删除该消息。 #### DLX Dead letter exchange是Rabbit MQ中的死信队列。当下边情况发生时,消息会进入死信队列: 1. TTL超时 2. 队列满 3. 消费端使用nack签收并且没有requeue. Rabbit可以在队列上添加x-dead-letter-exchange属性,用于处理死信消息。 死信exchange需要是一个独立的exchange,并且绑定一个队里,任何发送给exchange的消息都投递给该队列。 ### 第四章 整合Rabbit MQ & Spring家族 #### Spring AMQP 要在Spring中使用Rabbit MQ,需要注入以下几个类。 ``` @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("192.168.11.76:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } ``` 如果需要消费消息,则需要注入以下类 ``` @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); container.setDefaultRequeueRejected(false); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setExposeListenerChannel(true); container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); ``` 可以设置container的MessageListener消费消息,但推荐的方式是MessageListenerAdapter。代码如下 ``` container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消费者: " + msg); } }); ``` ``` MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter); ``` MessageConverter可以进行消息的转换,在Spring Boot中,已经做了充分的优化和自动转换,所以一般情况下其实不用,但如果需要通过消息中渐渐传递一些特殊消息,如文件等,那么可以使用MessageConverter。以下是一个全局消息转换器的定义。 ``` //1.4 ext convert MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的转换器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter); ``` Message Property和Arguments功能比较强大,可以设置Rabbit MQ的属性,也可以自定义一些信息,在消息的发送和接收过程中,要灵活应用。
评论(0)
评论(必填)
名称(必填)
联系方式(可选)
验证码(必填)
提交
评论(必填)
名称(必填)
联系方式(可选)
验证码(必填)