package com.hazelcast.jet.kafka;

import com.hazelcast.config.DataConnectionConfig;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.dataconnection.DataConnectionBase;
import com.hazelcast.dataconnection.DataConnectionResource;
import com.hazelcast.jet.impl.util.ConcurrentMemoizingSupplier;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.kafka.impl.NonClosingKafkaProducer;
import com.hazelcast.spi.annotation.Beta;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

@Beta
/* loaded from: input_file:com/hazelcast/jet/kafka/KafkaDataConnection.class */
public class KafkaDataConnection extends DataConnectionBase {
    private static final Properties EMPTY_PROPERTIES = new Properties();
    private volatile ConcurrentMemoizingSupplier<NonClosingKafkaProducer<?, ?>> producerSupplier;

    public KafkaDataConnection(@Nonnull DataConnectionConfig dataConnectionConfig) {
        super(dataConnectionConfig);
        this.producerSupplier = new ConcurrentMemoizingSupplier<>(() -> {
            return new NonClosingKafkaProducer(dataConnectionConfig.getProperties(), this::release);
        });
    }

    @Nonnull
    public Collection<DataConnectionResource> listResources() {
        try {
            AdminClient create = AdminClient.create(getConfig().getProperties());
            Throwable th = null;
            try {
                try {
                    Collection<DataConnectionResource> collection = (Collection) ((Set) create.listTopics().names().get()).stream().sorted().map(str -> {
                        return new DataConnectionResource("topic", str);
                    }).collect(Collectors.toList());
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    return collection;
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new HazelcastException("Could not get list of topics for DataConnection " + getConfig().getName(), e);
        }
    }

    @Nonnull
    public Collection<String> resourceTypes() {
        return Collections.singleton("Topic");
    }

    @Nonnull
    public <K, V> Consumer<K, V> newConsumer() {
        return newConsumer(EMPTY_PROPERTIES);
    }

    @Nonnull
    public <K, V> Consumer<K, V> newConsumer(Properties properties) {
        if (getConfig().isShared()) {
            throw new IllegalArgumentException("KafkaConsumer is not thread-safe and can't be used with shared DataConnection '" + getConfig().getName() + "'");
        }
        return new KafkaConsumer(Util.mergeProps(getConfig().getProperties(), properties));
    }

    @Nonnull
    public <K, V> KafkaProducer<K, V> getProducer(@Nullable String str) {
        if (getConfig().isShared()) {
            if (str != null) {
                throw new IllegalArgumentException("Cannot use transactions with shared KafkaProducer for DataConnection" + getConfig().getName());
            }
            retain();
            return (KafkaProducer) this.producerSupplier.get();
        }
        if (str == null) {
            return new KafkaProducer<>(getConfig().getProperties());
        }
        HashMap hashMap = new HashMap(getConfig().getProperties());
        hashMap.put("transactional.id", str);
        return new KafkaProducer<>(hashMap);
    }

    @Nonnull
    public <K, V> KafkaProducer<K, V> getProducer(@Nullable String str, @Nonnull Properties properties) {
        Properties properties2 = getConfig().getProperties();
        boolean inputPropsAreSubsetOfConfigProps = inputPropsAreSubsetOfConfigProps(properties);
        if (!getConfig().isShared()) {
            Properties mergeProps = Util.mergeProps(properties2, properties);
            if (str != null) {
                mergeProps.put("transactional.id", str);
            }
            return new KafkaProducer<>(mergeProps);
        }
        if (!properties.isEmpty() && !inputPropsAreSubsetOfConfigProps) {
            throw new HazelcastException("For shared Kafka producer, please provide all serialization optionsat the DATA CONNECTION level (i.e. 'key.serializer'). Only 'keyFormat' and 'valueFormat' are required at the mapping level, however these options are ignored currently.");
        }
        retain();
        return (KafkaProducer) this.producerSupplier.get();
    }

    public synchronized void destroy() {
        if (this.producerSupplier == null || this.producerSupplier.remembered() == null) {
            return;
        }
        ((NonClosingKafkaProducer) this.producerSupplier.remembered()).doClose();
        this.producerSupplier = null;
    }

    private boolean inputPropsAreSubsetOfConfigProps(Properties properties) {
        Properties properties2 = getConfig().getProperties();
        for (Object obj : properties.keySet()) {
            if (!properties2.containsKey(obj) || !properties2.get(obj).equals(properties.get(obj))) {
                return false;
            }
        }
        return true;
    }
}
