package io.kubernetes.client.informer.cache;

import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.informer.ResyncRunnable;
import io.kubernetes.client.informer.cache.DeltaFIFO;
import io.kubernetes.client.util.Namespaces;
import io.kubernetes.client.util.Threads;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.MutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kubernetes/client/informer/cache/Controller.class */
public class Controller<ApiType extends KubernetesObject, ApiListType extends KubernetesListObject> {
    private static final Logger log = LoggerFactory.getLogger(Controller.class);
    private static final long DEFAULT_PERIOD = 1000;
    private long fullResyncPeriod;
    private DeltaFIFO queue;
    private ListerWatcher<ApiType, ApiListType> listerWatcher;
    private ReflectorRunnable<ApiType, ApiListType> reflector;
    private Supplier<Boolean> resyncFunc;
    private Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> processFunc;
    private ScheduledExecutorService reflectExecutor;
    private ScheduledExecutorService resyncExecutor;
    private ScheduledFuture resyncFuture;
    private Class<ApiType> apiTypeClass;
    private ScheduledFuture reflectorFuture;
    BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

    public Controller(Class<ApiType> cls, DeltaFIFO deltaFIFO, ListerWatcher<ApiType, ApiListType> listerWatcher, Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> consumer, Supplier<Boolean> supplier, long j) {
        this(cls, deltaFIFO, listerWatcher, consumer, supplier, j, null);
    }

    public Controller(Class<ApiType> cls, DeltaFIFO deltaFIFO, ListerWatcher<ApiType, ApiListType> listerWatcher, Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> consumer, Supplier<Boolean> supplier, long j, BiConsumer<Class<ApiType>, Throwable> biConsumer) {
        this.queue = deltaFIFO;
        this.listerWatcher = listerWatcher;
        this.apiTypeClass = cls;
        this.processFunc = consumer;
        this.resyncFunc = supplier;
        this.fullResyncPeriod = j;
        this.exceptionHandler = biConsumer;
        this.reflectExecutor = Executors.newSingleThreadScheduledExecutor(Threads.threadFactory("controller-reflector-" + cls.getName() + "-%d"));
        this.resyncExecutor = Executors.newSingleThreadScheduledExecutor(Threads.threadFactory("controller-resync-" + cls.getName() + "-%d"));
    }

    public Controller(Class<ApiType> cls, DeltaFIFO deltaFIFO, ListerWatcher<ApiType, ApiListType> listerWatcher, Consumer<Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>>> consumer) {
        this(cls, deltaFIFO, listerWatcher, consumer, null, 0L);
    }

    public void run() {
        log.info("informer#Controller: ready to run resync & reflector runnable");
        if (this.fullResyncPeriod > 0) {
            ResyncRunnable resyncRunnable = new ResyncRunnable(this.queue, this.resyncFunc);
            ScheduledExecutorService scheduledExecutorService = this.resyncExecutor;
            Objects.requireNonNull(resyncRunnable);
            this.resyncFuture = scheduledExecutorService.scheduleAtFixedRate(resyncRunnable::run, this.fullResyncPeriod, this.fullResyncPeriod, TimeUnit.MILLISECONDS);
        } else {
            log.info("informer#Controller: resync skipped due to 0 full resync period");
        }
        synchronized (this) {
            this.reflector = newReflector();
            try {
                ScheduledExecutorService scheduledExecutorService2 = this.reflectExecutor;
                ReflectorRunnable<ApiType, ApiListType> reflectorRunnable = this.reflector;
                Objects.requireNonNull(reflectorRunnable);
                this.reflectorFuture = scheduledExecutorService2.scheduleWithFixedDelay(reflectorRunnable::run, 0L, DEFAULT_PERIOD, TimeUnit.MILLISECONDS);
            } catch (RejectedExecutionException e) {
                log.warn("reflector list-watching job exiting because the thread-pool is shutting down");
                return;
            }
        }
        processLoop();
    }

    ReflectorRunnable<ApiType, ApiListType> newReflector() {
        return new ReflectorRunnable<>(this.apiTypeClass, this.listerWatcher, this.queue, this.exceptionHandler);
    }

    public void stop() {
        synchronized (this) {
            if (this.reflectorFuture != null) {
                this.reflector.stop();
                this.reflectorFuture.cancel(true);
            }
        }
        this.reflectExecutor.shutdownNow();
        this.resyncExecutor.shutdownNow();
    }

    public boolean hasSynced() {
        return this.queue.hasSynced();
    }

    public String lastSyncResourceVersion() {
        return this.reflector == null ? Namespaces.NAMESPACE_ALL : this.reflector.getLastSyncResourceVersion();
    }

    private void processLoop() {
        while (true) {
            try {
                this.queue.pop(this.processFunc);
            } catch (InterruptedException e) {
                log.error("DefaultController#processLoop get interrupted {}", e.getMessage(), e);
                return;
            } catch (Throwable th) {
                log.error("DefaultController#processLoop recovered from crashing {}", th.getMessage(), th);
            }
        }
    }
}
