golang实现rabbitmq之topic模式

作者: adm 分类: go 发布时间: 2022-07-12

上一篇介绍了golang实现rabbitmq之routing模式 .接着就是要说的最后一个模式,topic模式了。这个模式也是在routing模式上进一步升华而来,通过上面的介绍我们知道routing模式最大的特点是可以从生产端来指定消费端来消费消息,是通过routingKey来指定的。那么我们可不可以通过一定的规则来指定呢?比如用通配符来的指定,当然这个也是可以的。这也就是topic模式最大的特点了。

topic模式也是在routing的模式上演化而来。不同的是我们以通配符的方式来指定我们的消费者。

来看一下topic模式的代码,注意这里创建exchange的kind则是”topic”了:

package RabbitMq

import (
	"github.com/streadway/amqp"
	"log"
)

//topic模式
//与routing模式不同的是这个exchange的kind是"topic"类型的。
//topic模式的特别是可以以通配符的形式来指定与之匹配的消费者。
//"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。

//创建rabbitmq实例
func NewRabbitMqTopic(exchangeName string, routingKey string) *RabbitMQ {
	rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
	var err error
	//获取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
	rabbitmq.failOnErr(err, "创建rabbit的topic模式时候连接出现问题")
	//获取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "创建rabbitmq的topic实例时获取channel出错")
	return rabbitmq
}

//topic模式。生产者。
func (r *RabbitMQ) PublishTopic(message string) {
	//第一步,尝试创建交换机,这里的kind的类型要改为topic
	err := r.channel.ExchangeDeclare(
		r.ExChange,
		"topic",
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "topic模式尝试创建exchange失败。")

	//第二步,发送消息。
	err = r.channel.Publish(
		r.ExChange,
		r.Key,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
}

//topic模式。消费者。"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。
func (r *RabbitMQ) ConsumerTopic() {
	//第一步,创建交换机。这里的kind需要是“topic”类型。
	err := r.channel.ExchangeDeclare(
		r.ExChange,
		"topic",
		true, //这里需要是true
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "topic模式,消费者创建exchange失败。")

	//第二步,创建队列。这里不用写队列名称。
	q, err := r.channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "topic模式,消费者创建queue失败。")

	//第三步,将队列绑定到交换机里。
	err = r.channel.QueueBind(
		q.Name,
		r.Key,
		r.ExChange,
		false,
		nil,
	)

	//第四步,消费消息。
	messages, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	forever := make(chan bool)
	go func() {
		for d := range messages {
			log.Printf("小杜同学写的topic模式收到了消息:%s。\n", d.Body)
		}
	}()
	<-forever

}

生产端的代码:

package main

import (
	"fmt"
	"rabbitmq20181121/RabbitMq"
	"strconv"
	"time"
)

func main() {
	one := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.Jay")
	two := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Persident.XIDADA")
	for i := 0; i < 100; i++ {
		one.PublishTopic("小杜同学,topic模式,Jay," + strconv.Itoa(i))
		two.PublishTopic("小杜同学,topic模式,All," + strconv.Itoa(i))
		time.Sleep(1 * time.Second)
		fmt.Printf("topic模式。这是小杜同学发布的消息%v \n", i)
	}
}

消费端1的代码:

package main

import "rabbitmq20181121/RabbitMq"

func main() {
	jay := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.*")
	jay.ConsumerTopic()
}


消费端2的代码:

package main

import "rabbitmq20181121/RabbitMq"

func main() {
	all := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "#")
	all.ConsumerTopic()
}

结果:发现Jay只会配置到Singer来的消息,也就是topic模式也是成功了的了。

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!