package io.trino.tempto.fulfillment.table.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Shorts;
import com.google.inject.Injector;
import io.trino.tempto.configuration.Configuration;
import io.trino.tempto.fulfillment.table.MutableTableRequirement;
import io.trino.tempto.fulfillment.table.TableDefinition;
import io.trino.tempto.fulfillment.table.TableHandle;
import io.trino.tempto.fulfillment.table.TableInstance;
import io.trino.tempto.fulfillment.table.TableManager;
import io.trino.tempto.internal.fulfillment.table.TableName;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Singleton
@TableManager.Descriptor(tableDefinitionClass = KafkaTableDefinition.class, type = "KAFKA")
/* loaded from: input_file:io/trino/tempto/fulfillment/table/kafka/KafkaTableManager.class */
public class KafkaTableManager implements TableManager<KafkaTableDefinition> {
    private final String databaseName;
    private final Configuration brokerConfiguration;

    @Inject
    public KafkaTableManager(@Named("databaseName") String str, @Named("broker") Configuration configuration, Injector injector) {
        this.databaseName = (String) Objects.requireNonNull(str, "databaseName is null");
        this.brokerConfiguration = (Configuration) Objects.requireNonNull(configuration, "brokerConfiguration is null");
        Objects.requireNonNull(injector, "injector is null");
    }

    public TableInstance<KafkaTableDefinition> createImmutable(KafkaTableDefinition kafkaTableDefinition, TableHandle tableHandle) {
        deleteTopic(kafkaTableDefinition.getTopic());
        createTopic(kafkaTableDefinition.getTopic(), kafkaTableDefinition.getPartitionsCount(), kafkaTableDefinition.getReplicationLevel());
        insertDataIntoTopic(kafkaTableDefinition.getTopic(), kafkaTableDefinition.getDataSource());
        return new KafkaTableInstance(new TableName((String) tableHandle.getDatabase().orElse(getDatabaseName()), tableHandle.getSchema(), tableHandle.getName(), tableHandle.getName()), kafkaTableDefinition);
    }

    private void deleteTopic(String str) {
        try {
            AdminClient adminClient = getAdminClient();
            try {
                if (((Set) adminClient.listTopics().names().get()).contains(str)) {
                    adminClient.deleteTopics(ImmutableList.of(str)).all().get();
                }
                if (adminClient != null) {
                    adminClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException("Could not delete topic " + str, e);
        }
    }

    private void createTopic(String str, int i, int i2) {
        try {
            AdminClient adminClient = getAdminClient();
            try {
                adminClient.createTopics(ImmutableList.of(new NewTopic(str, i, Shorts.checkedCast(i2)))).all().get();
                if (adminClient != null) {
                    adminClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void insertDataIntoTopic(String str, KafkaDataSource kafkaDataSource) {
        KafkaProducer kafkaProducer = new KafkaProducer(getKafkaProperties());
        Iterator<KafkaMessage> messages = kafkaDataSource.getMessages();
        while (messages.hasNext()) {
            KafkaMessage next = messages.next();
            try {
                kafkaProducer.send(new ProducerRecord(str, next.getPartition().isPresent() ? Integer.valueOf(next.getPartition().getAsInt()) : null, next.getKey().orElse(null), next.getValue())).get();
            } catch (Exception e) {
                throw new RuntimeException("could not send message to topic " + str);
            }
        }
    }

    private Properties getKafkaProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerConfiguration.getStringMandatory("host") + ":" + this.brokerConfiguration.getIntMandatory("port"));
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        for (String str : this.brokerConfiguration.listKeys()) {
            if (!str.equals("host") && !str.equals("port")) {
                properties.put(str, this.brokerConfiguration.getStringMandatory(str));
            }
        }
        return properties;
    }

    public TableInstance<KafkaTableDefinition> createMutable(KafkaTableDefinition kafkaTableDefinition, MutableTableRequirement.State state, TableHandle tableHandle) {
        throw new IllegalArgumentException("Mutable tables are not supported by KafkaTableManager");
    }

    public void dropTable(TableName tableName) {
        throw new IllegalArgumentException("dropTable not supported by KafkaTableManager");
    }

    public void dropStaleMutableTables() {
    }

    public String getDatabaseName() {
        return this.databaseName;
    }

    public Class<? extends TableDefinition> getTableDefinitionClass() {
        return KafkaTableDefinition.class;
    }

    private AdminClient getAdminClient() {
        return KafkaAdminClient.create(getKafkaProperties());
    }
}
