/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.sequencedmultisetstate;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.sequencedmultisetstate.ValueStateMultiSetState;
import org.apache.flink.table.runtime.sequencedmultisetstate.linked.LinkedMultiSetState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AdaptiveSequencedMultiSetState
implements SequencedMultiSetState<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(AdaptiveSequencedMultiSetState.class);
    private final ValueStateMultiSetState smallState;
    private final LinkedMultiSetState largeState;
    private final long switchToLargeThreshold;
    private final long switchToSmallThreshold;
    private static final long ADAPTIVE_HEAP_HIGH_THRESHOLD = 400L;
    private static final long ADAPTIVE_HEAP_LOW_THRESHOLD = 300L;
    private static final long ADAPTIVE_ROCKSDB_HIGH_THRESHOLD = 50L;
    private static final long ADAPTIVE_ROCKSDB_LOW_THRESHOLD = 40L;

    AdaptiveSequencedMultiSetState(ValueStateMultiSetState smallState, LinkedMultiSetState largeState, long switchToLargeThreshold, long switchToSmallThreshold) {
        Preconditions.checkArgument((switchToLargeThreshold > switchToSmallThreshold ? 1 : 0) != 0);
        this.smallState = smallState;
        this.largeState = largeState;
        this.switchToLargeThreshold = switchToLargeThreshold;
        this.switchToSmallThreshold = switchToSmallThreshold;
        LOG.info("Created {} with thresholds: {}=>large, {}=>small", new Object[]{this.getClass().getSimpleName(), switchToLargeThreshold, switchToSmallThreshold});
    }

    @Override
    public SequencedMultiSetState.StateChangeInfo<RowData> add(RowData element, long timestamp) throws Exception {
        return this.execute(state -> state.add(element, timestamp), SequencedMultiSetState.StateChangeInfo::getSizeAfter, "add");
    }

    @Override
    public SequencedMultiSetState.StateChangeInfo<RowData> append(RowData element, long timestamp) throws Exception {
        return this.execute(state -> state.append(element, timestamp), SequencedMultiSetState.StateChangeInfo::getSizeAfter, "append");
    }

    @Override
    public Iterator<Tuple2<RowData, Long>> iterator() throws Exception {
        if (this.smallState.isEmpty()) {
            return this.largeState.iterator();
        }
        return this.smallState.iterator();
    }

    @Override
    public boolean isEmpty() throws IOException {
        return this.largeState.isEmpty() && this.smallState.isEmpty();
    }

    @Override
    public SequencedMultiSetState.StateChangeInfo<RowData> remove(RowData element) throws Exception {
        return this.execute(state -> state.remove(element), SequencedMultiSetState.StateChangeInfo::getSizeAfter, "remove");
    }

    @Override
    public void clear() {
        this.clearCache();
        this.smallState.clear();
        this.largeState.clear();
    }

    @Override
    public void loadCache() throws IOException {
        this.smallState.loadCache();
        this.largeState.loadCache();
    }

    @Override
    public void clearCache() {
        this.smallState.clearCache();
        this.largeState.clearCache();
    }

    private <T> T execute(FunctionWithException<SequencedMultiSetState<RowData>, T, Exception> stateOp, Function<T, Long> getNewSize, String action) throws Exception {
        boolean thresholdReached;
        boolean isUsingLarge = this.isIsUsingLargeState();
        SequencedMultiSetState<RowData> currentState = isUsingLarge ? this.largeState : this.smallState;
        SequencedMultiSetState<RowData> otherState = isUsingLarge ? this.smallState : this.largeState;
        Object result = stateOp.apply((Object)currentState);
        long sizeAfter = getNewSize.apply(result);
        boolean bl = isUsingLarge ? sizeAfter <= this.switchToSmallThreshold : (thresholdReached = sizeAfter >= this.switchToLargeThreshold);
        if (thresholdReached) {
            LOG.debug("Switch {} -> {} because '{}' resulted in state size reaching {} elements", new Object[]{currentState.getClass().getSimpleName(), otherState.getClass().getSimpleName(), action, sizeAfter});
            this.switchState(currentState, otherState);
        }
        this.clearCache();
        return (T)result;
    }

    @VisibleForTesting
    boolean isIsUsingLargeState() throws IOException {
        this.smallState.loadCache();
        if (!this.smallState.isEmpty()) {
            return false;
        }
        this.largeState.loadCache();
        return !this.largeState.isEmpty();
    }

    private void switchState(SequencedMultiSetState<RowData> src, SequencedMultiSetState<RowData> dst) throws Exception {
        Iterator<Tuple2<RowData, Long>> it = src.iterator();
        while (it.hasNext()) {
            Tuple2<RowData, Long> next = it.next();
            dst.append((RowData)next.f0, (Long)next.f1);
        }
        src.clear();
    }

    public static AdaptiveSequencedMultiSetState create(SequencedMultiSetStateConfig sequencedMultiSetStateConfig, String backendTypeIdentifier, ValueStateMultiSetState smallState, LinkedMultiSetState largeState) {
        return new AdaptiveSequencedMultiSetState(smallState, largeState, sequencedMultiSetStateConfig.getAdaptiveHighThresholdOverride().orElse(AdaptiveSequencedMultiSetState.isHeap(backendTypeIdentifier) ? 400L : 50L), sequencedMultiSetStateConfig.getAdaptiveLowThresholdOverride().orElse(AdaptiveSequencedMultiSetState.isHeap(backendTypeIdentifier) ? 300L : 40L));
    }

    private static boolean isHeap(String stateBackend) {
        String trim = stateBackend.trim();
        return trim.equalsIgnoreCase("hashmap") || trim.equalsIgnoreCase("heap");
    }
}

