package org.apache.flink.streaming.api.operators.collect;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectResultFetcher.class */
public class CollectResultFetcher<T> {
    private static final int DEFAULT_RETRY_MILLIS = 100;
    private static final Logger LOG = LoggerFactory.getLogger(CollectResultFetcher.class);
    private final AbstractCollectResultBuffer<T> buffer;
    private final CompletableFuture<OperatorID> operatorIdFuture;
    private final String accumulatorName;
    private final int retryMillis;
    private final long resultFetchTimeout;

    @Nullable
    private JobClient jobClient;

    @Nullable
    private CoordinationRequestGateway gateway;
    private boolean jobTerminated;
    private boolean closed;

    public CollectResultFetcher(AbstractCollectResultBuffer<T> abstractCollectResultBuffer, CompletableFuture<OperatorID> completableFuture, String str, long j) {
        this(abstractCollectResultBuffer, completableFuture, str, 100, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CollectResultFetcher(AbstractCollectResultBuffer<T> abstractCollectResultBuffer, CompletableFuture<OperatorID> completableFuture, String str, int i, long j) {
        this.buffer = abstractCollectResultBuffer;
        this.operatorIdFuture = completableFuture;
        this.accumulatorName = str;
        this.retryMillis = i;
        this.resultFetchTimeout = j;
        this.jobTerminated = false;
        this.closed = false;
    }

    public void setJobClient(JobClient jobClient) {
        Preconditions.checkArgument(jobClient instanceof CoordinationRequestGateway, "Job client must be a CoordinationRequestGateway. This is a bug.");
        this.jobClient = jobClient;
        this.gateway = (CoordinationRequestGateway) jobClient;
    }

    public T next() throws IOException {
        if (this.closed) {
            return null;
        }
        boolean z = true;
        while (true) {
            T next = this.buffer.next();
            if (next != null) {
                return next;
            }
            if (this.jobTerminated) {
                return null;
            }
            if (!z) {
                sleepBeforeRetry();
            }
            z = false;
            if (isJobTerminated()) {
                this.jobTerminated = true;
                Tuple2<Long, CollectCoordinationResponse> accumulatorResults = getAccumulatorResults();
                this.buffer.dealWithResponse((CollectCoordinationResponse) accumulatorResults.f1, ((Long) accumulatorResults.f0).longValue());
                this.buffer.complete();
            } else {
                long offset = this.buffer.getOffset();
                try {
                    this.buffer.dealWithResponse(sendRequest(this.buffer.getVersion(), offset), offset);
                } catch (Exception e) {
                    if (ExceptionUtils.findThrowableWithMessage(e, UnavailableDispatcherOperationException.class.getName()).isPresent()) {
                        LOG.debug("The job execution has not started yet; cannot fetch results.", e);
                    } else if (ExceptionUtils.findThrowableWithMessage(e, FlinkJobNotFoundException.class.getName()).isPresent()) {
                        LOG.debug("The job cannot be found. It is very likely that the job is not in a RUNNING state.", e);
                    } else {
                        LOG.warn("An exception occurred when fetching query results", e);
                    }
                }
            }
        }
    }

    public void close() {
        if (this.closed) {
            return;
        }
        cancelJob();
        this.closed = true;
    }

    private CollectCoordinationResponse sendRequest(String str, long j) throws InterruptedException, ExecutionException {
        checkJobClientConfigured();
        OperatorID now = this.operatorIdFuture.getNow(null);
        Preconditions.checkNotNull(now, "Unknown operator ID. This is a bug.");
        return (CollectCoordinationResponse) this.gateway.sendCoordinationRequest(now, new CollectCoordinationRequest(str, j)).get();
    }

    private Tuple2<Long, CollectCoordinationResponse> getAccumulatorResults() throws IOException {
        checkJobClientConfigured();
        try {
            ArrayList arrayList = (ArrayList) ((JobExecutionResult) this.jobClient.getJobExecutionResult().get(this.resultFetchTimeout, TimeUnit.MILLISECONDS)).getAccumulatorResult(this.accumulatorName);
            if (arrayList == null) {
                throw new IOException("Job terminated abnormally, no job execution result can be fetched");
            }
            try {
                return CollectSinkFunction.deserializeAccumulatorResult((byte[]) SerializedListAccumulator.deserializeList(arrayList, BytePrimitiveArraySerializer.INSTANCE).get(0));
            } catch (IOException | ClassNotFoundException e) {
                throw new IOException("Failed to deserialize accumulator results", e);
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e2) {
            throw new IOException("Failed to fetch job execution result", e2);
        }
    }

    private boolean isJobTerminated() {
        checkJobClientConfigured();
        try {
            return ((JobStatus) this.jobClient.getJobStatus().get()).isGloballyTerminalState();
        } catch (Exception e) {
            LOG.warn("Failed to get job status so we assume that the job has terminated. Some data might be lost.", e);
            return true;
        }
    }

    private void cancelJob() {
        checkJobClientConfigured();
        if (isJobTerminated()) {
            return;
        }
        this.jobClient.cancel();
    }

    private void sleepBeforeRetry() {
        if (this.retryMillis <= 0) {
            return;
        }
        try {
            Thread.sleep(this.retryMillis);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted when sleeping before a retry", e);
        }
    }

    private void checkJobClientConfigured() {
        Preconditions.checkNotNull(this.jobClient, "Job client must be configured before first use.");
        Preconditions.checkNotNull(this.gateway, "Coordination request gateway must be configured before first use.");
    }
}
