package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.SpannerConnectorTask;
import io.debezium.connector.spanner.db.metadata.SchemaRegistry;
import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.kafka.KafkaAdminClientFactory;
import io.debezium.connector.spanner.kafka.internal.KafkaConsumerAdminService;
import io.debezium.connector.spanner.kafka.internal.ProducerFactory;
import io.debezium.connector.spanner.kafka.internal.RebalancingConsumerFactory;
import io.debezium.connector.spanner.kafka.internal.RebalancingEventListener;
import io.debezium.connector.spanner.kafka.internal.SyncEventConsumerFactory;
import io.debezium.connector.spanner.kafka.internal.TaskSyncEventListener;
import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.processor.SpannerEventDispatcher;
import io.debezium.connector.spanner.task.leader.LeaderAction;
import io.debezium.connector.spanner.task.leader.LeaderService;
import io.debezium.connector.spanner.task.leader.LowWatermarkStampPublisher;
import io.debezium.connector.spanner.task.leader.rebalancer.LeaderRebalanceStrategy;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionEqualSharingRebalancer;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionGreedyLeaderRebalancer;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/SynchronizationTaskContext.class */
public class SynchronizationTaskContext {
    private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizationTaskContext.class);
    private final LeaderRebalanceStrategy leaderRebalanceStrategy = LeaderRebalanceStrategy.EQUAL_SHARING;
    private final SyncEventConsumerFactory<String, byte[]> syncEventConsumerFactory;
    private final RebalancingConsumerFactory<?, ?> rebalancingConsumerFactory;
    private final ProducerFactory<String, byte[]> producerFactory;
    private final LeaderAction leaderAction;
    private final RebalancingEventListener rebalancingEventListener;
    private final TaskSyncEventListener taskSyncEventListener;
    private final TaskSyncPublisher taskSyncPublisher;
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskStateChangeEventHandler taskStateChangeEventHandler;
    private final ErrorHandler errorHandler;
    private final PartitionFactory partitionFactory;
    private final LowWatermarkStampPublisher lowWatermarkStampPublisher;
    private final Runnable finishingHandler;
    private final TaskStateChangeEventProcessor taskStateChangeEventProcessor;
    private final SyncEventHandler syncEventHandler;
    private final RebalanceHandler rebalanceHandler;
    private final LowWatermarkCalculationJob lowWatermarkCalculationJob;
    private final SchemaRegistry schemaRegistry;
    private final SpannerConnectorTask task;
    private final SpannerConnectorConfig connectorConfig;

    public SynchronizationTaskContext(SpannerConnectorTask spannerConnectorTask, SpannerConnectorConfig spannerConnectorConfig, ErrorHandler errorHandler, PartitionOffsetProvider partitionOffsetProvider, ChangeStream changeStream, SpannerEventDispatcher spannerEventDispatcher, KafkaAdminClientFactory kafkaAdminClientFactory, SchemaRegistry schemaRegistry, Runnable runnable, MetricsEventPublisher metricsEventPublisher, LowWatermarkHolder lowWatermarkHolder) {
        String rebalancingTopic = spannerConnectorConfig.rebalancingTopic();
        String taskSyncTopic = spannerConnectorConfig.taskSyncTopic();
        String connectorName = spannerConnectorConfig.getConnectorName();
        this.task = spannerConnectorTask;
        this.connectorConfig = spannerConnectorConfig;
        this.errorHandler = errorHandler;
        this.finishingHandler = runnable;
        this.schemaRegistry = schemaRegistry;
        this.syncEventConsumerFactory = new SyncEventConsumerFactory<>(spannerConnectorConfig, false);
        this.rebalancingConsumerFactory = new RebalancingConsumerFactory<>(spannerConnectorConfig);
        this.producerFactory = new ProducerFactory<>(spannerConnectorConfig);
        this.taskSyncContextHolder = new TaskSyncContextHolder(metricsEventPublisher);
        this.taskSyncPublisher = new TaskSyncPublisher(spannerConnectorTask.getTaskUid(), taskSyncTopic, spannerConnectorConfig.syncEventPublisherWaitingTimeout(), this.producerFactory, (v1) -> {
            onError(v1);
        });
        KafkaConsumerAdminService kafkaConsumerAdminService = new KafkaConsumerAdminService(kafkaAdminClientFactory.getAdminClient(), connectorName);
        this.partitionFactory = new PartitionFactory(partitionOffsetProvider, metricsEventPublisher);
        LeaderService leaderService = new LeaderService(this.taskSyncContextHolder, spannerConnectorConfig, this::publishEvent, errorHandler, this.partitionFactory, metricsEventPublisher);
        this.lowWatermarkStampPublisher = new LowWatermarkStampPublisher(spannerConnectorConfig, spannerEventDispatcher, this::onError, this.taskSyncContextHolder);
        this.leaderAction = new LeaderAction(this.taskSyncContextHolder, kafkaConsumerAdminService, leaderService, this.leaderRebalanceStrategy.equals(LeaderRebalanceStrategy.EQUAL_SHARING) ? new TaskPartitionEqualSharingRebalancer() : new TaskPartitionGreedyLeaderRebalancer(), this.taskSyncPublisher, this::onError);
        this.taskSyncEventListener = new TaskSyncEventListener(spannerConnectorTask.getTaskUid(), taskSyncTopic, this.syncEventConsumerFactory, true, (v1) -> {
            onError(v1);
        });
        this.rebalancingEventListener = new RebalancingEventListener(spannerConnectorTask, connectorName, rebalancingTopic, spannerConnectorConfig.rebalancingTaskWaitingTimeout(), this.rebalancingConsumerFactory, (v1) -> {
            onError(v1);
        });
        this.taskStateChangeEventHandler = new TaskStateChangeEventHandler(this.taskSyncContextHolder, this.taskSyncPublisher, changeStream, this.partitionFactory, this::onFinish, spannerConnectorConfig, (v1) -> {
            onError(v1);
        });
        this.rebalanceHandler = new RebalanceHandler(this.taskSyncContextHolder, this.taskSyncPublisher, this.leaderAction, this.lowWatermarkStampPublisher);
        this.syncEventHandler = new SyncEventHandler(this.taskSyncContextHolder, this.taskSyncPublisher, this::publishEvent);
        this.lowWatermarkCalculationJob = new LowWatermarkCalculationJob(spannerConnectorConfig, this::onError, new LowWatermarkCalculator(spannerConnectorConfig, this.taskSyncContextHolder, partitionOffsetProvider), lowWatermarkHolder);
        this.taskStateChangeEventProcessor = new TaskStateChangeEventProcessor(spannerConnectorConfig.taskStateChangeEventQueueCapacity(), this.taskSyncContextHolder, this.taskStateChangeEventHandler, this::onError, metricsEventPublisher);
    }

    public synchronized void init() {
        try {
            this.taskSyncContextHolder.init(TaskSyncContext.getInitialContext(this.task.getTaskUid(), this.connectorConfig));
            this.rebalanceHandler.init();
            TaskSyncEventListener taskSyncEventListener = this.taskSyncEventListener;
            SyncEventHandler syncEventHandler = this.syncEventHandler;
            Objects.requireNonNull(syncEventHandler);
            taskSyncEventListener.subscribe(syncEventHandler::updateCurrentOffset);
            TaskSyncEventListener taskSyncEventListener2 = this.taskSyncEventListener;
            SyncEventHandler syncEventHandler2 = this.syncEventHandler;
            Objects.requireNonNull(syncEventHandler2);
            taskSyncEventListener2.subscribe(syncEventHandler2::process);
            TaskSyncEventListener taskSyncEventListener3 = this.taskSyncEventListener;
            SyncEventHandler syncEventHandler3 = this.syncEventHandler;
            Objects.requireNonNull(syncEventHandler3);
            taskSyncEventListener3.subscribe(syncEventHandler3::processNewEpoch);
            TaskSyncEventListener taskSyncEventListener4 = this.taskSyncEventListener;
            SyncEventHandler syncEventHandler4 = this.syncEventHandler;
            Objects.requireNonNull(syncEventHandler4);
            taskSyncEventListener4.subscribe(syncEventHandler4::processRebalanceAnswer);
            TaskSyncEventListener taskSyncEventListener5 = this.taskSyncEventListener;
            SyncEventHandler syncEventHandler5 = this.syncEventHandler;
            Objects.requireNonNull(syncEventHandler5);
            taskSyncEventListener5.subscribe(syncEventHandler5::processPreviousStates);
            this.taskSyncEventListener.start();
            this.taskSyncContextHolder.awaitInitialization();
            LOGGER.info("{}, connecting to the rebalance topic", this.task.getTaskUid());
            this.rebalancingEventListener.listen(rebalanceEventMetadata -> {
                this.rebalanceHandler.process(rebalanceEventMetadata.isLeader(), rebalanceEventMetadata.getConsumerId(), rebalanceEventMetadata.getRebalanceGenerationId());
            });
            this.lowWatermarkCalculationJob.start();
            this.schemaRegistry.init();
            this.taskStateChangeEventProcessor.startProcessing();
            this.taskSyncContextHolder.update(taskSyncContext -> {
                return taskSyncContext.toBuilder().initialized(true).build();
            });
        } catch (Throwable th) {
            LOGGER.error("Exception during SynchronizationTaskContext starting", th);
            onError(th);
        }
    }

    public synchronized void destroy() {
        try {
            this.taskSyncEventListener.shutdown();
            LOGGER.info("Task {}, Shut down TaskSyncEventListener", this.taskSyncContextHolder.get().getTaskUid());
            this.rebalancingEventListener.shutdown();
            LOGGER.info("Task {}, Shut down rebalancingEventListener", this.taskSyncContextHolder.get().getTaskUid());
            this.taskStateChangeEventProcessor.stopProcessing();
            LOGGER.info("Task {}, Shut down TaskStateChangeEventProcessor", this.taskSyncContextHolder.get().getTaskUid());
            this.lowWatermarkCalculationJob.stop();
            LOGGER.info("Task {}, Shut down LowWatermarkCalculationJob", this.taskSyncContextHolder.get().getTaskUid());
            this.rebalanceHandler.destroy();
            LOGGER.info("Task {}, Shut down rebalance handler", this.taskSyncContextHolder.get().getTaskUid());
            this.taskSyncPublisher.close();
            LOGGER.info("Task {}, Shut down TaskSyncPublisher", this.taskSyncContextHolder.get().getTaskUid());
        } catch (Throwable th) {
            LOGGER.warn("Task {}, Exception during sync context destroying", this.taskSyncContextHolder.get().getTaskUid(), th);
        }
    }

    public void publishEvent(TaskStateChangeEvent taskStateChangeEvent) throws InterruptedException {
        LoggerUtils.debug(LOGGER, "publishEvent: type: {}, event: {}", taskStateChangeEvent.getClass().getSimpleName(), taskStateChangeEvent);
        this.taskStateChangeEventProcessor.processEvent(taskStateChangeEvent);
    }

    private void onError(Throwable th) {
        this.errorHandler.setProducerThrowable(th);
    }

    private void onFinish() {
        this.finishingHandler.run();
    }
}
