package org.apache.kafka.clients.consumer.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.class */
public class SubscriptionStateTest {
    private SubscriptionState state = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
    private final String topic = "test";
    private final String topic1 = "test1";
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final TopicPartition t1p0 = new TopicPartition("test1", 0);
    private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
    private final Metadata.LeaderAndEpoch leaderAndEpoch = Metadata.LeaderAndEpoch.noLeaderOrEpoch();

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/SubscriptionStateTest$MockRebalanceListener.class */
    private static class MockRebalanceListener implements ConsumerRebalanceListener {
        Collection<TopicPartition> revoked;
        public Collection<TopicPartition> assigned;
        int revokedCount;
        int assignedCount;

        private MockRebalanceListener() {
            this.revokedCount = 0;
            this.assignedCount = 0;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            this.assigned = collection;
            this.assignedCount++;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            this.revoked = collection;
            this.revokedCount++;
        }
    }

    @Test
    public void partitionAssignment() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assertions.assertEquals(Collections.singleton(this.tp0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        Assertions.assertFalse(this.state.hasAllFetchPositions());
        this.state.seek(this.tp0, 1L);
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        Assertions.assertEquals(1L, this.state.position(this.tp0).offset);
        this.state.assignFromUser(Collections.emptySet());
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
        Assertions.assertFalse(this.state.isAssigned(this.tp0));
        Assertions.assertFalse(this.state.isFetchable(this.tp0));
    }

    @Test
    public void partitionAssignmentChangeOnTopicSubscription() {
        this.state.assignFromUser(new HashSet(Arrays.asList(this.tp0, this.tp1)));
        Assertions.assertEquals(2, this.state.assignedPartitions().size());
        Assertions.assertEquals(2, this.state.numAssignedPartitions());
        Assertions.assertTrue(this.state.assignedPartitions().contains(this.tp0));
        Assertions.assertTrue(this.state.assignedPartitions().contains(this.tp1));
        this.state.unsubscribe();
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
        this.state.subscribe(Collections.singleton("test1"), Optional.of(this.rebalanceListener));
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.t1p0)));
        this.state.assignFromSubscribed(Collections.singleton(this.t1p0));
        Assertions.assertEquals(Collections.singleton(this.t1p0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        Assertions.assertEquals(Collections.singleton(this.t1p0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        this.state.unsubscribe();
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
    }

    @Test
    public void testGroupSubscribe() {
        this.state.subscribe(Collections.singleton("test1"), Optional.of(this.rebalanceListener));
        Assertions.assertEquals(Collections.singleton("test1"), this.state.metadataTopics());
        Assertions.assertFalse(this.state.groupSubscribe(Collections.singleton("test1")));
        Assertions.assertEquals(Collections.singleton("test1"), this.state.metadataTopics());
        Assertions.assertTrue(this.state.groupSubscribe(Utils.mkSet(new String[]{"test", "test1"})));
        Assertions.assertEquals(Utils.mkSet(new String[]{"test", "test1"}), this.state.metadataTopics());
        Assertions.assertFalse(this.state.groupSubscribe(Collections.singleton("test1")));
        Assertions.assertEquals(Collections.singleton("test1"), this.state.metadataTopics());
        this.state.subscribe(Collections.singleton("anotherTopic"), Optional.of(this.rebalanceListener));
        Assertions.assertEquals(Utils.mkSet(new String[]{"test1", "anotherTopic"}), this.state.metadataTopics());
        Assertions.assertFalse(this.state.groupSubscribe(Collections.singleton("anotherTopic")));
        Assertions.assertEquals(Collections.singleton("anotherTopic"), this.state.metadataTopics());
    }

    @Test
    public void partitionAssignmentChangeOnPatternSubscription() {
        this.state.subscribe(Pattern.compile(".*"), Optional.of(this.rebalanceListener));
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
        this.state.subscribeFromPattern(Collections.singleton("test"));
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.tp1)));
        this.state.assignFromSubscribed(Collections.singleton(this.tp1));
        Assertions.assertEquals(Collections.singleton(this.tp1), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        Assertions.assertEquals(Collections.singleton("test"), this.state.subscription());
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.t1p0)));
        this.state.assignFromSubscribed(Collections.singleton(this.t1p0));
        Assertions.assertEquals(Collections.singleton(this.t1p0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        Assertions.assertEquals(Collections.singleton("test"), this.state.subscription());
        this.state.subscribe(Pattern.compile(".*t"), Optional.of(this.rebalanceListener));
        Assertions.assertEquals(Collections.singleton(this.t1p0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        this.state.subscribeFromPattern(Collections.singleton("test"));
        Assertions.assertEquals(Collections.singleton(this.t1p0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.tp0)));
        this.state.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals(Collections.singleton(this.tp0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        Assertions.assertEquals(Collections.singleton("test"), this.state.subscription());
        this.state.unsubscribe();
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
    }

    @Test
    public void verifyAssignmentId() {
        Assertions.assertEquals(0, this.state.assignmentId());
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1});
        this.state.assignFromUser(mkSet);
        Assertions.assertEquals(1, this.state.assignmentId());
        Assertions.assertEquals(mkSet, this.state.assignedPartitions());
        this.state.unsubscribe();
        Assertions.assertEquals(2, this.state.assignmentId());
        Assertions.assertEquals(Collections.emptySet(), this.state.assignedPartitions());
        Set mkSet2 = Utils.mkSet(new TopicPartition[]{this.t1p0});
        this.state.subscribe(Collections.singleton("test1"), Optional.of(this.rebalanceListener));
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(mkSet2));
        this.state.assignFromSubscribed(mkSet2);
        Assertions.assertEquals(3, this.state.assignmentId());
        Assertions.assertEquals(mkSet2, this.state.assignedPartitions());
    }

    @Test
    public void partitionReset() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seek(this.tp0, 5L);
        Assertions.assertEquals(5L, this.state.position(this.tp0).offset);
        this.state.requestOffsetReset(this.tp0);
        Assertions.assertFalse(this.state.isFetchable(this.tp0));
        Assertions.assertTrue(this.state.isOffsetResetNeeded(this.tp0));
        Assertions.assertNull(this.state.position(this.tp0));
        this.state.seek(this.tp0, 0L);
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        Assertions.assertFalse(this.state.isOffsetResetNeeded(this.tp0));
    }

    @Test
    public void topicSubscription() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        Assertions.assertEquals(1, this.state.subscription().size());
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
        Assertions.assertTrue(this.state.hasAutoAssignedPartitions());
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.tp0)));
        this.state.assignFromSubscribed(Collections.singleton(this.tp0));
        this.state.seek(this.tp0, 1L);
        Assertions.assertEquals(1L, this.state.position(this.tp0).offset);
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.tp1)));
        this.state.assignFromSubscribed(Collections.singleton(this.tp1));
        Assertions.assertTrue(this.state.isAssigned(this.tp1));
        Assertions.assertFalse(this.state.isAssigned(this.tp0));
        Assertions.assertFalse(this.state.isFetchable(this.tp1));
        Assertions.assertEquals(Collections.singleton(this.tp1), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
    }

    @Test
    public void partitionPause() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seek(this.tp0, 100L);
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        this.state.pause(this.tp0);
        Assertions.assertFalse(this.state.isFetchable(this.tp0));
        this.state.resume(this.tp0);
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
    }

    @Test
    public void testMarkingPartitionPending() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seek(this.tp0, 100L);
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        this.state.markPendingRevocation(Collections.singleton(this.tp0));
        Assertions.assertFalse(this.state.isFetchable(this.tp0));
        Assertions.assertFalse(this.state.isPaused(this.tp0));
    }

    @Test
    public void testAssignedPartitionsAwaitingCallbackKeepPositionDefinedInCallback() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        this.state.assignFromSubscribedAwaitingCallback(Collections.singleton(this.tp0), Collections.singleton(this.tp0));
        assertAssignmentAppliedAwaitingCallback(this.tp0);
        this.state.seek(this.tp0, 100L);
        this.state.enablePartitionsAwaitingCallback(Collections.singleton(this.tp0));
        Assertions.assertEquals(0, this.state.initializingPartitions().size());
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        Assertions.assertTrue(this.state.hasAllFetchPositions());
        Assertions.assertEquals(100L, this.state.position(this.tp0).offset);
    }

    @Test
    public void testAssignedPartitionsAwaitingCallbackInitializePositionsWhenCallbackCompletes() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        this.state.assignFromSubscribedAwaitingCallback(Collections.singleton(this.tp0), Collections.singleton(this.tp0));
        assertAssignmentAppliedAwaitingCallback(this.tp0);
        this.state.enablePartitionsAwaitingCallback(Collections.singleton(this.tp0));
        Assertions.assertEquals(1, this.state.initializingPartitions().size());
        this.state.seek(this.tp0, 100L);
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        Assertions.assertTrue(this.state.hasAllFetchPositions());
        Assertions.assertEquals(100L, this.state.position(this.tp0).offset);
    }

    @Test
    public void testAssignedPartitionsAwaitingCallbackDoesNotAffectPreviouslyOwnedPartitions() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        this.state.assignFromSubscribedAwaitingCallback(Collections.singleton(this.tp0), Collections.singleton(this.tp0));
        assertAssignmentAppliedAwaitingCallback(this.tp0);
        this.state.enablePartitionsAwaitingCallback(Collections.singleton(this.tp0));
        this.state.seek(this.tp0, 100L);
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        this.state.assignFromSubscribedAwaitingCallback(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}), Collections.singleton(this.tp1));
        Assertions.assertTrue(this.state.isFetchable(this.tp0));
        Assertions.assertFalse(this.state.isFetchable(this.tp1));
        Assertions.assertEquals(1, this.state.initializingPartitions().size());
        this.state.enablePartitionsAwaitingCallback(Collections.singleton(this.tp1));
        Assertions.assertEquals(1, this.state.initializingPartitions().size());
        Assertions.assertEquals(this.tp1, this.state.initializingPartitions().iterator().next());
        this.state.seek(this.tp1, 200L);
        Assertions.assertTrue(this.state.isFetchable(this.tp1));
    }

    private void assertAssignmentAppliedAwaitingCallback(TopicPartition topicPartition) {
        Assertions.assertEquals(Collections.singleton(topicPartition), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        Assertions.assertEquals(Collections.singleton(topicPartition.topic()), this.state.subscription());
        Assertions.assertFalse(this.state.isFetchable(topicPartition));
        Assertions.assertEquals(1, this.state.initializingPartitions().size());
        Assertions.assertFalse(this.state.isPaused(topicPartition));
    }

    @Test
    public void invalidPositionUpdate() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.tp0)));
        this.state.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.state.position(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), this.leaderAndEpoch));
        });
    }

    @Test
    public void cantAssignPartitionForUnsubscribedTopics() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        Assertions.assertFalse(this.state.checkAssignmentMatchedSubscription(Collections.singletonList(this.t1p0)));
    }

    @Test
    public void cantAssignPartitionForUnmatchedPattern() {
        this.state.subscribe(Pattern.compile(".*t"), Optional.of(this.rebalanceListener));
        this.state.subscribeFromPattern(Collections.singleton("test"));
        Assertions.assertFalse(this.state.checkAssignmentMatchedSubscription(Collections.singletonList(this.t1p0)));
    }

    @Test
    public void cantChangePositionForNonAssignedPartition() {
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.state.position(this.tp0, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.leaderAndEpoch));
        });
    }

    @Test
    public void cantSubscribeTopicAndPattern() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.state.subscribe(Pattern.compile(".*"), Optional.of(this.rebalanceListener));
        });
    }

    @Test
    public void cantSubscribePartitionAndPattern() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.state.subscribe(Pattern.compile(".*"), Optional.of(this.rebalanceListener));
        });
    }

    @Test
    public void cantSubscribePatternAndTopic() {
        this.state.subscribe(Pattern.compile(".*"), Optional.of(this.rebalanceListener));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        });
    }

    @Test
    public void cantSubscribePatternAndPartition() {
        this.state.subscribe(Pattern.compile(".*"), Optional.of(this.rebalanceListener));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.state.assignFromUser(Collections.singleton(this.tp0));
        });
    }

    @Test
    public void patternSubscription() {
        this.state.subscribe(Pattern.compile(".*"), Optional.of(this.rebalanceListener));
        this.state.subscribeFromPattern(new HashSet(Arrays.asList("test", "test1")));
        Assertions.assertEquals(2, this.state.subscription().size(), "Expected subscribed topics count is incorrect");
    }

    @Test
    public void unsubscribeUserAssignment() {
        this.state.assignFromUser(new HashSet(Arrays.asList(this.tp0, this.tp1)));
        this.state.unsubscribe();
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        Assertions.assertEquals(Collections.singleton("test"), this.state.subscription());
    }

    @Test
    public void unsubscribeUserSubscribe() {
        this.state.subscribe(Collections.singleton("test"), Optional.of(this.rebalanceListener));
        this.state.unsubscribe();
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assertions.assertEquals(Collections.singleton(this.tp0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
    }

    @Test
    public void unsubscription() {
        this.state.subscribe(Pattern.compile(".*"), Optional.of(this.rebalanceListener));
        this.state.subscribeFromPattern(new HashSet(Arrays.asList("test", "test1")));
        Assertions.assertTrue(this.state.checkAssignmentMatchedSubscription(Collections.singleton(this.tp1)));
        this.state.assignFromSubscribed(Collections.singleton(this.tp1));
        Assertions.assertEquals(Collections.singleton(this.tp1), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        this.state.unsubscribe();
        Assertions.assertEquals(0, this.state.subscription().size());
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assertions.assertEquals(Collections.singleton(this.tp0), this.state.assignedPartitions());
        Assertions.assertEquals(1, this.state.numAssignedPartitions());
        this.state.unsubscribe();
        Assertions.assertEquals(0, this.state.subscription().size());
        Assertions.assertTrue(this.state.assignedPartitions().isEmpty());
        Assertions.assertEquals(0, this.state.numAssignedPartitions());
    }

    @Test
    public void testPreferredReadReplicaLease() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assertions.assertFalse(this.state.preferredReadReplica(this.tp0, 0L).isPresent());
        this.state.updatePreferredReadReplica(this.tp0, 42, () -> {
            return 10L;
        });
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 9L), num -> {
            Assertions.assertEquals(num.intValue(), 42);
        });
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 10L), num2 -> {
            Assertions.assertEquals(num2.intValue(), 42);
        });
        Assertions.assertFalse(this.state.preferredReadReplica(this.tp0, 11L).isPresent());
        this.state.clearPreferredReadReplica(this.tp0);
        Assertions.assertFalse(this.state.preferredReadReplica(this.tp0, 9L).isPresent());
        Assertions.assertFalse(this.state.preferredReadReplica(this.tp0, 11L).isPresent());
        this.state.updatePreferredReadReplica(this.tp0, 43, () -> {
            return 20L;
        });
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 11L), num3 -> {
            Assertions.assertEquals(num3.intValue(), 43);
        });
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 20L), num4 -> {
            Assertions.assertEquals(num4.intValue(), 43);
        });
        Assertions.assertFalse(this.state.preferredReadReplica(this.tp0, 21L).isPresent());
        this.state.updatePreferredReadReplica(this.tp0, 44, () -> {
            return 30L;
        });
        TestUtils.assertOptional(this.state.preferredReadReplica(this.tp0, 30L), num5 -> {
            Assertions.assertEquals(num5.intValue(), 44);
        });
        Assertions.assertFalse(this.state.preferredReadReplica(this.tp0, 31L).isPresent());
    }

    @Test
    public void testSeekUnvalidatedWithNoOffsetEpoch() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(5))));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        ApiVersions apiVersions = new ApiVersions();
        apiVersions.update(node.idString(), NodeApiVersions.create());
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.empty())));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void testSeekUnvalidatedWithNoEpochClearsAwaitingValidation() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(5))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(5))));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void testSeekUnvalidatedWithOffsetEpoch() {
        Node node = new Node(1, "localhost", 9092);
        ApiVersions apiVersions = new ApiVersions();
        apiVersions.update(node.idString(), NodeApiVersions.create());
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(5))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(5))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(15))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.empty())));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void testSeekValidatedShouldClearAwaitingValidation() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(10L, this.state.position(this.tp0).offset);
        this.state.seekValidated(this.tp0, new SubscriptionState.FetchPosition(8L, Optional.of(4), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(8L, this.state.position(this.tp0).offset);
    }

    @Test
    public void testCompleteValidationShouldClearAwaitingValidation() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(10L, this.state.position(this.tp0).offset);
        this.state.completeValidation(this.tp0);
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(10L, this.state.position(this.tp0).offset);
    }

    @Test
    public void testOffsetResetWhileAwaitingValidation() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        this.state.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.state.isOffsetResetNeeded(this.tp0));
    }

    @Test
    public void testMaybeCompleteValidation() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(Optional.empty(), this.state.maybeCompleteValidation(this.tp0, fetchPosition, new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(10 + 5)));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(fetchPosition, this.state.position(this.tp0));
    }

    @Test
    public void testMaybeValidatePositionForCurrentLeader() {
        NodeApiVersions create = NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2);
        ApiVersions apiVersions = new ApiVersions();
        apiVersions.update("1", create);
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        apiVersions.update("1", NodeApiVersions.create());
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertTrue(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp1, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))));
        Assertions.assertFalse(this.state.assignedPartitions().contains(this.tp1));
    }

    @Test
    public void testMaybeCompleteValidationAfterPositionChange() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        SubscriptionState.FetchPosition fetchPosition2 = new SubscriptionState.FetchPosition(20L, Optional.of(8), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition2);
        Assertions.assertEquals(Optional.empty(), this.state.maybeCompleteValidation(this.tp0, fetchPosition, new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(10 + 5)));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(fetchPosition2, this.state.position(this.tp0));
    }

    @Test
    public void testMaybeCompleteValidationAfterOffsetReset() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        this.state.requestOffsetReset(this.tp0);
        Assertions.assertEquals(Optional.empty(), this.state.maybeCompleteValidation(this.tp0, fetchPosition, new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(5).setEndOffset(10 + 5)));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.state.isOffsetResetNeeded(this.tp0));
        Assertions.assertNull(this.state.position(this.tp0));
    }

    @Test
    public void testTruncationDetectionWithResetPolicy() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(Optional.empty(), this.state.maybeCompleteValidation(this.tp0, fetchPosition, new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(7).setEndOffset(5L)));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(new SubscriptionState.FetchPosition(5L, Optional.of(7), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10))), this.state.position(this.tp0));
    }

    @Test
    public void testTruncationDetectionWithoutResetPolicy() {
        Node node = new Node(1, "localhost", 9092);
        this.state = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Optional maybeCompleteValidation = this.state.maybeCompleteValidation(this.tp0, fetchPosition, new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(7).setEndOffset(5L));
        Assertions.assertTrue(maybeCompleteValidation.isPresent());
        SubscriptionState.LogTruncation logTruncation = (SubscriptionState.LogTruncation) maybeCompleteValidation.get();
        Assertions.assertEquals(Optional.of(new OffsetAndMetadata(5L, Optional.of(7), "")), logTruncation.divergentOffsetOpt);
        Assertions.assertEquals(fetchPosition, logTruncation.fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void testTruncationDetectionUnknownDivergentOffsetWithResetPolicy() {
        Node node = new Node(1, "localhost", 9092);
        this.state = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertEquals(Optional.empty(), this.state.maybeCompleteValidation(this.tp0, fetchPosition, new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(-1).setEndOffset(-1L)));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.state.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals(OffsetResetStrategy.EARLIEST, this.state.resetStrategy(this.tp0));
    }

    @Test
    public void testTruncationDetectionUnknownDivergentOffsetWithoutResetPolicy() {
        Node node = new Node(1, "localhost", 9092);
        this.state = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(10L, Optional.of(5), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(10)));
        this.state.seekUnvalidated(this.tp0, fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Optional maybeCompleteValidation = this.state.maybeCompleteValidation(this.tp0, fetchPosition, new OffsetForLeaderEpochResponseData.EpochEndOffset().setLeaderEpoch(-1).setEndOffset(-1L));
        Assertions.assertTrue(maybeCompleteValidation.isPresent());
        SubscriptionState.LogTruncation logTruncation = (SubscriptionState.LogTruncation) maybeCompleteValidation.get();
        Assertions.assertEquals(Optional.empty(), logTruncation.divergentOffsetOpt);
        Assertions.assertEquals(fetchPosition, logTruncation.fetchPosition);
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
    }

    @Test
    public void resetOffsetNoValidation() {
        Node node = new Node(1, "localhost", 9092);
        this.state.assignFromUser(Collections.singleton(this.tp0));
        this.state.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        ApiVersions apiVersions = new ApiVersions();
        apiVersions.update("1", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.empty())));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.state.isOffsetResetNeeded(this.tp0));
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertFalse(this.state.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.empty())));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertFalse(this.state.isOffsetResetNeeded(this.tp0));
        this.state.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.state.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(10), new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(2))));
        Assertions.assertFalse(this.state.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.state.awaitingValidation(this.tp0));
        Assertions.assertFalse(this.state.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.state.maybeValidatePositionForCurrentLeader(apiVersions, this.tp0, new Metadata.LeaderAndEpoch(Optional.of(node), Optional.of(2))));
        Assertions.assertTrue(this.state.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.state.awaitingValidation(this.tp0));
        Assertions.assertFalse(this.state.isOffsetResetNeeded(this.tp0));
    }

    @Test
    public void nullPositionLagOnNoPosition() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        Assertions.assertNull(this.state.partitionLag(this.tp0, IsolationLevel.READ_UNCOMMITTED));
        Assertions.assertNull(this.state.partitionLag(this.tp0, IsolationLevel.READ_COMMITTED));
        this.state.updateHighWatermark(this.tp0, 1L);
        this.state.updateLastStableOffset(this.tp0, 1L);
        Assertions.assertNull(this.state.partitionLag(this.tp0, IsolationLevel.READ_UNCOMMITTED));
        Assertions.assertNull(this.state.partitionLag(this.tp0, IsolationLevel.READ_COMMITTED));
    }

    @Test
    public void testPositionOrNull() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        TopicPartition topicPartition = new TopicPartition("unassigned", 0);
        this.state.seek(this.tp0, 5L);
        Assertions.assertEquals(5L, this.state.positionOrNull(this.tp0).offset);
        Assertions.assertNull(this.state.positionOrNull(topicPartition));
    }

    @Test
    public void testTryUpdatingHighWatermark() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        TopicPartition topicPartition = new TopicPartition("unassigned", 0);
        Assertions.assertTrue(this.state.tryUpdatingHighWatermark(this.tp0, 10L));
        Assertions.assertEquals(10L, this.state.partitionEndOffset(this.tp0, IsolationLevel.READ_UNCOMMITTED));
        Assertions.assertFalse(this.state.tryUpdatingHighWatermark(topicPartition, 10L));
    }

    @Test
    public void testTryUpdatingLogStartOffset() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        TopicPartition topicPartition = new TopicPartition("unassigned", 0);
        this.state.seek(this.tp0, 25L);
        Assertions.assertTrue(this.state.tryUpdatingLogStartOffset(this.tp0, 10L));
        Assertions.assertEquals(15L, this.state.partitionLead(this.tp0));
        Assertions.assertFalse(this.state.tryUpdatingLogStartOffset(topicPartition, 10L));
    }

    @Test
    public void testTryUpdatingLastStableOffset() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        TopicPartition topicPartition = new TopicPartition("unassigned", 0);
        Assertions.assertTrue(this.state.tryUpdatingLastStableOffset(this.tp0, 10L));
        Assertions.assertEquals(10L, this.state.partitionEndOffset(this.tp0, IsolationLevel.READ_COMMITTED));
        Assertions.assertFalse(this.state.tryUpdatingLastStableOffset(topicPartition, 10L));
    }

    @Test
    public void testTryUpdatingPreferredReadReplica() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        TopicPartition topicPartition = new TopicPartition("unassigned", 0);
        LongSupplier longSupplier = () -> {
            return System.currentTimeMillis() + 60000;
        };
        Assertions.assertTrue(this.state.tryUpdatingPreferredReadReplica(this.tp0, 10, longSupplier));
        Assertions.assertEquals(Optional.of(10), this.state.preferredReadReplica(this.tp0, System.currentTimeMillis()));
        Assertions.assertFalse(this.state.tryUpdatingPreferredReadReplica(topicPartition, 10, longSupplier));
        Assertions.assertEquals(Optional.empty(), this.state.preferredReadReplica(topicPartition, System.currentTimeMillis()));
    }

    @Test
    public void testRequestOffsetResetIfPartitionAssigned() {
        this.state.assignFromUser(Collections.singleton(this.tp0));
        TopicPartition topicPartition = new TopicPartition("unassigned", 0);
        this.state.requestOffsetResetIfPartitionAssigned(this.tp0);
        Assertions.assertTrue(this.state.isOffsetResetNeeded(this.tp0));
        this.state.requestOffsetResetIfPartitionAssigned(topicPartition);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.state.isOffsetResetNeeded(topicPartition);
        });
    }
}
