/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;

public class RecordWriter<T extends IOReadableWritable> {
    protected final ResultPartitionWriter targetPartition;
    private final ChannelSelector<T> channelSelector;
    private final int numChannels;
    private final RecordSerializer<T>[] serializers;

    public RecordWriter(ResultPartitionWriter writer) {
        this(writer, new RoundRobinChannelSelector());
    }

    public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
        this.targetPartition = writer;
        this.channelSelector = channelSelector;
        this.numChannels = writer.getNumberOfOutputChannels();
        this.serializers = new SpanningRecordSerializer[this.numChannels];
        for (int i = 0; i < this.numChannels; ++i) {
            this.serializers[i] = new SpanningRecordSerializer();
        }
    }

    public void emit(T record) throws IOException, InterruptedException {
        for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
            this.sendToTarget(record, targetChannel);
        }
    }

    public void broadcastEmit(T record) throws IOException, InterruptedException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            this.sendToTarget(record, targetChannel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer;
        RecordSerializer<T> recordSerializer = serializer = this.serializers[targetChannel];
        synchronized (recordSerializer) {
            RecordSerializer.SerializationResult result = serializer.addRecord(record);
            while (result.isFullBuffer()) {
                Buffer buffer = serializer.getCurrentBuffer();
                if (buffer != null) {
                    this.writeAndClearBuffer(buffer, targetChannel, serializer);
                    if (!result.isFullRecord()) continue;
                    break;
                }
                buffer = this.targetPartition.getBufferProvider().requestBufferBlocking();
                result = serializer.setNextBuffer(buffer);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            RecordSerializer<T> serializer;
            RecordSerializer<T> recordSerializer = serializer = this.serializers[targetChannel];
            synchronized (recordSerializer) {
                Buffer buffer = serializer.getCurrentBuffer();
                if (buffer != null) {
                    this.writeAndClearBuffer(buffer, targetChannel, serializer);
                } else if (serializer.hasData()) {
                    throw new IllegalStateException("No buffer, but serializer has buffered data.");
                }
                this.targetPartition.writeEvent(event, targetChannel);
                continue;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEndOfSuperstep() throws IOException, InterruptedException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            RecordSerializer<T> serializer;
            RecordSerializer<T> recordSerializer = serializer = this.serializers[targetChannel];
            synchronized (recordSerializer) {
                Buffer buffer = serializer.getCurrentBuffer();
                if (buffer != null) {
                    this.writeAndClearBuffer(buffer, targetChannel, serializer);
                }
                continue;
            }
        }
        this.targetPartition.writeEndOfSuperstep();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush() throws IOException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            RecordSerializer<T> serializer;
            RecordSerializer<T> recordSerializer = serializer = this.serializers[targetChannel];
            synchronized (recordSerializer) {
                try {
                    Buffer buffer = serializer.getCurrentBuffer();
                    if (buffer != null) {
                        this.writeAndClearBuffer(buffer, targetChannel, serializer);
                    }
                }
                finally {
                    serializer.clear();
                }
                continue;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clearBuffers() {
        RecordSerializer<T>[] recordSerializerArray = this.serializers;
        int n = recordSerializerArray.length;
        for (int i = 0; i < n; ++i) {
            RecordSerializer<T> serializer;
            RecordSerializer<T> recordSerializer = serializer = recordSerializerArray[i];
            synchronized (recordSerializer) {
                try {
                    Buffer buffer = serializer.getCurrentBuffer();
                    if (buffer != null) {
                        buffer.recycle();
                    }
                }
                finally {
                    serializer.clear();
                }
                continue;
            }
        }
    }

    public void setReporter(AccumulatorRegistry.Reporter reporter) {
        for (RecordSerializer<T> serializer : this.serializers) {
            serializer.setReporter(reporter);
        }
    }

    public void setMetricGroup(IOMetricGroup metrics) {
        for (RecordSerializer<T> serializer : this.serializers) {
            serializer.instantiateMetrics(metrics);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeAndClearBuffer(Buffer buffer, int targetChannel, RecordSerializer<T> serializer) throws IOException {
        try {
            this.targetPartition.writeBuffer(buffer, targetChannel);
        }
        finally {
            serializer.clearCurrentBuffer();
        }
    }
}

