/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.StateTracker;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.tracing.Tracer;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class WorkerTask
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
    private static final String THREAD_NAME_PREFIX = "task-thread-";
    private final TaskStatus.Listener statusListener;
    private final StatusBackingStore statusBackingStore;
    protected final ConnectorTaskId id;
    protected final ClassLoader loader;
    protected final Time time;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final TaskMetricsGroup taskMetricsGroup;
    private volatile TargetState targetState;
    private volatile boolean stopping;
    private volatile boolean cancelled;
    protected final RetryWithToleranceOperator retryWithToleranceOperator;
    protected Optional<Tracer> tracer = Optional.empty();

    public WorkerTask(ConnectorTaskId id, TaskStatus.Listener statusListener, TargetState initialState, ClassLoader loader, ConnectMetrics connectMetrics, RetryWithToleranceOperator retryWithToleranceOperator, Time time, StatusBackingStore statusBackingStore) {
        this.id = id;
        this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
        this.statusListener = this.taskMetricsGroup;
        this.loader = loader;
        this.targetState = initialState;
        this.stopping = false;
        this.cancelled = false;
        this.taskMetricsGroup.recordState(this.targetState);
        this.retryWithToleranceOperator = retryWithToleranceOperator;
        this.time = time;
        this.statusBackingStore = statusBackingStore;
    }

    public ConnectorTaskId id() {
        return this.id;
    }

    public ClassLoader loader() {
        return this.loader;
    }

    public abstract void initialize(TaskConfig var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void triggerStop() {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            this.stopping = true;
            this.notifyAll();
        }
    }

    public void stop() {
        this.triggerStop();
    }

    public void cancel() {
        this.cancelled = true;
    }

    public boolean awaitStop(long timeoutMs) {
        try {
            return this.shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    public void removeMetrics() {
        this.taskMetricsGroup.close();
    }

    protected abstract void initializeAndStart();

    protected abstract void execute();

    protected abstract void close();

    protected boolean isStopping() {
        return this.stopping;
    }

    protected boolean isCancelled() {
        return this.cancelled;
    }

    public void doClose() {
        try {
            this.tracer.ifPresent(t -> Utils.closeQuietly((AutoCloseable)t, (String)("tracer id:" + t.tracingContext().traceID().toString())));
            this.close();
        }
        catch (Throwable t2) {
            log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", (Object)this, (Object)t2);
            throw t2;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void doRun() throws InterruptedException {
        try {
            WorkerTask workerTask = this;
            synchronized (workerTask) {
                if (this.stopping) {
                    // MONITOREXIT @DISABLED, blocks:[0, 6, 14] lbl5 : MonitorExitStatement: MONITOREXIT : var1_1
                    this.doClose();
                    return;
                }
                if (this.targetState == TargetState.PAUSED) {
                    this.onPause();
                    if (!this.awaitUnpause()) {
                        // MONITOREXIT @DISABLED, blocks:[0, 6, 12, 13] lbl11 : MonitorExitStatement: MONITOREXIT : var1_1
                        this.doClose();
                        return;
                    }
                }
            }
            this.tracer.ifPresent(Tracer::start);
            this.initializeAndStart();
            this.statusListener.onStartup(this.id);
            this.execute();
            return;
        }
        catch (Throwable t) {
            if (this.cancelled) {
                log.warn("{} After being scheduled for shutdown, the orphan task threw an uncaught exception. A newer instance of this task might be already running", (Object)this, (Object)t);
                return;
            }
            if (this.stopping) {
                log.warn("{} After being scheduled for shutdown, task threw an uncaught exception.", (Object)this, (Object)t);
                return;
            }
            log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", (Object)this, (Object)t);
            throw t;
        }
        catch (Throwable throwable) {
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onShutdown() {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            this.triggerStop();
            if (!this.cancelled) {
                this.statusListener.onShutdown(this.id);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onFailure(Throwable t) {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            this.triggerStop();
            if (!this.cancelled) {
                this.statusListener.onFailure(this.id, t);
            }
        }
    }

    protected synchronized void onPause() {
        this.statusListener.onPause(this.id);
    }

    protected synchronized void onResume() {
        this.statusListener.onResume(this.id);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block18: {
            LoggingContext.clear();
            try (LoggingContext loggingContext = LoggingContext.forTask(this.id());){
                ClassLoader savedLoader = Plugins.compareAndSwapLoaders(this.loader);
                String savedName = Thread.currentThread().getName();
                try {
                    Thread.currentThread().setName(THREAD_NAME_PREFIX + this.id);
                    this.doRun();
                    this.onShutdown();
                    Thread.currentThread().setName(savedName);
                }
                catch (Throwable t) {
                    try {
                        this.onFailure(t);
                        if (t instanceof Error) {
                            throw (Error)t;
                        }
                        break block18;
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                    finally {
                        Thread.currentThread().setName(savedName);
                        Plugins.compareAndSwapLoaders(savedLoader);
                        this.shutdownLatch.countDown();
                    }
                }
                Plugins.compareAndSwapLoaders(savedLoader);
                this.shutdownLatch.countDown();
            }
        }
    }

    public boolean shouldPause() {
        return this.targetState == TargetState.PAUSED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean awaitUnpause() throws InterruptedException {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            while (this.targetState == TargetState.PAUSED) {
                if (this.stopping) {
                    return false;
                }
                this.wait();
            }
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transitionTo(TargetState state) {
        WorkerTask workerTask = this;
        synchronized (workerTask) {
            if (this.stopping) {
                return;
            }
            this.targetState = state;
            this.notifyAll();
        }
    }

    @Confluent
    public WorkerTask useTracer(Tracer tracer) {
        this.tracer = Optional.of(tracer);
        return this;
    }

    protected void recordActiveTopic(String topic) {
        if (this.statusBackingStore.getTopic(this.id.connector(), topic) != null) {
            return;
        }
        this.statusBackingStore.put(new TopicStatus(topic, this.id, this.time.milliseconds()));
    }

    protected void recordCommitSuccess(long duration) {
        this.taskMetricsGroup.recordCommit(duration, true, null);
    }

    protected void recordCommitFailure(long duration, Throwable error) {
        this.taskMetricsGroup.recordCommit(duration, false, error);
    }

    protected void recordBatch(int size) {
        this.taskMetricsGroup.recordBatch(size);
    }

    TaskMetricsGroup taskMetricsGroup() {
        return this.taskMetricsGroup;
    }

    static class TaskMetricsGroup
    implements TaskStatus.Listener {
        private final TaskStatus.Listener delegateListener;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Time time;
        private final StateTracker taskStateTimer;
        private final Sensor commitTime;
        private final Sensor batchSize;
        private final Sensor commitAttempts;

        public TaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics, TaskStatus.Listener statusListener) {
            this.delegateListener = statusListener;
            this.time = connectMetrics.time();
            this.taskStateTimer = new StateTracker();
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.taskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task()));
            this.metricGroup.close();
            this.metricGroup.addValueMetric(registry.taskStatus, now -> this.taskStateTimer.currentState().toString().toLowerCase(Locale.getDefault()));
            this.addRatioMetric(AbstractStatus.State.RUNNING, registry.taskRunningRatio);
            this.addRatioMetric(AbstractStatus.State.PAUSED, registry.taskPauseRatio);
            this.commitTime = this.metricGroup.sensor("commit-time");
            this.commitTime.add(this.metricGroup.metricName(registry.taskCommitTimeMax), (MeasurableStat)new Max());
            this.commitTime.add(this.metricGroup.metricName(registry.taskCommitTimeAvg), (MeasurableStat)new Avg());
            this.batchSize = this.metricGroup.sensor("batch-size");
            this.batchSize.add(this.metricGroup.metricName(registry.taskBatchSizeMax), (MeasurableStat)new Max());
            this.batchSize.add(this.metricGroup.metricName(registry.taskBatchSizeAvg), (MeasurableStat)new Avg());
            MetricName offsetCommitFailures = this.metricGroup.metricName(registry.taskCommitFailurePercentage);
            MetricName offsetCommitSucceeds = this.metricGroup.metricName(registry.taskCommitSuccessPercentage);
            Frequencies commitFrequencies = Frequencies.forBooleanValues((MetricName)offsetCommitFailures, (MetricName)offsetCommitSucceeds);
            this.commitAttempts = this.metricGroup.sensor("offset-commit-completion");
            this.commitAttempts.add((CompoundStat)commitFrequencies);
        }

        private void addRatioMetric(AbstractStatus.State matchingState, MetricNameTemplate template) {
            MetricName metricName = this.metricGroup.metricName(template);
            this.metricGroup.metrics().addMetricIfAbsent(metricName, null, (MetricValueProvider)((Gauge)(config, now) -> this.taskStateTimer.durationRatio(matchingState, now)));
        }

        void close() {
            this.metricGroup.close();
        }

        void recordCommit(long duration, boolean success, Throwable error) {
            if (success) {
                this.commitTime.record((double)duration);
                this.commitAttempts.record(1.0);
            } else {
                this.commitAttempts.record(0.0);
            }
        }

        void recordBatch(int size) {
            this.batchSize.record((double)size);
        }

        @Override
        public void onStartup(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
            this.delegateListener.onStartup(id);
        }

        @Override
        public void onFailure(ConnectorTaskId id, Throwable cause) {
            this.taskStateTimer.changeState(AbstractStatus.State.FAILED, this.time.milliseconds());
            this.delegateListener.onFailure(id, cause);
        }

        @Override
        public void onPause(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.PAUSED, this.time.milliseconds());
            this.delegateListener.onPause(id);
        }

        @Override
        public void onResume(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
            this.delegateListener.onResume(id);
        }

        @Override
        public void onShutdown(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.UNASSIGNED, this.time.milliseconds());
            this.delegateListener.onShutdown(id);
        }

        @Override
        public void onDeletion(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.DESTROYED, this.time.milliseconds());
            this.delegateListener.onDeletion(id);
        }

        @Override
        public void onRestart(ConnectorTaskId id) {
            this.taskStateTimer.changeState(AbstractStatus.State.RESTARTING, this.time.milliseconds());
            this.delegateListener.onRestart(id);
        }

        public void recordState(TargetState state) {
            switch (state) {
                case STARTED: {
                    this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
                    break;
                }
                case PAUSED: {
                    this.taskStateTimer.changeState(AbstractStatus.State.PAUSED, this.time.milliseconds());
                    break;
                }
            }
        }

        public AbstractStatus.State state() {
            return this.taskStateTimer.currentState();
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }
}

