/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class KafkaStreams {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
    private static final String JMX_PREFIX = "kafka.streams";
    private static final int DEFAULT_CLOSE_TIMEOUT = 0;
    private GlobalStreamThread globalStreamThread;
    private final ScheduledExecutorService stateDirCleaner;
    private final StreamThread[] threads;
    private final Metrics metrics;
    private final QueryableStoreProvider queryableStoreProvider;
    private final UUID processId;
    private final String logPrefix;
    private final StreamsMetadataState streamsMetadataState;
    private final StreamsConfig config;
    private final StateDirectory stateDirectory;
    private final Object stateLock = new Object();
    private volatile State state = State.CREATED;
    private StateListener stateListener = null;

    public void setStateListener(StateListener listener) {
        this.stateListener = listener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean setState(State newState) {
        State oldState;
        Object object = this.stateLock;
        synchronized (object) {
            if (newState != State.NOT_RUNNING && (this.state == State.NOT_RUNNING || this.state == State.PENDING_SHUTDOWN)) {
                return false;
            }
            oldState = this.state;
            if (!this.state.isValidTransition(newState)) {
                log.warn("{} Unexpected state transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
                throw new StreamsException(this.logPrefix + " Unexpected state transition from " + (Object)((Object)oldState) + " to " + (Object)((Object)newState));
            }
            log.info("{} State transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
            this.state = newState;
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(this.state, oldState);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setRunningFromCreated() {
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state != State.CREATED) {
                log.error("{} Unexpected state transition from {} to {}", new Object[]{this.logPrefix, this.state, State.RUNNING});
                throw new IllegalStateException(this.logPrefix + " Unexpected state transition from " + (Object)((Object)this.state) + " to " + (Object)((Object)State.RUNNING));
            }
            this.state = State.RUNNING;
            this.stateLock.notifyAll();
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(State.RUNNING, State.CREATED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public State state() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state;
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    public KafkaStreams(TopologyBuilder builder, Properties props) {
        this(builder, new StreamsConfig(props), new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
        this(builder, config, new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier) {
        Time time = Time.SYSTEM;
        this.processId = UUID.randomUUID();
        this.config = config;
        String applicationId = config.getString("application.id");
        builder.setApplicationId(applicationId);
        String clientId = config.getString("client.id");
        if (clientId.length() <= 0) {
            clientId = applicationId + "-" + this.processId;
        }
        this.logPrefix = String.format("stream-client [%s]", clientId);
        List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName((String)config.getString("metrics.recording.level"))).timeWindow(config.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        this.metrics = new Metrics(metricConfig, reporters, time);
        this.threads = new StreamThread[config.getInt("num.stream.threads").intValue()];
        HashMap<Long, StreamThread.State> threadState = new HashMap<Long, StreamThread.State>(this.threads.length);
        GlobalStreamThread.State globalThreadState = null;
        ArrayList<StateStoreProvider> storeProviders = new ArrayList<StateStoreProvider>();
        this.streamsMetadataState = new StreamsMetadataState(builder, KafkaStreams.parseHostInfo(config.getString("application.server")));
        ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
        if (config.getLong("cache.max.bytes.buffering") < 0L) {
            log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", (Object)this.logPrefix);
        }
        long cacheSizeBytes = Math.max(0L, config.getLong("cache.max.bytes.buffering") / (long)(config.getInt("num.stream.threads") + (globalTaskTopology == null ? 0 : 1)));
        this.stateDirectory = new StateDirectory(applicationId, config.getString("state.dir"), time);
        if (globalTaskTopology != null) {
            String globalThreadId = clientId + "-GlobalStreamThread";
            this.globalStreamThread = new GlobalStreamThread(globalTaskTopology, config, clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), this.stateDirectory, this.metrics, time, globalThreadId);
            globalThreadState = this.globalStreamThread.state();
        }
        for (int i = 0; i < this.threads.length; ++i) {
            this.threads[i] = new StreamThread(builder, config, clientSupplier, applicationId, clientId, this.processId, this.metrics, time, this.streamsMetadataState, cacheSizeBytes, this.stateDirectory);
            threadState.put(this.threads[i].getId(), this.threads[i].state());
            storeProviders.add(new StreamThreadStateStoreProvider(this.threads[i]));
        }
        StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
        if (globalTaskTopology != null) {
            this.globalStreamThread.setStateListener(streamStateListener);
        }
        for (StreamThread thread : this.threads) {
            thread.setStateListener(streamStateListener);
        }
        GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
        this.queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
        final String cleanupThreadName = clientId + "-CleanupThread";
        this.stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, cleanupThreadName);
                thread.setDaemon(true);
                return thread;
            }
        });
    }

    private static HostInfo parseHostInfo(String endPoint) {
        if (endPoint == null || endPoint.trim().isEmpty()) {
            return StreamsMetadataState.UNKNOWN_HOST;
        }
        String host = Utils.getHost((String)endPoint);
        Integer port = Utils.getPort((String)endPoint);
        if (host == null || port == null) {
            throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
        }
        return new HostInfo(host, port);
    }

    private void checkBrokerVersionCompatibility() throws StreamsException {
        StreamsKafkaClient client = new StreamsKafkaClient(this.config);
        client.checkBrokerCompatibility("exactly_once".equals(this.config.getString("processing.guarantee")));
        try {
            client.close();
        }
        catch (IOException e) {
            log.warn("{} Could not close StreamKafkaClient.", (Object)this.logPrefix, (Object)e);
        }
    }

    private void validateStartOnce() {
        try {
            this.setRunningFromCreated();
            return;
        }
        catch (StreamsException streamsException) {
            throw new IllegalStateException("Cannot start again.");
        }
    }

    public synchronized void start() throws IllegalStateException, StreamsException {
        log.debug("{} Starting Kafka Stream process.", (Object)this.logPrefix);
        this.validateStartOnce();
        this.checkBrokerVersionCompatibility();
        if (this.globalStreamThread != null) {
            this.globalStreamThread.start();
        }
        for (StreamThread thread : this.threads) {
            thread.start();
        }
        final Long cleanupDelay = this.config.getLong("state.cleanup.delay.ms");
        this.stateDirCleaner.scheduleAtFixedRate(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                Object object = KafkaStreams.this.stateLock;
                synchronized (object) {
                    if (KafkaStreams.this.state == State.RUNNING) {
                        KafkaStreams.this.stateDirectory.cleanRemovedTasks(cleanupDelay);
                    }
                }
            }
        }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
        log.info("{} Started Kafka Stream process", (Object)this.logPrefix);
    }

    public void close() {
        this.close(0L, TimeUnit.SECONDS);
    }

    private boolean checkFirstTimeClosing() {
        return this.setState(State.PENDING_SHUTDOWN);
    }

    private void closeGlobalStreamThread() {
        if (this.globalStreamThread != null) {
            this.globalStreamThread.setStateListener(null);
            this.globalStreamThread.close();
            if (!this.globalStreamThread.stillRunning()) {
                try {
                    this.globalStreamThread.join();
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            this.globalStreamThread = null;
        }
    }

    public synchronized boolean close(long timeout, TimeUnit timeUnit) {
        log.debug("{} Stopping Kafka Stream process.", (Object)this.logPrefix);
        if (!this.checkFirstTimeClosing()) {
            return true;
        }
        this.stateDirCleaner.shutdownNow();
        Thread shutdown = new Thread(new Runnable(){

            @Override
            public void run() {
                for (StreamThread thread : KafkaStreams.this.threads) {
                    thread.setStateListener(null);
                    thread.close();
                }
                KafkaStreams.this.closeGlobalStreamThread();
                for (StreamThread thread : KafkaStreams.this.threads) {
                    try {
                        if (thread.stillRunning()) continue;
                        thread.join();
                    }
                    catch (InterruptedException ex) {
                        Thread.interrupted();
                    }
                }
                KafkaStreams.this.metrics.close();
                log.info("{} Stopped Kafka Streams process.", (Object)KafkaStreams.this.logPrefix);
            }
        }, "kafka-streams-close-thread");
        shutdown.setDaemon(true);
        shutdown.start();
        try {
            shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
        this.setState(State.NOT_RUNNING);
        return !shutdown.isAlive();
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder().append(indent).append("KafkaStreams processID: ").append(this.processId).append("\n");
        for (StreamThread thread : this.threads) {
            sb.append(thread.toString(indent + "\t"));
        }
        sb.append("\n");
        return sb.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isRunning() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.isRunning();
        }
    }

    public void cleanUp() {
        if (this.isRunning()) {
            throw new IllegalStateException("Cannot clean up while running.");
        }
        this.stateDirectory.cleanRemovedTasks(0L);
    }

    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh) {
        for (StreamThread thread : this.threads) {
            thread.setUncaughtExceptionHandler(eh);
        }
        if (this.globalStreamThread != null) {
            this.globalStreamThread.setUncaughtExceptionHandler(eh);
        }
    }

    public Collection<StreamsMetadata> allMetadata() {
        this.validateIsRunning();
        return this.streamsMetadataState.getAllMetadata();
    }

    public Collection<StreamsMetadata> allMetadataForStore(String storeName) {
        this.validateIsRunning();
        return this.streamsMetadataState.getAllMetadataForStore(storeName);
    }

    public <K> StreamsMetadata metadataForKey(String storeName, K key, Serializer<K> keySerializer) {
        this.validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
    }

    public <K> StreamsMetadata metadataForKey(String storeName, K key, StreamPartitioner<? super K, ?> partitioner) {
        this.validateIsRunning();
        return this.streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
    }

    public <T> T store(String storeName, QueryableStoreType<T> queryableStoreType) {
        this.validateIsRunning();
        return this.queryableStoreProvider.getStore(storeName, queryableStoreType);
    }

    private void validateIsRunning() {
        if (!this.isRunning()) {
            throw new IllegalStateException("KafkaStreams is not running. State is " + (Object)((Object)this.state) + ".");
        }
    }

    final class StreamStateListener
    implements StreamThread.StateListener {
        private final Map<Long, StreamThread.State> threadState;
        private GlobalStreamThread.State globalThreadState;

        StreamStateListener(Map<Long, StreamThread.State> threadState, GlobalStreamThread.State globalThreadState) {
            this.threadState = threadState;
            this.globalThreadState = globalThreadState;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void checkAllThreadsDeadAndSetError() {
            Object object = KafkaStreams.this.stateLock;
            synchronized (object) {
                if (KafkaStreams.this.state != State.PENDING_SHUTDOWN) {
                    for (StreamThread.State state : this.threadState.values()) {
                        if (state == StreamThread.State.DEAD) continue;
                        return;
                    }
                    log.warn("{} All stream threads have died. The Kafka Streams instance will be in an error state and should be closed.", (Object)KafkaStreams.this.logPrefix);
                    KafkaStreams.this.setState(State.ERROR);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void maybeSetErrorSinceGlobalStreamThreadIsDead() {
            Object object = KafkaStreams.this.stateLock;
            synchronized (object) {
                if (KafkaStreams.this.state != State.PENDING_SHUTDOWN) {
                    log.warn("{} Global Stream thread has died. The Kafka Streams instance will be in an error state and should be closed.", (Object)KafkaStreams.this.logPrefix);
                    KafkaStreams.this.setState(State.ERROR);
                }
            }
        }

        private void maybeSetRunning() {
            for (StreamThread.State state : this.threadState.values()) {
                if (state == StreamThread.State.RUNNING) continue;
                return;
            }
            if (this.globalThreadState != null && this.globalThreadState != GlobalStreamThread.State.RUNNING) {
                return;
            }
            KafkaStreams.this.setState(State.RUNNING);
        }

        @Override
        public synchronized void onChange(Thread thread, ThreadStateTransitionValidator abstractNewState, ThreadStateTransitionValidator abstractOldState) {
            if (thread instanceof StreamThread) {
                StreamThread.State newState = (StreamThread.State)abstractNewState;
                this.threadState.put(thread.getId(), newState);
                if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) {
                    KafkaStreams.this.setState(State.REBALANCING);
                } else if (newState == StreamThread.State.RUNNING && KafkaStreams.this.state() != State.RUNNING) {
                    this.maybeSetRunning();
                } else if (newState == StreamThread.State.DEAD) {
                    this.checkAllThreadsDeadAndSetError();
                }
            } else if (thread instanceof GlobalStreamThread) {
                GlobalStreamThread.State newState;
                this.globalThreadState = newState = (GlobalStreamThread.State)abstractNewState;
                if (newState == GlobalStreamThread.State.DEAD) {
                    this.maybeSetErrorSinceGlobalStreamThreadIsDead();
                }
            }
        }
    }

    public static interface StateListener {
        public void onChange(State var1, State var2);
    }

    public static enum State {
        CREATED(1, 2, 3, 5),
        REBALANCING(1, 2, 3, 5),
        RUNNING(1, 3, 5),
        PENDING_SHUTDOWN(4),
        NOT_RUNNING(new Integer[0]),
        ERROR(3);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            return this.equals((Object)RUNNING) || this.equals((Object)REBALANCING);
        }

        public boolean isValidTransition(State newState) {
            return this.validTransitions.contains(newState.ordinal());
        }
    }
}

