package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler.class */
public abstract class CheckpointBarrierHandler implements Closeable {
    private static final long OUTSIDE_OF_ALIGNMENT = Long.MIN_VALUE;
    private final AbstractInvokable toNotifyOnCheckpoint;
    private final Clock clock;
    private long latestCheckpointStartDelayNanos;
    private long bytesProcessedDuringAlignment;
    private CompletableFuture<Long> latestAlignmentDurationNanos = new CompletableFuture<>();
    private long startOfAlignmentTimestamp = OUTSIDE_OF_ALIGNMENT;
    private long startAlignmentCheckpointId = -1;
    private CompletableFuture<Long> latestBytesProcessedDuringAlignment = new CompletableFuture<>();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierHandler$Cancellable.class */
    interface Cancellable {
        void cancel();
    }

    public CheckpointBarrierHandler(AbstractInvokable abstractInvokable, Clock clock) {
        this.toNotifyOnCheckpoint = (AbstractInvokable) Preconditions.checkNotNull(abstractInvokable);
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public abstract void processBarrier(CheckpointBarrier checkpointBarrier, InputChannelInfo inputChannelInfo) throws IOException;

    public abstract void processBarrierAnnouncement(CheckpointBarrier checkpointBarrier, int i, InputChannelInfo inputChannelInfo) throws IOException;

    public abstract void processCancellationBarrier(CancelCheckpointMarker cancelCheckpointMarker) throws IOException;

    public abstract void processEndOfPartition() throws IOException;

    public abstract long getLatestCheckpointId();

    public long getAlignmentDurationNanos() {
        return isDuringAlignment() ? this.clock.relativeTimeNanos() - this.startOfAlignmentTimestamp : ((Long) FutureUtils.getOrDefault(this.latestAlignmentDurationNanos, 0L)).longValue();
    }

    public long getCheckpointStartDelayNanos() {
        return this.latestCheckpointStartDelayNanos;
    }

    public CompletableFuture<Void> getAllBarriersReceivedFuture(long j) {
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
        this.toNotifyOnCheckpoint.triggerCheckpointOnBarrier(new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp(), System.currentTimeMillis()), checkpointBarrier.getCheckpointOptions(), checkpointBarrier.getId() == this.startAlignmentCheckpointId ? new CheckpointMetricsBuilder().setAlignmentDurationNanos(this.latestAlignmentDurationNanos).setBytesProcessedDuringAlignment(this.latestBytesProcessedDuringAlignment).setCheckpointStartDelayNanos(this.latestCheckpointStartDelayNanos) : new CheckpointMetricsBuilder().setAlignmentDurationNanos(0L).setBytesProcessedDuringAlignment(0L).setCheckpointStartDelayNanos(0L));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyAbortOnCancellationBarrier(long j) throws IOException {
        notifyAbort(j, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyAbort(long j, CheckpointException checkpointException) throws IOException {
        this.toNotifyOnCheckpoint.abortCheckpointOnBarrier(j, checkpointException);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void markAlignmentStartAndEnd(long j, long j2) {
        markAlignmentStart(j, j2);
        markAlignmentEnd(0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void markAlignmentStart(long j, long j2) {
        this.latestCheckpointStartDelayNanos = 1000000 * Math.max(0L, this.clock.absoluteTimeMillis() - j2);
        resetAlignment();
        this.startOfAlignmentTimestamp = this.clock.relativeTimeNanos();
        this.startAlignmentCheckpointId = j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void markAlignmentEnd() {
        markAlignmentEnd(this.clock.relativeTimeNanos() - this.startOfAlignmentTimestamp);
    }

    protected void markAlignmentEnd(long j) {
        Preconditions.checkState(j >= 0, "Alignment time is less than zero({}). Is the time monotonic?", new Object[]{Long.valueOf(j)});
        this.latestAlignmentDurationNanos.complete(Long.valueOf(j));
        this.latestBytesProcessedDuringAlignment.complete(Long.valueOf(this.bytesProcessedDuringAlignment));
        this.startOfAlignmentTimestamp = OUTSIDE_OF_ALIGNMENT;
        this.bytesProcessedDuringAlignment = 0L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetAlignment() {
        markAlignmentEnd(0L);
        this.latestAlignmentDurationNanos = new CompletableFuture<>();
        this.latestBytesProcessedDuringAlignment = new CompletableFuture<>();
    }

    protected abstract boolean isCheckpointPending();

    public void addProcessedBytes(int i) {
        if (isDuringAlignment()) {
            this.bytesProcessedDuringAlignment += i;
        }
    }

    private boolean isDuringAlignment() {
        return this.startOfAlignmentTimestamp > OUTSIDE_OF_ALIGNMENT;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Clock getClock() {
        return this.clock;
    }
}
