package org.apache.flink.cdc.runtime.operators.schema.distributed;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.HashMultimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.Multimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.class */
public class SchemaCoordinator extends SchemaRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaCoordinator.class);
    private transient AtomicReference<RequestStatus> evolvingStatus;
    private transient Map<Integer, Tuple2<SchemaChangeRequest, CompletableFuture<CoordinationResponse>>> pendingRequests;
    protected transient Set<Integer> flushedSinkWriters;
    private transient Table<TableId, Integer, Schema> upstreamSchemaTable;
    private transient Multimap<Tuple2<Integer, SchemaChangeEvent>, Integer> alreadyHandledSchemaChangeEvents;
    private final ExecutorService schemaChangeThreadPool;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator$RequestStatus.class */
    public enum RequestStatus {
        IDLE,
        WAITING_FOR_FLUSH,
        EVOLVING
    }

    public SchemaCoordinator(String str, OperatorCoordinator.Context context, ExecutorService executorService, MetadataApplier metadataApplier, List<RouteRule> list, SchemaChangeBehavior schemaChangeBehavior, Duration duration) {
        super(context, str, executorService, metadataApplier, list, schemaChangeBehavior, duration);
        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    public void start() throws Exception {
        super.start();
        this.evolvingStatus = new AtomicReference<>(RequestStatus.IDLE);
        this.pendingRequests = new ConcurrentHashMap();
        this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
        this.upstreamSchemaTable = HashBasedTable.create();
        this.alreadyHandledSchemaChangeEvents = HashMultimap.create();
        LOG.info("Started SchemaRegistry for {}. Parallelism: {}", this.operatorName, Integer.valueOf(this.currentParallelism));
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    public void close() throws Exception {
        super.close();
        if (this.schemaChangeThreadPool == null || this.schemaChangeThreadPool.isShutdown()) {
            return;
        }
        this.schemaChangeThreadPool.shutdownNow();
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void snapshot(CompletableFuture<byte[]> completableFuture) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    dataOutputStream.writeInt(SchemaManager.SERIALIZER.getVersion());
                    byte[] serialize = SchemaManager.SERIALIZER.serialize(this.schemaManager);
                    dataOutputStream.writeInt(serialize.length);
                    dataOutputStream.write(serialize);
                    completableFuture.complete(byteArrayOutputStream.toByteArray());
                    if (dataOutputStream != null) {
                        if (0 != 0) {
                            try {
                                dataOutputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataOutputStream.close();
                        }
                    }
                    if (byteArrayOutputStream != null) {
                        if (0 == 0) {
                            byteArrayOutputStream.close();
                            return;
                        }
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (dataOutputStream != null) {
                    if (th2 != null) {
                        try {
                            dataOutputStream.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        dataOutputStream.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
            throw th8;
        }
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void restore(byte[] bArr) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        Throwable th = null;
        try {
            DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
            Throwable th2 = null;
            try {
                int readInt = dataInputStream.readInt();
                byte[] bArr2 = new byte[dataInputStream.readInt()];
                dataInputStream.readFully(bArr2);
                this.schemaManager = SchemaManager.SERIALIZER.m6deserialize(readInt, bArr2);
                if (dataInputStream != null) {
                    if (0 != 0) {
                        try {
                            dataInputStream.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        dataInputStream.close();
                    }
                }
                if (byteArrayInputStream != null) {
                    if (0 == 0) {
                        byteArrayInputStream.close();
                        return;
                    }
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (dataInputStream != null) {
                    if (0 != 0) {
                        try {
                            dataInputStream.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        dataInputStream.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
            throw th7;
        }
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void handleGetOriginalSchemaRequest(GetOriginalSchemaRequest getOriginalSchemaRequest, CompletableFuture<CoordinationResponse> completableFuture) {
        throw new UnsupportedOperationException("In distributed topology, there's no centralized upstream schema table since they may evolve independently in various partitions.");
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void handleCustomCoordinationRequest(CoordinationRequest coordinationRequest, CompletableFuture<CoordinationResponse> completableFuture) throws Exception {
        if (!(coordinationRequest instanceof SchemaChangeRequest)) {
            throw new UnsupportedOperationException("Unknown coordination request type: " + coordinationRequest);
        }
        handleSchemaEvolveRequest((SchemaChangeRequest) coordinationRequest, completableFuture);
    }

    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    protected void handleFlushSuccessEvent(FlushSuccessEvent flushSuccessEvent) throws Exception {
        LOG.info("Sink subtask {} succeed flushing.", Integer.valueOf(flushSuccessEvent.getSinkSubTaskId()));
        this.flushedSinkWriters.add(Integer.valueOf(flushSuccessEvent.getSinkSubTaskId()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry
    public void handleUnrecoverableError(String str, Throwable th) {
        super.handleUnrecoverableError(str, th);
        LOG.info("Current upstream table state: {}", this.upstreamSchemaTable);
        this.pendingRequests.forEach((num, tuple2) -> {
            ((CompletableFuture) tuple2.f1).completeExceptionally(th);
        });
    }

    private void handleSchemaEvolveRequest(SchemaChangeRequest schemaChangeRequest, CompletableFuture<CoordinationResponse> completableFuture) throws Exception {
        LOG.info("Coordinator received schema change request {}.", schemaChangeRequest);
        if (!schemaChangeRequest.isNoOpRequest()) {
            LOG.info("It's not an align request, will try to deduplicate.");
            int sourceSubTaskId = schemaChangeRequest.getSourceSubTaskId();
            int sinkSubTaskId = schemaChangeRequest.getSinkSubTaskId();
            SchemaChangeEvent schemaChangeEvent = schemaChangeRequest.getSchemaChangeEvent();
            Tuple2 of = Tuple2.of(Integer.valueOf(sourceSubTaskId), schemaChangeEvent);
            this.alreadyHandledSchemaChangeEvents.put(of, Integer.valueOf(sinkSubTaskId));
            Collection collection = this.alreadyHandledSchemaChangeEvents.get(of);
            if (collection.size() == 1) {
                LOG.info("It's a new request for {}, will handle it", of);
                updateUpstreamSchemaTable(schemaChangeEvent.tableId(), schemaChangeRequest.getSourceSubTaskId(), schemaChangeEvent);
            } else {
                LOG.info("It's an already handled event {}. It has been handled by {}", of, collection);
                schemaChangeRequest = SchemaChangeRequest.createNoOpRequest(sinkSubTaskId);
            }
            if (collection.size() == this.currentParallelism) {
                LOG.info("All sink subTasks ({}) have already reported request {}. Remove it out of tracking.", collection, of);
                this.alreadyHandledSchemaChangeEvents.removeAll(of);
            }
        }
        this.pendingRequests.put(Integer.valueOf(schemaChangeRequest.getSinkSubTaskId()), Tuple2.of(schemaChangeRequest, completableFuture));
        if (this.pendingRequests.size() == 1) {
            Preconditions.checkState(this.evolvingStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH), "Unexpected evolving status: " + this.evolvingStatus.get());
            LOG.info("Received the very-first schema change request {}. Switching from IDLE to WAITING_FOR_FLUSH.", schemaChangeRequest);
        }
        if (this.pendingRequests.size() == this.currentParallelism) {
            Preconditions.checkState(this.evolvingStatus.compareAndSet(RequestStatus.WAITING_FOR_FLUSH, RequestStatus.EVOLVING), "Unexpected evolving status: " + this.evolvingStatus.get());
            LOG.info("Received the last required schema change request {}. Switching from WAITING_FOR_FLUSH to EVOLVING.", schemaChangeRequest);
            this.schemaChangeThreadPool.submit(() -> {
                try {
                    startSchemaChange();
                } catch (Throwable th) {
                    failJob("Schema change applying task", new FlinkRuntimeException("Failed to apply schema change event.", th));
                    throw new FlinkRuntimeException("Failed to apply schema change event.", th);
                }
            });
        }
    }

    private void updateUpstreamSchemaTable(TableId tableId, int i, SchemaChangeEvent schemaChangeEvent) {
        this.upstreamSchemaTable.put(tableId, Integer.valueOf(i), SchemaUtils.applySchemaChangeEvent((Schema) this.upstreamSchemaTable.get(tableId, Integer.valueOf(i)), schemaChangeEvent));
    }

    private void startSchemaChange() throws TimeoutException {
        LOG.info("Starting to evolve schema.");
        loopWhen(() -> {
            return this.flushedSinkWriters.size() < this.currentParallelism;
        }, () -> {
            LOG.info("Not all sink writers have successfully flushed. Expected {}, actual {}", Integer.valueOf(this.currentParallelism), this.flushedSinkWriters);
        }, this.rpcTimeout, Duration.ofMillis(100L));
        LOG.info("All flushed. Going to evolve schema for pending requests: {}", this.pendingRequests);
        this.flushedSinkWriters.clear();
        Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceEvolvedSchemaChanges = deduceEvolvedSchemaChanges();
        ArrayList arrayList = new ArrayList();
        for (SchemaChangeEvent schemaChangeEvent : (List) deduceEvolvedSchemaChanges.f1) {
            if (applyAndUpdateEvolvedSchemaChange(schemaChangeEvent)) {
                arrayList.add(schemaChangeEvent);
            }
        }
        Set<TableId> set = (Set) deduceEvolvedSchemaChanges.f0;
        HashMap hashMap = new HashMap();
        for (TableId tableId : set) {
            this.schemaManager.getLatestEvolvedSchema(tableId).ifPresent(schema -> {
            });
        }
        ArrayList arrayList2 = new ArrayList(this.pendingRequests.values());
        this.pendingRequests.clear();
        LOG.info("Finished schema evolving. Switching from EVOLVING to IDLE.");
        Preconditions.checkState(this.evolvingStatus.compareAndSet(RequestStatus.EVOLVING, RequestStatus.IDLE), "RequestStatus should be EVOLVING when schema evolving finishes.");
        arrayList2.forEach(tuple2 -> {
            LOG.info("Coordinator finishes pending future from {}", Integer.valueOf(((SchemaChangeRequest) tuple2.f0).getSinkSubTaskId()));
            ((CompletableFuture) tuple2.f1).complete(CoordinationResponseUtils.wrap(new SchemaChangeResponse(hashMap, arrayList)));
        });
    }

    private Tuple2<Set<TableId>, List<SchemaChangeEvent>> deduceEvolvedSchemaChanges() {
        List list = (List) this.pendingRequests.values().stream().map(tuple2 -> {
            return (SchemaChangeRequest) tuple2.f0;
        }).filter(schemaChangeRequest -> {
            return !schemaChangeRequest.isNoOpRequest();
        }).collect(Collectors.toList());
        LOG.info("Step 1 - Start deducing evolved schema change events for {}", list);
        Set<TableId> affectedEvolvedTables = SchemaDerivator.getAffectedEvolvedTables(this.router, (Set) list.stream().map(schemaChangeRequest2 -> {
            return schemaChangeRequest2.getSchemaChangeEvent().tableId();
        }).collect(Collectors.toSet()));
        LOG.info("Step 2 - Affected sink tables are: {}", affectedEvolvedTables);
        ArrayList arrayList = new ArrayList();
        for (TableId tableId : affectedEvolvedTables) {
            Schema orElse = this.schemaManager.getLatestEvolvedSchema(tableId).orElse(null);
            LOG.info("Step 3.1 - For affected sink table {} with schema {}...", tableId, orElse);
            Set<TableId> reverseLookupDependingUpstreamTables = SchemaDerivator.reverseLookupDependingUpstreamTables(this.router, tableId, this.upstreamSchemaTable);
            Preconditions.checkState(!reverseLookupDependingUpstreamTables.isEmpty(), "An affected sink table's upstream dependency cannot be empty.");
            LOG.info("Step 3.2 - upstream dependency tables are: {}", reverseLookupDependingUpstreamTables);
            Set<Schema> reverseLookupDependingUpstreamSchemas = SchemaDerivator.reverseLookupDependingUpstreamSchemas(this.router, tableId, this.upstreamSchemaTable);
            LOG.info("Step 3.3 - Upstream dependency schemas are: {}.", reverseLookupDependingUpstreamSchemas);
            Schema schema = orElse;
            Iterator<Schema> it = reverseLookupDependingUpstreamSchemas.iterator();
            while (it.hasNext()) {
                schema = SchemaMergingUtils.getLeastCommonSchema(schema, it.next());
            }
            LOG.info("Step 3.4 - Deduced widest schema is: {}.", schema);
            List schemaDifference = SchemaMergingUtils.getSchemaDifference(tableId, orElse, schema);
            LOG.info("Step 3.5 - Corresponding schema changes are: {}.", schemaDifference);
            List<SchemaChangeEvent> normalizeSchemaChangeEvents = SchemaDerivator.normalizeSchemaChangeEvents(orElse, schemaDifference, this.behavior, this.metadataApplier);
            LOG.info("Step 3.6 - After being normalized with {} behavior, final schema change events are: {}", this.behavior, normalizeSchemaChangeEvents);
            arrayList.addAll(normalizeSchemaChangeEvents);
        }
        return Tuple2.of(affectedEvolvedTables, arrayList);
    }

    private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) {
        try {
            this.metadataApplier.applySchemaChange(schemaChangeEvent);
            this.schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
            LOG.info("Successfully applied schema change event {} to external system.", schemaChangeEvent);
            return true;
        } catch (Throwable th) {
            handleUnrecoverableError("Apply schema change event - " + schemaChangeEvent, new FlinkRuntimeException("Failed to apply schema change event " + schemaChangeEvent + ".", th));
            this.context.failJob(th);
            throw th;
        }
    }

    @VisibleForTesting
    public void emplaceOriginalSchema(TableId tableId, Integer num, Schema schema) {
        this.upstreamSchemaTable.put(tableId, num, schema);
    }
}
