package org.apache.flink.streaming.util.asyncprocessing;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedMultiInputStreamOperatorTestHarness.class */
public class AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> extends MultiInputStreamOperatorTestHarness<OUT> {
    private final ExecutorService executor;

    public static <K, OUT, OP extends AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT>> OP create(FunctionWithException<ExecutorService, OP, Exception> functionWithException) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CompletableFuture completableFuture = new CompletableFuture();
        newSingleThreadExecutor.execute(() -> {
            try {
                completableFuture.complete((AsyncKeyedMultiInputStreamOperatorTestHarness) functionWithException.apply(newSingleThreadExecutor));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (OP) completableFuture.get();
    }

    public static <K, OUT> AsyncKeyedMultiInputStreamOperatorTestHarness<K, OUT> create(StreamOperatorFactory<OUT> streamOperatorFactory, TypeInformation<K> typeInformation, List<KeySelector<?, K>> list, int i, int i2, int i3) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CompletableFuture completableFuture = new CompletableFuture();
        newSingleThreadExecutor.execute(() -> {
            try {
                completableFuture.complete(new AsyncKeyedMultiInputStreamOperatorTestHarness(newSingleThreadExecutor, streamOperatorFactory, typeInformation, list, i, i2, i3));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (AsyncKeyedMultiInputStreamOperatorTestHarness) completableFuture.get();
    }

    private AsyncKeyedMultiInputStreamOperatorTestHarness(ExecutorService executorService, StreamOperatorFactory<OUT> streamOperatorFactory, TypeInformation<K> typeInformation, List<KeySelector<?, K>> list, int i, int i2, int i3) throws Exception {
        super(streamOperatorFactory, i, i2, i3);
        this.config.setStateKeySerializer(typeInformation.createSerializer(this.executionConfig.getSerializerConfig()));
        this.config.serializeAllConfigs();
        for (int i4 = 0; i4 < list.size(); i4++) {
            setKeySelector(i4, list.get(i4));
        }
        this.executor = executorService;
        getEnvironment().setExpectedExternalFailureCause(Throwable.class);
    }

    public void setKeySelector(int i, KeySelector<?, K> keySelector) {
        ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
        this.config.setStatePartitioner(i, keySelector);
        this.config.serializeAllConfigs();
    }

    @Override // org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness
    public void processElement(int i, StreamRecord<?> streamRecord) throws Exception {
        ThrowingConsumer recordProcessor = RecordProcessorUtils.getRecordProcessor((Input) getCastedOperator().getInputs().get(i));
        executeAndGet(() -> {
            recordProcessor.accept(streamRecord);
        });
    }

    @Override // org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness
    public void processWatermark(int i, Watermark watermark) throws Exception {
        Input input = (Input) getCastedOperator().getInputs().get(i);
        executeAndGet(() -> {
            input.processWatermark(watermark);
        });
    }

    @Override // org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness
    public void processWatermarkStatus(int i, WatermarkStatus watermarkStatus) throws Exception {
        Input input = (Input) getCastedOperator().getInputs().get(i);
        executeAndGet(() -> {
            input.processWatermarkStatus(watermarkStatus);
        });
    }

    @Override // org.apache.flink.streaming.util.MultiInputStreamOperatorTestHarness
    public void processRecordAttributes(int i, RecordAttributes recordAttributes) throws Exception {
        Input input = (Input) getCastedOperator().getInputs().get(i);
        executeAndGet(() -> {
            input.processRecordAttributes(recordAttributes);
        });
    }

    public void drainStateRequests() throws Exception {
        executeAndGet(() -> {
            AsyncProcessingTestUtil.drain(this.operator);
        });
    }

    @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness, java.lang.AutoCloseable
    public void close() throws Exception {
        executeAndGet(() -> {
            super.close();
        });
        this.executor.shutdown();
    }

    private void executeAndGet(RunnableWithException runnableWithException) throws Exception {
        try {
            AsyncProcessingTestUtil.execute(this.executor, () -> {
                checkEnvState();
                runnableWithException.run();
            }).get();
            checkEnvState();
        } catch (Exception e) {
            AsyncProcessingTestUtil.execute(this.executor, () -> {
                this.mockTask.cleanUp(e);
            }).get();
            throw AsyncProcessingTestUtil.unwrapAsyncException(e);
        }
    }

    private void checkEnvState() {
        if (getEnvironment().getActualExternalFailureCause().isPresent()) {
            Assertions.fail("There is an error on other threads", getEnvironment().getActualExternalFailureCause().get());
        }
    }
}
