package org.apache.flink.runtime.asyncprocessing;

import java.util.LinkedList;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager.class */
public class EpochManager {
    private static final Logger LOG = LoggerFactory.getLogger(EpochManager.class);
    final AsyncExecutionController<?> asyncExecutionController;
    long epochNum;
    LinkedList<Epoch> outputQueue = new LinkedList<>();
    Epoch activeEpoch;

    @Nullable
    Epoch finishingEpoch;
    boolean recursiveFlag;

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager$Epoch.class */
    public static class Epoch {
        long id;
        int ongoingRecordCount = 0;
        EpochStatus status = EpochStatus.OPEN;

        @Nullable
        Runnable triggerAction = null;

        @Nullable
        Runnable finalAction = null;

        public Epoch(long j) {
            this.id = j;
        }

        boolean tryFinish() {
            if (this.ongoingRecordCount != 0) {
                return false;
            }
            if (this.status == EpochStatus.CLOSED) {
                transition(EpochStatus.FINISHING);
                if (this.triggerAction != null) {
                    this.triggerAction.run();
                }
            }
            if (this.ongoingRecordCount == 0 && this.status == EpochStatus.FINISHING) {
                transition(EpochStatus.FINISHED);
                if (this.finalAction != null) {
                    this.finalAction.run();
                }
            }
            return this.status == EpochStatus.FINISHED;
        }

        void transition(EpochStatus epochStatus) {
            if (this.status != epochStatus) {
                EpochManager.LOG.trace("Epoch {} transit from {} to {}", new Object[]{this, this.status, epochStatus});
                this.status = epochStatus;
            }
        }

        void close(@Nullable Runnable runnable, @Nullable Runnable runnable2) {
            this.triggerAction = runnable;
            this.finalAction = runnable2;
            transition(EpochStatus.CLOSED);
        }

        public String toString() {
            return String.format("Epoch{id=%d, ongoingRecord=%d, status=%s}", Long.valueOf(this.id), Integer.valueOf(this.ongoingRecordCount), this.status);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager$EpochStatus.class */
    public enum EpochStatus {
        OPEN,
        CLOSED,
        FINISHING,
        FINISHED
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager$ParallelMode.class */
    public enum ParallelMode {
        SERIAL_BETWEEN_EPOCH,
        PARALLEL_BETWEEN_EPOCH
    }

    public EpochManager(AsyncExecutionController<?> asyncExecutionController) {
        this.epochNum = 0L;
        this.asyncExecutionController = asyncExecutionController;
        long j = this.epochNum;
        this.epochNum = j + 1;
        this.activeEpoch = new Epoch(j);
        this.finishingEpoch = null;
        this.recursiveFlag = false;
    }

    public Epoch onRecord() {
        if (this.finishingEpoch != null) {
            this.finishingEpoch.ongoingRecordCount++;
            return this.finishingEpoch;
        }
        this.activeEpoch.ongoingRecordCount++;
        return this.activeEpoch;
    }

    public Epoch onEpoch(Epoch epoch) {
        epoch.ongoingRecordCount++;
        return epoch;
    }

    public void onNonRecord(@Nullable Runnable runnable, @Nullable Runnable runnable2, ParallelMode parallelMode) {
        LOG.trace("on NonRecord, old epoch: {}, outputQueue size: {}", this.activeEpoch, Integer.valueOf(this.outputQueue.size()));
        switchActiveEpoch(runnable, runnable2);
        if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }

    public void completeOneRecord(Epoch epoch) {
        int i = epoch.ongoingRecordCount - 1;
        epoch.ongoingRecordCount = i;
        if (i != 0 || epoch == this.activeEpoch) {
            return;
        }
        tryFinishInQueue();
    }

    private void tryFinishInQueue() {
        if (this.recursiveFlag) {
            return;
        }
        this.recursiveFlag = true;
        while (true) {
            if (this.outputQueue.isEmpty()) {
                break;
            }
            Epoch peek = this.outputQueue.peek();
            this.finishingEpoch = peek;
            try {
                if (!peek.tryFinish()) {
                    break;
                }
                this.outputQueue.pop();
                this.finishingEpoch = null;
            } finally {
                this.finishingEpoch = null;
            }
        }
        this.recursiveFlag = false;
    }

    private void switchActiveEpoch(@Nullable Runnable runnable, @Nullable Runnable runnable2) {
        this.activeEpoch.close(runnable, runnable2);
        this.outputQueue.offer(this.activeEpoch);
        long j = this.epochNum;
        this.epochNum = j + 1;
        this.activeEpoch = new Epoch(j);
        tryFinishInQueue();
    }
}
