go 中使用redis 中的 list 队列实现异步处理
reids的链表结构,可以轻松实现阻塞队列,可以使用左进右出的命令组成来完成队列的设计。比如:数据的生产者可以通过Lpush命令从左边插入数据,多个数据消费者,可以使用BRpop命令阻塞的“抢”列表尾部的数据。下面就为大家演示一下
先建一个redis-db.go 文件用来连接redis
package orm
import (
"context"
"github.com/redis/go-redis/v9"
"log"
"time"
)
var Redis = initRedis()
// 初始化连接
func initRedis() *redis.Client {
Rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379", //端口写你们自己的
Password: "", // no password set
DB: 0, // use default DB
PoolSize: 10000, // 连接池大小
})
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
_, err = Rdb.Ping(ctx).Result()
if err != nil {
log.Fatal("initRedis client.Ping err: ", err)
}
return Rdb
}
redis处理工具
import (
"bufio"
"carbon/orm"
"context"
"github.com/redis/go-redis/v9"
"strings"
"time"
)
//redisUtil Redis操作工具类
type RedisUtil struct{
}
const RedisPrefix = "yz"
//stringToLines string拆分多行
func stringToLines(s string) (lines []string, err error) {
scanner := bufio.NewScanner(strings.NewReader(s))
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
err = scanner.Err()
return
}
//stringToKV string拆分key和val
func stringToKV(s string) (string, string) {
ss := strings.Split(s, ":")
if len(ss) < 2 {
return s, ""
}
return ss[0], ss[1]
}
//Info Redis服务信息
func (ru RedisUtil) Info(sections ...string) (res map[string]string) {
infoStr, err := orm.Redis.Info(context.Background(), sections...).Result()
res = map[string]string{}
if err != nil {
return res
}
// string拆分多行
lines, err := stringToLines(infoStr)
if err != nil {
return res
}
// 解析成Map
for i := 0; i < len(lines); i++ {
if lines[i] == "" || strings.HasPrefix(lines[i], "# ") {
continue
}
k, v := stringToKV(lines[i])
res[k] = v
}
return res
}
//DBSize 当前数据库key数量
func (ru RedisUtil) DBSize() int64 {
size, err := orm.Redis.DBSize(context.Background()).Result()
if err != nil {
return 0
}
return size
}
//Set 设置键值对
func (ru RedisUtil) Set(key string, value interface{}, timeSec int) bool {
err := orm.Redis.Set(context.Background(),
RedisPrefix+key, value, time.Duration(timeSec)*time.Second).Err()
if err != nil {
return false
}
return true
}
//Get 获取key的值
func (ru RedisUtil) Get(key string) string {
res, err := orm.Redis.Get(context.Background(),RedisPrefix+key).Result()
if err != nil {
return ""
}
return res
}
//SSet 将数据放入set缓存
func (ru RedisUtil) SSet(key string, values ...interface{}) bool {
err := orm.Redis.SAdd(context.Background(), RedisPrefix+key, values...).Err()
if err != nil {
return false
}
return true
}
//SGet 根据key获取Set中的所有值
func (ru RedisUtil) SGet(key string) []string {
res, err := orm.Redis.SMembers(context.Background(), RedisPrefix+key).Result()
if err != nil {
return []string{}
}
return res
}
//HMSet 设置key, 通过字典的方式设置多个field, value对
func (ru RedisUtil) HMSet(key string, mapping map[string]string, timeSec int) bool {
err := orm.Redis.HSet(context.Background(), RedisPrefix+key, mapping).Err()
if err != nil {
return false
}
if timeSec > 0 {
if !ru.Expire(key, timeSec) {
return false
}
}
return true
}
//HSet 向hash表中放入数据,如果不存在将创建
func (ru RedisUtil) HSet(key string, field string, value string, timeSec int) bool {
return ru.HMSet(key, map[string]string{field: value}, timeSec)
}
//HGet 获取key中field域的值
func (ru RedisUtil) HGet(key string, field string) string {
res, err := orm.Redis.HGet(context.Background(), RedisPrefix+key, field).Result()
if err != nil {
return ""
}
return res
}
//HExists 判断key中有没有field域名
func (ru RedisUtil) HExists(key string, field string) bool {
res, err := orm.Redis.HExists(context.Background(), RedisPrefix+key, field).Result()
if err != nil {
return false
}
return res
}
//HDel 删除hash表中的值
func (ru RedisUtil) HDel(key string, fields ...string) bool {
err := orm.Redis.HDel(context.Background(), RedisPrefix+key, fields...).Err()
if err != nil {
return false
}
return true
}
//Exists 判断多项key是否存在
func (ru RedisUtil) Exists(keys ...string) int64 {
fullKeys := ru.toFullKeys(keys)
cnt, err := orm.Redis.Exists(context.Background(), fullKeys...).Result()
if err != nil {
return -1
}
return cnt
}
//Expire 指定缓存失效时间
func (ru RedisUtil) Expire(key string, timeSec int) bool {
err := orm.Redis.Expire(context.Background(), RedisPrefix+key, time.Duration(timeSec)*time.Second).Err()
if err != nil {
return false
}
return true
}
//TTL 根据key获取过期时间
func (ru RedisUtil) TTL(key string) int {
td, err := orm.Redis.TTL(context.Background(), RedisPrefix+key).Result()
if err != nil {
return 0
}
return int(td / time.Second)
}
//Del 删除一个或多个键
func (ru RedisUtil) Del(keys ...string) bool {
fullKeys := ru.toFullKeys(keys)
err := orm.Redis.Del(context.Background(), fullKeys...).Err()
if err != nil {
return false
}
return true
}
//toFullKeys 为keys批量增加前缀
func (ru RedisUtil) toFullKeys(keys []string) (fullKeys []string) {
for _, k := range keys {
fullKeys = append(fullKeys, RedisPrefix+k)
}
return
}
func (ru RedisUtil) RPush(key string, data []byte) bool {
orm.Redis.RPush(context.Background(), key, string(data))
return true
}
func (ru RedisUtil) BLPop(key string) *redis.StringSliceCmd {
data := orm.Redis.BLPop(context.Background(),time.Second*2, key)
return data
}
再建一个producer.go 文件用来充当生产者
package cmd
import (
"carbon/queue"
"github.com/spf13/cobra"
)
// moduleCmd represents the module command
var producerCmd = &cobra.Command{
Use: "producer",
Short: "A brief description of your command",
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:
Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
producerRun()
},
}
//生产订单信息
func producerRun() {
for i := 0; i < 100; i++ {
o:=queue.Document{
Value: "111",
}
o.Id=i
o.Add(o)
}
}
再建一个consumer.go 文件用来充当消费者
package cmd
import (
"carbon/queue"
"github.com/spf13/cobra"
"sync"
)
// moduleCmd represents the module command
var consumerCmd = &cobra.Command{
Use: "consumer",
Short: "A brief description of your command",
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:
Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
Run: func(cmd *cobra.Command, args []string) {
ConsumerRun()
},
}
func ConsumerRun() {
o:=&queue.Document{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
o.Consumer()
}
}()
wg.Wait()
}
再建一个Document.go 文件用来定义订单结构
package queue
import (
"carbon/utils"
"encoding/json"
"fmt"
)
type Document struct {
Id int `json:"id"`
Cate string `json:"cate"`
Name string `json:"name"`
Url string `json:"url"`
Value string `json:"value"`
Retry int `json:"retry"`
}
func NewOrder(document Document) Document {
return document
}
// 添加到队列(生产者)
func (o *Document) Add(document Document) {
data, _ := json.Marshal(NewOrder(document))
utils.RedisUtil{}.RPush("uploadOss", data)
}
// 消费队列(消费者)
func (o *Document) Consumer() {
data := utils.RedisUtil{}.BLPop("uploadOss")
if len(data.Val()) != 0 {
c := data.Val()[1]
var mess Document
json.Unmarshal([]byte(c), &mess)
fmt.Println(data.Val())
mess.Retry = mess.Retry + 1
if mess.Retry<3{
o.Add(mess)
}else{
//重试2次还没完成,就写入日志手工处理
}
}
}
最后再分别启动producer.go 和consumer.go
[uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":1,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":2,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":3,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":4,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":1}]
[uploadOss {"id":5,"cate":"","name":"","url":"","value":"111","retry":0}]

