/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.BufferWithChannel;
import org.apache.flink.runtime.io.network.partition.PartitionSortedBuffer;
import org.apache.flink.runtime.io.network.partition.PartitionedFile;
import org.apache.flink.runtime.io.network.partition.PartitionedFileWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SortBuffer;
import org.apache.flink.runtime.io.network.partition.SortMergeResultPartitionReadScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

@NotThreadSafe
public class SortMergeResultPartition
extends ResultPartition {
    private static final int NUM_WRITE_BUFFER_BYTES = 0x1000000;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private PartitionedFile resultFile;
    @GuardedBy(value="lock")
    private final List<MemorySegment> writeBuffers = new ArrayList<MemorySegment>();
    private final int networkBufferSize;
    private final PartitionedFileWriter fileWriter;
    private final int[] subpartitionOrder;
    private final SortMergeResultPartitionReadScheduler readScheduler;
    private int numBuffersForSort;
    private SortBuffer broadcastSortBuffer;
    private SortBuffer unicastSortBuffer;

    public SortMergeResultPartition(String owningTaskName, int partitionIndex, ResultPartitionID partitionId, ResultPartitionType partitionType, int numSubpartitions, int numTargetKeyGroups, BatchShuffleReadBufferPool readBufferPool, Executor readIOExecutor, ResultPartitionManager partitionManager, String resultFileBasePath, @Nullable BufferCompressor bufferCompressor, SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        super(owningTaskName, partitionIndex, partitionId, partitionType, numSubpartitions, numTargetKeyGroups, partitionManager, bufferCompressor, bufferPoolFactory);
        this.networkBufferSize = readBufferPool.getBufferSize();
        this.subpartitionOrder = this.getRandomSubpartitionOrder(numSubpartitions);
        this.readScheduler = new SortMergeResultPartitionReadScheduler(readBufferPool, readIOExecutor, this.lock);
        PartitionedFileWriter fileWriter = null;
        try {
            fileWriter = new PartitionedFileWriter(numSubpartitions, 0x400000, resultFileBasePath);
        }
        catch (Throwable throwable) {
            ExceptionUtils.rethrow((Throwable)throwable);
        }
        this.fileWriter = fileWriter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setup() throws IOException {
        int numRequiredBuffer;
        int numWriteBuffers;
        super.setup();
        int expectedWriteBuffers = 0x1000000 / this.networkBufferSize;
        if (this.networkBufferSize > 0x1000000) {
            expectedWriteBuffers = 1;
        }
        if ((numWriteBuffers = Math.min((numRequiredBuffer = this.bufferPool.getNumberOfRequiredMemorySegments()) / 2, expectedWriteBuffers)) < 1) {
            throw new IOException(String.format("Too few sort buffers, please increase %s.", NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS));
        }
        this.numBuffersForSort = numRequiredBuffer - numWriteBuffers;
        Object object = this.lock;
        synchronized (object) {
            try {
                for (int i = 0; i < numWriteBuffers; ++i) {
                    MemorySegment segment = this.bufferPool.requestBufferBuilderBlocking().getMemorySegment();
                    this.writeBuffers.add(segment);
                }
            }
            catch (InterruptedException exception) {
                throw new IOException(exception);
            }
        }
        LOG.info("Sort-merge partition {} initialized, num sort buffers: {}, num write buffers: {}.", new Object[]{this.getPartitionId(), this.numBuffersForSort, numWriteBuffers});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void releaseInternal() {
        Object object = this.lock;
        synchronized (object) {
            if (this.resultFile == null) {
                this.fileWriter.releaseQuietly();
            }
            this.readScheduler.release().thenRun(() -> {
                Object object = this.lock;
                synchronized (object) {
                    if (this.resultFile != null) {
                        this.resultFile.deleteQuietly();
                        this.resultFile = null;
                    }
                }
            });
        }
    }

    @Override
    public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
        this.emit(record, targetSubpartition, Buffer.DataType.DATA_BUFFER, false);
    }

    @Override
    public void broadcastRecord(ByteBuffer record) throws IOException {
        this.broadcast(record, Buffer.DataType.DATA_BUFFER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
        Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
        try {
            ByteBuffer serializedEvent = buffer.getNioBufferReadable();
            this.broadcast(serializedEvent, buffer.getDataType());
        }
        finally {
            buffer.recycleBuffer();
        }
    }

    private void broadcast(ByteBuffer record, Buffer.DataType dataType) throws IOException {
        this.emit(record, 0, dataType, true);
    }

    private void emit(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType, boolean isBroadcast) throws IOException {
        SortBuffer sortBuffer;
        this.checkInProduceState();
        SortBuffer sortBuffer2 = sortBuffer = isBroadcast ? this.getBroadcastSortBuffer() : this.getUnicastSortBuffer();
        if (sortBuffer.append(record, targetSubpartition, dataType)) {
            return;
        }
        if (!sortBuffer.hasRemaining()) {
            sortBuffer.finish();
            sortBuffer.release();
            this.writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
            return;
        }
        this.flushSortBuffer(sortBuffer, isBroadcast);
        this.emit(record, targetSubpartition, dataType, isBroadcast);
    }

    private void releaseSortBuffer(SortBuffer sortBuffer) {
        if (sortBuffer != null) {
            sortBuffer.release();
        }
    }

    private SortBuffer getUnicastSortBuffer() throws IOException {
        this.flushBroadcastSortBuffer();
        if (this.unicastSortBuffer != null && !this.unicastSortBuffer.isFinished()) {
            return this.unicastSortBuffer;
        }
        this.unicastSortBuffer = new PartitionSortedBuffer(this.lock, this.bufferPool, this.numSubpartitions, this.networkBufferSize, this.numBuffersForSort, this.subpartitionOrder);
        return this.unicastSortBuffer;
    }

    private SortBuffer getBroadcastSortBuffer() throws IOException {
        this.flushUnicastSortBuffer();
        if (this.broadcastSortBuffer != null && !this.broadcastSortBuffer.isFinished()) {
            return this.broadcastSortBuffer;
        }
        this.broadcastSortBuffer = new PartitionSortedBuffer(this.lock, this.bufferPool, this.numSubpartitions, this.networkBufferSize, this.numBuffersForSort, this.subpartitionOrder);
        return this.broadcastSortBuffer;
    }

    private void flushSortBuffer(SortBuffer sortBuffer, boolean isBroadcast) throws IOException {
        if (sortBuffer == null || sortBuffer.isReleased()) {
            return;
        }
        sortBuffer.finish();
        if (sortBuffer.hasRemaining()) {
            this.fileWriter.startNewRegion(isBroadcast);
            ArrayList<BufferWithChannel> toWrite = new ArrayList<BufferWithChannel>();
            Queue<MemorySegment> segments = this.getWriteBuffers();
            while (sortBuffer.hasRemaining()) {
                if (segments.isEmpty()) {
                    this.fileWriter.writeBuffers(toWrite);
                    toWrite.clear();
                    segments = this.getWriteBuffers();
                }
                BufferWithChannel bufferWithChannel = sortBuffer.copyIntoSegment((MemorySegment)Preconditions.checkNotNull((Object)segments.poll()));
                this.updateStatistics(bufferWithChannel.getBuffer(), isBroadcast);
                toWrite.add(this.compressBufferIfPossible(bufferWithChannel));
            }
            this.fileWriter.writeBuffers(toWrite);
        }
        this.releaseSortBuffer(sortBuffer);
    }

    private void flushBroadcastSortBuffer() throws IOException {
        this.flushSortBuffer(this.broadcastSortBuffer, true);
    }

    private void flushUnicastSortBuffer() throws IOException {
        this.flushSortBuffer(this.unicastSortBuffer, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Queue<MemorySegment> getWriteBuffers() {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.writeBuffers.isEmpty() ? 1 : 0) != 0, (Object)"Task has been canceled.");
            return new ArrayDeque<MemorySegment>(this.writeBuffers);
        }
    }

    private BufferWithChannel compressBufferIfPossible(BufferWithChannel bufferWithChannel) {
        Buffer buffer = bufferWithChannel.getBuffer();
        if (!this.canBeCompressed(buffer)) {
            return bufferWithChannel;
        }
        buffer = ((BufferCompressor)Preconditions.checkNotNull((Object)this.bufferCompressor)).compressToOriginalBuffer(buffer);
        return new BufferWithChannel(buffer, bufferWithChannel.getChannelIndex());
    }

    private void updateStatistics(Buffer buffer, boolean isBroadcast) {
        this.numBuffersOut.inc(isBroadcast ? (long)this.numSubpartitions : 1L);
        long readableBytes = buffer.readableBytes();
        this.numBytesOut.inc(isBroadcast ? readableBytes * (long)this.numSubpartitions : readableBytes);
    }

    private void writeLargeRecord(ByteBuffer record, int targetSubpartition, Buffer.DataType dataType, boolean isBroadcast) throws IOException {
        this.fileWriter.startNewRegion(isBroadcast);
        ArrayList<BufferWithChannel> toWrite = new ArrayList<BufferWithChannel>();
        Queue<MemorySegment> segments = this.getWriteBuffers();
        while (record.hasRemaining()) {
            if (segments.isEmpty()) {
                this.fileWriter.writeBuffers(toWrite);
                toWrite.clear();
                segments = this.getWriteBuffers();
            }
            int toCopy = Math.min(record.remaining(), this.networkBufferSize);
            MemorySegment writeBuffer = (MemorySegment)Preconditions.checkNotNull((Object)segments.poll());
            writeBuffer.put(0, record, toCopy);
            NetworkBuffer buffer = new NetworkBuffer(writeBuffer, buf -> {}, dataType, toCopy);
            BufferWithChannel bufferWithChannel = new BufferWithChannel(buffer, targetSubpartition);
            this.updateStatistics(buffer, isBroadcast);
            toWrite.add(this.compressBufferIfPossible(bufferWithChannel));
        }
        this.fileWriter.writeBuffers(toWrite);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void finish() throws IOException {
        this.broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
        Preconditions.checkState((this.unicastSortBuffer == null || this.unicastSortBuffer.isReleased() ? 1 : 0) != 0, (Object)"The unicast sort buffer should be either null or released.");
        this.flushBroadcastSortBuffer();
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isReleased() ? 1 : 0) != 0, (Object)"Result partition is already released.");
            this.resultFile = this.fileWriter.finish();
            LOG.info("New partitioned file produced: {}.", (Object)this.resultFile);
        }
        super.finish();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseWriteBuffers() {
        Object object = this.lock;
        synchronized (object) {
            if (this.bufferPool != null) {
                for (MemorySegment segment : this.writeBuffers) {
                    this.bufferPool.recycle(segment);
                }
                this.writeBuffers.clear();
            }
        }
    }

    @Override
    public void close() {
        this.releaseWriteBuffers();
        this.releaseSortBuffer(this.unicastSortBuffer);
        this.releaseSortBuffer(this.broadcastSortBuffer);
        super.close();
        IOUtils.closeQuietly((AutoCloseable)this.fileWriter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ResultSubpartitionView createSubpartitionView(int subpartitionIndex, BufferAvailabilityListener availabilityListener) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkElementIndex((int)subpartitionIndex, (int)this.numSubpartitions, (String)"Subpartition not found.");
            Preconditions.checkState((!this.isReleased() ? 1 : 0) != 0, (Object)"Partition released.");
            Preconditions.checkState((boolean)this.isFinished(), (Object)"Trying to read unfinished blocking partition.");
            return this.readScheduler.crateSubpartitionReader(availabilityListener, subpartitionIndex, this.resultFile);
        }
    }

    @Override
    public void flushAll() {
        try {
            this.flushUnicastSortBuffer();
            this.flushBroadcastSortBuffer();
        }
        catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", (Throwable)e);
        }
    }

    @Override
    public void flush(int subpartitionIndex) {
        try {
            this.flushUnicastSortBuffer();
            this.flushBroadcastSortBuffer();
        }
        catch (IOException e) {
            LOG.error("Failed to flush the current sort buffer.", (Throwable)e);
        }
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return AVAILABLE;
    }

    @Override
    public int getNumberOfQueuedBuffers() {
        return 0;
    }

    @Override
    public int getNumberOfQueuedBuffers(int targetSubpartition) {
        return 0;
    }

    private int[] getRandomSubpartitionOrder(int numSubpartitions) {
        List list = IntStream.range(0, numSubpartitions).boxed().collect(Collectors.toList());
        Collections.shuffle(list);
        return list.stream().mapToInt(Integer::intValue).toArray();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    PartitionedFile getResultFile() {
        Object object = this.lock;
        synchronized (object) {
            return this.resultFile;
        }
    }
}

