package io.confluent.kafkarest.resources.v2;

import com.fasterxml.jackson.databind.node.NullNode;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMultimap;
import com.google.protobuf.ByteString;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.common.CompletableFutures;
import io.confluent.kafkarest.controllers.ProduceController;
import io.confluent.kafkarest.controllers.RecordSerializer;
import io.confluent.kafkarest.controllers.SchemaManager;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.ProduceResult;
import io.confluent.kafkarest.entities.RegisteredSchema;
import io.confluent.kafkarest.entities.v2.PartitionOffset;
import io.confluent.kafkarest.entities.v2.ProduceRequest;
import io.confluent.kafkarest.entities.v2.ProduceResponse;
import io.confluent.rest.exceptions.RestServerErrorException;
import jakarta.inject.Provider;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.RetriableException;

/* loaded from: input_file:io/confluent/kafkarest/resources/v2/AbstractProduceAction.class */
abstract class AbstractProduceAction {
    public static final String UNEXPECTED_PRODUCER_EXCEPTION = "Unexpected non-Kafka exception returned by Kafka";
    private final Provider<SchemaManager> schemaManager;
    private final Provider<RecordSerializer> recordSerializer;
    private final Provider<ProduceController> produceController;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:io/confluent/kafkarest/resources/v2/AbstractProduceAction$SerializedKeyAndValue.class */
    public static abstract class SerializedKeyAndValue {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Optional<Integer> getPartitionId();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Optional<ByteString> getKey();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Optional<ByteString> getValue();

        private static SerializedKeyAndValue create(Optional<Integer> optional, Optional<ByteString> optional2, Optional<ByteString> optional3) {
            return new AutoValue_AbstractProduceAction_SerializedKeyAndValue(optional, optional2, optional3);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractProduceAction(Provider<SchemaManager> provider, Provider<RecordSerializer> provider2, Provider<ProduceController> provider3) {
        this.schemaManager = (Provider) Objects.requireNonNull(provider);
        this.recordSerializer = (Provider) Objects.requireNonNull(provider2);
        this.produceController = (Provider) Objects.requireNonNull(provider3);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final CompletableFuture<ProduceResponse> produceWithoutSchema(EmbeddedFormat embeddedFormat, String str, Optional<Integer> optional, ProduceRequest produceRequest) {
        return produceResultsToResponse(Optional.empty(), Optional.empty(), doProduce(str, serialize(embeddedFormat, str, optional, Optional.empty(), Optional.empty(), produceRequest.getRecords())));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final CompletableFuture<ProduceResponse> produceWithSchema(EmbeddedFormat embeddedFormat, String str, Optional<Integer> optional, ProduceRequest produceRequest) {
        Optional<RegisteredSchema> schema = getSchema(embeddedFormat, str, produceRequest.getKeySchemaId(), produceRequest.getKeySchema(), true);
        Optional<RegisteredSchema> schema2 = getSchema(embeddedFormat, str, produceRequest.getValueSchemaId(), produceRequest.getValueSchema(), false);
        return produceResultsToResponse(schema, schema2, doProduce(str, serialize(embeddedFormat, str, optional, schema, schema2, produceRequest.getRecords())));
    }

    private Optional<RegisteredSchema> getSchema(EmbeddedFormat embeddedFormat, String str, Optional<Integer> optional, Optional<String> optional2, boolean z) {
        return (embeddedFormat.requiresSchema() && (optional.isPresent() || optional2.isPresent())) ? Optional.of(((SchemaManager) this.schemaManager.get()).getSchema(str, optional2.map(str2 -> {
            return embeddedFormat;
        }), Optional.empty(), Optional.empty(), optional, Optional.empty(), optional2, z)) : Optional.empty();
    }

    private List<SerializedKeyAndValue> serialize(EmbeddedFormat embeddedFormat, String str, Optional<Integer> optional, Optional<RegisteredSchema> optional2, Optional<RegisteredSchema> optional3, List<ProduceRequest.ProduceRecord> list) {
        return (List) list.stream().map(produceRecord -> {
            return SerializedKeyAndValue.create((Optional) produceRecord.getPartition().map((v0) -> {
                return Optional.of(v0);
            }).orElse(optional), ((RecordSerializer) this.recordSerializer.get()).serialize(embeddedFormat, str, optional2, produceRecord.getKey().orElse(NullNode.getInstance()), true), ((RecordSerializer) this.recordSerializer.get()).serialize(embeddedFormat, str, optional3, produceRecord.getValue().orElse(NullNode.getInstance()), false));
        }).collect(Collectors.toList());
    }

    private List<CompletableFuture<ProduceResult>> doProduce(String str, List<SerializedKeyAndValue> list) {
        return (List) list.stream().map(serializedKeyAndValue -> {
            return ((ProduceController) this.produceController.get()).produce("", str, serializedKeyAndValue.getPartitionId(), ImmutableMultimap.of(), serializedKeyAndValue.getKey(), serializedKeyAndValue.getValue(), Instant.now());
        }).collect(Collectors.toList());
    }

    private static CompletableFuture<ProduceResponse> produceResultsToResponse(Optional<RegisteredSchema> optional, Optional<RegisteredSchema> optional2, List<CompletableFuture<ProduceResult>> list) {
        return CompletableFutures.allAsList((List) list.stream().map(completableFuture -> {
            return completableFuture.thenApply(produceResult -> {
                return new PartitionOffset(Integer.valueOf(produceResult.getPartitionId()), Long.valueOf(produceResult.getOffset()), null, null);
            });
        }).map(completableFuture2 -> {
            return completableFuture2.exceptionally(th -> {
                return new PartitionOffset(null, null, Integer.valueOf(errorCodeFromProducerException(th.getCause())), th.getCause().getMessage());
            });
        }).collect(Collectors.toList())).thenApply(list2 -> {
            return new ProduceResponse(list2, (Integer) optional.map((v0) -> {
                return v0.getSchemaId();
            }).orElse(null), (Integer) optional2.map((v0) -> {
                return v0.getSchemaId();
            }).orElse(null));
        });
    }

    private static int errorCodeFromProducerException(Throwable th) {
        if (th instanceof AuthenticationException) {
            return Errors.KAFKA_AUTHENTICATION_ERROR_CODE;
        }
        if (th instanceof AuthorizationException) {
            return Errors.KAFKA_AUTHORIZATION_ERROR_CODE;
        }
        if (th instanceof RetriableException) {
            return Errors.KAFKA_RETRIABLE_ERROR_ERROR_CODE;
        }
        if (th instanceof KafkaException) {
            return Errors.KAFKA_ERROR_ERROR_CODE;
        }
        throw new RestServerErrorException(UNEXPECTED_PRODUCER_EXCEPTION, RestServerErrorException.DEFAULT_ERROR_CODE, th);
    }
}
