golang实现rabbitmq之topic模式
上一篇介绍了golang实现rabbitmq之routing模式 .接着就是要说的最后一个模式,topic模式了。这个模式也是在routing模式上进一步升华而来,通过上面的介绍我们知道routing模式最大的特点是可以从生产端来指定消费端来消费消息,是通过routingKey来指定的。那么我们可不可以通过一定的规则来指定呢?比如用通配符来的指定,当然这个也是可以的。这也就是topic模式最大的特点了。
topic模式也是在routing的模式上演化而来。不同的是我们以通配符的方式来指定我们的消费者。
来看一下topic模式的代码,注意这里创建exchange的kind则是”topic”了:
package RabbitMq
import (
"github.com/streadway/amqp"
"log"
)
//topic模式
//与routing模式不同的是这个exchange的kind是"topic"类型的。
//topic模式的特别是可以以通配符的形式来指定与之匹配的消费者。
//"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。
//创建rabbitmq实例
func NewRabbitMqTopic(exchangeName string, routingKey string) *RabbitMQ {
rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
var err error
//获取connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
rabbitmq.failOnErr(err, "创建rabbit的topic模式时候连接出现问题")
//获取channel
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "创建rabbitmq的topic实例时获取channel出错")
return rabbitmq
}
//topic模式。生产者。
func (r *RabbitMQ) PublishTopic(message string) {
//第一步,尝试创建交换机,这里的kind的类型要改为topic
err := r.channel.ExchangeDeclare(
r.ExChange,
"topic",
true,
false,
false,
false,
nil,
)
r.failOnErr(err, "topic模式尝试创建exchange失败。")
//第二步,发送消息。
err = r.channel.Publish(
r.ExChange,
r.Key,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//topic模式。消费者。"*"表示匹配一个单词。“#”表示匹配多个单词,亦可以是0个。
func (r *RabbitMQ) ConsumerTopic() {
//第一步,创建交换机。这里的kind需要是“topic”类型。
err := r.channel.ExchangeDeclare(
r.ExChange,
"topic",
true, //这里需要是true
false,
false,
false,
nil,
)
r.failOnErr(err, "topic模式,消费者创建exchange失败。")
//第二步,创建队列。这里不用写队列名称。
q, err := r.channel.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
r.failOnErr(err, "topic模式,消费者创建queue失败。")
//第三步,将队列绑定到交换机里。
err = r.channel.QueueBind(
q.Name,
r.Key,
r.ExChange,
false,
nil,
)
//第四步,消费消息。
messages, err := r.channel.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range messages {
log.Printf("小杜同学写的topic模式收到了消息:%s。\n", d.Body)
}
}()
<-forever
}
生产端的代码:
package main
import (
"fmt"
"rabbitmq20181121/RabbitMq"
"strconv"
"time"
)
func main() {
one := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.Jay")
two := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Persident.XIDADA")
for i := 0; i < 100; i++ {
one.PublishTopic("小杜同学,topic模式,Jay," + strconv.Itoa(i))
two.PublishTopic("小杜同学,topic模式,All," + strconv.Itoa(i))
time.Sleep(1 * time.Second)
fmt.Printf("topic模式。这是小杜同学发布的消息%v \n", i)
}
}
消费端1的代码:
package main
import "rabbitmq20181121/RabbitMq"
func main() {
jay := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "Singer.*")
jay.ConsumerTopic()
}
消费端2的代码:
package main
import "rabbitmq20181121/RabbitMq"
func main() {
all := RabbitMq.NewRabbitMqTopic("exchangeNameTpoic1224", "#")
all.ConsumerTopic()
}
结果:发现Jay只会配置到Singer来的消息,也就是topic模式也是成功了的了。

