package com.jumipm.common.redisson;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/jumipm/common/redisson/RedissonDelayQueue.class */
public class RedissonDelayQueue {
    private static final Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);
    private RedissonClient redissonClient;
    private ThreadPoolExecutor taskExecutor;
    private RDelayedQueue<MessageModel> delayQueue;
    private RBlockingQueue<MessageModel> blockingQueue;
    private Map<String, DelayQueueConsumer> consumers = new ConcurrentHashMap();
    private Long defaultTime = 10L;
    private String name = "Jumi_DelayQueue";

    public RedissonDelayQueue(List<DelayQueueConsumer> list, RedissonClient redissonClient) {
        if (!CollectionUtils.isEmpty(list)) {
            for (int i = 0; i < list.size(); i++) {
                DelayQueueConsumer delayQueueConsumer = list.get(i);
                this.consumers.put(delayQueueConsumer.getType(), delayQueueConsumer);
                log.info("consumer is add : {}", delayQueueConsumer.getType());
            }
        }
        log.info("consumer size is : {}", Integer.valueOf(this.consumers.size()));
        this.redissonClient = redissonClient;
        this.taskExecutor = new ThreadPoolExecutor(0, 5, 20L, TimeUnit.SECONDS, new LinkedBlockingQueue(128), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void init(String str) {
        initOnlySend(str);
        startDelayQueueConsumer();
    }

    public void initOnlySend(String str) {
        this.name = str;
        initDelayQueue();
    }

    private void initDelayQueue() {
        this.blockingQueue = this.redissonClient.getBlockingQueue(this.name);
        this.delayQueue = this.redissonClient.getDelayedQueue(this.blockingQueue);
    }

    private void startDelayQueueConsumer() {
        new Thread(() -> {
            log.info("startDelayQueueConsumer Thread>>>>>>>>>>>>>>>>");
            while (true) {
                log.info("wait message >>>>>>>>>>>>>>>>");
                try {
                    MessageModel messageModel = (MessageModel) this.blockingQueue.take();
                    log.info("接收到延迟任务:{}", messageModel.getType());
                    accept(messageModel);
                } catch (Exception e) {
                    log.error("执行失败: {}", e.getMessage());
                    e.printStackTrace();
                    return;
                }
            }
        }, this.name + "-Consumer").start();
    }

    public void sendDelayTask(MessageModel messageModel, long j) {
        log.info("添加延迟任务:{} 延迟时间:{}s", messageModel.getType(), Long.valueOf(j));
        this.delayQueue.offer(messageModel, j, TimeUnit.SECONDS);
    }

    public void sendDelayTask(MessageModel messageModel) {
        sendDelayTask(messageModel, this.defaultTime.longValue());
    }

    public void sendTask(MessageModel messageModel) {
        log.info("添加实时任务:{}", messageModel.getType());
        accept(messageModel);
    }

    private void accept(MessageModel messageModel) {
        log.info("MessageModel type is {}", messageModel.getType());
        if (!this.consumers.containsKey(messageModel.getType())) {
            log.warn("no consumer type of {}", messageModel.getType());
        } else {
            log.info("isDone {}", Boolean.valueOf(CompletableFuture.runAsync(() -> {
                Thread.currentThread().setName(String.format("DelayQueueConsumer-%s", messageModel.getType()));
                log.info("开始处理>>>>>>>");
                this.consumers.get(messageModel.getType()).accept(messageModel);
                log.info("处理完成>>>>>>>");
            }, this.taskExecutor).exceptionally(th -> {
                log.error("执行失败: {}", th.getMessage());
                th.printStackTrace();
                return null;
            }).isDone()));
        }
    }
}
