go 中使用redis 中的 list 队列实现异步处理

先建一个redis-db.go 文件用来连接redis

package orm

import (


var Redis = initRedis()

// 初始化连接

func initRedis() *redis.Client {

	Rdb := redis.NewClient(&redis.Options{

		Addr: "", //端口写你们自己的

		Password: "", // no password set

		DB: 0, // use default DB

		PoolSize: 10000, // 连接池大小

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()
	_, err = Rdb.Ping(ctx).Result()
	if err != nil {
		log.Fatal("initRedis client.Ping err: ", err)
	return Rdb


import (

//redisUtil Redis操作工具类
type RedisUtil struct{


const RedisPrefix = "yz"
//stringToLines string拆分多行
func stringToLines(s string) (lines []string, err error) {
	scanner := bufio.NewScanner(strings.NewReader(s))
	for scanner.Scan() {
		lines = append(lines, scanner.Text())
	err = scanner.Err()

//stringToKV string拆分key和val
func stringToKV(s string) (string, string) {
	ss := strings.Split(s, ":")
	if len(ss) < 2 {
		return s, ""
	return ss[0], ss[1]

//Info Redis服务信息
func (ru RedisUtil) Info(sections ...string) (res map[string]string) {
	infoStr, err := orm.Redis.Info(context.Background(), sections...).Result()
	res = map[string]string{}
	if err != nil {

		return res
	// string拆分多行
	lines, err := stringToLines(infoStr)
	if err != nil {

		return res
	// 解析成Map
	for i := 0; i < len(lines); i++ {
		if lines[i] == "" || strings.HasPrefix(lines[i], "# ") {
		k, v := stringToKV(lines[i])
		res[k] = v
	return res

//DBSize 当前数据库key数量
func (ru RedisUtil) DBSize() int64 {
	size, err := orm.Redis.DBSize(context.Background()).Result()
	if err != nil {

		return 0
	return size

//Set 设置键值对
func (ru RedisUtil) Set(key string, value interface{}, timeSec int) bool {
	err := orm.Redis.Set(context.Background(),
		RedisPrefix+key, value, time.Duration(timeSec)*time.Second).Err()
	if err != nil {

		return false
	return true

//Get 获取key的值
func (ru RedisUtil) Get(key string) string {
	res, err := orm.Redis.Get(context.Background(),RedisPrefix+key).Result()
	if err != nil {
		return ""
	return res

//SSet 将数据放入set缓存
func (ru RedisUtil) SSet(key string, values ...interface{}) bool {
	err := orm.Redis.SAdd(context.Background(), RedisPrefix+key, values...).Err()
	if err != nil {
		return false
	return true

//SGet 根据key获取Set中的所有值
func (ru RedisUtil) SGet(key string) []string {
	res, err := orm.Redis.SMembers(context.Background(), RedisPrefix+key).Result()
	if err != nil {
		return []string{}
	return res

//HMSet 设置key, 通过字典的方式设置多个field, value对
func (ru RedisUtil) HMSet(key string, mapping map[string]string, timeSec int) bool {
	err := orm.Redis.HSet(context.Background(), RedisPrefix+key, mapping).Err()
	if err != nil {
		return false
	if timeSec > 0 {
		if !ru.Expire(key, timeSec) {
			return false
	return true

//HSet 向hash表中放入数据,如果不存在将创建
func (ru RedisUtil) HSet(key string, field string, value string, timeSec int) bool {
	return ru.HMSet(key, map[string]string{field: value}, timeSec)

//HGet 获取key中field域的值
func (ru RedisUtil) HGet(key string, field string) string {
	res, err := orm.Redis.HGet(context.Background(), RedisPrefix+key, field).Result()
	if err != nil {

		return ""
	return res

//HExists 判断key中有没有field域名
func (ru RedisUtil) HExists(key string, field string) bool {
	res, err := orm.Redis.HExists(context.Background(), RedisPrefix+key, field).Result()
	if err != nil {
		return false
	return res

//HDel 删除hash表中的值
func (ru RedisUtil) HDel(key string, fields ...string) bool {
	err := orm.Redis.HDel(context.Background(), RedisPrefix+key, fields...).Err()
	if err != nil {
		return false
	return true

//Exists 判断多项key是否存在
func (ru RedisUtil) Exists(keys ...string) int64 {
	fullKeys := ru.toFullKeys(keys)
	cnt, err := orm.Redis.Exists(context.Background(), fullKeys...).Result()
	if err != nil {

		return -1
	return cnt

//Expire 指定缓存失效时间
func (ru RedisUtil) Expire(key string, timeSec int) bool {
	err := orm.Redis.Expire(context.Background(), RedisPrefix+key, time.Duration(timeSec)*time.Second).Err()
	if err != nil {

		return false
	return true

//TTL 根据key获取过期时间
func (ru RedisUtil) TTL(key string) int {
	td, err := orm.Redis.TTL(context.Background(), RedisPrefix+key).Result()
	if err != nil {

		return 0
	return int(td / time.Second)

//Del 删除一个或多个键
func (ru RedisUtil) Del(keys ...string) bool {
	fullKeys := ru.toFullKeys(keys)
	err := orm.Redis.Del(context.Background(), fullKeys...).Err()
	if err != nil {
		return false
	return true

//toFullKeys 为keys批量增加前缀
func (ru RedisUtil) toFullKeys(keys []string) (fullKeys []string) {
	for _, k := range keys {
		fullKeys = append(fullKeys, RedisPrefix+k)

func (ru RedisUtil) RPush(key string, data []byte) bool {
	orm.Redis.RPush(context.Background(), key, string(data))

	return true

func (ru RedisUtil) BLPop(key string) *redis.StringSliceCmd {
	data := orm.Redis.BLPop(context.Background(),time.Second*2, key)

	return data

再建一个producer.go 文件用来充当生产者

package cmd

import (

// moduleCmd represents the module command
var producerCmd = &cobra.Command{
	Use:   "producer",
	Short: "A brief description of your command",
	Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
	Run: func(cmd *cobra.Command, args []string) {

func producerRun() {

	for i := 0; i < 100; i++ {
			Value: "111",



再建一个consumer.go 文件用来充当消费者

package cmd

import (

// moduleCmd represents the module command
var consumerCmd = &cobra.Command{
	Use:   "consumer",
	Short: "A brief description of your command",
	Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
	Run: func(cmd *cobra.Command, args []string) {


func ConsumerRun() {


	var wg sync.WaitGroup


	go func() {

		for  {






再建一个Document.go 文件用来定义订单结构

package queue

import (

type Document struct {
	Id    int    `json:"id"`
	Cate  string `json:"cate"`
	Name  string `json:"name"`
	Url   string `json:"url"`
	Value string `json:"value"`
	Retry int    `json:"retry"`

func NewOrder(document Document) Document {

	return document


// 添加到队列(生产者)

func (o *Document) Add(document Document) {

	data, _ := json.Marshal(NewOrder(document))
	utils.RedisUtil{}.RPush("uploadOss", data)

// 消费队列(消费者)

func (o *Document) Consumer() {

	data := utils.RedisUtil{}.BLPop("uploadOss")

	if len(data.Val()) != 0 {
		c := data.Val()[1]
		var mess Document
		json.Unmarshal([]byte(c), &mess)
		mess.Retry = mess.Retry + 1
		if mess.Retry<3{



最后再分别启动producer.go 和consumer.go

[uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":1,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":2,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":3,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":4,"cate":"","name":"","url":"","value":"111","retry":0}]
[uploadOss {"id":0,"cate":"","name":"","url":"","value":"111","retry":1}]
[uploadOss {"id":5,"cate":"","name":"","url":"","value":"111","retry":0}]
