package io.confluent.kafkarest.resources.v3;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.UnmodifiableIterator;
import com.google.protobuf.ByteString;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.Versions;
import io.confluent.kafkarest.config.ConfigModule;
import io.confluent.kafkarest.controllers.ProduceController;
import io.confluent.kafkarest.controllers.RecordSerializer;
import io.confluent.kafkarest.controllers.SchemaManager;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.ProduceResult;
import io.confluent.kafkarest.entities.RegisteredSchema;
import io.confluent.kafkarest.entities.v3.ProduceBatchRequest;
import io.confluent.kafkarest.entities.v3.ProduceBatchRequestEntry;
import io.confluent.kafkarest.entities.v3.ProduceBatchResponse;
import io.confluent.kafkarest.entities.v3.ProduceBatchResponseFailureEntry;
import io.confluent.kafkarest.entities.v3.ProduceBatchResponseSuccessEntry;
import io.confluent.kafkarest.entities.v3.ProduceRequest;
import io.confluent.kafkarest.entities.v3.ProduceResponse;
import io.confluent.kafkarest.exceptions.BadRequestException;
import io.confluent.kafkarest.exceptions.StacklessCompletionException;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.extension.ResourceAccesslistFeature;
import io.confluent.kafkarest.ratelimit.DoNotRateLimit;
import io.confluent.kafkarest.ratelimit.RateLimitExceededException;
import io.confluent.kafkarest.resources.v3.V3ResourcesModule;
import io.confluent.kafkarest.response.StreamingResponse;
import io.confluent.rest.annotations.PerformanceMetric;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.validation.Valid;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.Providers;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collector;
import org.apache.kafka.common.errors.SerializationException;

@DoNotRateLimit
@Path("/v3/clusters/{clusterId}/topics/{topicName}/records:batch")
@ResourceAccesslistFeature.ResourceName("api.v3.batch-produce.*")
/* loaded from: input_file:io/confluent/kafkarest/resources/v3/ProduceBatchAction.class */
public final class ProduceBatchAction {
    public static final int BATCH_ID_MINIMUM_LENGTH = 1;
    public static final int BATCH_ID_MAXIMUM_LENGTH = 80;

    @Context
    private Providers providers;
    private static final Collector<ProduceRequest.ProduceRequestHeader, ImmutableMultimap.Builder<String, Optional<ByteString>>, ImmutableMultimap<String, Optional<ByteString>>> PRODUCE_REQUEST_HEADER_COLLECTOR = Collector.of(ImmutableMultimap::builder, (builder, produceRequestHeader) -> {
        builder.put(produceRequestHeader.getName(), produceRequestHeader.getValue());
    }, (builder2, builder3) -> {
        return builder2.putAll(builder3.build());
    }, (v0) -> {
        return v0.build();
    }, new Collector.Characteristics[0]);
    private final Provider<SchemaManager> schemaManagerProvider;
    private final Provider<RecordSerializer> recordSerializerProvider;
    private final Provider<ProduceController> produceControllerProvider;
    private final Provider<ProducerMetrics> producerMetricsProvider;
    private final ProduceRateLimiters produceRateLimiters;
    private final int produceBatchMaximumEntries;
    private final ExecutorService executorService;

    @Context
    private HttpServletRequest httpServletRequest;

    @Inject
    public ProduceBatchAction(Provider<SchemaManager> provider, Provider<RecordSerializer> provider2, Provider<ProduceController> provider3, Provider<ProducerMetrics> provider4, ProduceRateLimiters produceRateLimiters, @ConfigModule.ProduceBatchMaximumEntriesConfig Integer num, @V3ResourcesModule.ProduceResponseThreadPool ExecutorService executorService) {
        this.schemaManagerProvider = (Provider) Objects.requireNonNull(provider);
        this.recordSerializerProvider = (Provider) Objects.requireNonNull(provider2);
        this.produceControllerProvider = (Provider) Objects.requireNonNull(provider3);
        this.producerMetricsProvider = (Provider) Objects.requireNonNull(provider4);
        this.produceRateLimiters = (ProduceRateLimiters) Objects.requireNonNull(produceRateLimiters);
        this.produceBatchMaximumEntries = ((Integer) Objects.requireNonNull(num)).intValue();
        this.executorService = (ExecutorService) Objects.requireNonNull(executorService);
    }

    @Produces({Versions.JSON})
    @ResourceAccesslistFeature.ResourceName("api.v3.batch-produce.produce-to-topic")
    @POST
    @Consumes({Versions.JSON})
    @PerformanceMetric("v3.produce.produce-to-topic")
    public void produce(@Suspended AsyncResponse asyncResponse, @PathParam("clusterId") String str, @PathParam("topicName") String str2, @Valid ProduceBatchRequest produceBatchRequest) throws Exception {
        if (produceBatchRequest == null) {
            throw Errors.produceBatchException(Errors.PRODUCE_BATCH_EXCEPTION_NULL_MESSAGE);
        }
        HashSet hashSet = new HashSet();
        produceBatchRequest.getEntries().stream().forEach(produceBatchRequestEntry -> {
            if (!hashSet.add(produceBatchRequestEntry.getId().textValue())) {
                throw Errors.produceBatchException(Errors.PRODUCE_BATCH_EXCEPTION_IDS_NOT_DISTINCT_MESSAGE);
            }
        });
        int size = produceBatchRequest.getEntries().size();
        if (size == 0) {
            throw Errors.produceBatchException(Errors.PRODUCE_BATCH_EXCEPTION_EMPTY_BATCH_MESSAGE);
        }
        if (size > this.produceBatchMaximumEntries) {
            throw Errors.produceBatchException(Errors.PRODUCE_BATCH_EXCEPTION_TOO_MANY_ENTRIES_MESSAGE);
        }
        ArrayList arrayList = new ArrayList(size);
        ArrayList arrayList2 = new ArrayList(size);
        ProduceController produceController = (ProduceController) this.produceControllerProvider.get();
        ArrayList arrayList3 = new ArrayList(size);
        UnmodifiableIterator it = produceBatchRequest.getEntries().iterator();
        while (it.hasNext()) {
            arrayList3.add(produce(str, str2, (ProduceBatchRequestEntry) it.next(), produceController, (ProducerMetrics) this.producerMetricsProvider.get()));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList3.toArray(new CompletableFuture[0])).whenCompleteAsync((r13, th) -> {
            for (int i = 0; i < size; i++) {
                CompletableFuture completableFuture = (CompletableFuture) arrayList3.get(i);
                String textValue = ((ProduceBatchRequestEntry) produceBatchRequest.getEntries().get(i)).getId().textValue();
                completableFuture.handle((produceBatchResponseSuccessEntry, th) -> {
                    if (th != null) {
                        arrayList2.add(toErrorEntryForException(textValue, th));
                        return null;
                    }
                    arrayList.add(produceBatchResponseSuccessEntry);
                    return null;
                });
            }
            asyncResponse.resume(Response.status(207, "Multi-Status").entity(ProduceBatchResponse.builder().setSuccesses(arrayList).setFailures(arrayList2).build()).build());
        }, (Executor) this.executorService);
    }

    private CompletableFuture<ProduceBatchResponseSuccessEntry> produce(String str, String str2, ProduceBatchRequestEntry produceBatchRequestEntry, ProduceController produceController, ProducerMetrics producerMetrics) {
        long nanoTime = System.nanoTime();
        try {
            try {
                this.produceRateLimiters.rateLimit(str, produceBatchRequestEntry.getOriginalSize(), this.httpServletRequest);
                recordRequestMetrics(producerMetrics, produceBatchRequestEntry.getOriginalSize());
                produceBatchRequestEntry.getPartitionId().ifPresent(num -> {
                    if (num.intValue() < 0) {
                        throw Errors.partitionNotFoundException();
                    }
                });
                Optional<RegisteredSchema> flatMap = produceBatchRequestEntry.getKey().flatMap(produceRequestData -> {
                    return getSchema(str2, true, produceRequestData);
                });
                Optional<EmbeddedFormat> optional = (Optional) flatMap.map(registeredSchema -> {
                    return Optional.of(registeredSchema.getFormat());
                }).orElse(produceBatchRequestEntry.getKey().flatMap((v0) -> {
                    return v0.getFormat();
                }));
                Optional<ByteString> serialize = serialize(str2, optional, flatMap, produceBatchRequestEntry.getKey(), true);
                Optional<RegisteredSchema> flatMap2 = produceBatchRequestEntry.getValue().flatMap(produceRequestData2 -> {
                    return getSchema(str2, false, produceRequestData2);
                });
                Optional<EmbeddedFormat> optional2 = (Optional) flatMap2.map(registeredSchema2 -> {
                    return Optional.of(registeredSchema2.getFormat());
                }).orElse(produceBatchRequestEntry.getValue().flatMap((v0) -> {
                    return v0.getFormat();
                }));
                return produceController.produce(str, str2, produceBatchRequestEntry.getPartitionId(), (Multimap) produceBatchRequestEntry.getHeaders().stream().collect(PRODUCE_REQUEST_HEADER_COLLECTOR), serialize, serialize(str2, optional2, flatMap2, produceBatchRequestEntry.getValue(), false), produceBatchRequestEntry.getTimestamp().orElse(Instant.now())).handleAsync((produceResult, th) -> {
                    if (th == null) {
                        return produceResult;
                    }
                    recordErrorMetrics(producerMetrics, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
                    throw new StacklessCompletionException(th);
                }, (Executor) this.executorService).thenApplyAsync((Function<? super U, ? extends U>) produceResult2 -> {
                    ProduceBatchResponseSuccessEntry responseSuccessEntry = toResponseSuccessEntry(produceBatchRequestEntry.getId().textValue(), str, str2, optional, flatMap, optional2, flatMap2, produceResult2);
                    recordResponseMetrics(producerMetrics, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime));
                    return responseSuccessEntry;
                }, (Executor) this.executorService);
            } catch (RateLimitExceededException e) {
                recordRateLimitedMetrics(producerMetrics);
                throw new StacklessCompletionException(e);
            }
        } catch (Throwable th2) {
            CompletableFuture<ProduceBatchResponseSuccessEntry> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(th2);
            return completableFuture;
        }
    }

    private Optional<RegisteredSchema> getSchema(String str, boolean z, ProduceRequest.ProduceRequestData produceRequestData) {
        if (produceRequestData.getFormat().isPresent() && !produceRequestData.getFormat().get().requiresSchema()) {
            return Optional.empty();
        }
        try {
            return Optional.of(((SchemaManager) this.schemaManagerProvider.get()).getSchema(str, produceRequestData.getFormat(), produceRequestData.getSubject(), produceRequestData.getSubjectNameStrategy().map(Function.identity()), produceRequestData.getSchemaId(), produceRequestData.getSchemaVersion(), produceRequestData.getRawSchema(), z));
        } catch (IllegalArgumentException e) {
            throw new BadRequestException(e.getMessage(), e);
        } catch (SerializationException e2) {
            throw Errors.messageSerializationException(e2.getMessage());
        }
    }

    private Optional<ByteString> serialize(String str, Optional<EmbeddedFormat> optional, Optional<RegisteredSchema> optional2, Optional<ProduceRequest.ProduceRequestData> optional3, boolean z) {
        return ((RecordSerializer) this.recordSerializerProvider.get()).serialize(optional.orElse(EmbeddedFormat.BINARY), str, optional2, (JsonNode) optional3.map((v0) -> {
            return v0.getData();
        }).orElse(NullNode.getInstance()), z);
    }

    private static ProduceBatchResponseSuccessEntry toResponseSuccessEntry(String str, String str2, String str3, Optional<EmbeddedFormat> optional, Optional<RegisteredSchema> optional2, Optional<EmbeddedFormat> optional3, Optional<RegisteredSchema> optional4, ProduceResult produceResult) {
        return ProduceBatchResponseSuccessEntry.builder().setId(str).setClusterId(str2).setTopicName(str3).setPartitionId(Integer.valueOf(produceResult.getPartitionId())).setOffset(Long.valueOf(produceResult.getOffset())).setTimestamp(produceResult.getTimestamp()).setKey((Optional<ProduceResponse.ProduceResponseData>) optional.map(embeddedFormat -> {
            return ProduceResponse.ProduceResponseData.builder().setType((Optional<EmbeddedFormat>) optional).setSubject(optional2.map((v0) -> {
                return v0.getSubject();
            })).setSchemaId(optional2.map((v0) -> {
                return v0.getSchemaId();
            })).setSchemaVersion(optional2.map((v0) -> {
                return v0.getSchemaVersion();
            })).setSize(produceResult.getSerializedKeySize()).build();
        })).setValue((Optional<ProduceResponse.ProduceResponseData>) optional3.map(embeddedFormat2 -> {
            return ProduceResponse.ProduceResponseData.builder().setType((Optional<EmbeddedFormat>) optional3).setSubject(optional4.map((v0) -> {
                return v0.getSubject();
            })).setSchemaId(optional4.map((v0) -> {
                return v0.getSchemaId();
            })).setSchemaVersion(optional4.map((v0) -> {
                return v0.getSchemaVersion();
            })).setSize(produceResult.getSerializedValueSize()).build();
        })).build();
    }

    private ProduceBatchResponseFailureEntry toErrorEntryForException(String str, Throwable th) {
        ErrorResponse errorResponse = th instanceof CompletionException ? StreamingResponse.toErrorResponse(((CompletionException) th).getCause()) : StreamingResponse.toErrorResponse(th);
        ProduceBatchResponseFailureEntry.Builder builder = ProduceBatchResponseFailureEntry.builder();
        builder.setId(str);
        builder.setErrorCode(errorResponse.getErrorCode());
        builder.setMessage(errorResponse.getMessage());
        return builder.build();
    }

    private void recordResponseMetrics(ProducerMetrics producerMetrics, long j) {
        producerMetrics.recordResponse();
        producerMetrics.recordRequestLatency(j);
    }

    private void recordErrorMetrics(ProducerMetrics producerMetrics, long j) {
        producerMetrics.recordError();
        producerMetrics.recordRequestLatency(j);
    }

    private void recordRateLimitedMetrics(ProducerMetrics producerMetrics) {
        producerMetrics.recordRateLimited();
    }

    private void recordRequestMetrics(ProducerMetrics producerMetrics, long j) {
        producerMetrics.recordRequest();
        producerMetrics.recordRequestSize(j);
    }
}
