RabbitMQ 是一个开源的、使用最广的消息队列。

  • Erlang 开发,对高并发、路由、负载均衡、数据持久化有很好的支持。
  • 支持的协议:AMQP,XMPP, SMTP, STOMP
  • 支持集群部署(私有云或公有云)
  • 可使用 HTTP-API、命令行工具和 UI 界面以便于管理和监控

RabbitMQ

RabbitMQ 消息转发器:可用来接收、存储和转发消息(binary blobs of data ‒ messages)

RabbitMQ 中的几个术语:

  • 生产者:只负责发送消息的程序

  • 队列:一个很大的消息缓存池,大小取决于宿主机的内存和磁盘容量;多个生产者可同时发消息给一个队列,多个消费者也可以同时从一个队列中取消息

  • 消费者:只负责接收消息的程序

生产者、队列及消费者通常会运行在不同机器上;而且同一个应用程序即可包含生产者也可包含消费者

Running RabbitMQ

Ref: Running RabbitMQ With Management Plugin

? Note:

  • RabbitMQ 会使用容器的主机名称(Hostname) 生成数据存储目录,因此我们需要在运行容器服务时指定一个具体的名称(--hostname my-rabbit)以便后续查看数据, 如果不指定则会使用一个随机名称。
  • RabbitMQ server addr for connecting: amqp://guest:guest@host_ip:5672/
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# This will start a RabbitMQ container listening on the default port of 5672

$ docker run -d --hostname my-rabbit -p 5672:5672 --name rabbitmq rabbitmq:3
# - -p: 做端口转发,让宿主机外的机器可以通过 ip:port 方式访问 

$ docker logs rabbitmq
...
              Starting broker...
2019-03-28 13:14:38.489 [info] <0.216.0>
 node           : rabbit@my-rabbit
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : rWhct608elv6zt7P4yeo0A==
 log(s)         : <stdout>
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@my-rabbit # 数据存放目录 -> Queue Buffer
 
# 开启带管理插件(Web)的 RabbitMQ Server

$ docker run --restart=always -d --hostname my-rabbit --name rabbitmq-m -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# RabbitMQ Manager Web: http://127.0.0.1:15672
# RabbitMQ Manager Account:guest@guest

Using the Go RabbitMQ client

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
Design 

> 客户端协议实现说明

Most other broker clients publish to queues, but in AMQP, clients publish
Exchanges(交易中心) instead.  AMQP is programmable, meaning that both the producers and
consumers agree on(达成一致) the configuration of the broker, instead requiring an
operator or system configuration that declares the logical topology in the
broker.  The routing between producers and consumer queues is via Bindings.
These bindings form the logical topology(拓扑) of the broker.

> Hello World Example Routing

                       +---------------------------------+
           Publishing  +                                 + Delivery
Publisher ------------>+--Exchange --> Routes --> Queue--+----------> Consumer
                       +                                 +
                       +---------------------------------+

In this library, a message sent from publisher is called a "Publishing" and a
message received to a consumer is called a "Delivery".  The fields of
Publishings and Deliveries are close(接近) but not exact mappings to the underlying
wire format to maintain stronger types.  Many other libraries will combine
message properties with message headers.  In this library, the message well
known properties are strongly typed fields on the Publishings and Deliveries,
whereas the user defined headers are in the Headers field.

> Publishings 和 Deliveries 类型定义相关说明

The method naming closely matches the protocol's method name with positional
parameters mapping(映射) to named protocol message fields.  The motivation here is to
present a comprehensive(全面的) view over all possible interactions with the server.

Generally, methods that map to protocol methods of the "basic" class will be
elided(消隐) in this interface, and "select" methods of various channel mode selectors
will be elided(隐藏) for example Channel.Confirm and Channel.Tx.

> 接口命名尽量和 AMQP 协议保持了一致 

The library is intentionally(有意的) designed to be synchronous, where responses for
each protocol message are required to be received in an RPC manner.  Some
methods have a noWait parameter like Channel.QueueDeclare, and some methods are
asynchronous like Channel.Publish.  The error values should still be checked for
these methods as they will indicate IO failures like when the underlying
connection closes.

> 同步通信;但也有例外,如 Channel.Publish 属于异步,也需要检查 err 是否是 IO 错误(在底层连接关闭时就会抛出) 

Asynchronous Events

> 异步事件机制相关说明

Clients of this library may be interested in receiving some of the protocol
messages other than Deliveries like basic.ack methods while a channel is in
confirm mode(确认模式).

The Notify* methods with Connection and Channel receivers model(模拟) the pattern of
asynchronous events like closes due to exceptions, or messages that are sent out
of band from an RPC call like basic.ack or basic.flow.

Any asynchronous events, including Deliveries and Publishings must always have
a receiver until the corresponding chans are closed.  Without asynchronous
receivers, the sychronous methods will block.

> 异步事件必须有接收者,等待 chan 被关闭;同步方法会阻塞

Use Case

It's important as a client to an AMQP topology to ensure the state of the
broker matches your expectations.  For both publish and consume use cases,
make sure you declare the queues, exchanges and bindings you expect to exist
prior to calling Channel.Publish or Channel.Consume.

Hello World

  • send.go: 实现生产者代码,发送一条 hello 然后退出.
  • recv.go: 实现消费者代码,从队列中持续取数据

    1
    2
    3
    4
    
    hello world data flow diagram
    
    p -> [][][][][][] -> c
            queue

发送

send.go:连接到RabbitMQ后,发送一条消息后,退出。

  • 连接到 RabbitMQ 服务器:即:建立 Socket 连接,处理协议转换、版本对接以及一些登陆授权问题 For Us.

    1
    2
    
    conn, err := amqp.Dial(url)
    defer conn.Close()
    • 打开通道,然后通过这个 ch 来实现我们生产者业务相关的 API
    1
    2
    
    ch, err := conn.Channel()
    defer ch.Close()
  • 创建队列,用于接收、存储生产者消息

    1
    
    q, err := ch. QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

    Note: 声明队列的操作幂等,即多次创建只会返回一个名为 ‘name’ 的 queue,不存在时则会创建。

    • 发布消息
    1
    
    err = ch.Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error

    Note: 消息的内容是字节数组,因此可以按照业务需求进行编码。

接收

recv.go: 连接到 RabbitMQ 服务器,然后从 queue 中不断地读取消息

  • 连接到 RabbitMQ 服务器:即:建立 Socket 连接,处理协议转换、版本对接以及一些登陆授权问题 For Us.

    1
    2
    
    conn, err := amqp.Dial(url)
    defer conn.Close()
    • 打开通道,然后通过这个 ch 来实现我们消费者业务相关的 API
    1
    2
    
    ch, err := conn.Channel()
    defer ch.Close()
  • 创建队列,用于存储、转发消息给消费者

    1
    
    q, err := ch. QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

    Note: 这里创建的队列名称必须和生产者程序 (send.go) 中的队列名一致, 否则无法完成 发送/接收 绑定。

    • 将消费者程序中的 queue 注册到 RabbitMQ 服务器中,然后准备接收来自 RabbitMQ 转发的消息;消息发送时异步通信,在 Go 中消息将被发送到 chan amqp.Delivery 通道中, 因此我们需要通过 range 从这个通道来持续取消息
    1
    2
    3
    4
    
    consumeCh, err := ch.Consume(q.Name,"",true, false, false, false, nil)
    for d = range consumeCh {
        // do something with msg
    }

运行

  • Run Consumer

    1
    2
    
    # Open terminal A
    $ go run recv.go
    • Run Producer

      1
      2
      
      # Open terminal B
      $ go run send.go
    • Result

    Note:如果运行了带 管理插件 的 RabbitMQ Docker Container, 可以访问 http://host_ip:15672 查看队列中的详细信息. 如下图:

    Hello World Code Click Here

    Hello Wrold 这种简单队列的不足

    • 耦合性高:生产者和消费者一一对应(如果有多个消费者要来消费队列中的消息,简单队列就无法满足了)
    • 如果队列名变更,就会引起消费者和生产者代码同时修改

    See Also