package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import java.util.function.BiFunction;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.StateChangeUploader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.shaded.guava32.com.google.common.io.Closer;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

/* loaded from: input_file:org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.class */
public abstract class AbstractStateChangeFsUploader implements StateChangeUploader {
    private final StateChangeFormat format = new StateChangeFormat();
    private final Clock clock = SystemClock.getInstance();
    private final TaskChangelogRegistry changelogRegistry;
    private final BiFunction<Path, Long, StreamStateHandle> handleFactory;
    protected final ChangelogStorageMetricGroup metrics;
    protected final boolean compression;
    protected final int bufferSize;

    public AbstractStateChangeFsUploader(boolean z, int i, ChangelogStorageMetricGroup changelogStorageMetricGroup, TaskChangelogRegistry taskChangelogRegistry, BiFunction<Path, Long, StreamStateHandle> biFunction) {
        this.compression = z;
        this.bufferSize = i;
        this.metrics = changelogStorageMetricGroup;
        this.changelogRegistry = taskChangelogRegistry;
        this.handleFactory = biFunction;
    }

    abstract OutputStreamWithPos prepareStream() throws IOException;

    @Override // org.apache.flink.changelog.fs.StateChangeUploader
    public StateChangeUploader.UploadTasksResult upload(Collection<StateChangeUploadScheduler.UploadTask> collection) throws IOException {
        this.metrics.getUploadsCounter().inc();
        long relativeTimeNanos = this.clock.relativeTimeNanos();
        StateChangeUploader.UploadTasksResult uploadInternal = uploadInternal(collection);
        this.metrics.getUploadLatenciesNanos().update(this.clock.relativeTimeNanos() - relativeTimeNanos);
        this.metrics.getUploadSizes().update(uploadInternal.getStateSize());
        return uploadInternal;
    }

    private StateChangeUploader.UploadTasksResult uploadInternal(Collection<StateChangeUploadScheduler.UploadTask> collection) throws IOException {
        try {
            OutputStreamWithPos prepareStream = prepareStream();
            try {
                HashMap hashMap = new HashMap();
                for (StateChangeUploadScheduler.UploadTask uploadTask : collection) {
                    hashMap.put(uploadTask, this.format.write(prepareStream, uploadTask.changeSets));
                }
                long count = collection.stream().flatMap(uploadTask2 -> {
                    return uploadTask2.getChangeSets().stream();
                }).count();
                StreamStateHandle handle = prepareStream.getHandle(this.handleFactory);
                this.changelogRegistry.startTracking(handle, count);
                if (!(prepareStream instanceof DuplicatingOutputStreamWithPos)) {
                    StateChangeUploader.UploadTasksResult uploadTasksResult = new StateChangeUploader.UploadTasksResult(hashMap, handle);
                    if (prepareStream != null) {
                        prepareStream.close();
                    }
                    return uploadTasksResult;
                }
                StreamStateHandle secondaryHandle = ((DuplicatingOutputStreamWithPos) prepareStream).getSecondaryHandle(this.handleFactory);
                this.changelogRegistry.startTracking(secondaryHandle, count);
                StateChangeUploader.UploadTasksResult uploadTasksResult2 = new StateChangeUploader.UploadTasksResult(hashMap, handle, secondaryHandle);
                if (prepareStream != null) {
                    prepareStream.close();
                }
                return uploadTasksResult2;
            } finally {
            }
        } catch (IOException e) {
            this.metrics.getUploadFailuresCounter().inc();
            Closer create = Closer.create();
            try {
                create.register(() -> {
                    throw e;
                });
                collection.forEach(uploadTask3 -> {
                    create.register(() -> {
                        uploadTask3.fail(e);
                    });
                });
                if (create == null) {
                    return null;
                }
                create.close();
                return null;
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String generateFileName() {
        return UUID.randomUUID().toString();
    }
}
