Rabbitmq | 01 - Hello World
Contents
RabbitMQ 是一个开源的、使用最广的消息队列。
- Erlang 开发,对高并发、路由、负载均衡、数据持久化有很好的支持。
- 支持的协议:AMQP,XMPP, SMTP, STOMP
- 支持集群部署(私有云或公有云)
- 可使用 HTTP-API、命令行工具和 UI 界面以便于管理和监控
RabbitMQ
RabbitMQ 消息转发器:可用来接收、存储和转发消息(binary blobs of data ‒ messages)
RabbitMQ 中的几个术语:
生产者:只负责发送消息的程序
队列:一个很大的消息缓存池,大小取决于宿主机的内存和磁盘容量;多个生产者可同时发消息给一个队列,多个消费者也可以同时从一个队列中取消息
消费者:只负责接收消息的程序
生产者、队列及消费者通常会运行在不同机器上;而且同一个应用程序即可包含生产者也可包含消费者
Running RabbitMQ
Ref: Running RabbitMQ With Management Plugin
? Note:
- RabbitMQ 会使用容器的主机名称(Hostname) 生成数据存储目录,因此我们需要在运行容器服务时指定一个具体的名称(
--hostname my-rabbit
)以便后续查看数据, 如果不指定则会使用一个随机名称。- RabbitMQ server addr for connecting:
amqp://guest:guest@host_ip:5672/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# This will start a RabbitMQ container listening on the default port of 5672 $ docker run -d --hostname my-rabbit -p 5672:5672 --name rabbitmq rabbitmq:3 # - -p: 做端口转发,让宿主机外的机器可以通过 ip:port 方式访问 $ docker logs rabbitmq ... Starting broker... 2019-03-28 13:14:38.489 [info] <0.216.0> node : rabbit@my-rabbit home dir : /var/lib/rabbitmq config file(s) : /etc/rabbitmq/rabbitmq.conf cookie hash : rWhct608elv6zt7P4yeo0A== log(s) : <stdout> database dir : /var/lib/rabbitmq/mnesia/rabbit@my-rabbit # 数据存放目录 -> Queue Buffer # 开启带管理插件(Web)的 RabbitMQ Server $ docker run --restart=always -d --hostname my-rabbit --name rabbitmq-m -p 5672:5672 -p 15672:15672 rabbitmq:3-management # RabbitMQ Manager Web: http://127.0.0.1:15672 # RabbitMQ Manager Account:guest@guest |
Using the Go RabbitMQ client
|
|
Hello World
- send.go: 实现生产者代码,发送一条
hello
然后退出. recv.go: 实现消费者代码,从队列中持续取数据
1 2 3 4
hello world data flow diagram p -> [][][][][][] -> c queue
发送
send.go:连接到RabbitMQ后,发送一条消息后,退出。
连接到 RabbitMQ 服务器:即:建立 Socket 连接,处理协议转换、版本对接以及一些登陆授权问题 For Us.
1 2
conn, err := amqp.Dial(url) defer conn.Close()
- 打开通道,然后通过这个 ch 来实现我们生产者业务相关的 API
1 2
ch, err := conn.Channel() defer ch.Close()
创建队列,用于接收、存储生产者消息
1
q, err := ch. QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
Note: 声明队列的操作幂等,即多次创建只会返回一个名为 ‘name’ 的 queue,不存在时则会创建。
- 发布消息
1
err = ch.Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
Note: 消息的内容是字节数组,因此可以按照业务需求进行编码。
接收
recv.go: 连接到 RabbitMQ 服务器,然后从 queue 中不断地读取消息
连接到 RabbitMQ 服务器:即:建立 Socket 连接,处理协议转换、版本对接以及一些登陆授权问题 For Us.
1 2
conn, err := amqp.Dial(url) defer conn.Close()
- 打开通道,然后通过这个 ch 来实现我们消费者业务相关的 API
1 2
ch, err := conn.Channel() defer ch.Close()
创建队列,用于存储、转发消息给消费者
1
q, err := ch. QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
Note: 这里创建的队列名称必须和生产者程序
(send.go)
中的队列名一致, 否则无法完成发送/接收
绑定。- 将消费者程序中的 queue 注册到 RabbitMQ 服务器中,然后准备接收来自 RabbitMQ 转发的消息;消息发送时异步通信,在 Go 中消息将被发送到
chan amqp.Delivery
通道中, 因此我们需要通过range
从这个通道来持续取消息
1 2 3 4
consumeCh, err := ch.Consume(q.Name,"",true, false, false, false, nil) for d = range consumeCh { // do something with msg }
- 将消费者程序中的 queue 注册到 RabbitMQ 服务器中,然后准备接收来自 RabbitMQ 转发的消息;消息发送时异步通信,在 Go 中消息将被发送到
运行
Run Consumer
1 2
# Open terminal A $ go run recv.go
Run Producer
1 2
# Open terminal B $ go run send.go
Result
Note:如果运行了带
管理插件
的 RabbitMQ Docker Container, 可以访问http://host_ip:15672
查看队列中的详细信息. 如下图:Hello Wrold 这种简单队列的不足
- 耦合性高:生产者和消费者一一对应(如果有多个消费者要来消费队列中的消息,简单队列就无法满足了)
- 如果队列名变更,就会引起消费者和生产者代码同时修改
See Also