/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.config.server.service.notify.NotifyTask;
import com.alibaba.nacos.config.server.service.notify.NotifyTaskProcessor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;

public class NotifySingleService {
    private static final Logger LOGGER = LogUtil.FATAL_LOG;
    private ServerMemberManager memberManager;
    private ConcurrentHashMap<String, Executor> executors = new ConcurrentHashMap();

    @Autowired
    public NotifySingleService(ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        this.setupNotifyExecutors();
    }

    private void setupNotifyExecutors() {
        ExecutorService executor;
        Collection clusterIps = this.memberManager.allMembers();
        for (Member member : clusterIps) {
            String address = member.getAddress();
            if (null != this.executors.putIfAbsent(address, executor = ExecutorFactory.newSingleScheduledExecutorService((ThreadFactory)new NameThreadFactory("com.alibaba.nacos.config.NotifySingleServiceThread-" + address)))) continue;
            LOGGER.warn("[notify-thread-pool] setup thread target ip {} ok.", (Object)address);
        }
        for (Map.Entry entry : this.executors.entrySet()) {
            String target = (String)entry.getKey();
            if (clusterIps.contains(target)) continue;
            executor = (ThreadPoolExecutor)entry.getValue();
            ((ThreadPoolExecutor)executor).shutdown();
            this.executors.remove(target);
            LOGGER.warn("[notify-thread-pool] tear down thread target ip {} ok.", (Object)target);
        }
    }

    public ConcurrentHashMap<String, Executor> getExecutors() {
        return this.executors;
    }

    static class NotifySingleTask
    extends NotifyTask
    implements Runnable {
        private static final NotifyTaskProcessorWrapper PROCESSOR = new NotifyTaskProcessorWrapper();
        private final Executor executor;
        private final String target;
        private boolean isSuccess = false;

        public NotifySingleTask(String dataId, String group, String tenant, long lastModified, String target, Executor executor) {
            super(dataId, group, tenant, lastModified);
            this.target = target;
            this.executor = executor;
        }

        @Override
        public void run() {
            try {
                this.isSuccess = PROCESSOR.process((NacosTask)this);
            }
            catch (Exception e) {
                this.isSuccess = false;
                LogUtil.NOTIFY_LOG.error("[notify-exception] target:{} dataid:{} group:{} ts:{}", new Object[]{this.target, this.getDataId(), this.getGroup(), this.getLastModified()});
                LogUtil.NOTIFY_LOG.debug("[notify-exception] target:{} dataid:{} group:{} ts:{}", (Object)new Object[]{this.target, this.getDataId(), this.getGroup(), this.getLastModified()}, (Object)e);
            }
            if (!this.isSuccess) {
                LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataid:{} group:{} ts:{}", new Object[]{this.target, this.getDataId(), this.getGroup(), this.getLastModified()});
                try {
                    ((ScheduledThreadPoolExecutor)this.executor).schedule(this, 500L, TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    LOGGER.warn("[notify-thread-pool] cluster remove node {}, current thread was tear down.", (Object)this.target, (Object)e);
                }
            }
        }
    }

    static class NotifyTaskProcessorWrapper
    extends NotifyTaskProcessor {
        public NotifyTaskProcessorWrapper() {
            super(null);
        }

        @Override
        public boolean process(NacosTask task) {
            NotifySingleTask notifyTask = (NotifySingleTask)task;
            return this.notifyToDump(notifyTask.getDataId(), notifyTask.getGroup(), notifyTask.getTenant(), notifyTask.getLastModified(), notifyTask.target);
        }
    }
}

