/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileReader;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.SortMergeSubpartitionReader;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SortMergeResultPartitionReadScheduler
implements Runnable,
BufferRecycler {
    private static final Logger LOG = LoggerFactory.getLogger(SortMergeResultPartitionReadScheduler.class);
    private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5L);
    private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
    private final ByteBuffer indexEntryBufferInit = ByteBuffer.allocateDirect(16);
    private final ByteBuffer indexEntryBufferRead = ByteBuffer.allocateDirect(16);
    private final Object lock;
    private final CompletableFuture<?> releaseFuture = new CompletableFuture();
    private final BatchShuffleReadBufferPool bufferPool;
    private final Executor ioExecutor;
    private final Duration bufferRequestTimeout;
    @GuardedBy(value="lock")
    private final Set<SortMergeSubpartitionReader> failedReaders = new HashSet<SortMergeSubpartitionReader>();
    @GuardedBy(value="lock")
    private final Set<SortMergeSubpartitionReader> allReaders = new HashSet<SortMergeSubpartitionReader>();
    @GuardedBy(value="lock")
    private final Queue<SortMergeSubpartitionReader> sortedReaders = new PriorityQueue<SortMergeSubpartitionReader>();
    @GuardedBy(value="lock")
    private FileChannel dataFileChannel;
    @GuardedBy(value="lock")
    private FileChannel indexFileChannel;
    @GuardedBy(value="lock")
    private boolean isRunning;
    @GuardedBy(value="lock")
    private volatile int numRequestedBuffers;
    @GuardedBy(value="lock")
    private volatile boolean isReleased;

    SortMergeResultPartitionReadScheduler(BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object lock) {
        this(bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
    }

    SortMergeResultPartitionReadScheduler(BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object lock, Duration bufferRequestTimeout) {
        this.lock = Preconditions.checkNotNull((Object)lock);
        this.bufferPool = (BatchShuffleReadBufferPool)Preconditions.checkNotNull((Object)bufferPool);
        this.ioExecutor = (Executor)Preconditions.checkNotNull((Object)ioExecutor);
        this.bufferRequestTimeout = (Duration)Preconditions.checkNotNull((Object)bufferRequestTimeout);
        BufferReaderWriterUtil.configureByteBuffer(this.indexEntryBufferInit);
        BufferReaderWriterUtil.configureByteBuffer(this.indexEntryBufferRead);
    }

    @Override
    public synchronized void run() {
        Queue<MemorySegment> buffers;
        HashSet<SortMergeSubpartitionReader> finishedReaders = new HashSet<SortMergeSubpartitionReader>();
        try {
            buffers = this.allocateBuffers();
        }
        catch (Throwable throwable) {
            LOG.error("Failed to request buffers for data reading.", throwable);
            this.failSubpartitionReaders(this.getAllReaders(), throwable);
            this.removeFinishedAndFailedReaders(0, finishedReaders);
            return;
        }
        Preconditions.checkState((!buffers.isEmpty() ? 1 : 0) != 0, (Object)"No buffer available.");
        int numBuffersAllocated = buffers.size();
        ArrayList<SortMergeSubpartitionReader> unfinishedReaders = new ArrayList<SortMergeSubpartitionReader>();
        SortMergeSubpartitionReader subpartitionReader = this.getNextReader();
        while (subpartitionReader != null) {
            try {
                if (!subpartitionReader.readBuffers(buffers, this)) {
                    finishedReaders.add(subpartitionReader);
                } else {
                    unfinishedReaders.add(subpartitionReader);
                }
            }
            catch (Throwable throwable) {
                this.failSubpartitionReaders(Collections.singletonList(subpartitionReader), throwable);
                LOG.debug("Failed to read shuffle data.", throwable);
            }
            if (buffers.isEmpty()) break;
            subpartitionReader = this.getNextReader();
            if (subpartitionReader != null || unfinishedReaders.isEmpty()) continue;
            this.returnUnfinishedReaders(unfinishedReaders);
            subpartitionReader = this.getNextReader();
        }
        int numBuffersRead = numBuffersAllocated - buffers.size();
        this.releaseBuffers(buffers);
        this.returnUnfinishedReaders(unfinishedReaders);
        this.removeFinishedAndFailedReaders(numBuffersRead, finishedReaders);
    }

    /*
     * Unable to fully structure code
     */
    @VisibleForTesting
    Queue<MemorySegment> allocateBuffers() throws Exception {
        timeoutTime = this.getBufferRequestTimeoutTime();
        do lbl-1000:
        // 3 sources

        {
            if (!(buffers = this.bufferPool.requestBuffers()).isEmpty()) {
                return new ArrayDeque<MemorySegment>(buffers);
            }
            Preconditions.checkState((boolean)(this.isReleased == false), (Object)"Result partition has been already released.");
            if (System.currentTimeMillis() < timeoutTime) ** GOTO lbl-1000
            timeoutTime = this.getBufferRequestTimeoutTime();
        } while (System.currentTimeMillis() < timeoutTime);
        throw new TimeoutException(String.format("Buffer request timeout, this means there is a fierce contention of the batch shuffle read memory, please increase '%s'.", new Object[]{TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()}));
    }

    private long getBufferRequestTimeoutTime() {
        return this.bufferPool.getLastBufferOperationTimestamp() + this.bufferRequestTimeout.toMillis();
    }

    private void releaseBuffers(Queue<MemorySegment> buffers) {
        if (!buffers.isEmpty()) {
            try {
                this.bufferPool.recycle(buffers);
                buffers.clear();
            }
            catch (Throwable throwable) {
                FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failSubpartitionReaders(Collection<SortMergeSubpartitionReader> readers, Throwable failureCause) {
        Iterator<SortMergeSubpartitionReader> iterator = this.lock;
        synchronized (iterator) {
            this.failedReaders.addAll(readers);
        }
        for (SortMergeSubpartitionReader reader : readers) {
            try {
                reader.fail(failureCause);
            }
            catch (Throwable throwable) {
                FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeFinishedAndFailedReaders(int numBuffersRead, Set<SortMergeSubpartitionReader> finishedReaders) {
        Object object = this.lock;
        synchronized (object) {
            for (SortMergeSubpartitionReader reader : finishedReaders) {
                this.allReaders.remove(reader);
            }
            finishedReaders.clear();
            for (SortMergeSubpartitionReader reader : this.failedReaders) {
                this.allReaders.remove(reader);
            }
            this.failedReaders.clear();
            if (this.allReaders.isEmpty()) {
                this.bufferPool.unregisterRequester(this);
                this.closeFileChannels();
                this.sortedReaders.clear();
            }
            this.numRequestedBuffers += numBuffersRead;
            this.isRunning = false;
            this.mayTriggerReading();
            this.mayNotifyReleased();
        }
    }

    @GuardedBy(value="lock")
    private void mayNotifyReleased() {
        assert (Thread.holdsLock(this.lock));
        if (this.isReleased && this.allReaders.isEmpty()) {
            this.releaseFuture.complete(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Queue<SortMergeSubpartitionReader> getAllReaders() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return new ArrayDeque<SortMergeSubpartitionReader>();
            }
            return new ArrayDeque<SortMergeSubpartitionReader>(this.allReaders);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private SortMergeSubpartitionReader getNextReader() {
        Object object = this.lock;
        synchronized (object) {
            SortMergeSubpartitionReader subpartitionReader = this.sortedReaders.poll();
            while (subpartitionReader != null && this.failedReaders.contains(subpartitionReader)) {
                subpartitionReader = this.sortedReaders.poll();
            }
            return subpartitionReader;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void returnUnfinishedReaders(ArrayList<SortMergeSubpartitionReader> readers) {
        if (readers != null && !readers.isEmpty()) {
            Object object = this.lock;
            synchronized (object) {
                this.sortedReaders.addAll(readers);
                readers.clear();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SortMergeSubpartitionReader createSubpartitionReader(BufferAvailabilityListener availabilityListener, ResultSubpartitionIndexSet indexSet, PartitionedFile resultFile, int subpartitionOrderRotationIndex) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased ? 1 : 0) != 0, (Object)"Partition is already released.");
            PartitionedFileReader fileReader = this.createFileReader(resultFile, indexSet, subpartitionOrderRotationIndex);
            SortMergeSubpartitionReader subpartitionReader = new SortMergeSubpartitionReader(this.bufferPool.getBufferSize(), availabilityListener, fileReader);
            if (this.allReaders.isEmpty()) {
                this.bufferPool.registerRequester(this);
            }
            this.allReaders.add(subpartitionReader);
            this.sortedReaders.add(subpartitionReader);
            subpartitionReader.getReleaseFuture().thenRun(() -> this.releaseSubpartitionReader(subpartitionReader));
            this.mayTriggerReading();
            return subpartitionReader;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseSubpartitionReader(SortMergeSubpartitionReader subpartitionReader) {
        Object object = this.lock;
        synchronized (object) {
            if (this.allReaders.contains(subpartitionReader)) {
                this.failedReaders.add(subpartitionReader);
            }
        }
    }

    @GuardedBy(value="lock")
    private PartitionedFileReader createFileReader(PartitionedFile resultFile, ResultSubpartitionIndexSet indexSet, int subpartitionOrderRotationIndex) throws IOException {
        assert (Thread.holdsLock(this.lock));
        try {
            if (this.allReaders.isEmpty()) {
                this.openFileChannels(resultFile);
            }
            PartitionedFileReader partitionedFileReader = new PartitionedFileReader(resultFile, indexSet, this.dataFileChannel, this.indexFileChannel, this.headerBuf, this.indexEntryBufferRead, subpartitionOrderRotationIndex);
            partitionedFileReader.initRegionIndex(this.indexEntryBufferInit);
            return partitionedFileReader;
        }
        catch (Throwable throwable) {
            if (this.allReaders.isEmpty()) {
                this.closeFileChannels();
            }
            throw throwable;
        }
    }

    @GuardedBy(value="lock")
    private void openFileChannels(PartitionedFile resultFile) throws IOException {
        assert (Thread.holdsLock(this.lock));
        this.closeFileChannels();
        this.dataFileChannel = SortMergeResultPartitionReadScheduler.openFileChannel(resultFile.getDataFilePath());
        this.indexFileChannel = SortMergeResultPartitionReadScheduler.openFileChannel(resultFile.getIndexFilePath());
    }

    @GuardedBy(value="lock")
    private void closeFileChannels() {
        assert (Thread.holdsLock(this.lock));
        IOUtils.closeAllQuietly((AutoCloseable[])new AutoCloseable[]{this.dataFileChannel, this.indexFileChannel});
        this.dataFileChannel = null;
        this.indexFileChannel = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recycle(MemorySegment segment) {
        Object object = this.lock;
        synchronized (object) {
            this.bufferPool.recycle(segment);
            --this.numRequestedBuffers;
            this.mayTriggerReading();
        }
    }

    @GuardedBy(value="lock")
    private void mayTriggerReading() {
        assert (Thread.holdsLock(this.lock));
        int maxRequestedBuffers = Math.max(4 * this.bufferPool.getNumBuffersPerRequest(), 2 * this.allReaders.size());
        if (!this.isRunning && !this.allReaders.isEmpty() && this.numRequestedBuffers + this.bufferPool.getNumBuffersPerRequest() <= maxRequestedBuffers && this.numRequestedBuffers < this.bufferPool.getAverageBuffersPerRequester()) {
            this.isRunning = true;
            this.ioExecutor.execute(() -> {
                try {
                    this.run();
                }
                catch (Throwable throwable) {
                    FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<?> release() {
        ArrayList<SortMergeSubpartitionReader> pendingReaders;
        Object object = this.lock;
        synchronized (object) {
            if (this.isReleased) {
                return this.releaseFuture;
            }
            this.isReleased = true;
            this.failedReaders.addAll(this.allReaders);
            pendingReaders = new ArrayList<SortMergeSubpartitionReader>(this.allReaders);
            this.mayNotifyReleased();
        }
        this.failSubpartitionReaders(pendingReaders, new IllegalStateException("Result partition has been already released."));
        return this.releaseFuture;
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    int getNumPendingReaders() {
        Object object = this.lock;
        synchronized (object) {
            return this.allReaders.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    FileChannel getDataFileChannel() {
        Object object = this.lock;
        synchronized (object) {
            return this.dataFileChannel;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    FileChannel getIndexFileChannel() {
        Object object = this.lock;
        synchronized (object) {
            return this.indexFileChannel;
        }
    }

    @VisibleForTesting
    CompletableFuture<?> getReleaseFuture() {
        return this.releaseFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    boolean isRunning() {
        Object object = this.lock;
        synchronized (object) {
            return this.isRunning;
        }
    }
}

