RabbitMq入门
Contents
RabbitMq
RabbitMq 是一个消息队列,它接受并传递消息,可以认为它是一个邮局,但是它不传递纸,传递的是二进制的数据块,也就是消息。
有一些rabbitMq的术语,生产者值得是发送消息的人,队列指的是rabbitMq里面的一个消息盒子,消息都存储在这个队列里,队列受主机的内存和硬盘限制。实质上就是一个很大的消息缓冲区。许多生产者可以把消息都放入这个队列中,需要消费者可以从这个队列中取出消息。通常情况下,消费者,生产者,和消息队列三者都不在一个主机上。
Hello world
这里写两个程序,生产者发送一个单个的消息,然后消费者接受消息然后把他们打印。
RabbitMq支持许多种协议,这里使用的是AMQP 0-9-1,是一个开源的广泛使用的消息协议。The RabbitMQ Java client is also in the central Maven repository, with the groupId **com.rabbitmq** and the artifactId **amqp-client**
.
下面用send代表生产者,用recv代表消费者。
1 | public class Send { |
分开介绍:
建立一个到服务器的连接
1 | ConnectionFactory factory = new ConnectionFactory(); |
Connection抽象了socket连接,主要帮助我们关心协议的版本沟通和认证。这里连接到的队列是本地的机器,如果想连接其他的机器的话应该填写具体的名字或者是IP地址。
接下来,创建一个通道(channel),绝大部分API方法需要通过调用它来完成。
为了发送信息,我们需要声明一个queue来帮助我们发送,然后我们就开始发送信息了。
1 | channel.queueDeclare(QUEUE_NAME, false, false, false, null); |
声明一个队列是幂等的,它只会在不存在的时候创建,消息的内容是一个二进制的数组,最后关闭通道(channel)和连接(connection)。
1 | channel.close(); |
以上就是我们的发送者。RabbitMQ会把消息推送给接收者,所以不同于只发了一条信息的发送者,我们会让接收者一直监听消息并打印出来。
下面是消息的接收者,消费者从rabbitmq中拉取消息,需要持续的监听队列并打印其中的消息。
和生产者一样,我们打开一个连接和一个channel,然后声明一个我们将要去消费的队列,需要注意的是要和生产者生产的队列对应。
1 | import com.rabbitmq.client.*; |
work queues
创建一个工作队列来分发任务到不同的执行单元中。
工作队列的主要思想就是避免避免占资源的任务的长时间阻塞,导致后面的任务的长时间等待。我们安排这个任务一会再做。我们把任务概述成信息然后发送到消息队列中,一个后台的工作单元将会取出这个任务然后执行它。当有许多个工作单元时,任务在他们之间是分享的。这个概念在Web应用中十分的有用。
准备
前面部分中我们发送了一个Helloworld,现在我们发送字符串来代表复杂的信息,并用Thread.sleep
来代替复杂的信息。我们用字符串后面的.
的个数来代替这个任务需要执行的时间。
1 | String message = getMessage(argv); |
改进后的消费者
1 | final Consumer consumer = new DefaultConsumer(channel) { |
循环调度
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。
首先,我们先同时运行两个 worker实例,它们都会从队列中获取消息,到底是不是这样呢?我们看看。
然后需要打开三个终端,两个用来运行 worker程序,这两个终端就是我们的两个消费者(consumers)—C1 和 C2。
默认来说,RabbitMQ 会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。
消息确认
当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被 RabbitMQ 发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。
我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
为了防止消息丢失,RabbitMQ 提供了消息响应(acknowledgments)。消费者会通过一个 ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ 就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工作者与它断开的时候,RabbitMQ 会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
消息响应默认是开启的。之前的例子中我们可以使用 no_ack=True 标识把它关闭。当它为false时,当工作者(worker)完成了任务,就发送一个响应。
1 | channel.basicQos(1); // accept only one unack-ed message at a time (see below) |
运用上面的代码,即使杀死一个任务,也不会丢失消息。
忘记确认
一个很容易犯的错误就是忘了 basic_ack,后果很严重。消息在你的程序退出之后就会重新发送,如果它不能够释放没响应的消息,RabbitMQ 就会占用越来越多的内存。
为了排除这种错误,你可以使用 rabbitmqctl 命令,输出 messages_unacknowledged 字段:
消息持久化
如果你没有特意告诉 RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
首先,为了不让队列消失,需要把队列声明为持久化(durable):
1 | boolean durable = true; |
尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫hello 的非持久化队列。RabbitMq 不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue。
1 | boolean durable = true; |
这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。这时候,我们就可以确保在 RabbitMq 重启之后 queue_declare 队列不会丢失。另外,我们需要把我们的消息也要设为持久化——将 delivery_mode 的属性设为2。
公平调度
你应该已经发现,它仍旧没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ 并不知道这些,它仍然一如既往的派发消息。
这是因为 RabbitMQ 只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第 n-th 条消息发给第 n-th 个消费者。
我们可以使用 basic.qos 方法,并设置 prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ 就会把消息分发给下一个空闲的工作者(worker)。
1 | channel.basicQos(1); |
发布订阅
分发一个消息给多个消费者(consumers)。这种模式被称为“发布/订阅”。
为了描述这种模式,将会构建一个简单的日志系统。它包括两个程序,第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。
在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受者(receiver)把日志输出到屏幕上。
最终,日志消息被广播给所有的接受者(receivers)。
RabbitMQ 消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。
有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), 头交换机(headers)和 扇型交换机(fanout)。我们在这里主要说明最后一个—-扇型交换机(fanout)。先创建一个fanout类型的交换机,命名为logs:
1 | channel.exchangeDeclare("logs", "fanout"); |
扇型交换机(fanout)很简单,它把消息发送给它所知道的所有队列。这正是我们的日志系统所需要的。
现在,我们就可以发送消息到一个具名交换机了:
1 | channel.basicPublish( "logs", "", null, message.getBytes()); |
临时队列
给一个队列命名是很重要的——我们需要把工作者(workers)指向正确的队列。如果你打算在发布者(producers)和消费者(consumers)之间共享同队列的话,给队列命名是十分重要的。
但是这并不适用于我们的日志系统。我们打算接收所有的日志消息,而不仅仅是一小部分。我们关心的是最新的消息而不是旧的。为了解决这个问题,我们需要做两件事情。
首先,当我们连接上 RabbitMQ 的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只需要在调用 queueDeclare 方法的时候,不提供 queue 参数就可以了:
1 | String queueName = channel.queueDeclare().getQueue(); |
这时候我们可以通过 result.method.queue 获得已经生成的随机队列名。它可能是这样子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。exclusive 标识符即可达到此目的.
绑定
我们已经创建了一个扇型交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换器和队列之间的联系我们称之为绑定(binding)。1
channel.queueBind(queueName, "logs", "");
现在,logs 交换机将会把消息添加到我们的队列中。
绑定列表
你可以使用 rabbitmqctl list_bindings
列出所有现存的绑定。
1 | 交换器与队列之间的连接叫做绑定,每一个绑定都有一个绑定键,叫做binding key,当消息发送过来时,消息会带有一个路由的标识,叫做routing key,交换器会根据这两个值来决定将消息发给哪个队列。 |
路由
我们打算新增一个功能 —— 使得它能够只订阅消息的一个子集。例如,我们只需要把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中。
绑定
前面的例子,我们已经创建过绑定(bindings),代码如下:
1 | channel.queue_bind(exchange=exchange_name,queue=queue_name) |
绑定(binding)是指交换机(exchange)和队列(queue)的关系。可以简单理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。
绑定的时候可以带上一个额外的 routing_key 参数。为了避免与basic_publish的参数混淆,我们把它叫做绑定键(binding key)。以下是如何创建一个带绑定键的绑定。
1 | channel.queue_bind(exchange=exchange_name, |
绑定键的意义取决于交换机(exchange)的类型。我们之前使用过的扇型交换机(fanout exchanges)会忽略这个值。
直连交换机
我们的日志系统广播所有的消息给所有的消费者(consumers)。我们打算扩展它,使其基于日志的严重程度进行消息过滤。例如我们也许只是希望将比较严重的错误(error)日志写入磁盘,以免在警告(warning)或者信息(info)日志上浪费磁盘空间。
我们使用的扇型交换机(fanout exchange)没有足够的灵活性 —— 它能做的仅仅是广播。
我们将会使用直连交换机(direct exchange)来代替。路由的算法很简单 —— 交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列。
在这个场景中,我们可以看到直连交换机 X 和两个队列进行了绑定。第一个队列使用 orange 作为绑定键,第二个队列有两个绑定,一个使用 black 作为绑定键,另外一个使用 green。
这样以来,当路由键为 orange 的消息发布到交换机,就会被路由到队列 Q1。路由键为 black 或者 green 的消息就会路由到 Q2。其他的所有消息都将会被丢弃。
多个绑定
多个队列使用相同的绑定键是合法的。这个例子中,我们可以添加一个 X 和 Q1 之间的绑定,使用 black 绑定键。这样一来,直连交换机就和扇型交换机的行为一样,会将消息广播到所有匹配的队列。带有 black 路由键的消息会同时发送到 Q1 和 Q2。
发送日志
我们将会发送消息到一个直连交换机,把日志级别作为路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。我们先看看发送日志。我们需要创建一个交换机(exchange)
1 | channel.exchange_declare(exchange='direct_logs', |
然后我们发送一则消息:
1 | channel.basic_publish(exchange='direct_logs', |
我们先假设 “severity” 的值是 info、warning、error 中的一个。
订阅
处理接收消息的方式和之前差不多,只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定。
1 | result = channel.queue_declare(exclusive=True) |
主题交换机
我们改进了我们的日志系统。我们使用直连交换机替代了扇型交换机,从只能盲目的广播消息改进为有可能选择性的接收日志。
尽管直连交换机能够改善我们的系统,但是它也有它的限制—-没办法基于多个标准执行路由操作。
在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix 工具 syslog就是同时基于严重程度 -severity (info/warn/crit…) 和 设备 -facility (auth/cron/kern…) 来路由日志的。
如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于 “cron” 的严重程度为 “critical errors” 的日志,也可以监听来源于 “kern” 的所有日志。
为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 —— 主题交换机。
发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。词语的个数可以随意,但是不要超过 255 字节。
绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:
- 星号用来表示一个单词
- 井号用来表示任意数量的单词
这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: <celerity>.<colour>.<species>
我们创建了三个绑定:Q1 的绑定键为.orange.
,Q2 的绑定键为 ..rabbit
和 lazy.#
。
这三个绑定键被可以总结为:
- Q1 对所有的桔黄色动物都感兴趣。
- Q2 则是对所有的兔子和所有懒惰的动物感兴趣。
一个携带有 quick.orange.rabbit
的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant
的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox
的消息会投递给第一个队列,携带有 lazy.brown.fox
的消息会投递给第二个队列。携带有 lazy.pink.rabbit
的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox
的消息不会投递给任何一个队列。
如果我们违反约定,发送了一个携带有一个单词或者四个单词orange
or quick.orange.male.rabbit
的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。
但是另一方面,即使 lazy.orange.male.rabbit
有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。
主题交换机是很强大的,它可以表现出跟其他交换机类似的行为
当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
Author: corn1ng
Link: https://corn1ng.github.io/2018/03/27/RabbitMq/
License: 知识共享署名-非商业性使用 4.0 国际许可协议