/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CompositeStateHandle;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SharedStateRegistryImpl
implements SharedStateRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistryImpl.class);
    private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates;
    private final Map<Long, Optional<SnapshotType.SharingFilesStrategy>> restoredCheckpointSharingStrategies = new HashMap<Long, Optional<SnapshotType.SharingFilesStrategy>>();
    private boolean open;
    private final Executor asyncDisposalExecutor;
    private long highestNotClaimedCheckpointID = -1L;

    public SharedStateRegistryImpl() {
        this(Executors.directExecutor());
    }

    public SharedStateRegistryImpl(Executor asyncDisposalExecutor) {
        this.registeredStates = new HashMap<SharedStateRegistryKey, SharedStateEntry>();
        this.asyncDisposalExecutor = (Executor)Preconditions.checkNotNull((Object)asyncDisposalExecutor);
        this.open = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StreamStateHandle registerReference(SharedStateRegistryKey registrationKey, StreamStateHandle newHandle, long checkpointID, boolean preventDiscardingCreatedCheckpoint) {
        SharedStateEntry entry;
        Preconditions.checkNotNull((Object)newHandle, (String)"State handle should not be null.");
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            Preconditions.checkState((boolean)this.open, (Object)"Attempt to register state to closed SharedStateRegistry.");
            entry = this.registeredStates.get((Object)registrationKey);
            if (entry == null) {
                Preconditions.checkState((!this.isPlaceholder(newHandle) ? 1 : 0) != 0, (Object)("Attempt to reference unknown state: " + registrationKey));
                LOG.trace("Registered new shared state {} under key {}.", (Object)newHandle, (Object)registrationKey);
                entry = new SharedStateEntry(newHandle, checkpointID);
                this.registeredStates.put(registrationKey, entry);
                return entry.stateHandle;
            }
            if (entry.stateHandle == newHandle) {
                LOG.info("Duplicated registration under key {} with the same object: {}", (Object)registrationKey, (Object)newHandle);
            } else if (Objects.equals(entry.stateHandle, newHandle)) {
                LOG.trace("Duplicated registration under key {} with the new object: {}.", (Object)registrationKey, (Object)newHandle);
            } else if (this.isPlaceholder(newHandle)) {
                LOG.trace("Duplicated registration under key {} with a placeholder (normal case)", (Object)registrationKey);
            } else {
                LOG.info("the registered handle should equal to the previous one or is a placeholder, register key:{}, handle:{}", (Object)registrationKey, (Object)newHandle);
                if (entry.stateHandle instanceof EmptyDiscardStateObjectForRegister) {
                    entry.stateHandle = newHandle;
                } else {
                    throw new IllegalStateException("StateObjects underlying same key should be equal !");
                }
            }
            LOG.trace("Updating last checkpoint for {} from {} to {}", new Object[]{registrationKey, entry.lastUsedCheckpointID, checkpointID});
            entry.advanceLastUsingCheckpointID(checkpointID);
            if (preventDiscardingCreatedCheckpoint) {
                entry.preventDiscardingCreatedCheckpoint();
            }
        }
        return entry.stateHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<Long> unregisterUnusedState(long lowestCheckpointID) {
        HashSet<Long> checkpointInUse = new HashSet<Long>();
        LOG.debug("Discard state created before checkpoint {} and not used afterwards", (Object)lowestCheckpointID);
        ArrayList<StreamStateHandle> subsumed = new ArrayList<StreamStateHandle>();
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            Iterator<SharedStateEntry> it = this.registeredStates.values().iterator();
            while (it.hasNext()) {
                SharedStateEntry entry = it.next();
                if (entry.lastUsedCheckpointID < lowestCheckpointID) {
                    if (entry.createdByCheckpointID > this.highestNotClaimedCheckpointID) {
                        subsumed.add(entry.stateHandle);
                    }
                    it.remove();
                    continue;
                }
                if (!this.preventsDiscardingCreatedCheckpoint(entry)) continue;
                checkpointInUse.add(entry.createdByCheckpointID);
            }
        }
        LOG.trace("Discard {} state asynchronously", (Object)subsumed.size());
        for (StreamStateHandle handle : subsumed) {
            this.scheduleAsyncDelete(handle);
        }
        return checkpointInUse;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, long checkpointID) {
        if (stateHandles == null) {
            return;
        }
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            for (CompositeStateHandle compositeStateHandle : stateHandles) {
                compositeStateHandle.registerSharedStates(this, checkpointID);
            }
        }
    }

    @Override
    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, RecoveryClaimMode mode) {
        this.registerAll(checkpoint.getOperatorStates().values(), checkpoint.getCheckpointID());
        this.restoredCheckpointSharingStrategies.put(checkpoint.getCheckpointID(), checkpoint.getRestoredProperties().map(props -> props.getCheckpointType().getSharingFilesStrategy()));
        if (mode != RecoveryClaimMode.CLAIM) {
            this.highestNotClaimedCheckpointID = Math.max(this.highestNotClaimedCheckpointID, checkpoint.getCheckpointID());
        }
    }

    @Override
    public void checkpointCompleted(long checkpointId) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String toString() {
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            return "SharedStateRegistry{registeredStates=" + this.registeredStates + "}";
        }
    }

    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
        if (streamStateHandle != null && !this.isPlaceholder(streamStateHandle)) {
            LOG.debug("Scheduled delete of state handle {}.", (Object)streamStateHandle);
            AsyncDisposalRunnable asyncDisposalRunnable = new AsyncDisposalRunnable(streamStateHandle);
            try {
                this.asyncDisposalExecutor.execute(asyncDisposalRunnable);
            }
            catch (RejectedExecutionException ex) {
                asyncDisposalRunnable.run();
            }
        }
    }

    private boolean isPlaceholder(StreamStateHandle stateHandle) {
        return stateHandle instanceof PlaceholderStreamStateHandle;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Map<SharedStateRegistryKey, SharedStateEntry> map = this.registeredStates;
        synchronized (map) {
            this.open = false;
        }
    }

    private boolean preventsDiscardingCreatedCheckpoint(SharedStateEntry entry) {
        if (entry.preventDiscardingCreatedCheckpoint && this.restoredCheckpointSharingStrategies.containsKey(entry.createdByCheckpointID)) {
            return true;
        }
        return this.restoredCheckpointSharingStrategies.getOrDefault(entry.createdByCheckpointID, Optional.empty()).filter(sharingFilesStrategy -> sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING).isPresent();
    }

    public static class EmptyDiscardStateObjectForRegister
    implements StreamStateHandle {
        private static final long serialVersionUID = 1L;
        private StateHandleID stateHandleID;

        public EmptyDiscardStateObjectForRegister(StateHandleID stateHandleID) {
            this.stateHandleID = stateHandleID;
        }

        @Override
        public void discardState() throws Exception {
        }

        @Override
        public long getStateSize() {
            throw new UnsupportedOperationException("Should not call here.");
        }

        @Override
        public FSDataInputStream openInputStream() throws IOException {
            throw new UnsupportedOperationException("Should not call here.");
        }

        @Override
        public Optional<byte[]> asBytesIfInMemory() {
            throw new UnsupportedOperationException("Should not call here.");
        }

        @Override
        public PhysicalStateHandleID getStreamStateHandleID() {
            throw new UnsupportedOperationException("Should not call here.");
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            EmptyDiscardStateObjectForRegister that = (EmptyDiscardStateObjectForRegister)o;
            return Objects.equals((Object)this.stateHandleID, (Object)that.stateHandleID);
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.stateHandleID});
        }

        public String toString() {
            return "EmptyDiscardStateObject{" + this.stateHandleID + "}";
        }
    }

    private static final class SharedStateEntry {
        private boolean preventDiscardingCreatedCheckpoint = false;
        StreamStateHandle stateHandle;
        private final long createdByCheckpointID;
        private long lastUsedCheckpointID;

        SharedStateEntry(StreamStateHandle value, long checkpointID) {
            this.stateHandle = value;
            this.createdByCheckpointID = checkpointID;
            this.lastUsedCheckpointID = checkpointID;
        }

        public String toString() {
            return "SharedStateEntry{stateHandle=" + this.stateHandle + ", createdByCheckpointID=" + this.createdByCheckpointID + ", lastUsedCheckpointID=" + this.lastUsedCheckpointID + "}";
        }

        private void advanceLastUsingCheckpointID(long checkpointID) {
            this.lastUsedCheckpointID = Math.max(checkpointID, this.lastUsedCheckpointID);
        }

        private void preventDiscardingCreatedCheckpoint() {
            this.preventDiscardingCreatedCheckpoint = true;
        }
    }

    private static final class AsyncDisposalRunnable
    implements Runnable {
        private final StateObject toDispose;

        public AsyncDisposalRunnable(StateObject toDispose) {
            this.toDispose = (StateObject)Preconditions.checkNotNull((Object)toDispose);
        }

        @Override
        public void run() {
            try {
                this.toDispose.discardState();
            }
            catch (Exception e) {
                LOG.warn("A problem occurred during asynchronous disposal of a shared state object: {}", (Object)this.toDispose, (Object)e);
            }
        }
    }
}

