package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.kafka.internal.model.MessageTypeEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.SyncEventMetadata;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.state.SyncEvent;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import io.debezium.function.BlockingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/SyncEventHandler.class */
public class SyncEventHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncEventHandler.class);
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskSyncPublisher taskSyncPublisher;
    private final BlockingConsumer<TaskStateChangeEvent> eventConsumer;

    public SyncEventHandler(TaskSyncContextHolder taskSyncContextHolder, TaskSyncPublisher taskSyncPublisher, BlockingConsumer<TaskStateChangeEvent> blockingConsumer) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.taskSyncPublisher = taskSyncPublisher;
        this.eventConsumer = blockingConsumer;
    }

    public void updateCurrentOffset(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) {
        if (taskSyncEvent == null || skipFromPreviousGeneration(taskSyncEvent)) {
            return;
        }
        this.taskSyncContextHolder.update(taskSyncContext -> {
            return taskSyncContext.toBuilder().currentKafkaRecordOffset(syncEventMetadata.getOffset()).build();
        });
        LOGGER.debug("Task {} - update task sync topic offset with {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(syncEventMetadata.getOffset()));
    }

    public void processPreviousStates(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) {
        if (RebalanceState.START_INITIAL_SYNC.equals(this.taskSyncContextHolder.get().getRebalanceState()) && !skipFromPreviousGeneration(taskSyncEvent)) {
            this.taskSyncContextHolder.lock();
            if (taskSyncEvent != null) {
                try {
                    if (taskSyncEvent.getMessageType() == MessageTypeEnum.NEW_EPOCH) {
                        LOGGER.info("Task {} - processPreviousStates - merge new epoch with rebalance generation ID {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId()));
                        this.taskSyncContextHolder.update(taskSyncContext -> {
                            return SyncEventMerger.mergeNewEpoch(taskSyncContext, taskSyncEvent);
                        });
                    } else {
                        this.taskSyncContextHolder.update(taskSyncContext2 -> {
                            return SyncEventMerger.merge(taskSyncContext2, taskSyncEvent);
                        });
                    }
                } finally {
                    this.taskSyncContextHolder.unlock();
                }
            }
            if (syncEventMetadata.isCanInitiateRebalancing()) {
                LOGGER.info("task {}, finished processing all previous sync event messages with end offset {}, can initiate rebalancing", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(syncEventMetadata.getOffset()));
                this.taskSyncContextHolder.update(taskSyncContext3 -> {
                    return taskSyncContext3.toBuilder().rebalanceState(RebalanceState.INITIAL_INCREMENTED_STATE_COMPLETED).epochOffsetHolder(taskSyncContext3.getEpochOffsetHolder().nextOffset(taskSyncContext3.getCurrentKafkaRecordOffset())).build();
                });
                LOGGER.info("Task {} - now initialized with epoch offset {} and context {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(this.taskSyncContextHolder.get().getEpochOffsetHolder().getEpochOffset()), this.taskSyncContextHolder.get()});
                this.taskSyncContextHolder.get().checkDuplication(true, "Finished Initializing Task State");
            }
        }
    }

    public void processNewEpoch(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) throws InterruptedException {
        if (taskSyncEvent == null || skipFromPreviousGeneration(taskSyncEvent)) {
            return;
        }
        this.taskSyncContextHolder.lock();
        try {
            long rebalanceGenerationId = this.taskSyncContextHolder.get().getRebalanceGenerationId();
            long rebalanceGenerationId2 = taskSyncEvent.getRebalanceGenerationId();
            if (this.taskSyncContextHolder.get().getRebalanceState() == RebalanceState.INITIAL_INCREMENTED_STATE_COMPLETED && taskSyncEvent.getMessageType() == MessageTypeEnum.NEW_EPOCH && rebalanceGenerationId2 >= rebalanceGenerationId) {
                LOGGER.debug("Task {} - processNewEpoch {}", this.taskSyncContextHolder.get().getTaskUid(), taskSyncEvent);
                LOGGER.info("Task {} - processNewEpoch : metadata {}, rebalanceId: {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), syncEventMetadata, Long.valueOf(taskSyncEvent.getRebalanceGenerationId())});
                this.taskSyncContextHolder.update(taskSyncContext -> {
                    return SyncEventMerger.mergeNewEpoch(taskSyncContext, taskSyncEvent);
                });
                LOGGER.info("Task {} - SyncEventHandler sent response for new epoch", this.taskSyncContextHolder.get().getTaskUid());
                this.taskSyncPublisher.send(this.taskSyncContextHolder.get().buildTaskSyncEvent());
            }
        } finally {
            this.taskSyncContextHolder.unlock();
        }
    }

    public void process(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) throws InterruptedException {
        if (taskSyncEvent == null || skipFromPreviousGeneration(taskSyncEvent)) {
            return;
        }
        this.taskSyncContextHolder.lock();
        try {
            if (this.taskSyncContextHolder.get().getRebalanceState().equals(RebalanceState.NEW_EPOCH_STARTED)) {
                LOGGER.debug("Task {} - process sync event", this.taskSyncContextHolder.get().getTaskUid());
                this.taskSyncContextHolder.update(taskSyncContext -> {
                    return SyncEventMerger.merge(taskSyncContext, taskSyncEvent);
                });
                this.eventConsumer.accept(new SyncEvent());
            }
        } finally {
            this.taskSyncContextHolder.unlock();
        }
    }

    public void processRebalanceAnswer(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) {
        if (taskSyncEvent == null || skipFromPreviousGeneration(taskSyncEvent)) {
            return;
        }
        this.taskSyncContextHolder.lock();
        try {
            if (this.taskSyncContextHolder.get().isLeader() && this.taskSyncContextHolder.get().getRebalanceState().equals(RebalanceState.INITIAL_INCREMENTED_STATE_COMPLETED)) {
                LOGGER.info("Task {} - process sync event - rebalance answer", this.taskSyncContextHolder.get().getTaskUid());
                this.taskSyncContextHolder.update(taskSyncContext -> {
                    return SyncEventMerger.merge(taskSyncContext, taskSyncEvent);
                });
            }
        } finally {
            this.taskSyncContextHolder.unlock();
        }
    }

    private boolean skipFromPreviousGeneration(TaskSyncEvent taskSyncEvent) {
        if (taskSyncEvent == null) {
            return false;
        }
        long rebalanceGenerationId = taskSyncEvent.getRebalanceGenerationId();
        long rebalanceGenerationId2 = this.taskSyncContextHolder.get().getRebalanceGenerationId();
        if (rebalanceGenerationId >= rebalanceGenerationId2) {
            return false;
        }
        LOGGER.debug("skipFromPreviousGeneration: currentGen: {}, inGen: {}, inTaskUid: {}", new Object[]{Long.valueOf(rebalanceGenerationId2), Long.valueOf(rebalanceGenerationId), taskSyncEvent.getTaskUid()});
        return true;
    }
}
