package org.apache.flink.table.connector.source.lookup.cache.trigger;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalUnit;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/table/connector/source/lookup/cache/trigger/TimedCacheReloadTriggerTest.class */
class TimedCacheReloadTriggerTest {
    private static final long MILLIS_DIFF = 2;
    private static final int RELOAD_INTERVAL_IN_DAYS = 7;
    private static final Clock CONSTANT_CLOCK = Clock.fixed(Instant.ofEpochMilli(10), ZoneId.systemDefault());
    private final ScheduleStrategyExecutorService scheduledExecutor = new ScheduleStrategyExecutorService();
    private final TestTriggerContext context = new TestTriggerContext();

    TimedCacheReloadTriggerTest() {
    }

    @MethodSource({"normalReloadTimes"})
    @ParameterizedTest
    void testNormalReloadTime(Temporal temporal) throws Exception {
        TimedCacheReloadTrigger timedCacheReloadTrigger = new TimedCacheReloadTrigger(temporal, RELOAD_INTERVAL_IN_DAYS, this.scheduledExecutor, CONSTANT_CLOCK);
        try {
            timedCacheReloadTrigger.open(this.context);
            Assertions.assertThat(this.scheduledExecutor.numQueuedRunnables()).isEqualTo(1);
            Assertions.assertThat(this.scheduledExecutor.getNumPeriodicTasksWithFixedDelay()).isEqualTo(0);
            Assertions.assertThat(this.scheduledExecutor.getNumPeriodicTasksWithFixedRate()).isEqualTo(1);
            ScheduledTask scheduledTask = (ScheduledTask) this.scheduledExecutor.getAllPeriodicScheduledTask().iterator().next();
            Assertions.assertThat(scheduledTask.getDelay(TimeUnit.MILLISECONDS)).isEqualTo(MILLIS_DIFF);
            Assertions.assertThat(scheduledTask.getPeriod()).isEqualTo(Duration.ofDays(7L).toMillis());
            this.scheduledExecutor.trigger();
            Assertions.assertThat(this.context.getReloadTask().getNumLoads()).isEqualTo(1);
            this.scheduledExecutor.triggerPeriodicScheduledTasks();
            Assertions.assertThat(this.context.getReloadTask().getNumLoads()).isEqualTo(2);
            timedCacheReloadTrigger.close();
            Assertions.assertThat(this.scheduledExecutor.isTerminated()).isTrue();
        } catch (Throwable th) {
            try {
                timedCacheReloadTrigger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"nextDayReloadTimes"})
    @ParameterizedTest
    void testNextDayReloadTime(Temporal temporal) throws Exception {
        TimedCacheReloadTrigger timedCacheReloadTrigger = new TimedCacheReloadTrigger(temporal, RELOAD_INTERVAL_IN_DAYS, this.scheduledExecutor, CONSTANT_CLOCK);
        try {
            timedCacheReloadTrigger.open(this.context);
            Assertions.assertThat(this.scheduledExecutor.numQueuedRunnables()).isEqualTo(1);
            Assertions.assertThat(this.scheduledExecutor.getNumPeriodicTasksWithFixedDelay()).isEqualTo(0);
            Assertions.assertThat(this.scheduledExecutor.getNumPeriodicTasksWithFixedRate()).isEqualTo(1);
            ScheduledTask scheduledTask = (ScheduledTask) this.scheduledExecutor.getAllPeriodicScheduledTask().iterator().next();
            Assertions.assertThat(scheduledTask.getDelay(TimeUnit.MILLISECONDS)).isEqualTo(Duration.ofDays(1L).minus(MILLIS_DIFF, ChronoUnit.MILLIS).toMillis());
            Assertions.assertThat(scheduledTask.getPeriod()).isEqualTo(Duration.ofDays(7L).toMillis());
            this.scheduledExecutor.trigger();
            Assertions.assertThat(this.context.getReloadTask().getNumLoads()).isEqualTo(1);
            this.scheduledExecutor.triggerPeriodicScheduledTasks();
            Assertions.assertThat(this.context.getReloadTask().getNumLoads()).isEqualTo(2);
            timedCacheReloadTrigger.close();
            Assertions.assertThat(this.scheduledExecutor.isTerminated()).isTrue();
        } catch (Throwable th) {
            try {
                timedCacheReloadTrigger.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testBadReloadIntervalInDays() {
        Assertions.assertThatThrownBy(() -> {
            new TimedCacheReloadTrigger(LocalTime.now(), 0);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("at least 1 day");
        Assertions.assertThatThrownBy(() -> {
            new TimedCacheReloadTrigger(LocalTime.now(), -1);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("at least 1 day");
    }

    @MethodSource({"reloadTimeStrings"})
    @ParameterizedTest
    void testParseReloadTimeFromConf(String str, Temporal temporal) {
        Configuration createValidConf = createValidConf();
        createValidConf.set(LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME, str);
        Assertions.assertThat(TimedCacheReloadTrigger.fromConfig(createValidConf).getReloadTime()).isEqualTo(temporal);
    }

    @Test
    void testCreateFromConfig() {
        Assertions.assertThat(TimedCacheReloadTrigger.fromConfig(createValidConf())).isNotNull();
        Configuration configuration = createValidConf().set(LookupOptions.CACHE_TYPE, LookupOptions.LookupCacheType.PARTIAL);
        Assertions.assertThatThrownBy(() -> {
            TimedCacheReloadTrigger.fromConfig(configuration);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("should be 'FULL'");
        Configuration configuration2 = createValidConf().set(LookupOptions.FULL_CACHE_RELOAD_STRATEGY, LookupOptions.ReloadStrategy.PERIODIC);
        Assertions.assertThatThrownBy(() -> {
            TimedCacheReloadTrigger.fromConfig(configuration2);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("should be 'TIMED'");
        Configuration createValidConf = createValidConf();
        createValidConf.removeConfig(LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME);
        Assertions.assertThatThrownBy(() -> {
            TimedCacheReloadTrigger.fromConfig(createValidConf);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Missing '" + LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME.key() + "'");
        Configuration configuration3 = createValidConf().set(LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME, "10");
        Assertions.assertThatThrownBy(() -> {
            TimedCacheReloadTrigger.fromConfig(configuration3);
        }).isInstanceOf(DateTimeParseException.class).hasMessageContaining("could not be parsed");
    }

    static Stream<Arguments> normalReloadTimes() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{OffsetTime.now(CONSTANT_CLOCK).plus(MILLIS_DIFF, (TemporalUnit) ChronoUnit.MILLIS)}), Arguments.of(new Object[]{LocalTime.now(CONSTANT_CLOCK).plus(MILLIS_DIFF, (TemporalUnit) ChronoUnit.MILLIS)})});
    }

    static Stream<Arguments> nextDayReloadTimes() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{OffsetTime.now(CONSTANT_CLOCK).minus(MILLIS_DIFF, (TemporalUnit) ChronoUnit.MILLIS)}), Arguments.of(new Object[]{LocalTime.now(CONSTANT_CLOCK).minus(MILLIS_DIFF, (TemporalUnit) ChronoUnit.MILLIS)})});
    }

    static Stream<Arguments> reloadTimeStrings() {
        LocalTime of = LocalTime.of(10, 15);
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"10:15", of}), Arguments.of(new Object[]{"10:15Z", OffsetTime.of(of, ZoneOffset.UTC)}), Arguments.of(new Object[]{"10:15+07:00", OffsetTime.of(of, ZoneOffset.ofHours(RELOAD_INTERVAL_IN_DAYS))})});
    }

    private static Configuration createValidConf() {
        Configuration configuration = new Configuration();
        configuration.set(LookupOptions.CACHE_TYPE, LookupOptions.LookupCacheType.FULL);
        configuration.set(LookupOptions.FULL_CACHE_RELOAD_STRATEGY, LookupOptions.ReloadStrategy.TIMED);
        configuration.set(LookupOptions.FULL_CACHE_TIMED_RELOAD_ISO_TIME, "10:15");
        return configuration;
    }
}
