package org.apache.kafka.connect.runtime.errors;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.class */
public class RetryWithToleranceOperator<T> implements AutoCloseable {
    public static final long RETRIES_DELAY_MIN_MS = 300;
    private final long errorRetryTimeout;
    private final long errorMaxDelayInMillis;
    private final ToleranceType errorToleranceType;
    private long totalFailures;
    private final Time time;
    private final ErrorHandlingMetrics errorHandlingMetrics;
    private final CountDownLatch stopRequestedLatch;
    private volatile boolean stopping;
    private List<ErrorReporter<T>> reporters;
    private static final Logger log = LoggerFactory.getLogger(RetryWithToleranceOperator.class);
    private static final Map<Stage, Class<? extends Exception>> TOLERABLE_EXCEPTIONS = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType = new int[ToleranceType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType[ToleranceType.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType[ToleranceType.ALL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public RetryWithToleranceOperator(long j, long j2, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) {
        this(j, j2, toleranceType, time, errorHandlingMetrics, new CountDownLatch(1));
    }

    RetryWithToleranceOperator(long j, long j2, ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics, CountDownLatch countDownLatch) {
        this.totalFailures = 0L;
        this.errorRetryTimeout = j;
        this.errorMaxDelayInMillis = j2;
        this.errorToleranceType = toleranceType;
        this.time = time;
        this.errorHandlingMetrics = errorHandlingMetrics;
        this.stopRequestedLatch = countDownLatch;
        this.stopping = false;
        this.reporters = Collections.emptyList();
    }

    public Future<Void> executeFailed(ProcessingContext<T> processingContext, Stage stage, Class<?> cls, Throwable th) {
        markAsFailed();
        processingContext.currentContext(stage, cls);
        processingContext.error(th);
        this.errorHandlingMetrics.recordFailure();
        Future<Void> report = report(processingContext);
        if (withinToleranceLimits()) {
            return report;
        }
        this.errorHandlingMetrics.recordError();
        throw new ConnectException("Tolerance exceeded in error handler", th);
    }

    synchronized Future<Void> report(ProcessingContext<T> processingContext) {
        if (this.reporters.size() == 1) {
            return new WorkerErrantRecordReporter.ErrantRecordFuture(Collections.singletonList(this.reporters.iterator().next().report(processingContext)));
        }
        List list = (List) this.reporters.stream().map(errorReporter -> {
            return errorReporter.report(processingContext);
        }).filter(future -> {
            return !future.isDone();
        }).collect(Collectors.toList());
        return list.isEmpty() ? CompletableFuture.completedFuture(null) : new WorkerErrantRecordReporter.ErrantRecordFuture(list);
    }

    public <V> V execute(ProcessingContext<T> processingContext, Operation<V> operation, Stage stage, Class<?> cls) {
        processingContext.currentContext(stage, cls);
        if (processingContext.failed()) {
            log.debug("ProcessingContext is already in failed state. Ignoring requested operation.");
            return null;
        }
        processingContext.currentContext(stage, cls);
        try {
            V v = (V) execAndHandleError(processingContext, operation, TOLERABLE_EXCEPTIONS.getOrDefault(processingContext.stage(), RetriableException.class));
            if (processingContext.failed()) {
                this.errorHandlingMetrics.recordError();
                report(processingContext);
            }
            return v;
        } catch (Throwable th) {
            if (processingContext.failed()) {
                this.errorHandlingMetrics.recordError();
                report(processingContext);
            }
            throw th;
        }
    }

    protected <V> V execAndRetry(ProcessingContext<T> processingContext, Operation<V> operation) throws Exception {
        int i = 0;
        long milliseconds = this.time.milliseconds();
        long j = this.errorRetryTimeout >= 0 ? milliseconds + this.errorRetryTimeout : Long.MAX_VALUE;
        while (true) {
            try {
                try {
                    i++;
                    V call = operation.call();
                    processingContext.attempt(i);
                    return call;
                } catch (RetriableException e) {
                    log.trace("Caught a retriable exception while executing {} operation with {}", processingContext.stage(), processingContext.executingClass());
                    this.errorHandlingMetrics.recordFailure();
                    if (this.time.milliseconds() >= j) {
                        log.trace("Can't retry. start={}, attempt={}, deadline={}", new Object[]{Long.valueOf(milliseconds), Integer.valueOf(i), Long.valueOf(j)});
                        processingContext.error(e);
                        processingContext.attempt(i);
                        return null;
                    }
                    backoff(i, j);
                    this.errorHandlingMetrics.recordRetry();
                    if (this.stopping) {
                        log.trace("Shutdown has been scheduled. Marking operation as failed.");
                        processingContext.error(e);
                        processingContext.attempt(i);
                        return null;
                    }
                    processingContext.attempt(i);
                }
            } catch (Throwable th) {
                processingContext.attempt(i);
                throw th;
            }
        }
    }

    protected <V> V execAndHandleError(ProcessingContext<T> processingContext, Operation<V> operation, Class<? extends Exception> cls) {
        try {
            V v = (V) execAndRetry(processingContext, operation);
            if (processingContext.failed()) {
                markAsFailed();
                this.errorHandlingMetrics.recordSkipped();
            }
            return v;
        } catch (Exception e) {
            this.errorHandlingMetrics.recordFailure();
            markAsFailed();
            processingContext.error(e);
            if (!cls.isAssignableFrom(e.getClass())) {
                throw new ConnectException("Unhandled exception in error handler", e);
            }
            if (!withinToleranceLimits()) {
                throw new ConnectException("Tolerance exceeded in error handler", e);
            }
            this.errorHandlingMetrics.recordSkipped();
            return null;
        }
    }

    synchronized void markAsFailed() {
        this.errorHandlingMetrics.recordErrorTimestamp();
        this.totalFailures++;
    }

    public synchronized boolean withinToleranceLimits() {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$connect$runtime$errors$ToleranceType[this.errorToleranceType.ordinal()]) {
            case 1:
                return this.totalFailures <= 0;
            case IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2 /* 2 */:
                return true;
            default:
                throw new ConfigException("Unknown tolerance type: {}", this.errorToleranceType);
        }
    }

    public ToleranceType getErrorToleranceType() {
        return this.errorToleranceType;
    }

    void backoff(int i, long j) {
        long j2 = 300 << (i - 1);
        if (j2 > this.errorMaxDelayInMillis) {
            j2 = ThreadLocalRandom.current().nextLong(this.errorMaxDelayInMillis);
        }
        long milliseconds = this.time.milliseconds();
        if (j2 + milliseconds > j) {
            j2 = Math.max(0L, j - milliseconds);
        }
        log.debug("Sleeping for up to {} millis", Long.valueOf(j2));
        try {
            this.stopRequestedLatch.await(j2, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }

    public String toString() {
        return "RetryWithToleranceOperator{errorRetryTimeout=" + this.errorRetryTimeout + ", errorMaxDelayInMillis=" + this.errorMaxDelayInMillis + ", errorToleranceType=" + this.errorToleranceType + ", totalFailures=" + this.totalFailures + ", time=" + this.time + '}';
    }

    public synchronized void reporters(List<ErrorReporter<T>> list) {
        this.reporters = (List) Objects.requireNonNull(list, "reporters");
    }

    public void triggerStop() {
        this.stopping = true;
        this.stopRequestedLatch.countDown();
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        ConnectException connectException = null;
        Iterator<ErrorReporter<T>> it = this.reporters.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Throwable th) {
                connectException = connectException != null ? connectException : new ConnectException("Failed to close all reporters");
                connectException.addSuppressed(th);
            }
        }
        this.reporters = Collections.emptyList();
        if (connectException != null) {
            throw connectException;
        }
    }

    static {
        TOLERABLE_EXCEPTIONS.put(Stage.TRANSFORMATION, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.HEADER_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.KEY_CONVERTER, Exception.class);
        TOLERABLE_EXCEPTIONS.put(Stage.VALUE_CONVERTER, Exception.class);
    }
}
