package org.apache.flink.streaming.api.operators.collect;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.class */
public class CollectSinkOperatorCoordinator implements OperatorCoordinator, CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(CollectSinkOperatorCoordinator.class);
    private final int socketTimeout;
    private InetSocketAddress address;
    private SocketConnection socketConnection;
    private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests;
    private ExecutorService executorService;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator$Provider.class */
    public static class Provider implements OperatorCoordinator.Provider {
        private final OperatorID operatorId;
        private final int socketTimeout;

        public Provider(OperatorID operatorID, int i) {
            this.operatorId = operatorID;
            this.socketTimeout = i;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider
        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider
        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return new CollectSinkOperatorCoordinator(this.socketTimeout);
        }
    }

    @VisibleForTesting
    CollectSinkOperatorCoordinator() {
        this(0);
    }

    public CollectSinkOperatorCoordinator(int i) {
        this.ongoingRequests = ConcurrentHashMap.newKeySet();
        this.socketTimeout = i;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void start() throws Exception {
        this.executorService = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("collect-sink-operator-coordinator-executor-thread-pool"));
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.info("Closing the CollectSinkOperatorCoordinator.");
        this.executorService.shutdownNow();
        this.ongoingRequests.forEach(completableFuture -> {
            completableFuture.cancel(true);
        });
        this.ongoingRequests.clear();
        closeConnection();
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) throws Exception {
        Preconditions.checkArgument(operatorEvent instanceof CollectSinkAddressEvent, "Operator event must be a CollectSinkAddressEvent");
        this.address = ((CollectSinkAddressEvent) operatorEvent).getAddress();
        LOG.info("Received sink socket server address: " + this.address);
    }

    @Override // org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler
    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest coordinationRequest) {
        Preconditions.checkArgument(coordinationRequest instanceof CollectCoordinationRequest, "Coordination request must be a CollectCoordinationRequest");
        CollectCoordinationRequest collectCoordinationRequest = (CollectCoordinationRequest) coordinationRequest;
        if (this.address == null) {
            return CompletableFuture.completedFuture(createEmptyResponse(collectCoordinationRequest));
        }
        CompletableFuture<CoordinationResponse> supplyAsync = FutureUtils.supplyAsync(() -> {
            return handleRequestImpl(collectCoordinationRequest, this.address);
        }, this.executorService);
        this.ongoingRequests.add(supplyAsync);
        return supplyAsync.handle((coordinationResponse, th) -> {
            this.ongoingRequests.remove(supplyAsync);
            if (coordinationResponse != null) {
                return coordinationResponse;
            }
            if (!ExceptionUtils.findThrowable(th, CancellationException.class).isPresent()) {
                if (LOG.isDebugEnabled()) {
                    LOG.warn("Collect sink coordinator encountered an unexpected error.", th);
                } else {
                    LOG.warn("Collect sink coordinator encounters a {}: {}", th.getClass().getSimpleName(), th.getMessage());
                }
                closeConnection();
            }
            return createEmptyResponse(collectCoordinationRequest);
        });
    }

    private CoordinationResponse handleRequestImpl(CollectCoordinationRequest collectCoordinationRequest, InetSocketAddress inetSocketAddress) throws IOException {
        if (inetSocketAddress == null) {
            throw new NullPointerException("No sinkAddress available.");
        }
        if (this.socketConnection == null) {
            this.socketConnection = SocketConnection.create(this.socketTimeout, inetSocketAddress);
            LOG.info("Sink connection established");
        }
        LOG.debug("Forwarding request to sink socket server");
        collectCoordinationRequest.serialize(this.socketConnection.getDataOutputView());
        LOG.debug("Fetching serialized result from sink socket server");
        return new CollectCoordinationResponse(this.socketConnection.getDataInputView());
    }

    private CollectCoordinationResponse createEmptyResponse(CollectCoordinationRequest collectCoordinationRequest) {
        return new CollectCoordinationResponse(collectCoordinationRequest.getVersion(), -1L, Collections.emptyList());
    }

    private void closeConnection() {
        if (this.socketConnection != null) {
            try {
                this.socketConnection.close();
            } catch (Exception e) {
                LOG.warn("Failed to close sink socket server connection", e);
            }
            this.socketConnection = null;
        }
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void executionAttemptFailed(int i, int i2, @Nullable Throwable th) {
        this.address = null;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskReset(int i, long j) {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void executionAttemptReady(int i, int i2, OperatorCoordinator.SubtaskGateway subtaskGateway) {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream).writeObject(this.address);
        completableFuture.complete(byteArrayOutputStream.toByteArray());
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void notifyCheckpointComplete(long j) {
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
        if (bArr != null) {
            this.address = (InetSocketAddress) new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
            return;
        }
        LOG.info("Any ongoing requests are cancelled due to a coordinator reset.");
        cancelOngoingRequests();
        closeConnection();
    }

    private void cancelOngoingRequests() {
        this.ongoingRequests.forEach(completableFuture -> {
            completableFuture.cancel(true);
        });
        this.ongoingRequests.clear();
    }
}
