golang实现rabbitmq之订阅模式

作者: adm 分类: go 发布时间: 2022-07-12

上一篇介绍了golang实现rabbitmq之work模式上面简单介绍了两种模式,一个是simple模式,另外一个是work模式,他们有一个共同的特点就是一个消息只能被一个消费者消费,那么我们的消息能不能被多个消费者消费呢,这个自然是可以的,也就是我要说的订阅模式(Publish/Subscribe)。订阅模式的特别是:一个消息被投递到多个队列,一个消息能被多个消费者获取。过程是由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。

//订阅模式需要用到exchange。
//因为其过程就是:由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//另外定义exchange时,其kind类型一定要是”fanout”,这样才是广播类型。
发布订阅模式的代码:

package RabbitMq

import (
	"fmt"
	"github.com/streadway/amqp"
)

//这里是订阅模式的相关代码。
//订阅模式需要用到exchange。
//因为其过程就是:由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
//另外定义exchange时,其kind类型一定要是"fanout",这样才是广播类型。

//获取订阅模式下的rabbitmq的实例
func NewRabbitMqSubscription(exchangeName string) *RabbitMQ {
	//创建rabbitmq实例
	rabbitmq := NewRabbitMQ("", exchangeName, "")
	var err error
	//获取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
	rabbitmq.failOnErr(err, "订阅模式连接rabbitmq失败。")
	//获取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "订阅模式获取channel失败")
	return rabbitmq
}

//订阅模式发布消息
func (r *RabbitMQ) PublishSubscription(message string) {
	//第一步,尝试连接交换机
	err := r.channel.ExchangeDeclare(
		r.ExChange,
		"fanout", //这里一定要设计为"fanout"也就是广播类型。
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "订阅模式发布方法中尝试连接交换机失败。")

	//第二步,发送消息
	err = r.channel.Publish(
		r.ExChange,
		"",
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
}

//订阅模式消费者
func (r *RabbitMQ) ConsumeSbuscription() {
	//第一步,试探性创建交换机exchange
	err := r.channel.ExchangeDeclare(
		r.ExChange,
		"fanout",
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "订阅模式消费方法中创建交换机失败。")

	//第二步,试探性创建队列queue
	q, err := r.channel.QueueDeclare(
		"", //随机生产队列名称
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "订阅模式消费方法中创建创建队列失败。")

	//第三步,绑定队列到交换机中
	err = r.channel.QueueBind(
		q.Name,
		"", //在pub/sub模式下key要为空
		r.ExChange,
		false,
		nil,
	)

	//第四步,消费消息
	messages, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	forever := make(chan bool)
	go func() {
		for d := range messages {
			fmt.Printf("小杜同学写的订阅模式收到的消息:%s\n", d.Body)
		}
	}()

	fmt.Println("订阅模式消费者已开启,退出请按 CTRL+C\n")
	<-forever

}

发布者的代码:

package main

import (
	"fmt"
	"rabbitmq20181121/RabbitMq"
	"strconv"
	"time"
)

func main() {
	rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
	for i := 0; i < 100; i++ {
		rabbitmq.PublishSubscription("订阅模式生产第" + strconv.Itoa(i) + "条数据")
		fmt.Printf("订阅模式生产第" + strconv.Itoa(i) + "条数据\n")
		time.Sleep(1 * time.Second)
	}
}

建立两个一样的消费者的代码:

package main

import "rabbitmq20181121/RabbitMq"

func main() {
	rabbitmq := RabbitMq.NewRabbitMqSubscription("duexchangeName")
	rabbitmq.ConsumeSbuscription()
}

接着,依旧是把发布者和两个消费者run起来,会发现两个消费者都同时消费了发布者发布的消息了。也就是发布订阅模式也成功了。

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!