Golang 使用 Redis 解决钉钉机器人报警频率限制

August 21, 2020 默认分类

消费rabbitmq队列消息发送到钉钉机器人, 由于消息数量比较多,会很频繁地发送钉钉消息

钉钉机器人发送频率限制是 20条/每分钟,如果超过限制,会返回 send too fast 错误信息,再发,就直接 302 并限制发送10分钟

方案

由于 Redis 支持的类型丰富,用法简单,效率高 等特点, 且很多公司都会用到 Redis 无需额外安装。
利用redis的有序集合, Redis 有序集合(sorted set)和集合一样也是string类型元素的集合,且不允许重复的成员,不同的是每个元素都会关联一个double类型的分数,这个分数主要用于集合元素排序
解决思路是:

  1. 一个群最多申请六个机器人(默认有一个小钉占了一个位置),也就是只能申请五个, 把五个token保存在数组中
  2. 每次发送,依次查看每个token的 redis sorted set 集合中以最近一分钟 timestamp 为 scope 范围的数量
  3. 如果大于 19,查询下一个token,否则是可用的token

部分代码如下:

// 获取存活的机器人
func GetAvailableDingRobot(useToken, useSec *string) bool {
    var (
        redisCli = myredis.RedisClient
        //date     = time.Now().Format("2006-01-02")
        prefixKey = "DingTalkSender:Token"
        timeUN    = int(time.Now().UnixNano() / 1e6) // 毫秒时间戳
        key       string
    )

    for _, bot := range config.Config.DingTalk {
        key = fmt.Sprint(prefixKey, ":", bot.Token)
        min := strconv.Itoa(timeUN - 65000)
        max := strconv.Itoa(timeUN)
        count, err := redisCli.ZCount(ctx, key, min, max).Result()
        if err != nil {
            log.Error(err)
            continue
        }

        if count >= config.Config.Frequency {
            continue
        }
        *useToken = bot.Token
        *useSec = bot.Secret
        break
    }

    if *useToken == "" || *useSec == "" {
        return false
    }

    // 添加一次记录
    if err := redisCli.ZAdd(ctx, key, &redis.Z{
        Score:  float64(timeUN),
        Member: fmt.Sprintf("%d", timeUN),
    }).Err(); err != nil {
        log.Error(err)
        return false
    }

    // 设置过期时间 65 秒
    if err := redisCli.Expire(ctx, key, 65*time.Second).Err(); err != nil {
        return false
    }
    return true
}

因为 5 个机器人,最多一分钟就可以发送 5 * 19 = 95 条消息了,如果是单进程脚本,每次执行发送间隔只小到 ~ 50 ms, 已经基本能满足目前需要。

防止消息丢失

有一步特殊处理,如果 5 个 token 都不可用, 将消息发回mq的死信队列重试, 同时增加header计数,如果重试次数超过100则丢弃
部分代码如下:

        .......
        // 重试队列名
        retryQueue := fmt.Sprintf("Retry_%s", queue.Queue)
        // 构造header
        arguments := amqp.Table{
            "x-dead-letter-exchange":    config.Config.Exchange,
            "x-dead-letter-routing-key": queue.RoutingKey, // 原来消息队RoutingKey
        }
        p := producer.New(
            config.Config.MqAddr,
            config.Config.Exchange,
            exchangeType,
            retryQueue, // 死信队列名
            retryQueue, // 死信队列RoutingKey
            arguments,
            true,
        )
        if err := p.Start(); err != nil {
            log.Error(err)
            return
        }
        .......
        .......
        .......
        死信队列默认超时时间6秒
        retryNum, err := strconv.Atoi(fmt.Sprintf("%v", data.Headers["Retry_Num"]))
        retryNum += 1
        headers := amqp.Table{
            "Retry_Num": retryNum,
        }
        if err != nil {
            return p.Push(data.Body, "6000", headers)
        }
        // 发送重试次数达到100就丢弃
        if retryNum >= 100 {
            return nil
        }
        // 每次加1秒, 可以适当延长
        return p.Push(data.Body, strconv.Itoa(6000+(1000*retryNum)), headers)

相关参考:

添加新评论