golang实现rabbitmq之订阅模式
上一篇介绍了golang实现rabbitmq之work模式上面简单介绍了两种模式,一个是simple模式,另外一个是work模式,他们有一个共同的特点就是一个消息只能被一个消费者消费,那么我们的消息能不能被多个消费者消费呢,这个自然是可以的,也就是我要说的订阅模式(Publish/Subscribe)。订阅模式的特别是:一个消息被投递到多个队列,一个消息能被多个消费者获取。过程是由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//订阅模式需要用到exchange。
//因为其过程就是:由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//另外定义exchange时,其kind类型一定要是”fanout”,这样才是广播类型。
发布订阅模式的代码:
package RabbitMq
import (
"fmt"
"github.com/streadway/amqp"
)
//这里是订阅模式的相关代码。
//订阅模式需要用到exchange。
//因为其过程就是:由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//另外定义exchange时,其kind类型一定要是"fanout",这样才是广播类型。
//获取订阅模式下的rabbitmq的实例
func NewRabbitMqSubscription(exchangeName string) *RabbitMQ {
//创建rabbitmq实例
rabbitmq := NewRabbitMQ("", exchangeName, "")
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
rabbitmq.failOnErr(err, "订阅模式连接rabbitmq失败。")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "订阅模式获取channel失败")
return rabbitmq
}
//订阅模式发布消息
func (r *RabbitMQ) PublishSubscription(message string) {
//第一步,尝试连接交换机
err := r.channel.ExchangeDeclare(
r.ExChange,
"fanout", //这里一定要设计为"fanout"也就是广播类型。
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "订阅模式发布方法中尝试连接交换机失败。")
//第二步,发送消息
err = r.channel.Publish(
r.ExChange,
"",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//订阅模式消费者
func (r *RabbitMQ) ConsumeSbuscription() {
//第一步,试探性创建交换机exchange
err := r.channel.ExchangeDeclare(
r.ExChange,
"fanout",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "订阅模式消费方法中创建交换机失败。")
//第二步,试探性创建队列queue
q, err := r.channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "订阅模式消费方法中创建创建队列失败。")
//第三步,绑定队列到交换机中
err = r.channel.QueueBind(
q.Name,
"", //在pub/sub模式下key要为空
r.ExChange,
false,
nil,
)
//第四步,消费消息
messages, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range messages {
fmt.Printf("小杜同学写的订阅模式收到的消息:%s\n", d.Body)
}
}()
fmt.Println("订阅模式消费者已开启,退出请按 CTRL+C\n")
<-forever
}
发布者的代码:
package main
import (
"fmt"
"rabbitmq20181121/RabbitMq"
"strconv"
"time"
)
func main() {
rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
for i := 0; i < 100; i++ {
rabbitmq.PublishSubscription("订阅模式生产第" + strconv.Itoa(i) + "条数据")
fmt.Printf("订阅模式生产第" + strconv.Itoa(i) + "条数据\n")
time.Sleep(1 * time.Second)
}
}
建立两个一样的消费者的代码:
package main
import "rabbitmq20181121/RabbitMq"
func main() {
rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
rabbitmq.ConsumeSbuscription()
}
接着,依旧是把发布者和两个消费者run起来,会发现两个消费者都同时消费了发布者发布的消息了。也就是发布订阅模式也成功了。

