package org.apache.flink.table.connector.source.lookup.cache.trigger;

import java.time.Clock;
import java.time.Duration;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTrigger.class */
public class TimedCacheReloadTrigger implements CacheReloadTrigger {
    private static final long serialVersionUID = 1;
    private final Temporal reloadTime;
    private final int reloadIntervalInDays;
    private transient ScheduledExecutorService scheduledExecutor;
    private transient Clock clock;

    public TimedCacheReloadTrigger(OffsetTime offsetTime, int i) {
        this((Temporal) offsetTime, i);
    }

    public TimedCacheReloadTrigger(LocalTime localTime, int i) {
        this((Temporal) localTime, i);
    }

    private TimedCacheReloadTrigger(Temporal temporal, int i) {
        Preconditions.checkArgument(i > 0, "Reload interval for Timed cache reload trigger must be at least 1 day.");
        this.reloadTime = temporal;
        this.reloadIntervalInDays = i;
    }

    @VisibleForTesting
    TimedCacheReloadTrigger(Temporal temporal, int i, ScheduledExecutorService scheduledExecutorService, Clock clock) {
        this(temporal, i);
        this.scheduledExecutor = scheduledExecutorService;
        this.clock = clock;
    }

    @Override // org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger
    public void open(CacheReloadTrigger.Context context) {
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        }
        if (this.clock == null) {
            this.clock = this.reloadTime instanceof LocalTime ? Clock.systemDefaultZone() : Clock.system(((OffsetTime) this.reloadTime).getOffset());
        }
        Duration between = Duration.between(this.reloadTime instanceof LocalTime ? LocalTime.now(this.clock) : OffsetTime.now(this.clock), this.reloadTime);
        if (between.isNegative()) {
            between = between.plus(1L, ChronoUnit.DAYS);
        }
        ScheduledExecutorService scheduledExecutorService = this.scheduledExecutor;
        Objects.requireNonNull(context);
        scheduledExecutorService.execute(context::triggerReload);
        ScheduledExecutorService scheduledExecutorService2 = this.scheduledExecutor;
        Objects.requireNonNull(context);
        scheduledExecutorService2.scheduleAtFixedRate(context::triggerReload, between.toMillis(), Duration.ofDays(this.reloadIntervalInDays).toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdownNow();
        }
    }

    @VisibleForTesting
    Temporal getReloadTime() {
        return this.reloadTime;
    }

    public static TimedCacheReloadTrigger fromConfig(ReadableConfig readableConfig) {
        Preconditions.checkArgument(readableConfig.get(LookupOptions.CACHE_TYPE) == LookupOptions.LookupCacheType.FULL, "'%s' should be '%s' in order to build a Timed cache reload trigger.", LookupOptions.CACHE_TYPE.key(), LookupOptions.LookupCacheType.FULL);
        Preconditions.checkArgument(readableConfig.get(LookupOptions.FULL_CACHE_RELOAD_STRATEGY) == LookupOptions.ReloadStrategy.TIMED, "'%s' should be '%s' in order to build a Timed cache reload trigger.", LookupOptions.FULL_CACHE_RELOAD_STRATEGY.key(), LookupOptions.ReloadStrategy.TIMED);
        Preconditions.checkArgument(readableConfig.getOptional(LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME).isPresent(), "Missing '%s' in the configuration. This option is required to build a Timed cache reload trigger.", LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME.key());
        return new TimedCacheReloadTrigger((Temporal) DateTimeFormatter.ISO_TIME.parseBest((CharSequence) readableConfig.get(LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME), OffsetTime::from, LocalTime::from), ((Integer) readableConfig.get(LookupOptions.FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS)).intValue());
    }
}
