package org.apache.flink.state.rocksdb.ttl;

import java.io.IOException;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlUtils;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.FlinkCompactionFilter;
import org.rocksdb.InfoLogLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager.class */
public class RocksDbTtlCompactFiltersManager {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactionFilter.class);
    private final TtlTimeProvider ttlTimeProvider;
    private final LinkedHashMap<String, FlinkCompactionFilter.FlinkCompactionFilterFactory> compactionFilterFactories = new LinkedHashMap<>();
    private final LinkedHashMap<String, ColumnFamilyOptions> columnFamilyOptionsMap = new LinkedHashMap<>();
    private final long queryTimeAfterNumEntries;
    private final Duration periodicCompactionTime;

    /* loaded from: input_file:org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager$ListElementFilter.class */
    private static class ListElementFilter<T> implements FlinkCompactionFilter.ListElementFilter {
        private final TypeSerializer<T> serializer;
        private DataInputDeserializer input = new DataInputDeserializer();

        private ListElementFilter(TypeSerializer<T> typeSerializer) {
            this.serializer = typeSerializer;
        }

        public int nextUnexpiredOffset(byte[] bArr, long j, long j2) {
            this.input.setBuffer(bArr);
            int i = 0;
            while (this.input.available() > 0) {
                try {
                    if (!TtlUtils.expired(nextElementLastAccessTimestamp(), j, j2)) {
                        break;
                    }
                    i = this.input.getPosition();
                } catch (IOException e) {
                    throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", e);
                }
            }
            return i;
        }

        private long nextElementLastAccessTimestamp() throws IOException {
            TtlValue ttlValue = (TtlValue) this.serializer.deserialize(this.input);
            if (this.input.available() > 0) {
                this.input.skipBytesToRead(1);
            }
            return ttlValue.getLastAccessTimestamp();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager$ListElementFilterFactory.class */
    public static class ListElementFilterFactory<T> implements FlinkCompactionFilter.ListElementFilterFactory {
        private final TypeSerializer<T> serializer;

        private ListElementFilterFactory(TypeSerializer<T> typeSerializer) {
            this.serializer = typeSerializer;
        }

        public FlinkCompactionFilter.ListElementFilter createListElementFilter() {
            return new ListElementFilter(this.serializer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/rocksdb/ttl/RocksDbTtlCompactFiltersManager$TimeProviderWrapper.class */
    public static class TimeProviderWrapper implements FlinkCompactionFilter.TimeProvider {
        private final TtlTimeProvider ttlTimeProvider;

        private TimeProviderWrapper(TtlTimeProvider ttlTimeProvider) {
            this.ttlTimeProvider = ttlTimeProvider;
        }

        public long currentTimestamp() {
            return this.ttlTimeProvider.currentTimestamp();
        }
    }

    public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider, long j, Duration duration) {
        this.ttlTimeProvider = ttlTimeProvider;
        this.queryTimeAfterNumEntries = j;
        this.periodicCompactionTime = duration;
    }

    public void setAndRegisterCompactFilterIfStateTtl(@Nonnull RegisteredStateMetaInfoBase registeredStateMetaInfoBase, @Nonnull ColumnFamilyOptions columnFamilyOptions) {
        if ((registeredStateMetaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) && TtlAwareSerializer.isSerializerTtlEnabled(((RegisteredKeyValueStateBackendMetaInfo) registeredStateMetaInfoBase).getStateSerializer())) {
            createAndSetCompactFilterFactory(registeredStateMetaInfoBase.getName(), columnFamilyOptions);
        }
    }

    private void createAndSetCompactFilterFactory(String str, @Nonnull ColumnFamilyOptions columnFamilyOptions) {
        FlinkCompactionFilter.FlinkCompactionFilterFactory flinkCompactionFilterFactory = new FlinkCompactionFilter.FlinkCompactionFilterFactory(new TimeProviderWrapper(this.ttlTimeProvider), createRocksDbNativeLogger());
        columnFamilyOptions.setCompactionFilterFactory(flinkCompactionFilterFactory);
        this.compactionFilterFactories.put(str, flinkCompactionFilterFactory);
        this.columnFamilyOptionsMap.put(str, columnFamilyOptions);
    }

    private static org.rocksdb.Logger createRocksDbNativeLogger() {
        if (!LOG.isDebugEnabled()) {
            return null;
        }
        DBOptions infoLogLevel = new DBOptions().setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
        try {
            org.rocksdb.Logger logger = new org.rocksdb.Logger(infoLogLevel) { // from class: org.apache.flink.state.rocksdb.ttl.RocksDbTtlCompactFiltersManager.1
                protected void log(InfoLogLevel infoLogLevel2, String str) {
                    RocksDbTtlCompactFiltersManager.LOG.debug("RocksDB filter native code log: " + str);
                }
            };
            if (infoLogLevel != null) {
                infoLogLevel.close();
            }
            return logger;
        } catch (Throwable th) {
            if (infoLogLevel != null) {
                try {
                    infoLogLevel.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void configCompactFilter(@Nonnull StateDescriptor<?, ?> stateDescriptor, TypeSerializer<?> typeSerializer) {
        FlinkCompactionFilter.Config createForMap;
        StateTtlConfig ttlConfig = stateDescriptor.getTtlConfig();
        if (ttlConfig.isEnabled() && ttlConfig.getCleanupStrategies().inRocksdbCompactFilter()) {
            FlinkCompactionFilter.FlinkCompactionFilterFactory flinkCompactionFilterFactory = this.compactionFilterFactories.get(stateDescriptor.getName());
            Preconditions.checkNotNull(flinkCompactionFilterFactory);
            long millis = ttlConfig.getTimeToLive().toMillis();
            ColumnFamilyOptions columnFamilyOptions = this.columnFamilyOptionsMap.get(stateDescriptor.getName());
            Preconditions.checkNotNull(columnFamilyOptions);
            StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy = ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy();
            Duration duration = this.periodicCompactionTime;
            long j = this.queryTimeAfterNumEntries;
            if (rocksdbCompactFilterCleanupStrategy != null) {
                duration = rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime();
                j = rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
            }
            if (duration != null) {
                columnFamilyOptions.setPeriodicCompactionSeconds(duration.getSeconds());
            }
            if (stateDescriptor instanceof ListStateDescriptor) {
                TypeSerializer elementSerializer = ((ListSerializer) typeSerializer).getElementSerializer();
                int length = elementSerializer.getLength();
                createForMap = length > 0 ? FlinkCompactionFilter.Config.createForFixedElementList(millis, j, length + 1) : FlinkCompactionFilter.Config.createForList(millis, j, new ListElementFilterFactory(elementSerializer.duplicate()));
            } else {
                createForMap = stateDescriptor instanceof MapStateDescriptor ? FlinkCompactionFilter.Config.createForMap(millis, j) : FlinkCompactionFilter.Config.createForValue(millis, j);
            }
            flinkCompactionFilterFactory.configure(createForMap);
        }
    }

    public void disposeAndClearRegisteredCompactionFactories() {
        Iterator<FlinkCompactionFilter.FlinkCompactionFilterFactory> it = this.compactionFilterFactories.values().iterator();
        while (it.hasNext()) {
            IOUtils.closeQuietly(it.next());
        }
        this.compactionFilterFactories.clear();
        this.columnFamilyOptionsMap.clear();
    }
}
