package org.apache.flink.streaming.util.asyncprocessing;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;

/* loaded from: input_file:org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedOneInputStreamOperatorTestHarness.class */
public class AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> extends KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
    private final List<Input<IN>> inputs;
    private long currentWatermark;
    private final ExecutorService executor;

    public static <K, IN, OUT, OP extends AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>> OP create(FunctionWithException<ExecutorService, OP, Exception> functionWithException) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CompletableFuture completableFuture = new CompletableFuture();
        newSingleThreadExecutor.execute(() -> {
            try {
                completableFuture.complete((AsyncKeyedOneInputStreamOperatorTestHarness) functionWithException.apply(newSingleThreadExecutor));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (OP) completableFuture.get();
    }

    public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> create(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation) throws Exception {
        return create(oneInputStreamOperator, keySelector, typeInformation, 1, 1, 0);
    }

    public static <K, IN, OUT> AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> create(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CompletableFuture completableFuture = new CompletableFuture();
        newSingleThreadExecutor.execute(() -> {
            try {
                completableFuture.complete(new AsyncKeyedOneInputStreamOperatorTestHarness(newSingleThreadExecutor, SimpleOperatorFactory.of(oneInputStreamOperator), keySelector, typeInformation, i, i2, i3));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        return (AsyncKeyedOneInputStreamOperatorTestHarness) completableFuture.get();
    }

    protected AsyncKeyedOneInputStreamOperatorTestHarness(ExecutorService executorService, StreamOperatorFactory<OUT> streamOperatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation, int i, int i2, int i3) throws Exception {
        super(streamOperatorFactory, keySelector, typeInformation, i, i2, i3);
        this.inputs = new ArrayList();
        ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
        this.config.setStatePartitioner(0, keySelector);
        this.config.setStateKeySerializer(typeInformation.createSerializer(this.executionConfig.getSerializerConfig()));
        this.config.serializeAllConfigs();
        this.executor = executorService;
        getEnvironment().setExpectedExternalFailureCause(Throwable.class);
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness, org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness
    public void setup(TypeSerializer<OUT> typeSerializer) {
        super.setup(typeSerializer);
        if (this.operator instanceof MultipleInputStreamOperator) {
            Preconditions.checkState(this.inputs.isEmpty());
            this.inputs.addAll(this.operator.getInputs());
        }
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        finishFuture(processElementInternal(streamRecord));
    }

    public CompletableFuture<Void> processElementInternal(StreamRecord<IN> streamRecord) throws Exception {
        if (this.inputs.isEmpty()) {
            return execute(() -> {
                RecordProcessorUtils.getRecordProcessor(getOneInputOperator()).accept(streamRecord);
            });
        }
        Preconditions.checkState(this.inputs.size() == 1);
        Input<IN> input = this.inputs.get(0);
        return execute(() -> {
            RecordProcessorUtils.getRecordProcessor(input).accept(streamRecord);
        });
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    public void processWatermark(long j) throws Exception {
        finishFuture(processWatermarkInternal(j));
    }

    public CompletableFuture<Void> processWatermarkInternal(long j) {
        return processWatermarkInternal(new Watermark(j));
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        finishFuture(processWatermarkStatusInternal(watermarkStatus));
    }

    public CompletableFuture<Void> processWatermarkStatusInternal(WatermarkStatus watermarkStatus) {
        if (this.inputs.isEmpty()) {
            return execute(() -> {
                getOneInputOperator().processWatermarkStatus(watermarkStatus);
            });
        }
        Preconditions.checkState(this.inputs.size() == 1);
        Input<IN> input = this.inputs.get(0);
        return execute(() -> {
            input.processWatermarkStatus(watermarkStatus);
        });
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    public void processWatermark(Watermark watermark) throws Exception {
        finishFuture(processWatermarkInternal(watermark));
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    public void endInput() throws Exception {
        if (this.operator instanceof BoundedOneInput) {
            executeAndGet(() -> {
                this.operator.endInput();
            });
        }
    }

    public CompletableFuture<Void> processWatermarkInternal(Watermark watermark) {
        this.currentWatermark = watermark.getTimestamp();
        if (this.inputs.isEmpty()) {
            return execute(() -> {
                getOneInputOperator().processWatermark(watermark);
            });
        }
        Preconditions.checkState(this.inputs.size() == 1);
        Input<IN> input = this.inputs.get(0);
        return execute(() -> {
            input.processWatermark(watermark);
        });
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        finishFuture(processLatencyMarkerInternal(latencyMarker));
    }

    public CompletableFuture<Void> processLatencyMarkerInternal(LatencyMarker latencyMarker) {
        if (this.inputs.isEmpty()) {
            return execute(() -> {
                getOneInputOperator().processLatencyMarker(latencyMarker);
            });
        }
        Preconditions.checkState(this.inputs.size() == 1);
        Input<IN> input = this.inputs.get(0);
        return execute(() -> {
            input.processLatencyMarker(latencyMarker);
        });
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        finishFuture(processRecordAttributesInternal(recordAttributes));
    }

    @Override // org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
    public long getCurrentWatermark() {
        return this.currentWatermark;
    }

    public CompletableFuture<Void> processRecordAttributesInternal(RecordAttributes recordAttributes) {
        if (this.inputs.isEmpty()) {
            return execute(() -> {
                getOneInputOperator().processRecordAttributes(recordAttributes);
            });
        }
        Preconditions.checkState(this.inputs.size() == 1);
        Input<IN> input = this.inputs.get(0);
        return execute(() -> {
            input.processRecordAttributes(recordAttributes);
        });
    }

    public void drainStateRequests() throws Exception {
        executeAndGet(() -> {
            AsyncProcessingTestUtil.drain(this.operator);
        });
    }

    @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        executeAndGet(() -> {
            this.operator.prepareSnapshotPreBarrier(j);
        });
    }

    @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness, java.lang.AutoCloseable
    public void close() throws Exception {
        executeAndGet(() -> {
            super.close();
        });
        this.executor.shutdown();
    }

    private CompletableFuture<Void> execute(RunnableWithException runnableWithException) {
        return AsyncProcessingTestUtil.execute(this.executor, () -> {
            checkEnvState();
            runnableWithException.run();
        });
    }

    private void executeAndGet(RunnableWithException runnableWithException) throws Exception {
        finishFuture(execute(runnableWithException));
    }

    private void finishFuture(CompletableFuture<Void> completableFuture) throws Exception {
        try {
            completableFuture.get();
            checkEnvState();
        } catch (Exception e) {
            AsyncProcessingTestUtil.execute(this.executor, () -> {
                this.mockTask.cleanUp(e);
            }).get();
            throw AsyncProcessingTestUtil.unwrapAsyncException(e);
        }
    }

    private void checkEnvState() {
        if (getEnvironment().getActualExternalFailureCause().isPresent()) {
            Assertions.fail("There is an error on other threads", getEnvironment().getActualExternalFailureCause().get());
        }
    }
}
