package org.apache.flink.runtime.state;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorFileMergingManager.class */
public class TaskExecutorFileMergingManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorFileMergingManager.class);
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final Map<JobID, Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>>> fileMergingSnapshotManagerByJobId = new HashMap();

    @GuardedBy("lock")
    private boolean closed = false;
    private final Thread shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG);

    @Nullable
    public FileMergingSnapshotManager fileMergingSnapshotManagerForTask(@Nonnull JobID jobID, @Nonnull ResourceID resourceID, @Nonnull ExecutionAttemptID executionAttemptID, Configuration configuration, Configuration configuration2, TaskManagerJobMetricGroup taskManagerJobMetricGroup) {
        boolean booleanValue = ((Boolean) configuration2.getOptional(CheckpointingOptions.FILE_MERGING_ENABLED).orElse(configuration.get(CheckpointingOptions.FILE_MERGING_ENABLED))).booleanValue();
        synchronized (this.lock) {
            if (this.closed) {
                throw new IllegalStateException("TaskExecutorFileMergingManager is already closed and cannot register a new FileMergingSnapshotManager.");
            }
            if (!booleanValue) {
                return null;
            }
            Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>> tuple2 = this.fileMergingSnapshotManagerByJobId.get(jobID);
            if (tuple2 == null) {
                tuple2 = Tuple2.of(new FileMergingSnapshotManagerBuilder(jobID, resourceID, ((Boolean) configuration2.getOptional(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY).orElse(configuration.get(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY))).booleanValue() ? FileMergingType.MERGE_ACROSS_CHECKPOINT : FileMergingType.MERGE_WITHIN_CHECKPOINT).setMaxFileSize(((MemorySize) configuration2.getOptional(CheckpointingOptions.FILE_MERGING_MAX_FILE_SIZE).orElse(configuration.get(CheckpointingOptions.FILE_MERGING_MAX_FILE_SIZE))).getBytes()).setFilePoolType(((Boolean) configuration2.getOptional(CheckpointingOptions.FILE_MERGING_POOL_BLOCKING).orElse(configuration.get(CheckpointingOptions.FILE_MERGING_POOL_BLOCKING))).booleanValue() ? PhysicalFilePool.Type.BLOCKING : PhysicalFilePool.Type.NON_BLOCKING).setMaxSpaceAmplification(((Float) configuration2.getOptional(CheckpointingOptions.FILE_MERGING_MAX_SPACE_AMPLIFICATION).orElse(configuration.get(CheckpointingOptions.FILE_MERGING_MAX_SPACE_AMPLIFICATION))).floatValue()).setMetricGroup(taskManagerJobMetricGroup).build(), new HashSet());
                this.fileMergingSnapshotManagerByJobId.put(jobID, tuple2);
                LOG.info("Registered new file merging snapshot manager for job {}.", jobID);
            }
            ((Set) tuple2.f1).add(executionAttemptID);
            return (FileMergingSnapshotManager) tuple2.f0;
        }
    }

    public void releaseMergingSnapshotManagerForTask(@Nonnull JobID jobID, @Nonnull ExecutionAttemptID executionAttemptID) {
        synchronized (this.lock) {
            Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>> tuple2 = this.fileMergingSnapshotManagerByJobId.get(jobID);
            if (tuple2 != null) {
                LOG.debug("Releasing file merging snapshot manager under job id {} and attempt {}.", jobID, executionAttemptID);
                ((Set) tuple2.f1).remove(executionAttemptID);
                if (((Set) tuple2.f1).isEmpty()) {
                    releaseMergingSnapshotManagerForJob(jobID);
                }
            }
        }
    }

    public void releaseMergingSnapshotManagerForJob(@Nonnull JobID jobID) {
        LOG.debug("Releasing file merging snapshot manager under job id {}.", jobID);
        synchronized (this.lock) {
            if (this.closed) {
                return;
            }
            Tuple2<FileMergingSnapshotManager, Set<ExecutionAttemptID>> remove = this.fileMergingSnapshotManagerByJobId.remove(jobID);
            if (remove != null) {
                if (!((Set) remove.f1).isEmpty()) {
                    LOG.warn("The file merging snapshot manager for job {} is released before all tasks are released.", jobID);
                }
                try {
                    ((FileMergingSnapshotManager) remove.f0).close();
                } catch (Exception e) {
                    LOG.warn("Exception while closing TaskExecutorFileMergingManager for job {}.", jobID, e);
                }
            }
        }
    }

    public void shutdown() {
        synchronized (this.lock) {
            HashMap hashMap = new HashMap(this.fileMergingSnapshotManagerByJobId);
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.fileMergingSnapshotManagerByJobId.clear();
            LOG.info("Shutting down TaskExecutorFileMergingManager.");
            ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
            for (Map.Entry entry : hashMap.entrySet()) {
                if (entry.getValue() != null) {
                    try {
                        ((FileMergingSnapshotManager) ((Tuple2) entry.getValue()).f0).close();
                    } catch (Exception e) {
                        LOG.warn("Exception while closing TaskExecutorFileMergingManager for job {}.", entry.getKey(), e);
                    }
                }
            }
        }
    }
}
