package com.alibaba.otter.common.push.media;

import com.alibaba.otter.common.push.AbstractSubscribeManager;
import com.alibaba.otter.common.push.SubscribeCallback;
import com.alibaba.otter.shared.arbitrate.impl.communication.ArbitrateCommmunicationClient;
import com.alibaba.otter.shared.common.model.config.ConfigException;
import com.alibaba.otter.shared.common.utils.cache.RefreshMemoryMirror;
import com.alibaba.otter.shared.common.utils.thread.NamedThreadFactory;
import com.alibaba.otter.shared.communication.model.config.FindMediaEvent;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/otter/common/push/media/MediaSubscribeManager.class */
public class MediaSubscribeManager extends AbstractSubscribeManager {
    private static final Long DEFAULT_PERIOD = 60000L;
    private static final Logger logger = LoggerFactory.getLogger(MediaSubscribeManager.class);
    private ArbitrateCommmunicationClient arbitrateCommmunicationClient;
    private ScheduledThreadPoolExecutor executor;
    private ConcurrentMap<String, Object> mutexes = new ConcurrentHashMap();
    private Map<String, Runnable> runnableMap = new ConcurrentHashMap();
    private Long timeout = DEFAULT_PERIOD;
    private int poolSize = 8;
    private RefreshMemoryMirror<String, String> matrixCache = new RefreshMemoryMirror<>(this.timeout, new RefreshMemoryMirror.ComputeFunction<String, String>() { // from class: com.alibaba.otter.common.push.media.MediaSubscribeManager.1
        public String apply(final String str, String str2) {
            FindMediaEvent findMediaEvent = new FindMediaEvent();
            findMediaEvent.setDataId(str);
            try {
                Object callManager = MediaSubscribeManager.this.arbitrateCommmunicationClient.callManager(findMediaEvent);
                if (callManager == null || !(callManager instanceof String)) {
                    throw new ConfigException("No Such dataId[" + str + "]");
                }
                final String str3 = (String) callManager;
                if (!StringUtils.equalsIgnoreCase(str2, str3)) {
                    MediaSubscribeManager.this.executor.submit(new Runnable() { // from class: com.alibaba.otter.common.push.media.MediaSubscribeManager.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            Iterator it = MediaSubscribeManager.this.getCallbacks(str, null).iterator();
                            while (it.hasNext()) {
                                ((SubscribeCallback) it.next()).callback(str3);
                            }
                        }
                    });
                }
                return str3;
            } catch (Exception e) {
                MediaSubscribeManager.logger.error("call_manager_error", findMediaEvent.toString(), e);
                return str2;
            }
        }
    });

    @Override // com.alibaba.otter.common.push.AbstractSubscribeManager
    protected void doInit() {
        if (this.executor != null) {
            return;
        }
        this.executor = new ScheduledThreadPoolExecutor(this.poolSize, new NamedThreadFactory("canal-media-callback-worker"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @Override // com.alibaba.otter.common.push.AbstractSubscribeManager
    protected void doShutdown() {
        if (this.executor != null) {
            return;
        }
        this.executor.shutdown();
    }

    @Override // com.alibaba.otter.common.push.SubscribeManager
    public String fetchConfig(String str) {
        return fetchConfig(str, 0L);
    }

    @Override // com.alibaba.otter.common.push.SubscribeManager
    public String fetchConfig(String str, long j) {
        return (String) this.matrixCache.get(str);
    }

    @Override // com.alibaba.otter.common.push.SubscribeManager
    public String fetchConfig(String str, String str2) {
        return fetchConfig(str);
    }

    @Override // com.alibaba.otter.common.push.SubscribeManager
    public String fetchConfig(String str, String str2, long j) {
        return fetchConfig(str, 0L);
    }

    @Override // com.alibaba.otter.common.push.AbstractSubscribeManager
    protected void postRegisterCallback(String str, String str2, SubscribeCallback subscribeCallback) {
        String generateKey = generateKey(str, str2);
        if (this.mutexes.putIfAbsent(generateKey, new Object()) == null) {
            synchronized (this.mutexes.get(generateKey)) {
                submitSchedule(str, str2);
            }
        }
    }

    @Override // com.alibaba.otter.common.push.AbstractSubscribeManager
    protected void doWhenCallbackEmpty(String str, String str2, SubscribeCallback subscribeCallback) {
        String generateKey = generateKey(str, str2);
        Object putIfAbsent = this.mutexes.putIfAbsent(generateKey, new Object());
        if (putIfAbsent == null) {
            putIfAbsent = this.mutexes.get(generateKey);
        }
        synchronized (putIfAbsent) {
            colseSchedule(str, str2);
            this.matrixCache.remove(str);
        }
    }

    private void submitSchedule(final String str, String str2) {
        String generateKey = generateKey(str, str2);
        if (this.runnableMap.get(generateKey) == null) {
            Runnable runnable = new Runnable() { // from class: com.alibaba.otter.common.push.media.MediaSubscribeManager.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        MediaSubscribeManager.this.matrixCache.get(str);
                    } catch (Throwable th) {
                        MediaSubscribeManager.logger.error("reload failed", th);
                    }
                }
            };
            this.runnableMap.put(generateKey, runnable);
            this.executor.scheduleAtFixedRate(runnable, this.timeout.longValue(), this.timeout.longValue(), TimeUnit.MILLISECONDS);
        }
    }

    private void colseSchedule(String str, String str2) {
        Runnable remove = this.runnableMap.remove(generateKey(str, str2));
        if (remove != null) {
            this.executor.remove(remove);
        }
    }

    public void setArbitrateCommmunicationClient(ArbitrateCommmunicationClient arbitrateCommmunicationClient) {
        this.arbitrateCommmunicationClient = arbitrateCommmunicationClient;
    }
}
