package org.apache.flink.streaming.util.asyncprocessing;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.class */
public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> extends KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> {
    private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
    private ThrowingConsumer<StreamRecord<IN1>, Exception> processor1;
    private ThrowingConsumer<StreamRecord<IN2>, Exception> processor2;
    private final ExecutorService executor;

    public static <K, IN1, IN2, OUT, OP extends AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>> OP create(FunctionWithException<ExecutorService, OP, Exception> functionWithException) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CompletableFuture completableFuture = new CompletableFuture();
        newSingleThreadExecutor.execute(() -> {
            try {
                completableFuture.complete((AsyncKeyedTwoInputStreamOperatorTestHarness) functionWithException.apply(newSingleThreadExecutor));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (OP) completableFuture.get();
    }

    public static <K, IN1, IN2, OUT> AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> create(TwoInputStreamOperator<IN1, IN2, OUT> twoInputStreamOperator, KeySelector<IN1, K> keySelector, KeySelector<IN2, K> keySelector2, TypeInformation<K> typeInformation) throws Exception {
        return create(twoInputStreamOperator, keySelector, keySelector2, typeInformation, 1, 1, 0);
    }

    public static <K, IN1, IN2, OUT> AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> create(TwoInputStreamOperator<IN1, IN2, OUT> twoInputStreamOperator, KeySelector<IN1, K> keySelector, KeySelector<IN2, K> keySelector2, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CompletableFuture completableFuture = new CompletableFuture();
        newSingleThreadExecutor.execute(() -> {
            try {
                completableFuture.complete(new AsyncKeyedTwoInputStreamOperatorTestHarness(newSingleThreadExecutor, twoInputStreamOperator, keySelector, keySelector2, typeInformation, i, i2, i3));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (AsyncKeyedTwoInputStreamOperatorTestHarness) completableFuture.get();
    }

    public AsyncKeyedTwoInputStreamOperatorTestHarness(ExecutorService executorService, TwoInputStreamOperator<IN1, IN2, OUT> twoInputStreamOperator, KeySelector<IN1, K> keySelector, KeySelector<IN2, K> keySelector2, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
        super(twoInputStreamOperator, keySelector, keySelector2, typeInformation, i, i2, i3);
        Preconditions.checkState(twoInputStreamOperator instanceof AsyncStateProcessingOperator, "Operator is not an AsyncStateProcessingOperator");
        this.twoInputOperator = twoInputStreamOperator;
        this.executor = executorService;
        getEnvironment().setExpectedExternalFailureCause(Throwable.class);
    }

    private ThrowingConsumer<StreamRecord<IN1>, Exception> getRecordProcessor1() {
        if (this.processor1 == null) {
            this.processor1 = RecordProcessorUtils.getRecordProcessor1(this.twoInputOperator);
        }
        return this.processor1;
    }

    private ThrowingConsumer<StreamRecord<IN2>, Exception> getRecordProcessor2() {
        if (this.processor2 == null) {
            this.processor2 = RecordProcessorUtils.getRecordProcessor2(this.twoInputOperator);
        }
        return this.processor2;
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processElement1(StreamRecord<IN1> streamRecord) throws Exception {
        executeAndGet(() -> {
            getRecordProcessor1().accept(streamRecord);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processElement1(IN1 in1, long j) throws Exception {
        processElement1(new StreamRecord<>(in1, j));
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processElement2(StreamRecord<IN2> streamRecord) throws Exception {
        executeAndGet(() -> {
            getRecordProcessor2().accept(streamRecord);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processElement2(IN2 in2, long j) throws Exception {
        processElement2(new StreamRecord<>(in2, j));
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processWatermark1(Watermark watermark) throws Exception {
        executeAndGet(() -> {
            this.twoInputOperator.processWatermark1(watermark);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processWatermark2(Watermark watermark) throws Exception {
        executeAndGet(() -> {
            this.twoInputOperator.processWatermark2(watermark);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processBothWatermarks(Watermark watermark) throws Exception {
        executeAndGet(() -> {
            this.twoInputOperator.processWatermark1(watermark);
        });
        executeAndGet(() -> {
            this.twoInputOperator.processWatermark2(watermark);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
        executeAndGet(() -> {
            this.twoInputOperator.processWatermarkStatus1(watermarkStatus);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
        executeAndGet(() -> {
            this.twoInputOperator.processWatermarkStatus2(watermarkStatus);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {
        executeAndGet(() -> {
            this.twoInputOperator.processRecordAttributes1(recordAttributes);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {
        executeAndGet(() -> {
            this.twoInputOperator.processRecordAttributes2(recordAttributes);
        });
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void endInput1() throws Exception {
        if (this.operator instanceof BoundedMultiInput) {
            executeAndGet(() -> {
                this.operator.endInput(1);
            });
        }
    }

    @Override // org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness
    public void endInput2() throws Exception {
        if (this.operator instanceof BoundedMultiInput) {
            executeAndGet(() -> {
                this.operator.endInput(2);
            });
        }
    }

    public void drainStateRequests() throws Exception {
        executeAndGet(() -> {
            AsyncProcessingTestUtil.drain(this.operator);
        });
    }

    @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness, java.lang.AutoCloseable
    public void close() throws Exception {
        executeAndGet(() -> {
            super.close();
        });
        this.executor.shutdown();
    }

    @Override // org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
    public int numKeyedStateEntries() {
        AsyncKeyedStateBackendAdaptor asyncKeyedStateBackend = this.operator.getAsyncKeyedStateBackend();
        if (!(asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor)) {
            throw new UnsupportedOperationException(String.format("Unsupported async keyed state backend: %s", asyncKeyedStateBackend.getClass().getCanonicalName()));
        }
        HeapKeyedStateBackend keyedStateBackend = asyncKeyedStateBackend.getKeyedStateBackend();
        if (keyedStateBackend instanceof HeapKeyedStateBackend) {
            return keyedStateBackend.numKeyValueStateEntries();
        }
        throw new UnsupportedOperationException(String.format("Unsupported keyed state backend: %s", keyedStateBackend.getClass().getCanonicalName()));
    }

    private void executeAndGet(RunnableWithException runnableWithException) throws Exception {
        try {
            AsyncProcessingTestUtil.execute(this.executor, () -> {
                checkEnvState();
                runnableWithException.run();
            }).get();
            checkEnvState();
        } catch (Exception e) {
            AsyncProcessingTestUtil.execute(this.executor, () -> {
                this.mockTask.cleanUp(e);
            }).get();
            throw AsyncProcessingTestUtil.unwrapAsyncException(e);
        }
    }

    private void checkEnvState() {
        if (getEnvironment().getActualExternalFailureCause().isPresent()) {
            Assertions.fail("There is an error on other threads", getEnvironment().getActualExternalFailureCause().get());
        }
    }
}
