/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@ThreadSafe
class TaskChangelogRegistryImpl
implements TaskChangelogRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(TaskChangelogRegistryImpl.class);
    private final Map<PhysicalStateHandleID, Long> entries = new ConcurrentHashMap<PhysicalStateHandleID, Long>();
    private final Executor executor;

    public TaskChangelogRegistryImpl(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void startTracking(StreamStateHandle handle, long refCount) {
        Preconditions.checkState((refCount > 0L ? 1 : 0) != 0, (Object)"Initial refCount of state must larger than zero");
        LOG.debug("start tracking state, key: {}, state: {}", (Object)handle.getStreamStateHandleID(), (Object)handle);
        this.entries.put(handle.getStreamStateHandleID(), refCount);
    }

    @Override
    public void stopTracking(StreamStateHandle handle) {
        LOG.debug("stop tracking state, key: {}, state: {}", (Object)handle.getStreamStateHandleID(), (Object)handle);
        this.entries.remove(handle.getStreamStateHandleID());
    }

    @Override
    public void release(StreamStateHandle handle) {
        PhysicalStateHandleID key = handle.getStreamStateHandleID();
        LOG.debug("state reference count decreased by one, key: {}, state: {}", (Object)key, (Object)handle);
        this.entries.compute(key, (handleID, refCount) -> {
            if (refCount == null) {
                LOG.warn("state is not in tracking, key: {}, state: {}", (Object)key, (Object)handle);
                return null;
            }
            long newRefCount = refCount - 1L;
            if (newRefCount == 0L) {
                LOG.debug("state is not used by any backend, schedule discard: {}/{}", (Object)key, (Object)handle);
                this.scheduleDiscard(handle);
                return null;
            }
            return newRefCount;
        });
    }

    private void scheduleDiscard(StreamStateHandle handle) {
        this.executor.execute(() -> {
            try {
                LOG.trace("discard uploaded but unused state changes: {}", (Object)handle);
                handle.discardState();
            }
            catch (Exception e) {
                LOG.warn("unable to discard uploaded but unused state changes", (Throwable)e);
            }
        });
    }
}

