package io.confluent.ksql.util;

import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.QueryMetadataImpl;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/util/SandboxedSharedKafkaStreamsRuntimeImpl.class */
public class SandboxedSharedKafkaStreamsRuntimeImpl extends SharedKafkaStreamsRuntime {
    private final Logger log;

    public SandboxedSharedKafkaStreamsRuntimeImpl(SharedKafkaStreamsRuntime sharedKafkaStreamsRuntime) {
        super(sharedKafkaStreamsRuntime.getKafkaStreamsBuilder(), getSandboxStreamsProperties(sharedKafkaStreamsRuntime));
        this.log = LoggerFactory.getLogger(SandboxedSharedKafkaStreamsRuntimeImpl.class);
        Collection<?> collection = (Collection) this.collocatedQueries.values().stream().map((v0) -> {
            return v0.mo276getTopology();
        }).collect(Collectors.toSet());
        if (!collection.containsAll(sharedKafkaStreamsRuntime.kafkaStreams.getAllTopologies()) || !sharedKafkaStreamsRuntime.kafkaStreams.getAllTopologies().containsAll(collection)) {
            this.log.warn("Streams topologies and registered queries do not align. \nmetadata: {} \nstreams: {}", collection.stream().map(namedTopology -> {
                return namedTopology.name();
            }).collect(Collectors.toList()), sharedKafkaStreamsRuntime.kafkaStreams.getAllTopologies().stream().map(namedTopology2 -> {
                return namedTopology2.name();
            }).collect(Collectors.toList()));
        }
        this.collocatedQueries.putAll(sharedKafkaStreamsRuntime.collocatedQueries);
        Iterator<BinPackedPersistentQueryMetadataImpl> it = sharedKafkaStreamsRuntime.collocatedQueries.values().iterator();
        while (it.hasNext()) {
            this.kafkaStreams.addNamedTopology(it.next().getTopologyCopy(this));
        }
    }

    public SandboxedSharedKafkaStreamsRuntimeImpl(KafkaStreamsBuilder kafkaStreamsBuilder, Map<String, Object> map) {
        super(kafkaStreamsBuilder, map);
        this.log = LoggerFactory.getLogger(SandboxedSharedKafkaStreamsRuntimeImpl.class);
    }

    private static Map<String, Object> getSandboxStreamsProperties(SharedKafkaStreamsRuntime sharedKafkaStreamsRuntime) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(sharedKafkaStreamsRuntime.getStreamProperties());
        concurrentHashMap.put("application.id", sharedKafkaStreamsRuntime.getStreamProperties().get("application.id") + "-validation");
        return concurrentHashMap;
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void register(BinPackedPersistentQueryMetadataImpl binPackedPersistentQueryMetadataImpl) {
        QueryId queryId = binPackedPersistentQueryMetadataImpl.getQueryId();
        this.log.info("Registering query {} for validation for runtime {}", queryId, getApplicationId());
        this.collocatedQueries.put(queryId, binPackedPersistentQueryMetadataImpl);
        try {
            if (this.kafkaStreams.getTopologyByName(queryId.toString()).isPresent()) {
                this.kafkaStreams.removeNamedTopology(queryId.toString(), false).all().get();
            }
            this.kafkaStreams.addNamedTopology(binPackedPersistentQueryMetadataImpl.getTopologyCopy(this)).all().get();
            this.log.info("Registered query: {}  in {} \nRuntime {} is executing these queries: {}", new Object[]{queryId, getApplicationId(), getApplicationId(), this.collocatedQueries.keySet().stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(", "))});
        } catch (Throwable th) {
            throw new IllegalStateException(String.format("Encountered an error when trying to add query %s to runtime: %s", queryId, getApplicationId()), (!(th instanceof ExecutionException) || th.getCause() == null) ? th : th.getCause());
        }
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void stop(QueryId queryId, boolean z) {
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public QueryMetadataImpl.TimeBoundedQueue getNewQueryErrorQueue() {
        return new QueryMetadataImpl.TimeBoundedQueue(Duration.ofHours(1L), 0);
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public synchronized void close() {
        this.log.info("Closing validation runtime {}", getApplicationId());
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void start(QueryId queryId) {
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void overrideStreamsProperties(Map<String, Object> map) {
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void restartStreamsRuntime() {
    }
}
