package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.LongPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.class */
public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = "changelog_chk_";
    private final Map<Long, Long> mapToMaterializationId;
    private long lastCheckpointId;

    public ChangelogTaskLocalStateStore(@Nonnull JobID jobID, @Nonnull AllocationID allocationID, @Nonnull JobVertexID jobVertexID, @Nonnegative int i, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull Executor executor) {
        super(jobID, allocationID, jobVertexID, i, localRecoveryConfig, executor);
        this.lastCheckpointId = -1L;
        this.mapToMaterializationId = new HashMap();
    }

    private void updateReference(long j, TaskStateSnapshot taskStateSnapshot) {
        if (taskStateSnapshot == null) {
            taskStateSnapshot = NULL_DUMMY;
        }
        Iterator<Map.Entry<OperatorID, OperatorSubtaskState>> it = taskStateSnapshot.getSubtaskStateMappings().iterator();
        while (it.hasNext()) {
            Iterator<KeyedStateHandle> it2 = it.next().getValue().getManagedKeyedState().iterator();
            while (it2.hasNext()) {
                KeyedStateHandle next = it2.next();
                if (next instanceof ChangelogStateBackendHandle) {
                    long materializationID = ((ChangelogStateBackendHandle) next).getMaterializationID();
                    if (this.mapToMaterializationId.containsKey(Long.valueOf(j))) {
                        Preconditions.checkState(materializationID == this.mapToMaterializationId.get(Long.valueOf(j)).longValue(), "one checkpoint contains at most one materializationID");
                    } else {
                        this.mapToMaterializationId.put(Long.valueOf(j), Long.valueOf(materializationID));
                    }
                }
            }
        }
    }

    public static Path getLocalTaskOwnedDirectory(LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider, JobID jobID) {
        File selectAllocationBaseDirectory = localSnapshotDirectoryProvider.selectAllocationBaseDirectory((jobID.hashCode() & Integer.MAX_VALUE) % localSnapshotDirectoryProvider.allocationBaseDirsCount());
        if (!selectAllocationBaseDirectory.exists() && !selectAllocationBaseDirectory.mkdirs()) {
            LOG.error("Local state base directory does not exist and could not be created: " + selectAllocationBaseDirectory);
        }
        return new Path(String.format("%s/jid_%s", selectAllocationBaseDirectory.toURI(), jobID), AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR);
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImpl, org.apache.flink.runtime.state.TaskLocalStateStore
    public void storeLocalState(long j, @Nullable TaskStateSnapshot taskStateSnapshot) {
        if (j < this.lastCheckpointId) {
            LOG.info("Current checkpoint {} is out of order, smaller than last CheckpointId {}.", Long.valueOf(this.lastCheckpointId), Long.valueOf(j));
            return;
        }
        this.lastCheckpointId = j;
        synchronized (this.lock) {
            updateReference(j, taskStateSnapshot);
        }
        super.storeLocalState(j, taskStateSnapshot);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImpl
    public File getCheckpointDirectory(long j) {
        return new File(getLocalRecoveryDirectoryProvider().subtaskBaseDirectory(j), CHANGE_LOG_CHECKPOINT_PREFIX + j);
    }

    private void deleteMaterialization(LongPredicate longPredicate) {
        Set set;
        synchronized (this.lock) {
            Stream<Long> stream = this.mapToMaterializationId.keySet().stream();
            Objects.requireNonNull(longPredicate);
            Stream stream2 = ((Set) stream.filter((v1) -> {
                return r1.test(v1);
            }).collect(Collectors.toSet())).stream();
            Map<Long, Long> map = this.mapToMaterializationId;
            Objects.requireNonNull(map);
            set = (Set) stream2.map((v1) -> {
                return r1.remove(v1);
            }).collect(Collectors.toSet());
            set.removeAll(this.mapToMaterializationId.values());
        }
        this.discardExecutor.execute(() -> {
            syncDiscardFileForCollection((Collection) set.stream().map(j -> {
                return super.getCheckpointDirectory(j);
            }).collect(Collectors.toList()));
        });
    }

    private void syncDiscardFileForCollection(Collection<File> collection) {
        for (File file : collection) {
            if (file.exists()) {
                try {
                    deleteDirectory(file);
                } catch (IOException e) {
                    LOG.warn("Exception while deleting local state directory of {} in subtask ({} - {} - {}).", new Object[]{file, this.jobID, this.jobVertexID, Integer.valueOf(this.subtaskIndex), e});
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImpl
    public void pruneCheckpoints(LongPredicate longPredicate, boolean z) {
        super.pruneCheckpoints(longPredicate, false);
        deleteMaterialization(longPredicate);
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImpl, org.apache.flink.runtime.state.OwnedTaskLocalStateStore
    public CompletableFuture<Void> dispose() {
        deleteMaterialization(j -> {
            return true;
        });
        this.discardExecutor.execute(() -> {
            syncDiscardFileForCollection(Collections.singleton(new File(getLocalTaskOwnedDirectory(getLocalRecoveryDirectoryProvider(), this.jobID).toUri())));
        });
        synchronized (this.lock) {
            this.mapToMaterializationId.clear();
        }
        return super.dispose();
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImpl
    public String toString() {
        return "ChangelogTaskLocalStateStore{jobID=" + this.jobID + ", jobVertexID=" + this.jobVertexID + ", allocationID=" + this.allocationID.toHexString() + ", subtaskIndex=" + this.subtaskIndex + ", localRecoveryConfig=" + this.localRecoveryConfig + ", storedCheckpointIDs=" + this.storedTaskStateByCheckpointID.keySet() + ", mapToMaterializationId=" + this.mapToMaterializationId.entrySet() + '}';
    }
}
