package io.debezium.connector.spanner.task.leader;

import com.google.cloud.Timestamp;
import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.kafka.internal.model.PartitionStateEnum;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.RebalanceMetricEvent;
import io.debezium.connector.spanner.task.PartitionFactory;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.state.NewPartitionsEvent;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import io.debezium.connector.spanner.task.utils.TimeoutMeter;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.ErrorHandler;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/leader/LeaderService.class */
public class LeaderService {
    private static final int POLL_INTERVAL_MILLIS = 20;
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final Timestamp startTime;
    private final Timestamp endTime;
    private final BlockingConsumer<TaskStateChangeEvent> eventConsumer;
    private final ErrorHandler errorHandler;
    private final MetricsEventPublisher metricsEventPublisher;
    private final PartitionFactory partitionFactory;
    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderService.class);
    private static final Duration AWAIT_TASK_ANSWER_DURATION = Duration.of(60, ChronoUnit.SECONDS);

    public LeaderService(TaskSyncContextHolder taskSyncContextHolder, SpannerConnectorConfig spannerConnectorConfig, BlockingConsumer<TaskStateChangeEvent> blockingConsumer, ErrorHandler errorHandler, PartitionFactory partitionFactory, MetricsEventPublisher metricsEventPublisher) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.startTime = spannerConnectorConfig.startTime();
        this.endTime = spannerConnectorConfig.endTime() != null ? spannerConnectorConfig.endTime() : null;
        this.eventConsumer = blockingConsumer;
        this.errorHandler = errorHandler;
        this.partitionFactory = partitionFactory;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    public boolean isStartFromScratch() {
        EnumSet of = EnumSet.of(PartitionStateEnum.CREATED, PartitionStateEnum.READY_FOR_STREAMING, PartitionStateEnum.SCHEDULED, PartitionStateEnum.RUNNING);
        return !this.taskSyncContextHolder.get().getAllTaskStates().values().stream().flatMap(taskState -> {
            return Stream.of((Object[]) new Collection[]{taskState.getPartitions(), taskState.getSharedPartitions()}).flatMap((v0) -> {
                return v0.stream();
            });
        }).anyMatch(partitionState -> {
            return of.contains(partitionState.getState());
        });
    }

    public Map<String, String> awaitAllNewTaskStateUpdates(Set<String> set, long j) throws InterruptedException {
        HashMap hashMap = new HashMap();
        LOGGER.info("awaitAllNewTaskStateUpdates: wait taskSyncContextHolder for all new task updates");
        TimeoutMeter timeout = TimeoutMeter.setTimeout(AWAIT_TASK_ANSWER_DURATION);
        while (true) {
            if (hashMap.size() >= set.size()) {
                break;
            }
            LOGGER.info("awaitAllNewTaskStateUpdates: expected: {}, actual: {}. Expected consumers: {}", new Object[]{Integer.valueOf(set.size()), Integer.valueOf(hashMap.size()), set});
            if (timeout.isExpired()) {
                LOGGER.error("Task {} : Not received all answers from tasks", this.taskSyncContextHolder.get().getTaskUid());
                break;
            }
            try {
                Thread.sleep(20L);
                this.taskSyncContextHolder.get().getAllTaskStates().entrySet().stream().filter(entry -> {
                    return !hashMap.containsValue(entry.getKey()) && set.contains(((TaskState) entry.getValue()).getConsumerId()) && ((TaskState) entry.getValue()).getRebalanceGenerationId() == j;
                }).findAny().ifPresent(entry2 -> {
                    hashMap.put(((TaskState) entry2.getValue()).getConsumerId(), (String) entry2.getKey());
                });
                this.metricsEventPublisher.publishMetricEvent(new RebalanceMetricEvent(hashMap.size(), set.size()));
            } catch (InterruptedException e) {
                throw e;
            }
        }
        LOGGER.info("awaitAllNewTaskStateUpdates: expected: {}, actual: {}. Expected consumers: {}", new Object[]{Integer.valueOf(set.size()), Integer.valueOf(hashMap.size()), set});
        LOGGER.info("awaitAllNewTaskStateUpdates: new task updated the state with {} consumers: {}", hashMap, Integer.valueOf(hashMap.size()));
        return hashMap;
    }

    public void newParentPartition() throws InterruptedException {
        Partition initPartition = this.partitionFactory.initPartition(this.startTime, this.endTime);
        LOGGER.info("New parent partition {}", initPartition);
        this.eventConsumer.accept(new NewPartitionsEvent(List.of(initPartition)));
    }
}
