package org.apache.flink.table.runtime.operators.multipleinput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
import org.apache.flink.table.runtime.operators.multipleinput.input.OneInput;
import org.apache.flink.table.runtime.operators.multipleinput.input.SecondInputOfTwoInput;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest.class */
public class BatchMultipleInputStreamOperatorTest extends MultipleInputTestBase {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest$TestingBatchMultipleInputStreamOperator.class */
    public static class TestingBatchMultipleInputStreamOperator extends BatchMultipleInputStreamOperator {
        private final TableOperatorWrapper<?> tailWrapper;
        private final List<StreamElement> outputData;

        public TestingBatchMultipleInputStreamOperator(StreamOperatorParameters<RowData> streamOperatorParameters, List<InputSpec> list, List<TableOperatorWrapper<?>> list2, TableOperatorWrapper<?> tableOperatorWrapper, List<StreamElement> list3) {
            super(streamOperatorParameters, list, list2, tableOperatorWrapper);
            this.tailWrapper = tableOperatorWrapper;
            this.outputData = list3;
        }

        public List<StreamElement> getOutputData() {
            return this.outputData;
        }

        public TableOperatorWrapper<?> getTailWrapper() {
            return this.tailWrapper;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/multipleinput/BatchMultipleInputStreamOperatorTest$TestingOutput.class */
    public static class TestingOutput extends CollectorOutput<RowData> {
        private final List<StreamElement> list;

        public TestingOutput(List<StreamElement> list) {
            super(list);
            this.list = list;
        }

        public void collect(StreamRecord<RowData> streamRecord) {
            this.list.add(streamRecord);
        }
    }

    @Test
    public void testOpen() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        Assert.assertFalse(streamOperator3.isOpened());
        Assert.assertFalse(streamOperator4.isOpened());
        Assert.assertFalse(streamOperator3.isOpened());
        Assert.assertFalse(streamOperator.isOpened());
        createMultipleInputStreamOperator.open();
        Assert.assertTrue(streamOperator3.isOpened());
        Assert.assertTrue(streamOperator4.isOpened());
        Assert.assertTrue(streamOperator2.isOpened());
        Assert.assertTrue(streamOperator.isOpened());
    }

    @Test
    public void testNextSelectionAndEndInput() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        Assert.assertFalse(streamOperator3.isEnd());
        Assert.assertFalse(streamOperator4.isEnd());
        Assert.assertTrue(streamOperator2.getEndInputs().isEmpty());
        Assert.assertTrue(streamOperator.getEndInputs().isEmpty());
        Assert.assertEquals(new InputSelection.Builder().select(3).build(3), createMultipleInputStreamOperator.nextSelection());
        createMultipleInputStreamOperator.endInput(3);
        Assert.assertFalse(streamOperator3.isEnd());
        Assert.assertFalse(streamOperator4.isEnd());
        Assert.assertTrue(streamOperator2.getEndInputs().isEmpty());
        Assert.assertEquals(Collections.singletonList(2), streamOperator.getEndInputs());
        Assert.assertEquals(new InputSelection.Builder().select(1).build(3), createMultipleInputStreamOperator.nextSelection());
        createMultipleInputStreamOperator.endInput(1);
        Assert.assertTrue(streamOperator3.isEnd());
        Assert.assertFalse(streamOperator4.isEnd());
        Assert.assertEquals(Collections.singletonList(1), streamOperator2.getEndInputs());
        Assert.assertEquals(Collections.singletonList(2), streamOperator.getEndInputs());
        Assert.assertEquals(new InputSelection.Builder().select(2).build(3), createMultipleInputStreamOperator.nextSelection());
        createMultipleInputStreamOperator.endInput(2);
        Assert.assertTrue(streamOperator3.isEnd());
        Assert.assertTrue(streamOperator4.isEnd());
        Assert.assertEquals(Arrays.asList(1, 2), streamOperator2.getEndInputs());
        Assert.assertEquals(Arrays.asList(2, 1), streamOperator.getEndInputs());
        Assert.assertEquals(InputSelection.ALL, createMultipleInputStreamOperator.nextSelection());
    }

    @Test
    public void testDispose() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        Assert.assertFalse(streamOperator3.isDisposed());
        Assert.assertFalse(streamOperator4.isDisposed());
        Assert.assertFalse(streamOperator3.isDisposed());
        Assert.assertFalse(streamOperator.isDisposed());
        createMultipleInputStreamOperator.dispose();
        Assert.assertTrue(streamOperator3.isDisposed());
        Assert.assertTrue(streamOperator4.isDisposed());
        Assert.assertTrue(streamOperator2.isDisposed());
        Assert.assertTrue(streamOperator.isDisposed());
    }

    @Test
    public void testClose() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        Assert.assertFalse(streamOperator3.isClosed());
        Assert.assertFalse(streamOperator4.isClosed());
        Assert.assertFalse(streamOperator3.isClosed());
        Assert.assertFalse(streamOperator.isClosed());
        createMultipleInputStreamOperator.close();
        Assert.assertTrue(streamOperator3.isClosed());
        Assert.assertTrue(streamOperator4.isClosed());
        Assert.assertTrue(streamOperator2.isClosed());
        Assert.assertTrue(streamOperator.isClosed());
    }

    @Test
    public void testProcess() throws Exception {
        TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator = createMultipleInputStreamOperator();
        List<StreamElement> outputData = createMultipleInputStreamOperator.getOutputData();
        TestingTwoInputStreamOperator streamOperator = createMultipleInputStreamOperator.getTailWrapper().getStreamOperator();
        TableOperatorWrapper tableOperatorWrapper = (TableOperatorWrapper) createMultipleInputStreamOperator.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator streamOperator2 = tableOperatorWrapper.getStreamOperator();
        TestingOneInputStreamOperator streamOperator3 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(0)).getStreamOperator();
        TestingOneInputStreamOperator streamOperator4 = ((TableOperatorWrapper) tableOperatorWrapper.getInputWrappers().get(1)).getStreamOperator();
        List inputs = createMultipleInputStreamOperator.getInputs();
        Assert.assertEquals(3L, inputs.size());
        Input input = (Input) inputs.get(0);
        Input input2 = (Input) inputs.get(1);
        Input input3 = (Input) inputs.get(2);
        Assert.assertTrue(input instanceof OneInput);
        Assert.assertTrue(input2 instanceof OneInput);
        Assert.assertTrue(input3 instanceof SecondInputOfTwoInput);
        Assert.assertNull(streamOperator.getCurrentElement1());
        Assert.assertNull(streamOperator.getCurrentElement2());
        Assert.assertNull(streamOperator2.getCurrentElement1());
        Assert.assertNull(streamOperator2.getCurrentElement2());
        Assert.assertNull(streamOperator3.getCurrentElement());
        Assert.assertNull(streamOperator4.getCurrentElement());
        Assert.assertTrue(outputData.isEmpty());
        StreamRecord streamRecord = new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("123")}), 456L);
        input3.processElement(streamRecord);
        Assert.assertEquals(streamRecord, streamOperator.getCurrentElement2());
        Assert.assertNull(streamOperator.getCurrentElement1());
        Assert.assertTrue(outputData.isEmpty());
        Assert.assertTrue(streamOperator.getEndInputs().isEmpty());
        createMultipleInputStreamOperator.endInput(3);
        Assert.assertTrue(outputData.isEmpty());
        Assert.assertEquals(Collections.singletonList(2), streamOperator.getEndInputs());
        StreamRecord streamRecord2 = new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("124")}), 457L);
        input.processElement(streamRecord2);
        Assert.assertEquals(streamRecord2, streamOperator3.getCurrentElement());
        Assert.assertNull(streamOperator2.getCurrentElement1());
        Assert.assertNull(streamOperator.getCurrentElement1());
        Assert.assertTrue(outputData.isEmpty());
        Assert.assertTrue(streamOperator2.getEndInputs().isEmpty());
        createMultipleInputStreamOperator.endInput(1);
        Assert.assertEquals(Collections.singletonList(1), streamOperator2.getEndInputs());
        Assert.assertEquals(Collections.singletonList(2), streamOperator.getEndInputs());
        Assert.assertEquals(streamRecord2, streamOperator2.getCurrentElement1());
        Assert.assertTrue(outputData.isEmpty());
        StreamRecord streamRecord3 = new StreamRecord(GenericRowData.of(new Object[]{StringData.fromString("125")}), 458L);
        input2.processElement(streamRecord3);
        Assert.assertEquals(streamRecord3, streamOperator4.getCurrentElement());
        Assert.assertNull(streamOperator2.getCurrentElement2());
        Assert.assertNull(streamOperator.getCurrentElement1());
        Assert.assertTrue(outputData.isEmpty());
        Assert.assertEquals(Collections.singletonList(1), streamOperator2.getEndInputs());
        createMultipleInputStreamOperator.endInput(2);
        Assert.assertEquals(Arrays.asList(1, 2), streamOperator2.getEndInputs());
        Assert.assertEquals(Arrays.asList(2, 1), streamOperator.getEndInputs());
        Assert.assertEquals(streamRecord3, streamOperator2.getCurrentElement2());
        Assert.assertEquals(3L, outputData.size());
    }

    private TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> createSource = createSource(executionEnvironment, "source1");
        Transformation<RowData> createSource2 = createSource(executionEnvironment, "source2");
        Transformation<RowData> createSource3 = createSource(executionEnvironment, "source3");
        TableOperatorWrapperGenerator tableOperatorWrapperGenerator = new TableOperatorWrapperGenerator(Arrays.asList(createSource, createSource2, createSource3), createTwoInputTransform(createTwoInputTransform(createOneInputTransform(createSource, "agg1", new TestingOneInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), createOneInputTransform(createSource2, "agg2", new TestingOneInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), "join1", new TestingTwoInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), createSource3, "join2", new TestingTwoInputStreamOperator(true), InternalTypeInfo.of(RowType.of(new LogicalType[]{DataTypes.STRING().getLogicalType()}))), new int[]{1, 2, 0});
        tableOperatorWrapperGenerator.generate();
        List inputTransformAndInputSpecPairs = tableOperatorWrapperGenerator.getInputTransformAndInputSpecPairs();
        ArrayList arrayList = new ArrayList();
        return new TestingBatchMultipleInputStreamOperator(createStreamOperatorParameters(new TestingOutput(arrayList)), (List) inputTransformAndInputSpecPairs.stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()), tableOperatorWrapperGenerator.getHeadWrappers(), tableOperatorWrapperGenerator.getTailWrapper(), arrayList);
    }
}
