golang实现rabbitmq之simple模式
golang实现rabbitmq之simple模式
这是最简单的模式了。也就是由生产者将消息送到队列里,然后由消费者到消息队列里来取。在这之前,我们先定义一个RabbitMQ的结构体和我们定义的函数。
package RabbitMq
import (
"fmt"
"github.com/streadway/amqp"
)
//这里主要是RabbitMQ的一些信息。包括其结构体和函数。
//连接信息
const MQURL = "amqp://guest:guest@127.0.0.1:5672/dudevirtualhost"
//RabbitMQ结构体
type RabbitMQ struct {
//连接
conn *amqp.Connection
channel *amqp.Channel
//队列
QueueName string
//交换机名称
ExChange string
//绑定的key名称
Key string
//连接的信息,上面已经定义好了
MqUrl string
}
//创建结构体实例,参数队列名称、交换机名称和bind的key(也就是几个大写的,除去定义好的常量信息)
func NewRabbitMQ(queueName string, exChange string, key string) *RabbitMQ {
return &RabbitMQ{QueueName: queueName, ExChange: exChange, Key: key, MqUrl: MQURL}
}
//关闭conn和chanel的方法
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
//错误的函数处理
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
fmt.Printf("err是:%s,小杜同学手写的信息是:%s", err, message)
}
}
接着就是simple模式代码的书写了:
package RabbitMq
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//01和02.这里是rabbitmq最简单的两个模式:simple模式以及work模式。
//simple模式也就是由生产者将消息送到队列里,然后由消费者到队列里取出来消费。
//另外这里的代码work模式也是相同的,也是可以得用的。两个的差别是:work模式在simple模式的基础上多了消费者而已。
//创建简单模式下的实例,只需要queueName这个参数,其中exchange是默认的,key则不需要。
func NewRabbitMQSimple(queueName string) *RabbitMQ {
rabbitmq := NewRabbitMQ(queueName, "", "")
var err error
//获取参数connection
rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
rabbitmq.failOnErr(err, "连接connection失败")
//获取channel参数
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取channel参数失败")
return rabbitmq
}
//直接模式,生产者.
func (r *RabbitMQ) PublishSimple(message string) {
//第一步,申请队列,如不存在,则自动创建之,存在,则路过。
_, err := r.channel.QueueDeclare(
r.QueueName,
false,
false,
false,
false,
nil,
)
if err != nil {
fmt.Printf("创建连接队列失败:%s", err)
}
//第二步,发送消息到队列中
r.channel.Publish(
r.ExChange,
r.QueueName,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
}
//直接模式,消费者
func (r *RabbitMQ) ConsumeSimple() {
//第一步,申请队列,如果队列不存在则自动创建,存在则跳过
q, err := r.channel.QueueDeclare(
r.QueueName,
//是否持久化
false,
//是否自动删除
false,
//是否具有排他性
false,
//是否阻塞处理
false,
//额外的属性
nil,
)
if err != nil {
fmt.Println(err)
}
//第二步,接收消息
msgs, err := r.channel.Consume(
q.Name,
"", //用来区分多个消费者
true, //是否自动应答,告诉我已经消费完了
false,
false, //若设置为true,则表示为不能将同一个connection中发送的消息传递给这个connection中的消费者.
false, //消费队列是否设计阻塞
nil,
)
if err != nil {
fmt.Printf("消费者接收消息出现问题:%s", err)
}
forever := make(chan bool)
//启用协程处理消息
go func() {
for d := range msgs {
log.Printf("小杜同学写的Simple(或者Work)模式接收到了消息:%s\n", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
simple模式发布者:
package main
import (
"fmt"
"rabbitmq20181121/RabbitMq"
)
func main() {
rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName1912161843")
rabbitmq.PublishSimple("他是客,你是心上人。 ---来自simple模式")
fmt.Println("发送成功!")
}
simple模式消费者:
package main
import (
"fmt"
"rabbitmq20181121/RabbitMq"
)
func main() {
rabbitmq := RabbitMq.NewRabbitMQSimple("duQueueName1912161843")
rabbitmq.ConsumeSimple()
fmt.Println("接收成功!")
}

