时间:2021-05-20
前言
RabbitMQ 是使用 Erlang 语言开发的消息中间件, 其遵循了高级消息队列协议(Advanced Message Queuing Protocol, AMQP)。
与 Kafka 等消息队列相比,RabbitMQ 最大的优势在于其较高的可靠性:
因为具有较高可靠性和一致性, RabbitMQ 可以胜任订单处理、秒杀等一致性要求较高的业务场景。
RabbitMQ 概念与机制
RabbitMQ 中的概念模型:
交换机(Exchange)
生产者发送的消息会首先送到交换机(Exchange), 交换机根据自身类型和消息的 routing-key 等信息将消息投递到绑定的消息队列中。
RabbitMQ中的四种标准交换机:
direct: 如果消息的 routing-key 与队列的 binding-key 完全相同,direct类型的交换机则会将消息投递到该队列中。
topic: 允许队列的 binding-key 中包含通配符*和#, topic 交换机会将消息投递到 binding-key 与 routing-key 匹配的队列中。
fanout: fanout 交换机不进行任何匹配, 将消息投递到所有绑定的队列
header: header 交换机根据消息头进行投递,现在已较少使用
我们可以使用 RabbitMQ 的插件机制使用第三方交换机或自行开发交换机。如实现延时投递的delayed-message-exchange。
消息头中的delivery-mode可以设置为 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理, 即使 RabbitMQ 崩溃也不会丢失。
消费者客户端通常使用的channel.basicConsume使用推(push)模式投递消息, 即当有新消息时 Broker 通过 channel 主动向客户端发送消息。客户端也可以使用channel.basicGet从 Broker 拉取消息。
ACK机制
RabbitMQ 提供了确认送达(acknowledge)机制保证消息被正确处理不会丢失。
确认送达的回执有三种:
RabbitMQ 的 Queue 可以设置 no_ack=true, 则消息被投递后即删除不等待回执。
channel.basicConsume 可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ACK回执,否则必须显式的发出回执。
Java 代码示例
首先安装并启动RabbitMQ实例, Mac用户可以使用 Homebrew 进行安装:
brew install rabbitmq启动服务:
brew services start rabbitmq或者使用官方docker镜像:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-managementRabbitMQ官网提供了Ubuntu、RPM以及Windows等多种平台安装方式。
RabbitMQ默认TCP端口为5672, Web控制台默认端口15672。
在Maven中添加依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version></dependency>编写生产者:
package rabbit;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * @author finley */public class RabbitProducer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) { String exchangeName = "test-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hello"; byte[] msg = "hello world".getBytes(); AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); propsBuilder.deliveryMode(2); // persistent propsBuilder.priority(0); // normal propsBuilder.contentType("text/plain"); channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg); } }}编写消费者:
package rabbit;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.*;/** * @author finley */public class RabbitConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) { String exchangeName = "test-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String queueName = channel.queueDeclare().getQueue(); String bindingKey = "hello"; channel.queueBind(queueName, exchangeName, bindingKey); while(true) { channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); String bodyStr = new String(body, "UTF-8"); System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr); long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); } }); } } }}RabbitMQ 的消息为字节, 可以将 Java 对象序列化后作为消息体发送。
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
(一)安装一个消息中间件,如:rabbitMQ(二)生产者sendmq.pyimportpikaimportsysimporttime#远程rabbitmq服务
目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消
1:RabbitMQ是个啥?(专业术语参考自网络) RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。 Rabb
因为MyCat是一个分布式数据库中间件,要理解MyCat,那你就得先知道到底什么是中间件!说起中间件,很多人首先想到的就是消息中间件,那么除了消息中间件呢?其实
环境Win10Python3.6.6Django2.1.3中间件作用中间件用于全局修改Django的输入或输出。中间件常见用途缓存会话认证日志记录异常中间件执行