package com.hazelcast.jet.kafka.impl;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.processor.TransactionPoolSnapshotUtility;
import com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.kafka.KafkaDataConnection;
import com.hazelcast.jet.pipeline.DataConnectionRef;
import com.hazelcast.logging.ILogger;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;

/* loaded from: input_file:com/hazelcast/jet/kafka/impl/WriteKafkaP.class */
public final class WriteKafkaP<T, K, V> implements Processor {
    public static final int TXN_POOL_SIZE = 2;
    private final FunctionEx<String, KafkaProducer<K, V>> getProducerFn;
    private final Function<? super T, ? extends ProducerRecord<K, V>> toRecordFn;
    private final boolean exactlyOnce;
    private Processor.Context context;
    private TransactionPoolSnapshotUtility<KafkaTransactionId, KafkaTransaction<K, V>> snapshotUtility;
    private final AtomicReference<Throwable> lastError;
    private final Callback callback;

    /* loaded from: input_file:com/hazelcast/jet/kafka/impl/WriteKafkaP$KafkaTransaction.class */
    private static final class KafkaTransaction<K, V> implements TwoPhaseSnapshotCommitUtility.TransactionalResource<KafkaTransactionId> {
        private final KafkaProducer<K, V> producer;
        private final ILogger logger;
        private final KafkaTransactionId transactionId;
        private boolean txnInitialized;

        private KafkaTransaction(KafkaTransactionId kafkaTransactionId, KafkaProducer<K, V> kafkaProducer, ILogger iLogger) {
            this.transactionId = kafkaTransactionId;
            this.producer = kafkaProducer;
            this.logger = iLogger;
        }

        /* renamed from: id, reason: merged with bridge method [inline-methods] */
        public KafkaTransactionId m5id() {
            return this.transactionId;
        }

        public void begin() {
            if (!this.txnInitialized) {
                LoggingUtil.logFine(this.logger, "initTransactions in begin %s", this.transactionId);
                this.txnInitialized = true;
                this.producer.initTransactions();
                this.transactionId.updateProducerAndEpoch(this.producer);
            }
            this.producer.beginTransaction();
        }

        public boolean flush() {
            this.producer.flush();
            return true;
        }

        public void commit() {
            if (this.transactionId != null) {
                this.producer.commitTransaction();
            }
        }

        public void rollback() {
            if (this.transactionId != null) {
                this.producer.abortTransaction();
            }
        }

        public void release() {
            this.producer.close(Duration.ZERO);
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/kafka/impl/WriteKafkaP$KafkaTransactionId.class */
    public static class KafkaTransactionId implements TwoPhaseSnapshotCommitUtility.TransactionId, Serializable {
        private static final long serialVersionUID = 1;
        private final int processorIndex;
        private long producerId = -1;
        private short epoch = -1;
        private final String kafkaId;
        private final int hashCode;

        KafkaTransactionId(long j, String str, @Nonnull String str2, int i, int i2) {
            this.processorIndex = i;
            this.kafkaId = "jet.job-" + Util.idToString(j) + '.' + sanitize(str) + '.' + sanitize(str2) + '.' + i + "-" + i2;
            this.hashCode = Objects.hash(Long.valueOf(j), str2, Integer.valueOf(i));
        }

        public int index() {
            return this.processorIndex;
        }

        long producerId() {
            return this.producerId;
        }

        short epoch() {
            return this.epoch;
        }

        void updateProducerAndEpoch(KafkaProducer<?, ?> kafkaProducer) {
            this.producerId = ResumeTransactionUtil.getProducerId(kafkaProducer);
            this.epoch = ResumeTransactionUtil.getEpoch(kafkaProducer);
        }

        public String toString() {
            return getKafkaId() + ",producerId=" + this.producerId + ",epoch=" + ((int) this.epoch);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return getKafkaId().equals(((KafkaTransactionId) obj).getKafkaId());
        }

        public int hashCode() {
            return this.hashCode;
        }

        @Nonnull
        String getKafkaId() {
            return this.kafkaId;
        }

        private static String sanitize(String str) {
            return str == null ? "" : str.replaceAll("[^\\p{Alnum}.\\-_$#/{}\\[\\]]", "_");
        }
    }

    private WriteKafkaP(@Nonnull FunctionEx<String, KafkaProducer<K, V>> functionEx, @Nonnull Function<? super T, ? extends ProducerRecord<K, V>> function, boolean z) {
        this.lastError = new AtomicReference<>();
        this.callback = (recordMetadata, exc) -> {
            if (exc != null) {
                this.lastError.compareAndSet(null, exc);
            }
        };
        this.getProducerFn = functionEx;
        this.toRecordFn = function;
        this.exactlyOnce = z;
    }

    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.context = context;
        this.snapshotUtility = new TransactionPoolSnapshotUtility<>(outbox, context, false, (context.processingGuarantee() != ProcessingGuarantee.EXACTLY_ONCE || this.exactlyOnce) ? context.processingGuarantee() : ProcessingGuarantee.AT_LEAST_ONCE, 2, (num, num2) -> {
            return new KafkaTransactionId(context.jobId(), context.jobConfig().getName(), context.vertexName(), num.intValue(), num2.intValue());
        }, kafkaTransactionId -> {
            return new KafkaTransaction(kafkaTransactionId, (KafkaProducer) this.getProducerFn.apply(kafkaTransactionId == null ? null : kafkaTransactionId.getKafkaId()), context.logger());
        }, kafkaTransactionId2 -> {
            try {
                recoverTransaction(kafkaTransactionId2, true);
            } catch (ProducerFencedException e) {
                context.logger().warning("Failed to finish the commit of a transaction ID saved in the snapshot, data loss can occur. Transaction id: " + kafkaTransactionId2.getKafkaId(), e);
            }
        }, kafkaTransactionId3 -> {
            recoverTransaction(kafkaTransactionId3, false);
        });
    }

    public boolean tryProcess() {
        checkError();
        return this.snapshotUtility.tryProcess();
    }

    public void process(int i, @Nonnull Inbox inbox) {
        KafkaTransaction kafkaTransaction = (KafkaTransaction) this.snapshotUtility.activeTransaction();
        if (kafkaTransaction == null) {
            return;
        }
        checkError();
        while (true) {
            Object peek = inbox.peek();
            if (peek == null) {
                return;
            }
            try {
                kafkaTransaction.producer.send(this.toRecordFn.apply(peek), this.callback);
                inbox.remove();
            } catch (TimeoutException e) {
                return;
            }
        }
    }

    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    public boolean complete() {
        KafkaTransaction kafkaTransaction = (KafkaTransaction) this.snapshotUtility.activeTransaction();
        if (kafkaTransaction == null) {
            return false;
        }
        kafkaTransaction.producer.flush();
        LoggingUtil.logFinest(this.context.logger(), "flush in complete() done, %s", kafkaTransaction.transactionId);
        checkError();
        this.snapshotUtility.afterCompleted();
        return true;
    }

    public boolean snapshotCommitPrepare() {
        if (!this.snapshotUtility.snapshotCommitPrepare()) {
            return false;
        }
        checkError();
        return true;
    }

    public boolean snapshotCommitFinish(boolean z) {
        return this.snapshotUtility.snapshotCommitFinish(z);
    }

    public void restoreFromSnapshot(@Nonnull Inbox inbox) {
        this.snapshotUtility.restoreFromSnapshot(inbox);
    }

    public boolean isCooperative() {
        return false;
    }

    private void recoverTransaction(KafkaTransactionId kafkaTransactionId, boolean z) {
        KafkaProducer kafkaProducer = (KafkaProducer) this.getProducerFn.apply(kafkaTransactionId.getKafkaId());
        Throwable th = null;
        try {
            try {
                if (z) {
                    ResumeTransactionUtil.resumeTransaction(kafkaProducer, kafkaTransactionId.producerId(), kafkaTransactionId.epoch());
                    try {
                        kafkaProducer.commitTransaction();
                    } catch (InvalidTxnStateException e) {
                        this.context.logger().fine("Failed to commit transaction with ID restored from the snapshot. This happens normally when the transaction was committed in phase 2 of the snapshot and can be ignored, but can happen also if the transaction wasn't committed in phase 2 and the broker lost it (in this case data written in it is lost). Transaction ID: " + kafkaTransactionId, e);
                    }
                } else {
                    kafkaProducer.initTransactions();
                }
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    public void close() {
        if (this.snapshotUtility != null) {
            this.snapshotUtility.close();
        }
    }

    private void checkError() {
        Throwable th = this.lastError.get();
        if (th != null) {
            throw ExceptionUtil.sneakyThrow(th);
        }
    }

    public static <T, K, V> SupplierEx<Processor> supplier(@Nonnull Properties properties, @Nonnull Function<? super T, ? extends ProducerRecord<K, V>> function, boolean z) {
        if (properties.containsKey("transactional.id")) {
            throw new IllegalArgumentException("Property `transactional.id` must not be set, Jet sets it as needed");
        }
        return () -> {
            return new WriteKafkaP(str -> {
                HashMap hashMap = new HashMap(properties);
                if (str != null) {
                    hashMap.put("transactional.id", str);
                }
                return new KafkaProducer(hashMap);
            }, function, z);
        };
    }

    public static <T, K, V> ProcessorSupplier supplier(@Nonnull final DataConnectionRef dataConnectionRef, @Nonnull final Function<? super T, ? extends ProducerRecord<K, V>> function, final boolean z) {
        return new ProcessorSupplier() { // from class: com.hazelcast.jet.kafka.impl.WriteKafkaP.1
            private transient KafkaDataConnection kafkaDataConnection;

            public void init(@Nonnull ProcessorSupplier.Context context) {
                this.kafkaDataConnection = context.dataConnectionService().getAndRetainDataConnection(dataConnectionRef.getName(), KafkaDataConnection.class);
            }

            @Nonnull
            public Collection<? extends Processor> get(int i) {
                IntStream range = IntStream.range(0, i);
                Function function2 = function;
                boolean z2 = z;
                return (Collection) range.mapToObj(i2 -> {
                    return new WriteKafkaP(str -> {
                        return this.kafkaDataConnection.getProducer(str);
                    }, function2, z2);
                }).collect(Collectors.toList());
            }

            public void close(@Nullable Throwable th) {
                if (this.kafkaDataConnection != null) {
                    this.kafkaDataConnection.release();
                }
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z2 = -1;
                switch (implMethodName.hashCode()) {
                    case -1680124038:
                        if (implMethodName.equals("lambda$null$ef2034f3$1")) {
                            z2 = false;
                            break;
                        }
                        break;
                }
                switch (z2) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP$1") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lorg/apache/kafka/clients/producer/KafkaProducer;")) {
                            AnonymousClass1 anonymousClass1 = (AnonymousClass1) serializedLambda.getCapturedArg(0);
                            return str -> {
                                return this.kafkaDataConnection.getProducer(str);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    public static <T, K, V> ProcessorSupplier supplier(@Nonnull final DataConnectionRef dataConnectionRef, @Nonnull final Properties properties, @Nonnull final Function<? super T, ? extends ProducerRecord<K, V>> function, final boolean z) {
        return new ProcessorSupplier() { // from class: com.hazelcast.jet.kafka.impl.WriteKafkaP.2
            private transient KafkaDataConnection kafkaDataConnection;

            public void init(@Nonnull ProcessorSupplier.Context context) {
                this.kafkaDataConnection = context.dataConnectionService().getAndRetainDataConnection(dataConnectionRef.getName(), KafkaDataConnection.class);
            }

            @Nonnull
            public Collection<? extends Processor> get(int i) {
                IntStream range = IntStream.range(0, i);
                Properties properties2 = properties;
                Function function2 = function;
                boolean z2 = z;
                return (Collection) range.mapToObj(i2 -> {
                    return new WriteKafkaP(str -> {
                        return this.kafkaDataConnection.getProducer(str, properties2);
                    }, function2, z2);
                }).collect(Collectors.toList());
            }

            public void close(@Nullable Throwable th) {
                if (this.kafkaDataConnection != null) {
                    this.kafkaDataConnection.release();
                }
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z2 = -1;
                switch (implMethodName.hashCode()) {
                    case 647304035:
                        if (implMethodName.equals("lambda$null$c2f765a8$1")) {
                            z2 = false;
                            break;
                        }
                        break;
                }
                switch (z2) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP$2") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Properties;Ljava/lang/String;)Lorg/apache/kafka/clients/producer/KafkaProducer;")) {
                            AnonymousClass2 anonymousClass2 = (AnonymousClass2) serializedLambda.getCapturedArg(0);
                            Properties properties2 = (Properties) serializedLambda.getCapturedArg(1);
                            return str -> {
                                return this.kafkaDataConnection.getProducer(str, properties2);
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -777998421:
                if (implMethodName.equals("lambda$init$b1f4d90e$1")) {
                    z = 5;
                    break;
                }
                break;
            case -741216415:
                if (implMethodName.equals("lambda$init$89868699$1")) {
                    z = false;
                    break;
                }
                break;
            case -471022139:
                if (implMethodName.equals("lambda$init$dd3778fb$1")) {
                    z = 4;
                    break;
                }
                break;
            case -56658648:
                if (implMethodName.equals("lambda$supplier$f5abb695$1")) {
                    z = 3;
                    break;
                }
                break;
            case 647304035:
                if (implMethodName.equals("lambda$null$c2f765a8$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1332500682:
                if (implMethodName.equals("lambda$init$75be80e2$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;Lcom/hazelcast/jet/kafka/impl/WriteKafkaP$KafkaTransactionId;)V")) {
                    WriteKafkaP writeKafkaP = (WriteKafkaP) serializedLambda.getCapturedArg(0);
                    Processor.Context context = (Processor.Context) serializedLambda.getCapturedArg(1);
                    return kafkaTransactionId2 -> {
                        try {
                            recoverTransaction(kafkaTransactionId2, true);
                        } catch (ProducerFencedException e) {
                            context.logger().warning("Failed to finish the commit of a transaction ID saved in the snapshot, data loss can occur. Transaction id: " + kafkaTransactionId2.getKafkaId(), e);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;Ljava/lang/Integer;Ljava/lang/Integer;)Lcom/hazelcast/jet/kafka/impl/WriteKafkaP$KafkaTransactionId;")) {
                    Processor.Context context2 = (Processor.Context) serializedLambda.getCapturedArg(0);
                    return (num, num2) -> {
                        return new KafkaTransactionId(context2.jobId(), context2.jobConfig().getName(), context2.vertexName(), num.intValue(), num2.intValue());
                    };
                }
                break;
            case TXN_POOL_SIZE /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Properties;Ljava/lang/String;)Lorg/apache/kafka/clients/producer/KafkaProducer;")) {
                    Properties properties = (Properties) serializedLambda.getCapturedArg(0);
                    return str -> {
                        HashMap hashMap = new HashMap(properties);
                        if (str != null) {
                            hashMap.put("transactional.id", str);
                        }
                        return new KafkaProducer(hashMap);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Properties;Ljava/util/function/Function;Z)Lcom/hazelcast/jet/core/Processor;")) {
                    Properties properties2 = (Properties) serializedLambda.getCapturedArg(0);
                    Function function = (Function) serializedLambda.getCapturedArg(1);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(2)).booleanValue();
                    return () -> {
                        return new WriteKafkaP(str2 -> {
                            HashMap hashMap = new HashMap(properties2);
                            if (str2 != null) {
                                hashMap.put("transactional.id", str2);
                            }
                            return new KafkaProducer(hashMap);
                        }, function, booleanValue);
                    };
                }
                break;
            case StreamKafkaP.PREFERRED_LOCAL_PARALLELISM /* 4 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;Lcom/hazelcast/jet/kafka/impl/WriteKafkaP$KafkaTransactionId;)Lcom/hazelcast/jet/kafka/impl/WriteKafkaP$KafkaTransaction;")) {
                    WriteKafkaP writeKafkaP2 = (WriteKafkaP) serializedLambda.getCapturedArg(0);
                    Processor.Context context3 = (Processor.Context) serializedLambda.getCapturedArg(1);
                    return kafkaTransactionId -> {
                        return new KafkaTransaction(kafkaTransactionId, (KafkaProducer) this.getProducerFn.apply(kafkaTransactionId == null ? null : kafkaTransactionId.getKafkaId()), context3.logger());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/WriteKafkaP") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/kafka/impl/WriteKafkaP$KafkaTransactionId;)V")) {
                    WriteKafkaP writeKafkaP3 = (WriteKafkaP) serializedLambda.getCapturedArg(0);
                    return kafkaTransactionId3 -> {
                        recoverTransaction(kafkaTransactionId3, false);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
