package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.class */
public class ShareFetchCollectorTest {
    private static final int DEFAULT_RECORD_COUNT = 10;
    private static final int DEFAULT_MAX_POLL_RECORDS = 500;
    private final TopicIdPartition topicAPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic-a");
    private LogContext logContext;
    private SubscriptionState subscriptions;
    private FetchConfig fetchConfig;
    private ConsumerMetadata metadata;
    private ShareFetchBuffer fetchBuffer;
    private Deserializers<String, String> deserializers;
    private ShareFetchCollector<String, String> fetchCollector;
    private ShareCompletedFetchBuilder completedFetchBuilder;
    private ShareFetchMetricsAggregator shareFetchMetricsAggregator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest$ShareCompletedFetchBuilder.class */
    public class ShareCompletedFetchBuilder {
        private int recordCount;
        private Errors error;

        private ShareCompletedFetchBuilder() {
            this.recordCount = ShareFetchCollectorTest.DEFAULT_RECORD_COUNT;
            this.error = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ShareCompletedFetchBuilder recordCount(int i) {
            this.recordCount = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ShareCompletedFetchBuilder error(Errors errors) {
            this.error = errors;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ShareCompletedFetch build() {
            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(KafkaChannelTest.MAX_RECEIVE_SIZE), Compression.NONE, TimestampType.CREATE_TIME, 0L);
            Throwable th = null;
            for (int i = 0; i < this.recordCount; i++) {
                try {
                    try {
                        builder.append(0L, "key".getBytes(), ("value-" + i).getBytes());
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (builder != null) {
                        if (th != null) {
                            try {
                                builder.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            builder.close();
                        }
                    }
                    throw th2;
                }
            }
            MemoryRecords build = builder.build();
            if (builder != null) {
                if (0 != 0) {
                    try {
                        builder.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    builder.close();
                }
            }
            ShareFetchResponseData.PartitionData acquiredRecords = new ShareFetchResponseData.PartitionData().setPartitionIndex(ShareFetchCollectorTest.this.topicAPartition0.partition()).setRecords(build).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(0L, this.recordCount));
            if (this.error != null) {
                acquiredRecords.setErrorCode(this.error.code());
            }
            return new ShareCompletedFetch(ShareFetchCollectorTest.this.logContext, BufferSupplier.create(), ShareFetchCollectorTest.this.topicAPartition0, acquiredRecords, ShareFetchCollectorTest.this.shareFetchMetricsAggregator, ApiKeys.SHARE_FETCH.latestVersion());
        }
    }

    @Test
    public void testFetchNormal() {
        buildDependencies();
        subscribeAndAssign(this.topicAPartition0);
        ShareCompletedFetch build = this.completedFetchBuilder.recordCount(DEFAULT_MAX_POLL_RECORDS).build();
        Assertions.assertTrue(this.fetchBuffer.isEmpty());
        this.fetchBuffer.add(build);
        Assertions.assertFalse(this.fetchBuffer.isEmpty());
        Assertions.assertFalse(build.isInitialized());
        ShareFetch collect = this.fetchCollector.collect(this.fetchBuffer);
        Assertions.assertFalse(collect.isEmpty());
        Assertions.assertEquals(DEFAULT_MAX_POLL_RECORDS, collect.numRecords());
        Assertions.assertTrue(build.isInitialized());
        Assertions.assertFalse(build.isConsumed());
        Assertions.assertTrue(this.fetchBuffer.isEmpty());
        Assertions.assertNull(this.fetchBuffer.peek());
        Assertions.assertNull(this.fetchBuffer.poll());
        Assertions.assertNotNull(this.fetchBuffer.nextInLineFetch());
        ShareFetch collect2 = this.fetchCollector.collect(this.fetchBuffer);
        Assertions.assertEquals(0, collect2.numRecords());
        Assertions.assertTrue(collect2.isEmpty());
        Assertions.assertTrue(build.isConsumed());
    }

    @MethodSource({"testErrorInInitializeSource"})
    @ParameterizedTest
    public void testErrorInInitialize(final RuntimeException runtimeException) {
        buildDependencies();
        subscribeAndAssign(this.topicAPartition0);
        this.fetchCollector = new ShareFetchCollector<String, String>(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers) { // from class: org.apache.kafka.clients.consumer.internals.ShareFetchCollectorTest.1
            protected ShareCompletedFetch initialize(ShareCompletedFetch shareCompletedFetch) {
                throw runtimeException;
            }
        };
        this.fetchBuffer.add(this.completedFetchBuilder.recordCount(DEFAULT_RECORD_COUNT).build());
        Assertions.assertFalse(this.fetchBuffer.isEmpty());
        Assertions.assertThrows(runtimeException.getClass(), () -> {
            this.fetchCollector.collect(this.fetchBuffer);
        });
    }

    @Test
    public void testFetchWithTopicAuthorizationFailed() {
        buildDependencies();
        subscribeAndAssign(this.topicAPartition0);
        this.fetchBuffer.add(this.completedFetchBuilder.error(Errors.TOPIC_AUTHORIZATION_FAILED).build());
        Assertions.assertThrows(TopicAuthorizationException.class, () -> {
            this.fetchCollector.collect(this.fetchBuffer);
        });
    }

    @Test
    public void testFetchWithUnknownLeaderEpoch() {
        buildDependencies();
        subscribeAndAssign(this.topicAPartition0);
        this.fetchBuffer.add(this.completedFetchBuilder.error(Errors.UNKNOWN_LEADER_EPOCH).build());
        Assertions.assertTrue(this.fetchCollector.collect(this.fetchBuffer).isEmpty());
    }

    @Test
    public void testFetchWithUnknownServerError() {
        buildDependencies();
        subscribeAndAssign(this.topicAPartition0);
        this.fetchBuffer.add(this.completedFetchBuilder.error(Errors.UNKNOWN_SERVER_ERROR).build());
        Assertions.assertTrue(this.fetchCollector.collect(this.fetchBuffer).isEmpty());
    }

    @Test
    public void testFetchWithCorruptMessage() {
        buildDependencies();
        subscribeAndAssign(this.topicAPartition0);
        this.fetchBuffer.add(this.completedFetchBuilder.error(Errors.CORRUPT_MESSAGE).build());
        Assertions.assertThrows(KafkaException.class, () -> {
            this.fetchCollector.collect(this.fetchBuffer);
        });
    }

    @MethodSource({"testFetchWithOtherErrorsSource"})
    @ParameterizedTest
    public void testFetchWithOtherErrors(Errors errors) {
        buildDependencies();
        subscribeAndAssign(this.topicAPartition0);
        this.fetchBuffer.add(this.completedFetchBuilder.error(errors).build());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.fetchCollector.collect(this.fetchBuffer);
        });
    }

    private void buildDependencies() {
        this.logContext = new LogContext();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.deserializer", StringSerializer.class.getName());
        properties.put("value.deserializer", StringSerializer.class.getName());
        properties.put("max.poll.records", String.valueOf(DEFAULT_MAX_POLL_RECORDS));
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        this.deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer());
        ShareFetchMetricsManager createShareFetchMetricsManager = ConsumerUtils.createShareFetchMetricsManager(ConsumerUtils.createMetrics(consumerConfig, Time.SYSTEM));
        HashSet hashSet = new HashSet();
        hashSet.add(this.topicAPartition0.topicPartition());
        this.shareFetchMetricsAggregator = new ShareFetchMetricsAggregator(createShareFetchMetricsManager, hashSet);
        this.subscriptions = ConsumerUtils.createSubscriptionState(consumerConfig, this.logContext);
        this.fetchConfig = new FetchConfig(consumerConfig);
        this.metadata = new ConsumerMetadata(0L, 1000L, 10000L, false, false, this.subscriptions, this.logContext, new ClusterResourceListeners());
        this.fetchCollector = new ShareFetchCollector<>(this.logContext, this.metadata, this.subscriptions, this.fetchConfig, this.deserializers);
        this.fetchBuffer = new ShareFetchBuffer(this.logContext);
        this.completedFetchBuilder = new ShareCompletedFetchBuilder();
    }

    private void subscribeAndAssign(TopicIdPartition topicIdPartition) {
        this.subscriptions.subscribe(Collections.singleton(topicIdPartition.topic()), Optional.empty());
        this.subscriptions.assignFromSubscribed(Collections.singleton(topicIdPartition.topicPartition()));
    }

    private static Stream<Arguments> testFetchWithOtherErrorsSource() {
        ArrayList arrayList = new ArrayList(Arrays.asList(Errors.values()));
        arrayList.removeAll(Arrays.asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.FENCED_LEADER_EPOCH, Errors.OFFSET_NOT_AVAILABLE, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_ID, Errors.INCONSISTENT_TOPIC_ID, Errors.OFFSET_OUT_OF_RANGE, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.UNKNOWN_LEADER_EPOCH, Errors.UNKNOWN_SERVER_ERROR, Errors.CORRUPT_MESSAGE));
        return arrayList.stream().map(obj -> {
            return Arguments.of(new Object[]{obj});
        });
    }

    private static Stream<Arguments> testErrorInInitializeSource() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{new RuntimeException()}), Arguments.of(new Object[]{new KafkaException()})});
    }
}
