Publish/Subscribe: Send messages to many consumers at once.

这一小节,通过实现一个日志系统来学习 RabbitMQ p/b 的简单用法:该日志系统由2个程序构成,第一个用来发送日志消息,第二个则用来接收并将其输出到终端窗口。在开始之前,先了解下 Exchange !了解它是什么,怎么路由消息等?

Exchange

RabbitMQ 中消息传递的核心思想是:生产者永远不会直接向队列发送任何消息。实际上,通常生产者甚至不知道消息是否会被传递到任何队列。

相反,生产者只能把消息发送给 exchangeexchange 则负责了两件事:

  • Producer 接收消息
  • 把接收到的消息发送给 queues

而 Exchange 怎么处理他收到的消息,则由 Exchange Type 决定。

Exchange 可以用 X 表示,模型图如下:

Exchange Type

Exchange Type 决定了来到 Exchange 的消息该如何分发到 Queues

RabbitMQ 的 Exchange 类型有:fanoutdirecttopicheaders

  • fanout: 将所有发送到 Exchange 的消息路由到所有与该 Exchange 绑定的队列
  • direct:将消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中去
  • topic: 相对于 direct 的严格匹配来说,topic 进行了扩展:即支持模糊匹配
  • headers: headers 类型的 Exchange 不依赖于路由键的匹配规则来路由消息,而是根据发送消息的内容中的 headers 属性进行匹配

后面将在写一篇关于 Exchange Type 的笔记,辅助实例代码进行理解。这里就先做认知学习吧。

The Default Exchange

默认的 Exchange 在声明时用 Empty String "" 表示,代码描述如下:

1
2
3
4
5
6
7
8
9
err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})
  • 这段代码我们使用了空的(默认) exchange。当然,如果该值被指定为具体的值时,消息将被路由到 routing_key 表示的队列中去。

Temproary Queues

能够给要使用的队列起名字对我们来说至关重要,因为我们需要将 worker 指向同一个队列中去。其,当你想要在消费者和生产者之间共享 queue 的时候,给队列一个给定的名字也很重要。

接下来,我们看看关于日志系统需要注意的两点:

  • 首先,无论我们什么时候连接到 Rabbit,都需要一个新的且为空的队列。为了做到这个,我们可以用随机名来创建队列,或者让 Rabbit Server 为我们选择队列名也是不错的选择

  • 其次,一旦我们断开了消费者的连接,都需要将队列自动删除

在 amqp 客户端,当我们提供命名为空的队列时,我们会用随即名创建一个非持久化的队列。例如:amq.gen-JzTY20BRgKO-HjmUJj0wLg

Bindings

Bindings: Exchange 和 Queue 之间的绑定关系就叫做 bindings-绑定,代码描述如下:

1
2
3
4
5
6
7
err = ch.QueueBind {
  q.Name,   // queue name
  "",       // routing key
  "logs",   // exchange
  false,
  nil,
}

P/S模型图

模型解读:

  • 一个生产者,多个消费者
  • 每个消费者都有一个自己的队列
  • 生产者只将消息发送给 Exchange(交换机、转换器),且 Exchange 只转发消息,并不做任何存储
  • 每个消费者的队列都要绑定到 Exchange
  • 生产者发送消息 -> Exchange -> 路由到队列 -> 消费者消费

完整代码:Golang Intro: 03 - Publish/Subscribe

See Also