Contents

用户等级 非定时更新 队列优化

背景

关于用户等级,有的app会在每天某个时间点定时更新,这样可以省去处理各种触发用户升级的事件的麻烦,那如果要尽可能即时更新用户等级,可以怎么优化呢

问题

因为各种触发用户升级的事件基本都是要查数据库, 以判断用户是否达到升级要求,所以要放到队列中执行。实践中发现,存在某个时间段触发某个用户很多个升级事件的行为,造成不必要的sql查询:即用户让同事朋友为其点赞收藏评论等。

思路

可以这样优化:记录当前job的编号以及存在的最大job编号,然后执行到这个人的升级任务的时候,只执行最后一个job

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

class GrowthPlanLevelListener
{
    const TAG = 'GrowthPlanLevelListener: ';

    /**
     * @param HasOwner|Event $event
     */
    public function handle($event)
    {
        /** @var User $user */
        $user = $event->getOwner();
        // phpcs:ignore
        // 此处获取版本信息,需要可序列化,所以不可以把这个listener实现Illuminate\Contracts\Queue\ShouldQueue
        // 解决方式是先获取$withJpush,再生成一个实现ShouldQueue的job,并把$withJpush传进去
        $withJpush = !LevelManager::isIncompatibleAgent();
        info(sprintf(
            '%s user %d check and upgrade growth plan level.',
            self::TAG,
            $user->id
        ));
        dispatch(new GrowthPlanLevelJob(
            $user,
            $withJpush,
            $this->getNum(sprintf(GROWTH_PLAN_JOB_KEY, $user->id))
        ));
    }

    private function getNum($key)
    {
        return Redis::connection()->eval(
            $this->preventRepeated(),
            1,
            $key
        );
    }

    // 不能让前一个job刚set完,后一个job没读到又set一次
    private function preventRepeated()
    {
        return <<<'LUA'
            local hash = redis.call('get', KEYS[1])
            local num = 1
            
            if hash then
                num = redis.call('incr',KEYS[1])
            else
                redis.call('set',KEYS[1],1)
            end
            
            return num
LUA;
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class GrowthPlanLevelJob extends Job implements ShouldQueue
{
    const TAG = 'GrowthPlanLevelJob';

    /**
     * @var User $user
     */
    private $user;

    private $withJpush;

    /**
     * 由于存在某个时间段触发某个用户很多个升级事件的行为,造成不必要的sql查询
     * 所以需要记录当前job的编号以及存在的最大job编号,然后执行到这个人的升级任务的时候,只执行最后一个job
     * @var int $jobNum
     */
    private $jobNum;

    public $tries = 3;
    
    public $timeout = 180;

    public function __construct($user, $withJpush, $jobNum = 1)
    {
        $this->user = $user;
        $this->withJpush = $withJpush;
        $this->jobNum = $jobNum;
    }

    /**
     * @throws \Illuminate\Contracts\Redis\LimiterTimeoutException
     */
    public function handle()
    {
        $jobKey = sprintf(GROWTH_PLAN_JOB_KEY, $this->user->id);
        $count = Redis::connection()->get($jobKey);
        $info = [
            'user_id' => $this->user->id,
            'count'   => $count,
            'job_num' => $this->jobNum
        ];
        if (!$count) {
            info(self::TAG . ' $count is invalid.', $info);
            return;
        }
        if ($count > $this->jobNum) {
            info(self::TAG . ' remain other job of upgrade this user growth plan.', $info);
            return;
        }
        info(self::TAG . ' start executing job.', $info);
        Redis::connection()
            ->funnel($funnelKey = self::TAG . $this->user->id)
            ->limit($limit = 1)
            ->then(function () use ($funnelKey, $limit) {
                $levelManager = new LevelManager($this->user);
                $levelManager->checkAndUpLevel($this->withJpush);
            });
        Redis::connection()->del($jobKey);
    }
}
coffee