package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import lombok.NonNull;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.ConsumerStates;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.retrieval.RecordsPublisher;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/lifecycle/ShardConsumer.class */
public class ShardConsumer {
    private static final Logger log = LoggerFactory.getLogger(ShardConsumer.class);
    public static final int MAX_TIME_BETWEEN_REQUEST_RESPONSE = 60000;
    private final RecordsPublisher recordsPublisher;
    private final ExecutorService executorService;
    private final ShardInfo shardInfo;
    private final ShardConsumerArgument shardConsumerArgument;

    @NonNull
    private final Optional<Long> logWarningForTaskAfterMillis;
    private final int bufferSize;
    private final TaskExecutionListener taskExecutionListener;
    private final String streamIdentifier;
    private ConsumerTask currentTask;
    private TaskOutcome taskOutcome;
    private CompletableFuture<Boolean> stateChangeFuture;
    private boolean needsInitialization;
    private volatile Instant taskDispatchedAt;
    private volatile boolean taskIsRunning;
    private ConsumerState currentState;
    private final Object shutdownLock;
    private volatile ShutdownReason shutdownReason;
    private volatile ShutdownNotification shutdownNotification;
    private final ShardConsumerSubscriber subscriber;
    private ProcessRecordsInput shardEndProcessRecordsInput;

    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> optional, ShardConsumerArgument shardConsumerArgument, TaskExecutionListener taskExecutionListener, int i) {
        this(recordsPublisher, executorService, shardInfo, optional, shardConsumerArgument, ConsumerStates.INITIAL_STATE, 8, taskExecutionListener, i);
    }

    public ShardConsumer(RecordsPublisher recordsPublisher, ExecutorService executorService, ShardInfo shardInfo, Optional<Long> optional, ShardConsumerArgument shardConsumerArgument, ConsumerState consumerState, int i, TaskExecutionListener taskExecutionListener, int i2) {
        this.needsInitialization = true;
        this.taskIsRunning = false;
        this.shutdownLock = new Object();
        this.recordsPublisher = recordsPublisher;
        this.executorService = executorService;
        this.shardInfo = shardInfo;
        this.streamIdentifier = shardInfo.streamIdentifierSerOpt().orElse("single_stream_mode");
        this.shardConsumerArgument = shardConsumerArgument;
        this.logWarningForTaskAfterMillis = optional;
        this.taskExecutionListener = taskExecutionListener;
        this.currentState = consumerState;
        this.subscriber = new ShardConsumerSubscriber(recordsPublisher, executorService, i, this, i2);
        this.bufferSize = i;
        if (this.shardInfo.isCompleted()) {
            markForShutdown(ShutdownReason.SHARD_END);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void handleInput(ProcessRecordsInput processRecordsInput, Subscription subscription) {
        if (isShutdownRequested()) {
            subscription.cancel();
            return;
        }
        processData(processRecordsInput);
        if (this.taskOutcome != TaskOutcome.END_OF_SHARD) {
            subscription.request(1L);
            return;
        }
        markForShutdown(ShutdownReason.SHARD_END);
        this.shardEndProcessRecordsInput = processRecordsInput;
        subscription.cancel();
    }

    public void executeLifecycle() {
        if (isShutdown()) {
            return;
        }
        if (this.stateChangeFuture == null || this.stateChangeFuture.isDone()) {
            try {
                if (isShutdownRequested()) {
                    this.stateChangeFuture = shutdownComplete();
                } else if (this.needsInitialization) {
                    if (this.stateChangeFuture != null && this.stateChangeFuture.get().booleanValue()) {
                        subscribe();
                        this.needsInitialization = false;
                    }
                    this.stateChangeFuture = initializeComplete();
                }
            } catch (InterruptedException e) {
            } catch (ExecutionException e2) {
                throw new RuntimeException(e2);
            } catch (RejectedExecutionException e3) {
                this.taskOutcome = TaskOutcome.FAILURE;
            }
            if (ConsumerStates.ShardConsumerState.PROCESSING.equals(this.currentState.state())) {
                Throwable healthCheck = healthCheck();
                if (healthCheck instanceof Error) {
                    throw ((Error) healthCheck);
                }
            }
        }
    }

    @VisibleForTesting
    Throwable healthCheck() {
        logNoDataRetrievedAfterTime();
        logLongRunningTask();
        Throwable healthCheck = this.subscriber.healthCheck(60000L);
        if (healthCheck != null) {
            return healthCheck;
        }
        Throwable andResetDispatchFailure = this.subscriber.getAndResetDispatchFailure();
        if (andResetDispatchFailure == null) {
            return null;
        }
        log.warn("{} : Exception occurred while dispatching incoming data.  The incoming data has been skipped", this.streamIdentifier, andResetDispatchFailure);
        return andResetDispatchFailure;
    }

    Duration taskRunningTime() {
        if (this.taskDispatchedAt == null || !this.taskIsRunning) {
            return null;
        }
        return Duration.between(this.taskDispatchedAt, Instant.now());
    }

    String longRunningTaskMessage(Duration duration) {
        if (duration != null) {
            return String.format("Previous %s task still pending for shard %s since %s ago. ", this.currentTask.taskType(), this.shardInfo.shardId(), duration);
        }
        return null;
    }

    private void logNoDataRetrievedAfterTime() {
        this.logWarningForTaskAfterMillis.ifPresent(l -> {
            Instant lastDataArrival = this.subscriber.lastDataArrival();
            if (lastDataArrival != null) {
                Duration between = Duration.between(this.subscriber.lastDataArrival(), Instant.now());
                if (between.toMillis() > l.longValue()) {
                    log.warn("{} : Last time data arrived: {} ({})", new Object[]{this.streamIdentifier, lastDataArrival, between});
                }
            }
        });
    }

    private void logLongRunningTask() {
        Duration taskRunningTime = taskRunningTime();
        if (taskRunningTime != null) {
            String longRunningTaskMessage = longRunningTaskMessage(taskRunningTime);
            if (log.isDebugEnabled()) {
                log.debug("{} : {} Not submitting new task.", this.streamIdentifier, longRunningTaskMessage);
            }
            this.logWarningForTaskAfterMillis.ifPresent(l -> {
                if (taskRunningTime.toMillis() > l.longValue()) {
                    log.warn("{} : {}", this.streamIdentifier, longRunningTaskMessage);
                }
            });
        }
    }

    @VisibleForTesting
    void subscribe() {
        this.subscriber.startSubscriptions();
    }

    @VisibleForTesting
    synchronized CompletableFuture<Boolean> initializeComplete() {
        if (!this.needsInitialization) {
            return CompletableFuture.completedFuture(true);
        }
        if (this.taskOutcome != null) {
            updateState(this.taskOutcome);
        }
        return this.currentState.state() == ConsumerStates.ShardConsumerState.PROCESSING ? CompletableFuture.completedFuture(true) : CompletableFuture.supplyAsync(() -> {
            if (isShutdownRequested()) {
                throw new IllegalStateException("Shutdown requested while initializing");
            }
            executeTask(null);
            if (isShutdownRequested()) {
                throw new IllegalStateException("Shutdown requested while initializing");
            }
            return false;
        }, this.executorService);
    }

    @VisibleForTesting
    CompletableFuture<Boolean> shutdownComplete() {
        return CompletableFuture.supplyAsync(() -> {
            synchronized (this) {
                if (this.taskOutcome != null) {
                    updateState(this.taskOutcome);
                } else {
                    updateState(TaskOutcome.SUCCESSFUL);
                }
                if (isShutdown()) {
                    return true;
                }
                executeTask(this.shardEndProcessRecordsInput);
                if (this.currentState.state() == ConsumerStates.ShardConsumerState.SHUTTING_DOWN && this.taskOutcome == TaskOutcome.SUCCESSFUL && this.shutdownNotification != null) {
                    this.shutdownNotification.shutdownComplete();
                }
                return false;
            }
        }, this.executorService);
    }

    private synchronized void processData(ProcessRecordsInput processRecordsInput) {
        executeTask(processRecordsInput);
    }

    private synchronized void executeTask(ProcessRecordsInput processRecordsInput) {
        TaskExecutionListenerInput build = TaskExecutionListenerInput.builder().shardInfo(this.shardInfo).taskType(this.currentState.taskType()).build();
        this.taskExecutionListener.beforeTaskExecution(build);
        ConsumerTask createTask = this.currentState.createTask(this.shardConsumerArgument, this, processRecordsInput);
        if (createTask != null) {
            this.taskDispatchedAt = Instant.now();
            this.currentTask = createTask;
            this.taskIsRunning = true;
            try {
                TaskResult call = createTask.call();
                this.taskIsRunning = false;
                this.taskOutcome = resultToOutcome(call);
                build = build.toBuilder().taskOutcome(this.taskOutcome).build();
            } catch (Throwable th) {
                this.taskIsRunning = false;
                throw th;
            }
        }
        this.taskExecutionListener.afterTaskExecution(build);
    }

    private TaskOutcome resultToOutcome(TaskResult taskResult) {
        if (taskResult.getException() == null) {
            return taskResult.isShardEndReached() ? TaskOutcome.END_OF_SHARD : TaskOutcome.SUCCESSFUL;
        }
        logTaskException(taskResult);
        return TaskOutcome.FAILURE;
    }

    private synchronized void updateState(TaskOutcome taskOutcome) {
        ConsumerState consumerState = this.currentState;
        switch (taskOutcome) {
            case SUCCESSFUL:
                consumerState = this.currentState.successTransition();
                break;
            case END_OF_SHARD:
                markForShutdown(ShutdownReason.SHARD_END);
                break;
            case FAILURE:
                consumerState = this.currentState.failureTransition();
                break;
            default:
                log.error("{} : No handler for outcome of {}", this.streamIdentifier, taskOutcome.name());
                consumerState = this.currentState.failureTransition();
                break;
        }
        this.currentState = handleShutdownTransition(taskOutcome, consumerState);
    }

    private ConsumerState handleShutdownTransition(TaskOutcome taskOutcome, ConsumerState consumerState) {
        synchronized (this.shutdownLock) {
            if (!isShutdownRequested() || taskOutcome == TaskOutcome.FAILURE) {
                return consumerState;
            }
            return this.currentState.shutdownTransition(this.shutdownReason);
        }
    }

    private void logTaskException(TaskResult taskResult) {
        if (log.isDebugEnabled()) {
            if (taskResult.getException() instanceof BlockedOnParentShardException) {
                log.debug("{} : Shard {} is blocked on completion of parent shard.", this.streamIdentifier, this.shardInfo.shardId());
            } else {
                log.debug("{} : Caught exception running {} task: ", new Object[]{this.streamIdentifier, this.currentTask.taskType(), taskResult.getException()});
            }
        }
    }

    public void gracefulShutdown(ShutdownNotification shutdownNotification) {
        if (this.subscriber != null) {
            this.subscriber.cancel();
        }
        if (shutdownNotification != null) {
            this.shutdownNotification = shutdownNotification;
        }
        markForShutdown(ShutdownReason.REQUESTED);
    }

    public boolean leaseLost() {
        log.debug("{} : Shutdown({}): Lease lost triggered.", this.streamIdentifier, this.shardInfo.shardId());
        if (this.subscriber != null) {
            this.subscriber.cancel();
            log.debug("{} : Shutdown({}): Subscriber cancelled.", this.streamIdentifier, this.shardInfo.shardId());
        }
        markForShutdown(ShutdownReason.LEASE_LOST);
        return isShutdown();
    }

    void markForShutdown(ShutdownReason shutdownReason) {
        synchronized (this.shutdownLock) {
            if (this.shutdownReason == null || this.shutdownReason.canTransitionTo(shutdownReason)) {
                this.shutdownReason = shutdownReason;
            }
        }
    }

    public boolean isShutdown() {
        return this.currentState.isTerminal();
    }

    @VisibleForTesting
    public boolean isShutdownRequested() {
        boolean z;
        synchronized (this.shutdownLock) {
            z = this.shutdownReason != null;
        }
        return z;
    }

    RecordsPublisher recordsPublisher() {
        return this.recordsPublisher;
    }

    ExecutorService executorService() {
        return this.executorService;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardInfo shardInfo() {
        return this.shardInfo;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardConsumerArgument shardConsumerArgument() {
        return this.shardConsumerArgument;
    }

    @NonNull
    Optional<Long> logWarningForTaskAfterMillis() {
        return this.logWarningForTaskAfterMillis;
    }

    int bufferSize() {
        return this.bufferSize;
    }

    TaskExecutionListener taskExecutionListener() {
        return this.taskExecutionListener;
    }

    String streamIdentifier() {
        return this.streamIdentifier;
    }

    ConsumerTask currentTask() {
        return this.currentTask;
    }

    TaskOutcome taskOutcome() {
        return this.taskOutcome;
    }

    CompletableFuture<Boolean> stateChangeFuture() {
        return this.stateChangeFuture;
    }

    boolean needsInitialization() {
        return this.needsInitialization;
    }

    Instant taskDispatchedAt() {
        return this.taskDispatchedAt;
    }

    boolean taskIsRunning() {
        return this.taskIsRunning;
    }

    ConsumerState currentState() {
        return this.currentState;
    }

    Object shutdownLock() {
        return this.shutdownLock;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShutdownNotification shutdownNotification() {
        return this.shutdownNotification;
    }

    ShardConsumerSubscriber subscriber() {
        return this.subscriber;
    }

    ProcessRecordsInput shardEndProcessRecordsInput() {
        return this.shardEndProcessRecordsInput;
    }

    public ShutdownReason shutdownReason() {
        return this.shutdownReason;
    }
}
