package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/PartitionGroupTest.class */
public class PartitionGroupTest {
    private final long maxTaskIdleMs = -1;
    private final LogContext logContext = new LogContext("[test] ");
    private final Time time = new MockTime();
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
    private final TopicPartition unknownPartition = new TopicPartition("unknown-partition", 0);
    private final String errMessage = "Partition " + this.unknownPartition + " not found.";
    private final String[] topics = {AssignmentTestUtils.TOPIC_PREFIX};
    private final TopicPartition partition1 = createPartition1();
    private final TopicPartition partition2 = createPartition2();
    private final RecordQueue queue1 = createQueue1();
    private final RecordQueue queue2 = createQueue2();
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);
    private final Metrics metrics = new Metrics();
    private final Sensor enforcedProcessingSensor = this.metrics.sensor(UUID.randomUUID().toString());
    private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", Utils.mkMap(new Map.Entry[0]));

    private static Sensor getValueSensor(Metrics metrics, MetricName metricName) {
        Sensor sensor = metrics.sensor(metricName.name());
        sensor.add(metricName, new Value());
        return sensor;
    }

    @Test
    public void testTimeTracking() {
        PartitionGroup basicGroup = getBasicGroup();
        testFirstBatch(basicGroup);
        testSecondBatch(basicGroup);
    }

    private RecordQueue createQueue1() {
        return new RecordQueue(this.partition1, new MockSourceNode(this.intDeserializer, this.intDeserializer), this.timestampExtractor, new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), this.logContext);
    }

    private RecordQueue createQueue2() {
        return new RecordQueue(this.partition2, new MockSourceNode(this.intDeserializer, this.intDeserializer), this.timestampExtractor, new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), this.logContext);
    }

    private TopicPartition createPartition1() {
        return new TopicPartition(this.topics[0], 1);
    }

    private TopicPartition createPartition2() {
        return new TopicPartition(this.topics[0], 2);
    }

    private void testFirstBatch(PartitionGroup partitionGroup) {
        PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
        MatcherAssert.assertThat(Integer.valueOf(partitionGroup.numBuffered()), CoreMatchers.is(0));
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 3L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        partitionGroup.addRawRecords(this.partition2, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 2L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 4L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 6L, this.recordKey, this.recordValue)));
        verifyBuffered(6, 3, 3, partitionGroup);
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(1L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(2L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.streamTime()), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(0.0d)));
        StampedRecord nextRecord = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition1));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(1L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(3L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(2L));
        verifyTimes(nextRecord, 1L, 1L, partitionGroup);
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(0.0d)));
        StampedRecord nextRecord2 = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition2));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(1L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(2L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(3L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(4L));
        verifyTimes(nextRecord2, 2L, 2L, partitionGroup);
        verifyBuffered(4, 2, 2, partitionGroup);
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
    }

    private void testSecondBatch(PartitionGroup partitionGroup) {
        PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 2L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 4L, this.recordKey, this.recordValue)));
        verifyBuffered(6, 4, 2, partitionGroup);
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(1L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(2L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(3L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(4L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.streamTime()), CoreMatchers.is(2L));
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(0.0d)));
        StampedRecord nextRecord = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition1));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(3L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(2L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(5L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(4L));
        verifyTimes(nextRecord, 3L, 3L, partitionGroup);
        verifyBuffered(5, 3, 2, partitionGroup);
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(0.0d)));
        StampedRecord nextRecord2 = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition2));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(3L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(4L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(5L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(6L));
        verifyTimes(nextRecord2, 4L, 4L, partitionGroup);
        verifyBuffered(4, 3, 1, partitionGroup);
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(0.0d)));
        StampedRecord nextRecord3 = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition1));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(5L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(4L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(2L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(6L));
        verifyTimes(nextRecord3, 5L, 5L, partitionGroup);
        verifyBuffered(3, 2, 1, partitionGroup);
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(0.0d)));
        StampedRecord nextRecord4 = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition1));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(5L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(4L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition1), CoreMatchers.is(4L));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(6L));
        verifyTimes(nextRecord4, 2L, 5L, partitionGroup);
        verifyBuffered(2, 1, 1, partitionGroup);
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(3.0d)));
        StampedRecord nextRecord5 = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition1));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(5L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(4L));
        Assert.assertNull(partitionGroup.headRecordOffset(this.partition1));
        MatcherAssert.assertThat(partitionGroup.headRecordOffset(this.partition2), CoreMatchers.is(6L));
        verifyTimes(nextRecord5, 4L, 5L, partitionGroup);
        verifyBuffered(1, 0, 1, partitionGroup);
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(1.0d)));
        StampedRecord nextRecord6 = partitionGroup.nextRecord(recordInfo, this.time.milliseconds());
        MatcherAssert.assertThat(recordInfo.partition(), Matchers.equalTo(this.partition2));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), CoreMatchers.is(5L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), CoreMatchers.is(6L));
        Assert.assertNull(partitionGroup.headRecordOffset(this.partition1));
        Assert.assertNull(partitionGroup.headRecordOffset(this.partition2));
        verifyTimes(nextRecord6, 6L, 6L, partitionGroup);
        verifyBuffered(0, 0, 0, partitionGroup);
        MatcherAssert.assertThat(this.metrics.metric(this.lastLatenessValue).metricValue(), CoreMatchers.is(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldChooseNextRecordBasedOnHeadTimestamp() {
        PartitionGroup basicGroup = getBasicGroup();
        Assert.assertEquals(0L, basicGroup.numBuffered());
        basicGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 3L, this.recordKey, this.recordValue)));
        verifyBuffered(3, 3, 0, basicGroup);
        Assert.assertEquals(-1L, basicGroup.streamTime());
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(this.lastLatenessValue).metricValue());
        PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
        Assert.assertEquals(basicGroup.nextRecord(recordInfo, this.time.milliseconds()).timestamp, 1L);
        Assert.assertEquals(basicGroup.nextRecord(recordInfo, this.time.milliseconds()).timestamp, 5L);
        basicGroup.addRawRecords(this.partition2, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 2L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 4L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 6L, this.recordKey, this.recordValue)));
        Assert.assertEquals(basicGroup.nextRecord(recordInfo, this.time.milliseconds()).timestamp, 2L);
        Assert.assertEquals(basicGroup.nextRecord(recordInfo, this.time.milliseconds()).timestamp, 3L);
    }

    private void verifyTimes(StampedRecord stampedRecord, long j, long j2, PartitionGroup partitionGroup) {
        MatcherAssert.assertThat(Long.valueOf(stampedRecord.timestamp), CoreMatchers.is(Long.valueOf(j)));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.streamTime()), CoreMatchers.is(Long.valueOf(j2)));
    }

    private void verifyBuffered(int i, int i2, int i3, PartitionGroup partitionGroup) {
        Assert.assertEquals(i, partitionGroup.numBuffered());
        Assert.assertEquals(i2, partitionGroup.numBuffered(this.partition1));
        Assert.assertEquals(i3, partitionGroup.numBuffered(this.partition2));
    }

    @Test
    public void shouldSetPartitionTimestampAndStreamTime() {
        PartitionGroup basicGroup = getBasicGroup();
        basicGroup.setPartitionTime(this.partition1, 100L);
        Assert.assertEquals(100L, basicGroup.partitionTimestamp(this.partition1));
        Assert.assertEquals(100L, basicGroup.streamTime());
        basicGroup.setPartitionTime(this.partition2, 50L);
        Assert.assertEquals(50L, basicGroup.partitionTimestamp(this.partition2));
        Assert.assertEquals(100L, basicGroup.streamTime());
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponAddRecordsIfPartitionUnknown() {
        PartitionGroup basicGroup = getBasicGroup();
        MatcherAssert.assertThat(this.errMessage, Matchers.equalTo(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            basicGroup.addRawRecords(this.unknownPartition, (Iterable) null);
        })).getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponNumBufferedIfPartitionUnknown() {
        PartitionGroup basicGroup = getBasicGroup();
        MatcherAssert.assertThat(this.errMessage, Matchers.equalTo(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            basicGroup.numBuffered(this.unknownPartition);
        })).getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponSetPartitionTimestampIfPartitionUnknown() {
        PartitionGroup basicGroup = getBasicGroup();
        MatcherAssert.assertThat(this.errMessage, Matchers.equalTo(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            basicGroup.setPartitionTime(this.unknownPartition, 0L);
        })).getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponGetPartitionTimestampIfPartitionUnknown() {
        PartitionGroup basicGroup = getBasicGroup();
        MatcherAssert.assertThat(this.errMessage, Matchers.equalTo(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            basicGroup.partitionTimestamp(this.unknownPartition);
        })).getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponGetHeadRecordOffsetIfPartitionUnknown() {
        PartitionGroup basicGroup = getBasicGroup();
        MatcherAssert.assertThat(this.errMessage, Matchers.equalTo(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            basicGroup.headRecordOffset(this.unknownPartition);
        })).getMessage()));
    }

    @Test
    public void shouldEmptyPartitionsOnClear() {
        PartitionGroup basicGroup = getBasicGroup();
        List asList = Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 3L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue));
        basicGroup.addRawRecords(this.partition1, asList);
        basicGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds());
        basicGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds());
        basicGroup.clear();
        MatcherAssert.assertThat(Integer.valueOf(basicGroup.numBuffered()), Matchers.equalTo(0));
        MatcherAssert.assertThat(Long.valueOf(basicGroup.streamTime()), Matchers.equalTo(-1L));
        MatcherAssert.assertThat(basicGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds()), Matchers.equalTo((Object) null));
        MatcherAssert.assertThat(Long.valueOf(basicGroup.partitionTimestamp(this.partition1)), Matchers.equalTo(-1L));
        basicGroup.addRawRecords(this.partition1, asList);
    }

    @Test
    public void shouldUpdatePartitionQueuesShrink() {
        PartitionGroup basicGroup = getBasicGroup();
        basicGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        basicGroup.addRawRecords(this.partition2, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 2L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 4L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 6L, this.recordKey, this.recordValue)));
        Assert.assertEquals(r0.size() + r0.size(), basicGroup.numBuffered());
        Assert.assertTrue(basicGroup.allPartitionsBufferedLocally());
        basicGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds());
        basicGroup.updatePartitions(Utils.mkSet(new TopicPartition[]{createPartition2()}), topicPartition -> {
            Assert.fail("should not create any queues");
            return null;
        });
        Assert.assertTrue(basicGroup.allPartitionsBufferedLocally());
        Assert.assertEquals(r0.size(), basicGroup.numBuffered());
        Assert.assertEquals(1L, basicGroup.streamTime());
        Assert.assertThrows(IllegalStateException.class, () -> {
            basicGroup.partitionTimestamp(this.partition1);
        });
        MatcherAssert.assertThat(basicGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds()), Matchers.notNullValue());
        MatcherAssert.assertThat(Long.valueOf(basicGroup.partitionTimestamp(this.partition2)), Matchers.equalTo(2L));
    }

    @Test
    public void shouldUpdatePartitionQueuesExpand() {
        PartitionGroup partitionGroup = new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1)}), topicPartition -> {
            return OptionalLong.of(0L);
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        Assert.assertEquals(r0.size(), partitionGroup.numBuffered());
        Assert.assertTrue(partitionGroup.allPartitionsBufferedLocally());
        partitionGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds());
        partitionGroup.updatePartitions(Utils.mkSet(new TopicPartition[]{createPartition1(), createPartition2()}), topicPartition2 -> {
            Assert.assertEquals(createPartition2(), topicPartition2);
            return createQueue2();
        });
        Assert.assertFalse(partitionGroup.allPartitionsBufferedLocally());
        Assert.assertEquals(1L, partitionGroup.numBuffered());
        Assert.assertEquals(1L, partitionGroup.streamTime());
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition1)), Matchers.equalTo(1L));
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), Matchers.equalTo(-1L));
        MatcherAssert.assertThat(partitionGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds()), Matchers.notNullValue());
    }

    @Test
    public void shouldUpdatePartitionQueuesShrinkAndExpand() {
        PartitionGroup partitionGroup = new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1)}), topicPartition -> {
            return OptionalLong.of(0L);
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        Assert.assertEquals(r0.size(), partitionGroup.numBuffered());
        Assert.assertTrue(partitionGroup.allPartitionsBufferedLocally());
        partitionGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds());
        partitionGroup.updatePartitions(Utils.mkSet(new TopicPartition[]{createPartition2()}), topicPartition2 -> {
            Assert.assertEquals(createPartition2(), topicPartition2);
            return createQueue2();
        });
        Assert.assertFalse(partitionGroup.allPartitionsBufferedLocally());
        Assert.assertEquals(0L, partitionGroup.numBuffered());
        Assert.assertEquals(1L, partitionGroup.streamTime());
        Assert.assertThrows(IllegalStateException.class, () -> {
            partitionGroup.partitionTimestamp(this.partition1);
        });
        MatcherAssert.assertThat(Long.valueOf(partitionGroup.partitionTimestamp(this.partition2)), Matchers.equalTo(-1L));
        MatcherAssert.assertThat(partitionGroup.nextRecord(new PartitionGroup.RecordInfo(), this.time.milliseconds()), Matchers.nullValue());
    }

    @Test
    public void shouldNeverWaitIfIdlingIsDisabled() {
        PartitionGroup partitionGroup = new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1), Utils.mkEntry(this.partition2, this.queue2)}), topicPartition -> {
            return OptionalLong.of(0L);
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.allPartitionsBufferedLocally()), CoreMatchers.is(false));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(PartitionGroup.class);
        Throwable th = null;
        try {
            try {
                LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
                MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(0L)), CoreMatchers.is(true));
                MatcherAssert.assertThat(createAndRegister.getEvents(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("level", Matchers.equalTo("TRACE")), Matchers.hasProperty("message", Matchers.equalTo("[test] Ready for processing because max.task.idle.ms is disabled.\n\tThere may be out-of-order processing for this task as a result.\n\tBuffered partitions: [topic-1]\n\tNon-buffered partitions: [topic-2]")))));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldBeReadyIfAllPartitionsAreBuffered() {
        PartitionGroup partitionGroup = new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1), Utils.mkEntry(this.partition2, this.queue2)}), topicPartition -> {
            return OptionalLong.of(0L);
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 0L);
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        partitionGroup.addRawRecords(this.partition2, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 2, 5L, this.recordKey, this.recordValue)));
        MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.allPartitionsBufferedLocally()), CoreMatchers.is(true));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(PartitionGroup.class);
        Throwable th = null;
        try {
            try {
                LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
                MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(0L)), CoreMatchers.is(true));
                MatcherAssert.assertThat(createAndRegister.getEvents(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("level", Matchers.equalTo("TRACE")), Matchers.hasProperty("message", Matchers.equalTo("[test] All partitions were buffered locally, so this task is ready for processing.")))));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldWaitForFetchesWhenMetadataIsIncomplete() {
        HashMap hashMap = new HashMap();
        PartitionGroup partitionGroup = new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1), Utils.mkEntry(this.partition2, this.queue2)}), topicPartition -> {
            return (OptionalLong) hashMap.getOrDefault(topicPartition, OptionalLong.empty());
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 0L);
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.allPartitionsBufferedLocally()), CoreMatchers.is(false));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(PartitionGroup.class);
        Throwable th = null;
        try {
            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(0L)), CoreMatchers.is(false));
            MatcherAssert.assertThat(createAndRegister.getEvents(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("level", Matchers.equalTo("TRACE")), Matchers.hasProperty("message", Matchers.equalTo("[test] Waiting to fetch data for topic-2")))));
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            hashMap.put(this.partition2, OptionalLong.of(0L));
            MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(0L)), CoreMatchers.is(true));
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldWaitForPollWhenLagIsNonzero() {
        HashMap hashMap = new HashMap();
        PartitionGroup partitionGroup = new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1), Utils.mkEntry(this.partition2, this.queue2)}), topicPartition -> {
            return (OptionalLong) hashMap.getOrDefault(topicPartition, OptionalLong.empty());
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 0L);
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        hashMap.put(this.partition2, OptionalLong.of(1L));
        MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.allPartitionsBufferedLocally()), CoreMatchers.is(false));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(PartitionGroup.class);
        Throwable th = null;
        try {
            try {
                LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
                MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(0L)), CoreMatchers.is(false));
                MatcherAssert.assertThat(createAndRegister.getEvents(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("level", Matchers.equalTo("TRACE")), Matchers.hasProperty("message", Matchers.equalTo("[test] Lag for topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records.")))));
                if (createAndRegister != null) {
                    if (0 == 0) {
                        createAndRegister.close();
                        return;
                    }
                    try {
                        createAndRegister.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldIdleAsSpecifiedWhenLagIsZero() {
        Throwable th;
        PartitionGroup partitionGroup = new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1), Utils.mkEntry(this.partition2, this.queue2)}), topicPartition -> {
            return OptionalLong.of(0L);
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 1L);
        partitionGroup.addRawRecords(this.partition1, Arrays.asList(new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 1L, this.recordKey, this.recordValue), new ConsumerRecord(AssignmentTestUtils.TOPIC_PREFIX, 1, 5L, this.recordKey, this.recordValue)));
        MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.allPartitionsBufferedLocally()), CoreMatchers.is(false));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(PartitionGroup.class);
        Throwable th2 = null;
        try {
            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(0L)), CoreMatchers.is(false));
            MatcherAssert.assertThat(createAndRegister.getEvents(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("level", Matchers.equalTo("TRACE")), Matchers.hasProperty("message", Matchers.equalTo("[test] Lag for topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1).")))));
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            LogCaptureAppender createAndRegister2 = LogCaptureAppender.createAndRegister(PartitionGroup.class);
            Throwable th4 = null;
            try {
                try {
                    LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
                    MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(1L)), CoreMatchers.is(true));
                    MatcherAssert.assertThat(createAndRegister2.getEvents(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("level", Matchers.equalTo("TRACE")), Matchers.hasProperty("message", Matchers.equalTo("[test] Continuing to process although some partitions are empty on the broker.\n\tThere may be out-of-order processing for this task as a result.\n\tPartitions with local data: [topic-1].\n\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n\tConfigured max.task.idle.ms: 1.\n\tCurrent wall-clock time: 1.")))));
                    if (createAndRegister2 != null) {
                        if (0 != 0) {
                            try {
                                createAndRegister2.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        } else {
                            createAndRegister2.close();
                        }
                    }
                    createAndRegister2 = LogCaptureAppender.createAndRegister(PartitionGroup.class);
                    th = null;
                } catch (Throwable th6) {
                    th4 = th6;
                    throw th6;
                }
                try {
                    try {
                        LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
                        MatcherAssert.assertThat(Boolean.valueOf(partitionGroup.readyToProcess(2L)), CoreMatchers.is(true));
                        MatcherAssert.assertThat(createAndRegister2.getEvents(), Matchers.hasItem(Matchers.allOf(Matchers.hasProperty("level", Matchers.equalTo("TRACE")), Matchers.hasProperty("message", Matchers.equalTo("[test] Continuing to process although some partitions are empty on the broker.\n\tThere may be out-of-order processing for this task as a result.\n\tPartitions with local data: [topic-1].\n\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n\tConfigured max.task.idle.ms: 1.\n\tCurrent wall-clock time: 2.")))));
                        if (createAndRegister2 != null) {
                            if (0 == 0) {
                                createAndRegister2.close();
                                return;
                            }
                            try {
                                createAndRegister2.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        }
                    } catch (Throwable th8) {
                        th = th8;
                        throw th8;
                    }
                } finally {
                }
            } finally {
            }
        } catch (Throwable th9) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th10) {
                        th2.addSuppressed(th10);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th9;
        }
    }

    private PartitionGroup getBasicGroup() {
        return new PartitionGroup(this.logContext, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, this.queue1), Utils.mkEntry(this.partition2, this.queue2)}), topicPartition -> {
            return OptionalLong.of(0L);
        }, getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
    }
}
