package io.debezium.connector.spanner.task;

import io.debezium.DebeziumException;
import io.debezium.connector.spanner.kafka.internal.model.MessageTypeEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.TaskSyncContext;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/SyncEventMerger.class */
public class SyncEventMerger {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncEventMerger.class);

    private SyncEventMerger() {
    }

    public static TaskSyncContext merge(TaskSyncContext taskSyncContext, TaskSyncEvent taskSyncEvent) {
        TaskState taskState;
        boolean z = false;
        if (taskSyncEvent.getMessageType() != MessageTypeEnum.REGULAR && taskSyncContext.checkDuplication(false, taskSyncEvent.getMessageType().toString())) {
            z = true;
        }
        Map<String, TaskState> taskStates = taskSyncEvent.getTaskStates();
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", taskSyncContext, taskStates);
        TaskSyncContext.TaskSyncContextBuilder builder = taskSyncContext.toBuilder();
        HashSet hashSet = new HashSet();
        for (TaskState taskState2 : taskStates.values()) {
            if (!taskState2.getTaskUid().equals(taskSyncContext.getTaskUid()) && ((taskState = taskSyncContext.getTaskStates().get(taskState2.getTaskUid())) == null || taskState2.getStateTimestamp() > taskState.getStateTimestamp())) {
                hashSet.add(taskState2.getTaskUid());
            }
        }
        if (taskSyncEvent.getMessageType() == MessageTypeEnum.UPDATE_EPOCH && !taskSyncEvent.getTaskUid().equals(taskSyncContext.getTaskUid())) {
            LOGGER.info("Task {}, updating the epoch offset from the leader's UPDATE_EPOCH message {}: {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getEpochOffset())});
            builder.epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncEvent.getEpochOffset()));
        }
        if (hashSet.isEmpty()) {
            LOGGER.debug("merge: final state is not changed");
            return builder.build();
        }
        builder.taskStates((Map) Stream.concat(taskSyncContext.getTaskStates().entrySet().stream().filter(entry -> {
            return !hashSet.contains(entry.getKey());
        }), taskStates.entrySet().stream().filter(entry2 -> {
            return hashSet.contains(entry2.getKey());
        })).collect(Collectors.toUnmodifiableMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).createdTimestamp(Long.max(taskSyncContext.getCreatedTimestamp(), taskSyncEvent.getMessageTimestamp()));
        TaskSyncContext build = builder.build();
        LoggerUtils.debug(LOGGER, "merge: final state {}, \nUpdated uids: {}, epoch: {}", build, hashSet, Long.valueOf(build.getRebalanceGenerationId()));
        if (taskSyncEvent.getMessageType() != MessageTypeEnum.REGULAR && !z && build.checkDuplication(true, taskSyncEvent.getMessageType().toString())) {
            LOGGER.info("Task {} found duplication after processing {}", taskSyncContext.getTaskUid(), taskSyncEvent);
            LOGGER.info("Task {} final message {}", taskSyncContext.getTaskUid(), build);
        }
        return build;
    }

    public static TaskSyncContext mergeNewEpoch(TaskSyncContext taskSyncContext, TaskSyncEvent taskSyncEvent) {
        TaskSyncContext.TaskSyncContextBuilder builder = taskSyncContext.toBuilder();
        Set set = (Set) taskSyncEvent.getTaskStates().values().stream().map(taskState -> {
            return taskState.getTaskUid();
        }).collect(Collectors.toSet());
        if (!(taskSyncContext.getRebalanceState() == RebalanceState.START_INITIAL_SYNC) && !set.contains(taskSyncContext.getTaskUid())) {
            LOGGER.warn("Task {} - Received new epoch message , but leader did not include the task in the new epoch message, throwing exception", taskSyncContext.getTaskUid());
            throw new DebeziumException("New epoch message does not contain task state " + taskSyncContext.getTaskUid());
        }
        boolean z = false;
        if (taskSyncContext.checkDuplication(false, "NEW_EPOCH")) {
            z = true;
        }
        if (taskSyncEvent.getTaskUid().equals(taskSyncContext.getTaskUid())) {
            return builder.build();
        }
        HashMap hashMap = new HashMap(taskSyncEvent.getTaskStates());
        hashMap.remove(taskSyncContext.getTaskUid());
        TaskState.TaskStateBuilder builder2 = taskSyncContext.getCurrentTaskState().toBuilder();
        if (RebalanceState.START_INITIAL_SYNC.equals(taskSyncContext.getRebalanceState())) {
            LOGGER.info("Task {}, updating the rebalance generation ID from the leader new epoch {}: {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId())});
            builder.rebalanceGenerationId(taskSyncEvent.getRebalanceGenerationId());
            builder2.rebalanceGenerationId(taskSyncEvent.getRebalanceGenerationId());
        } else {
            builder.rebalanceState(RebalanceState.NEW_EPOCH_STARTED);
        }
        LOGGER.info("Task {}, updating the epoch offset from the leader new epoch {}: {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getEpochOffset())});
        builder.createdTimestamp(taskSyncEvent.getMessageTimestamp()).epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncEvent.getEpochOffset())).taskStates(hashMap).currentTaskState(builder2.build());
        TaskSyncContext build = builder.build();
        if (!z && build.checkDuplication(true, "NEW_EPOCH")) {
            LOGGER.warn("Task {}, duplication exists after processing new epoch, old context {}", build.getTaskUid(), taskSyncContext);
            LOGGER.warn("Task {}, duplication exists after processing new epoch, new message {}", build.getTaskUid(), taskSyncEvent);
            LOGGER.warn("Task {}, duplication exists after processing new epoch, resulting context {}", build.getTaskUid(), build);
        }
        return build;
    }
}
