package io.debezium.connector.spanner.task.leader;

import io.debezium.DebeziumException;
import io.debezium.connector.spanner.kafka.internal.KafkaConsumerAdminService;
import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.kafka.internal.model.MessageTypeEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskStateUtil;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionRebalancer;
import java.lang.Thread;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/leader/LeaderAction.class */
public class LeaderAction {
    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderAction.class);
    private static final Duration EPOCH_OFFSET_UPDATE_DURATION = Duration.ofSeconds(60);
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final KafkaConsumerAdminService kafkaAdminService;
    private final LeaderService leaderService;
    private final TaskPartitionRebalancer taskPartitonRebalancer;
    private final TaskSyncPublisher taskSyncPublisher;
    private volatile Thread leaderThread;
    private Consumer<Throwable> errorHandler;

    public LeaderAction(TaskSyncContextHolder taskSyncContextHolder, KafkaConsumerAdminService kafkaConsumerAdminService, LeaderService leaderService, TaskPartitionRebalancer taskPartitionRebalancer, TaskSyncPublisher taskSyncPublisher, Consumer<Throwable> consumer) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.kafkaAdminService = kafkaConsumerAdminService;
        this.leaderService = leaderService;
        this.taskPartitonRebalancer = taskPartitionRebalancer;
        this.taskSyncPublisher = taskSyncPublisher;
        this.errorHandler = consumer;
    }

    private Thread createLeaderThread() {
        Thread thread = new Thread(() -> {
            LOGGER.info("performLeaderAction: Task {} start leader thread", this.taskSyncContextHolder.get().getTaskUid());
            try {
                newEpoch();
                while (!Thread.interrupted()) {
                    try {
                        Thread.sleep(EPOCH_OFFSET_UPDATE_DURATION.toMillis());
                        if (this.taskSyncContextHolder.get().getRebalanceState() == RebalanceState.NEW_EPOCH_STARTED) {
                            publishEpochOffset();
                        }
                    } catch (InterruptedException e) {
                        LOGGER.info("performLeaderAction: Task {} stop leader thread", this.taskSyncContextHolder.get().getTaskUid());
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            } catch (InterruptedException e2) {
                LOGGER.info("performLeaderAction: Task {} stop leader thread", this.taskSyncContextHolder.get().getTaskUid());
                Thread.currentThread().interrupt();
            }
        }, "SpannerConnector-LeaderAction");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            LOGGER.error("Leader action execution error", th);
            this.errorHandler.accept(th);
        });
        return thread;
    }

    private TaskSyncContext publishEpochOffset() throws InterruptedException {
        TaskSyncContext updateAndGet = this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            return taskSyncContext.toBuilder().epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncContext.getCurrentKafkaRecordOffset())).build();
        });
        this.taskSyncPublisher.send(updateAndGet.buildTaskSyncEvent(MessageTypeEnum.UPDATE_EPOCH));
        LOGGER.info("Task {} - Epoch offset has been incremented and published {}:{}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(updateAndGet.getRebalanceGenerationId()), Long.valueOf(updateAndGet.getEpochOffsetHolder().getEpochOffset())});
        return updateAndGet;
    }

    public void start() {
        if (this.leaderThread != null) {
            stop();
        }
        this.leaderThread = createLeaderThread();
        this.leaderThread.start();
    }

    public void stop() {
        if (this.leaderThread == null) {
            return;
        }
        this.leaderThread.interrupt();
        do {
        } while (!this.leaderThread.getState().equals(Thread.State.TERMINATED));
        this.leaderThread = null;
    }

    private void newEpoch() throws InterruptedException {
        LOGGER.info("performLeaderActions: new epoch initialization");
        boolean isStartFromScratch = this.leaderService.isStartFromScratch();
        Set<String> activeConsumerGroupMembers = this.kafkaAdminService.getActiveConsumerGroupMembers();
        LOGGER.info("performLeaderActions: consumers found {}", activeConsumerGroupMembers);
        Map<String, String> awaitAllNewTaskStateUpdates = this.leaderService.awaitAllNewTaskStateUpdates(activeConsumerGroupMembers, this.taskSyncContextHolder.get().getRebalanceGenerationId());
        LOGGER.info("performLeaderActions: answers received {}", awaitAllNewTaskStateUpdates);
        if (awaitAllNewTaskStateUpdates.size() < activeConsumerGroupMembers.size()) {
            LOGGER.info("TaskUid {}, Expected active consumers {}, but only received consumers {}, not sending new epoch", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), activeConsumerGroupMembers, awaitAllNewTaskStateUpdates});
            throw new DebeziumException("Task Uid " + this.taskSyncContextHolder.get().getTaskUid() + " Expected active consumers " + activeConsumerGroupMembers.toString() + " but only received consumers " + awaitAllNewTaskStateUpdates.toString() + " not sending new epoch ");
        }
        boolean z = false;
        if (this.taskSyncContextHolder.get().checkDuplication(false, "NEW EPOCH rebalance event, initial context")) {
            z = true;
        }
        TaskSyncContext updateAndGet = this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            TaskState currentTaskState = taskSyncContext.getCurrentTaskState();
            Map<Boolean, Map<String, TaskState>> splitSurvivedAndObsoleteTaskStates = TaskStateUtil.splitSurvivedAndObsoleteTaskStates(taskSyncContext.getAllTaskStates(), awaitAllNewTaskStateUpdates.values());
            Map<String, TaskState> map = splitSurvivedAndObsoleteTaskStates.get(true);
            return taskSyncContext.toBuilder().currentTaskState(this.taskPartitonRebalancer.rebalance(currentTaskState, map, splitSurvivedAndObsoleteTaskStates.get(false))).rebalanceState(RebalanceState.NEW_EPOCH_STARTED).taskStates(TaskStateUtil.filterSurvivedTasksStates(taskSyncContext.getTaskStates(), map.keySet())).epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncContext.getCurrentKafkaRecordOffset())).build();
        });
        if (!z) {
            updateAndGet.checkDuplication(true, "NEW EPOCH rebalance event, resulting context");
        }
        TaskSyncEvent buildTaskSyncEvent = updateAndGet.buildTaskSyncEvent(MessageTypeEnum.NEW_EPOCH);
        LOGGER.debug("Task {} - sent new epoch {}", updateAndGet.getTaskUid(), buildTaskSyncEvent);
        LOGGER.info("Task {} - LeaderAction sent sync event with rebalance generation ID {}: and epoch offset {}", new Object[]{updateAndGet.getTaskUid(), Long.valueOf(updateAndGet.getRebalanceGenerationId()), Long.valueOf(updateAndGet.getEpochOffsetHolder().getEpochOffset())});
        this.taskSyncPublisher.send(buildTaskSyncEvent);
        if (isStartFromScratch) {
            this.leaderService.newParentPartition();
            LOGGER.info("performLeaderActions: newParentPartition");
        }
        LoggerUtils.debug(LOGGER, "performLeaderActions: new epoch {}", buildTaskSyncEvent);
    }
}
