/*
 * Decompiled with CFR 0.152.
 */
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.GetRecordsRetrievalStrategy;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStreamExtended;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskResult;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.TaskType;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

class ShutdownTask
implements ITask {
    private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
    private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final ShutdownReason reason;
    private final IKinesisProxy kinesisProxy;
    private final ILeaseManager<KinesisClientLease> leaseManager;
    private final InitialPositionInStreamExtended initialPositionInStream;
    private final boolean cleanupLeasesOfCompletedShards;
    private final TaskType taskType = TaskType.SHUTDOWN;
    private final long backoffTimeMillis;
    private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;

    ShutdownTask(ShardInfo shardInfo, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownReason reason, IKinesisProxy kinesisProxy, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, ILeaseManager<KinesisClientLease> leaseManager, long backoffTimeMillis, GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
        this.shardInfo = shardInfo;
        this.recordProcessor = recordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.reason = reason;
        this.kinesisProxy = kinesisProxy;
        this.initialPositionInStream = initialPositionInStream;
        this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
        this.leaseManager = leaseManager;
        this.backoffTimeMillis = backoffTimeMillis;
        this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
    }

    @Override
    public TaskResult call() {
        boolean applicationException = false;
        try {
            if (this.reason == ShutdownReason.TERMINATE) {
                this.recordProcessorCheckpointer.setSequenceNumberAtShardEnd(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
                this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
            }
            LOG.debug((Object)("Invoking shutdown() for shard " + this.shardInfo.getShardId() + ", concurrencyToken " + this.shardInfo.getConcurrencyToken() + ". Shutdown reason: " + (Object)((Object)this.reason)));
            ShutdownInput shutdownInput = new ShutdownInput().withShutdownReason(this.reason).withCheckpointer(this.recordProcessorCheckpointer);
            long recordProcessorStartTimeMillis = System.currentTimeMillis();
            try {
                this.recordProcessor.shutdown(shutdownInput);
                ExtendedSequenceNumber lastCheckpointValue = this.recordProcessorCheckpointer.getLastCheckpointValue();
                if (!(this.reason != ShutdownReason.TERMINATE || lastCheckpointValue != null && lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
                    throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + this.shardInfo.getShardId());
                }
                LOG.debug((Object)"Shutting down retrieval strategy.");
                this.getRecordsRetrievalStrategy.shutdown();
                LOG.debug((Object)("Record processor completed shutdown() for shard " + this.shardInfo.getShardId()));
            }
            catch (Exception e) {
                applicationException = true;
                throw e;
            }
            finally {
                MetricsHelper.addLatency(RECORD_PROCESSOR_SHUTDOWN_METRIC, recordProcessorStartTimeMillis, MetricsLevel.SUMMARY);
            }
            if (this.reason == ShutdownReason.TERMINATE) {
                LOG.debug((Object)("Looking for child shards of shard " + this.shardInfo.getShardId()));
                ShardSyncer.checkAndCreateLeasesForNewShards(this.kinesisProxy, this.leaseManager, this.initialPositionInStream, this.cleanupLeasesOfCompletedShards);
                LOG.debug((Object)("Finished checking for child shards of shard " + this.shardInfo.getShardId()));
            }
            return new TaskResult(null);
        }
        catch (Exception e) {
            if (applicationException) {
                LOG.error((Object)"Application exception. ", (Throwable)e);
            } else {
                LOG.error((Object)"Caught exception: ", (Throwable)e);
            }
            Exception exception = e;
            try {
                Thread.sleep(this.backoffTimeMillis);
            }
            catch (InterruptedException ie) {
                LOG.debug((Object)"Interrupted sleep", (Throwable)ie);
            }
            return new TaskResult(exception);
        }
    }

    @Override
    public TaskType getTaskType() {
        return this.taskType;
    }

    @VisibleForTesting
    ShutdownReason getReason() {
        return this.reason;
    }
}

