package org.apache.flink.runtime.operators.util;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/util/OutputEmitterTest.class */
class OutputEmitterTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/util/OutputEmitterTest$RecordType.class */
    public enum RecordType {
        STRING,
        INTEGER
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/util/OutputEmitterTest$TestIntComparator.class */
    private static class TestIntComparator extends TypeComparator<Integer> {
        private TypeComparator[] comparators;

        private TestIntComparator() {
            this.comparators = new TypeComparator[]{new IntComparator(true)};
        }

        public int hash(Integer num) {
            return num.intValue();
        }

        public void setReference(Integer num) {
            throw new UnsupportedOperationException();
        }

        public boolean equalToReference(Integer num) {
            throw new UnsupportedOperationException();
        }

        public int compareToReference(TypeComparator<Integer> typeComparator) {
            throw new UnsupportedOperationException();
        }

        public int compare(Integer num, Integer num2) {
            throw new UnsupportedOperationException();
        }

        public int compareSerialized(DataInputView dataInputView, DataInputView dataInputView2) {
            throw new UnsupportedOperationException();
        }

        public boolean supportsNormalizedKey() {
            throw new UnsupportedOperationException();
        }

        public boolean supportsSerializationWithKeyNormalization() {
            throw new UnsupportedOperationException();
        }

        public int getNormalizeKeyLen() {
            throw new UnsupportedOperationException();
        }

        public boolean isNormalizedKeyPrefixOnly(int i) {
            throw new UnsupportedOperationException();
        }

        public void putNormalizedKey(Integer num, MemorySegment memorySegment, int i, int i2) {
            throw new UnsupportedOperationException();
        }

        public void writeWithKeyNormalization(Integer num, DataOutputView dataOutputView) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Integer readWithKeyDenormalization(Integer num, DataInputView dataInputView) throws IOException {
            throw new UnsupportedOperationException();
        }

        public boolean invertNormalizedKey() {
            throw new UnsupportedOperationException();
        }

        public TypeComparator<Integer> duplicate() {
            throw new UnsupportedOperationException();
        }

        public int extractKeys(Object obj, Object[] objArr, int i) {
            objArr[i] = obj;
            return 1;
        }

        public TypeComparator[] getFlatComparators() {
            return this.comparators;
        }
    }

    OutputEmitterTest() {
    }

    @Test
    void testPartitionHash() {
        verifyPartitionHashSelectedChannels(50000, 100, RecordType.INTEGER);
        verifyPartitionHashSelectedChannels(10000, 100, RecordType.STRING);
        ChannelSelector createChannelSelector = createChannelSelector(ShipStrategyType.PARTITION_HASH, new TestIntComparator(), 100);
        SerializationDelegate<Integer> serializationDelegate = new SerializationDelegate<>(new IntSerializer());
        assertPartitionHashSelectedChannels(createChannelSelector, serializationDelegate, Integer.MIN_VALUE, 100);
        assertPartitionHashSelectedChannels(createChannelSelector, serializationDelegate, -1, 100);
        assertPartitionHashSelectedChannels(createChannelSelector, serializationDelegate, 0, 100);
        assertPartitionHashSelectedChannels(createChannelSelector, serializationDelegate, 1, 100);
        assertPartitionHashSelectedChannels(createChannelSelector, serializationDelegate, Integer.MAX_VALUE, 100);
    }

    @Test
    void testForward() {
        verifyForwardSelectedChannels(50050, 100, RecordType.INTEGER);
        verifyForwardSelectedChannels(10050, 100, RecordType.STRING);
    }

    @Test
    void testForcedRebalance() {
        int i = 50000 + 33;
        SerializationDelegate<Record> serializationDelegate = new SerializationDelegate<>(new RecordSerializerFactory().getSerializer());
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.PARTITION_FORCED_REBALANCE, 85 + 100);
        outputEmitter.setup(100);
        int[] selectedChannelsHitCount = getSelectedChannelsHitCount(outputEmitter, serializationDelegate, RecordType.INTEGER, i, 100);
        int i2 = 0;
        for (int i3 = 0; i3 < selectedChannelsHitCount.length; i3++) {
            if (85 <= i3 || i3 < (85 + 33) - 100) {
                Assertions.assertThat(selectedChannelsHitCount[i3]).isEqualTo((i / 100) + 1);
            } else {
                Assertions.assertThat(selectedChannelsHitCount[i3]).isEqualTo(i / 100);
            }
            i2 += selectedChannelsHitCount[i3];
        }
        Assertions.assertThat(i2).isEqualTo(i);
        int i4 = 10000 + 22;
        OutputEmitter outputEmitter2 = new OutputEmitter(ShipStrategyType.PARTITION_FORCED_REBALANCE, 20 + 200);
        outputEmitter2.setup(100);
        int[] selectedChannelsHitCount2 = getSelectedChannelsHitCount(outputEmitter2, serializationDelegate, RecordType.STRING, i4, 100);
        int i5 = 0;
        for (int i6 = 0; i6 < selectedChannelsHitCount2.length; i6++) {
            if (20 > i6 || i6 >= 20 + 22) {
                Assertions.assertThat(selectedChannelsHitCount2[i6]).isEqualTo(i4 / 100);
            } else {
                Assertions.assertThat(selectedChannelsHitCount2[i6]).isEqualTo((i4 / 100) + 1);
            }
            i5 += selectedChannelsHitCount2[i6];
        }
        Assertions.assertThat(i5).isEqualTo(i4);
    }

    @Test
    void testBroadcast() {
        verifyBroadcastSelectedChannels(100, 50000, RecordType.INTEGER);
        verifyBroadcastSelectedChannels(100, 50000, RecordType.STRING);
    }

    @Test
    void testMultiKeys() {
        ChannelSelector createChannelSelector = createChannelSelector(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{0, 1, 3}, new Class[]{IntValue.class, StringValue.class, DoubleValue.class}).m680createComparator(), 100);
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int[] iArr = new int[100];
        for (int i = 0; i < 5000; i++) {
            Record record = new Record(4);
            record.setField(0, new IntValue(i));
            record.setField(1, new StringValue("AB" + i + "CD" + i));
            record.setField(3, new DoubleValue(i * 3.141d));
            serializationDelegate.setInstance(record);
            int selectChannel = createChannelSelector.selectChannel(serializationDelegate);
            iArr[selectChannel] = iArr[selectChannel] + 1;
        }
        int i2 = 0;
        for (int i3 : iArr) {
            Assertions.assertThat(i3).isGreaterThan(0);
            i2 += i3;
        }
        Assertions.assertThat(i2).isEqualTo(5000);
    }

    @Test
    void testMissingKey() {
        Assertions.assertThat(verifyWrongPartitionHashKey(1, 0)).withFailMessage("Expected a KeyFieldOutOfBoundsException.", new Object[0]).isTrue();
    }

    @Test
    void testNullKey() {
        Assertions.assertThat(verifyWrongPartitionHashKey(0, 1)).withFailMessage("Expected a NullKeyFieldException.", new Object[0]).isTrue();
    }

    @Test
    void testWrongKeyClass() throws Exception {
        ChannelSelector createChannelSelector = createChannelSelector(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{0}, new Class[]{DoubleValue.class}).m680createComparator(), 100);
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        PipedInputStream pipedInputStream = new PipedInputStream(1048576);
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(pipedInputStream);
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(new PipedOutputStream(pipedInputStream));
        Record record = new Record(1);
        record.setField(0, new IntValue());
        record.write(dataOutputViewStreamWrapper);
        Record record2 = new Record();
        record2.read(dataInputViewStreamWrapper);
        serializationDelegate.setInstance(record2);
        Assertions.assertThatThrownBy(() -> {
            createChannelSelector.selectChannel(serializationDelegate);
        }).isInstanceOf(DeserializationException.class);
    }

    private void verifyPartitionHashSelectedChannels(int i, int i2, Enum r9) {
        int i3 = 0;
        for (int i4 : getSelectedChannelsHitCount(ShipStrategyType.PARTITION_HASH, i, i2, r9)) {
            Assertions.assertThat(i4).isGreaterThan(0);
            i3 += i4;
        }
        Assertions.assertThat(i3).isEqualTo(i);
    }

    private void verifyForwardSelectedChannels(int i, int i2, Enum r9) {
        int[] selectedChannelsHitCount = getSelectedChannelsHitCount(ShipStrategyType.FORWARD, i, i2, r9);
        Assertions.assertThat(selectedChannelsHitCount[0]).isEqualTo(i);
        for (int i3 = 1; i3 < selectedChannelsHitCount.length; i3++) {
            Assertions.assertThat(selectedChannelsHitCount[i3]).isZero();
        }
    }

    private void verifyBroadcastSelectedChannels(int i, int i2, Enum r8) {
        Assertions.assertThatThrownBy(() -> {
            getSelectedChannelsHitCount(ShipStrategyType.BROADCAST, i, i2, r8);
        }).isInstanceOf(UnsupportedOperationException.class);
    }

    private boolean verifyWrongPartitionHashKey(int i, int i2) {
        ChannelSelector createChannelSelector = createChannelSelector(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{i}, new Class[]{IntValue.class}).m680createComparator(), 100);
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        Record record = new Record(2);
        record.setField(i2, new IntValue(1));
        serializationDelegate.setInstance(record);
        try {
            createChannelSelector.selectChannel(serializationDelegate);
            return false;
        } catch (NullKeyFieldException e) {
            Assertions.assertThat(e.getFieldNumber()).isEqualTo(i);
            return true;
        }
    }

    private int[] getSelectedChannelsHitCount(ShipStrategyType shipStrategyType, int i, int i2, Enum r13) {
        int[] iArr = {0};
        Class[] clsArr = new Class[1];
        clsArr[0] = r13 == RecordType.INTEGER ? IntValue.class : StringValue.class;
        return getSelectedChannelsHitCount(createChannelSelector(shipStrategyType, new RecordComparatorFactory(iArr, clsArr).m680createComparator(), i2), new SerializationDelegate<>(new RecordSerializerFactory().getSerializer()), r13, i, i2);
    }

    private ChannelSelector createChannelSelector(ShipStrategyType shipStrategyType, TypeComparator typeComparator, int i) {
        OutputEmitter outputEmitter = new OutputEmitter(shipStrategyType, typeComparator);
        outputEmitter.setup(i);
        Assertions.assertThat(shipStrategyType == ShipStrategyType.BROADCAST).isEqualTo(outputEmitter.isBroadcast());
        return outputEmitter;
    }

    private int[] getSelectedChannelsHitCount(ChannelSelector<SerializationDelegate<Record>> channelSelector, SerializationDelegate<Record> serializationDelegate, Enum r8, int i, int i2) {
        int[] iArr = new int[i2];
        for (int i3 = 0; i3 < i; i3++) {
            serializationDelegate.setInstance(new Record(r8 == RecordType.INTEGER ? new IntValue(i3) : new StringValue(i3 + "")));
            int selectChannel = channelSelector.selectChannel(serializationDelegate);
            iArr[selectChannel] = iArr[selectChannel] + 1;
        }
        return iArr;
    }

    private void assertPartitionHashSelectedChannels(ChannelSelector channelSelector, SerializationDelegate<Integer> serializationDelegate, int i, int i2) {
        serializationDelegate.setInstance(Integer.valueOf(i));
        Assertions.assertThat(channelSelector.selectChannel(serializationDelegate)).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(i2 - 1);
    }
}
