package org.springframework.integration.aws.inbound.kinesis;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.Record;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.support.ExecutorServiceAdapter;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.class */
public class KclMessageDrivenChannelAdapter extends MessageProducerSupport {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
    private final RecordProcessorFactory recordProcessorFactory;
    private final String stream;
    private final AmazonKinesis kinesisClient;
    private final AWSCredentialsProvider kinesisProxyCredentialsProvider;
    private final AmazonCloudWatch cloudWatchClient;
    private final AmazonDynamoDB dynamoDBClient;
    private TaskExecutor executor;
    private String consumerGroup;
    private InboundMessageMapper<byte[]> embeddedHeadersMapper;
    private KinesisClientLibConfiguration config;
    private InitialPositionInStream streamInitialSequence;
    private int idleBetweenPolls;
    private int consumerBackoff;
    private Converter<byte[], Object> converter;
    private ListenerMode listenerMode;
    private long checkpointsInterval;
    private CheckpointMode checkpointMode;
    private String workerId;
    private boolean bindSourceRecord;
    private volatile Worker scheduler;

    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter$RecordProcessor.class */
    private class RecordProcessor implements IRecordProcessor {
        private String shardId;
        private long nextCheckpointTimeInMillis;

        private RecordProcessor() {
        }

        public void initialize(String str) {
            this.shardId = str;
            if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.info("Initializing record processor for shard: " + this.shardId);
            }
        }

        public void processRecords(List<Record> list, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
            if (KclMessageDrivenChannelAdapter.this.logger.isDebugEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.debug("Processing " + list.size() + " records from " + this.shardId);
            }
            try {
                if (ListenerMode.record.equals(KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    for (Record record : list) {
                        processSingleRecord(record, iRecordProcessorCheckpointer);
                        checkpointIfRecordMode(iRecordProcessorCheckpointer, record);
                        checkpointIfPeriodicMode(iRecordProcessorCheckpointer, record);
                    }
                } else if (ListenerMode.batch.equals(KclMessageDrivenChannelAdapter.this.listenerMode)) {
                    processMultipleRecords(list, iRecordProcessorCheckpointer);
                    checkpointIfPeriodicMode(iRecordProcessorCheckpointer, null);
                }
                checkpointIfBatchMode(iRecordProcessorCheckpointer);
                KclMessageDrivenChannelAdapter.attributesHolder.remove();
            } catch (Throwable th) {
                KclMessageDrivenChannelAdapter.attributesHolder.remove();
                throw th;
            }
        }

        private void processSingleRecord(Record record, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
            performSend(prepareMessageForRecord(record), record, iRecordProcessorCheckpointer);
        }

        private void processMultipleRecords(List<Record> list, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
            AbstractIntegrationMessageBuilder<?> withPayload = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(list);
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                withPayload = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload((List) list.stream().map(this::prepareMessageForRecord).map((v0) -> {
                    return v0.build();
                }).collect(Collectors.toList()));
            } else if (KclMessageDrivenChannelAdapter.this.converter != null) {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                withPayload = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload((List) list.stream().map(record -> {
                    arrayList.add(record.getPartitionKey());
                    arrayList2.add(record.getSequenceNumber());
                    return KclMessageDrivenChannelAdapter.this.converter.convert(record.getData().array());
                }).collect(Collectors.toList())).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, arrayList).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, arrayList2);
            }
            performSend(withPayload, list, iRecordProcessorCheckpointer);
        }

        private AbstractIntegrationMessageBuilder<Object> prepareMessageForRecord(Record record) {
            Object array = record.getData().array();
            Message message = null;
            if (KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper != null) {
                try {
                    message = KclMessageDrivenChannelAdapter.this.embeddedHeadersMapper.toMessage((byte[]) array);
                    if (message == null) {
                        throw new IllegalStateException("The 'embeddedHeadersMapper' returned null for payload: " + Arrays.toString((byte[]) array));
                    }
                    array = message.getPayload();
                } catch (Exception e) {
                    KclMessageDrivenChannelAdapter.this.logger.warn("Could not parse embedded headers. Remain payload untouched.", e);
                }
            }
            if ((array instanceof byte[]) && KclMessageDrivenChannelAdapter.this.converter != null) {
                array = KclMessageDrivenChannelAdapter.this.converter.convert((byte[]) array);
            }
            AbstractIntegrationMessageBuilder<Object> header = KclMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(array).setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, record.getPartitionKey()).setHeader(AwsHeaders.RECEIVED_SEQUENCE_NUMBER, record.getSequenceNumber());
            if (KclMessageDrivenChannelAdapter.this.bindSourceRecord) {
                header.setHeader("sourceData", record);
            }
            if (message != null) {
                header.copyHeadersIfAbsent(message.getHeaders());
            }
            return header;
        }

        private void performSend(AbstractIntegrationMessageBuilder<?> abstractIntegrationMessageBuilder, Object obj, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
            abstractIntegrationMessageBuilder.setHeader(AwsHeaders.RECEIVED_STREAM, KclMessageDrivenChannelAdapter.this.stream).setHeader(AwsHeaders.SHARD, this.shardId);
            if (CheckpointMode.manual.equals(KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                abstractIntegrationMessageBuilder.setHeader(AwsHeaders.CHECKPOINTER, iRecordProcessorCheckpointer);
            }
            Message<?> build = abstractIntegrationMessageBuilder.build();
            setAttributesIfNecessary(obj, build);
            try {
                KclMessageDrivenChannelAdapter.this.sendMessage(build);
            } catch (Exception e) {
                KclMessageDrivenChannelAdapter.this.logger.error("Got an exception during sending a '" + build + "'\nfor the '" + obj + "'.\nConsider to use 'errorChannel' flow for the compensation logic.", e);
            }
        }

        private void setAttributesIfNecessary(Object obj, Message<?> message) {
            if (KclMessageDrivenChannelAdapter.this.getErrorChannel() != null) {
                AttributeAccessor attributeAccessor = ErrorMessageUtils.getAttributeAccessor(message, (Message) null);
                KclMessageDrivenChannelAdapter.attributesHolder.set(attributeAccessor);
                attributeAccessor.setAttribute(AwsHeaders.RAW_RECORD, obj);
            }
        }

        private void checkpoint(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, @Nullable Record record) {
            if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.info("Checkpointing shard " + this.shardId);
            }
            try {
                if (record == null) {
                    iRecordProcessorCheckpointer.checkpoint();
                } else {
                    iRecordProcessorCheckpointer.checkpoint(record);
                }
            } catch (InvalidStateException e) {
                KclMessageDrivenChannelAdapter.this.logger.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
            } catch (ThrottlingException e2) {
                if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                    KclMessageDrivenChannelAdapter.this.logger.info("Transient issue when checkpointing", e2);
                }
            } catch (ShutdownException e3) {
                KclMessageDrivenChannelAdapter.this.logger.info("Caught shutdown exception, skipping checkpoint.", e3);
            }
        }

        private void checkpointIfBatchMode(IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
            if (CheckpointMode.batch.equals(KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                checkpoint(iRecordProcessorCheckpointer, null);
            }
        }

        private void checkpointIfRecordMode(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, Record record) {
            if (CheckpointMode.record.equals(KclMessageDrivenChannelAdapter.this.checkpointMode)) {
                checkpoint(iRecordProcessorCheckpointer, record);
            }
        }

        private void checkpointIfPeriodicMode(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, @Nullable Record record) {
            if (!CheckpointMode.periodic.equals(KclMessageDrivenChannelAdapter.this.checkpointMode) || System.currentTimeMillis() <= this.nextCheckpointTimeInMillis) {
                return;
            }
            checkpoint(iRecordProcessorCheckpointer, record);
            this.nextCheckpointTimeInMillis = System.currentTimeMillis() + KclMessageDrivenChannelAdapter.this.checkpointsInterval;
        }

        public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) {
            if (KclMessageDrivenChannelAdapter.this.logger.isInfoEnabled()) {
                KclMessageDrivenChannelAdapter.this.logger.info("Scheduler is shutting down for reason '" + shutdownReason + "'; checkpointing...");
            }
            try {
                iRecordProcessorCheckpointer.checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                KclMessageDrivenChannelAdapter.this.logger.error("Exception while checkpointing at requested shutdown. Giving up", e);
            }
        }
    }

    /* loaded from: input_file:org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter$RecordProcessorFactory.class */
    private class RecordProcessorFactory implements IRecordProcessorFactory {
        private RecordProcessorFactory() {
        }

        public IRecordProcessor createProcessor() {
            return new RecordProcessor();
        }
    }

    public KclMessageDrivenChannelAdapter(String str) {
        this(str, AmazonKinesisClientBuilder.defaultClient(), AmazonCloudWatchClientBuilder.defaultClient(), AmazonDynamoDBClientBuilder.defaultClient(), new DefaultAWSCredentialsProviderChain());
    }

    public KclMessageDrivenChannelAdapter(String str, Regions regions) {
        this(str, (AmazonKinesis) AmazonKinesisClient.builder().withRegion(regions).build(), (AmazonCloudWatch) AmazonCloudWatchClient.builder().withRegion(regions).build(), (AmazonDynamoDB) AmazonDynamoDBClient.builder().withRegion(regions).build(), new DefaultAWSCredentialsProviderChain());
    }

    public KclMessageDrivenChannelAdapter(String str, AmazonKinesis amazonKinesis, AmazonCloudWatch amazonCloudWatch, AmazonDynamoDB amazonDynamoDB, AWSCredentialsProvider aWSCredentialsProvider) {
        this.recordProcessorFactory = new RecordProcessorFactory();
        this.executor = new SimpleAsyncTaskExecutor();
        this.consumerGroup = "SpringIntegration";
        this.streamInitialSequence = InitialPositionInStream.LATEST;
        this.idleBetweenPolls = 1000;
        this.consumerBackoff = 1000;
        this.converter = new DeserializingConverter();
        this.listenerMode = ListenerMode.record;
        this.checkpointsInterval = 5000L;
        this.checkpointMode = CheckpointMode.batch;
        this.workerId = UUID.randomUUID().toString();
        Assert.notNull(str, "'stream' must not be null.");
        Assert.notNull(amazonKinesis, "'kinesisClient' must not be null.");
        Assert.notNull(amazonCloudWatch, "'cloudWatchClient' must not be null.");
        Assert.notNull(amazonDynamoDB, "'dynamoDBClient' must not be null.");
        Assert.notNull(aWSCredentialsProvider, "'kinesisProxyCredentialsProvider' must not be null.");
        this.stream = str;
        this.kinesisClient = amazonKinesis;
        this.cloudWatchClient = amazonCloudWatch;
        this.dynamoDBClient = amazonDynamoDB;
        this.kinesisProxyCredentialsProvider = aWSCredentialsProvider;
    }

    public KclMessageDrivenChannelAdapter(KinesisClientLibConfiguration kinesisClientLibConfiguration) {
        this(kinesisClientLibConfiguration, AmazonKinesisClientBuilder.defaultClient(), AmazonCloudWatchClientBuilder.defaultClient(), AmazonDynamoDBClientBuilder.defaultClient());
    }

    public KclMessageDrivenChannelAdapter(KinesisClientLibConfiguration kinesisClientLibConfiguration, AmazonKinesis amazonKinesis, AmazonCloudWatch amazonCloudWatch, AmazonDynamoDB amazonDynamoDB) {
        this.recordProcessorFactory = new RecordProcessorFactory();
        this.executor = new SimpleAsyncTaskExecutor();
        this.consumerGroup = "SpringIntegration";
        this.streamInitialSequence = InitialPositionInStream.LATEST;
        this.idleBetweenPolls = 1000;
        this.consumerBackoff = 1000;
        this.converter = new DeserializingConverter();
        this.listenerMode = ListenerMode.record;
        this.checkpointsInterval = 5000L;
        this.checkpointMode = CheckpointMode.batch;
        this.workerId = UUID.randomUUID().toString();
        Assert.notNull(kinesisClientLibConfiguration, "'kinesisClientLibConfiguration' must not be null.");
        Assert.notNull(amazonKinesis, "'kinesisClient' must not be null.");
        Assert.notNull(amazonCloudWatch, "'cloudWatchClient' must not be null.");
        Assert.notNull(amazonDynamoDB, "'dynamoDBClient' must not be null.");
        this.config = kinesisClientLibConfiguration;
        this.stream = this.config.getStreamName();
        this.kinesisClient = amazonKinesis;
        this.cloudWatchClient = amazonCloudWatch;
        this.dynamoDBClient = amazonDynamoDB;
        this.kinesisProxyCredentialsProvider = null;
    }

    public void setExecutor(TaskExecutor taskExecutor) {
        Assert.notNull(taskExecutor, "'executor' must not be null.");
        this.executor = taskExecutor;
    }

    public void setConsumerGroup(String str) {
        Assert.hasText(str, "'consumerGroup' must not be empty");
        Assert.isNull(this.config, "'consumerGroup' must be configured as an application name on the provided KinesisClientLibConfiguration");
        this.consumerGroup = str;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> inboundMessageMapper) {
        this.embeddedHeadersMapper = inboundMessageMapper;
    }

    public void setStreamInitialSequence(InitialPositionInStream initialPositionInStream) {
        Assert.notNull(initialPositionInStream, "'streamInitialSequence' must not be null");
        Assert.isNull(this.config, "'streamInitialSequence' must be configured as an 'initialPositionInStream' on the provided KinesisClientLibConfiguration");
        this.streamInitialSequence = initialPositionInStream;
    }

    public void setIdleBetweenPolls(int i) {
        Assert.isNull(this.config, "'idleBetweenPolls' must be configured as an 'idleTimeBetweenReadsInMillis' on the provided KinesisClientLibConfiguration");
        this.idleBetweenPolls = Math.max(250, i);
    }

    public void setConsumerBackoff(int i) {
        Assert.isNull(this.config, "'consumerBackoff' must be configured as an 'taskBackoffTimeMillis' on the provided KinesisClientLibConfiguration");
        this.consumerBackoff = Math.max(1000, i);
    }

    public void setConverter(Converter<byte[], Object> converter) {
        this.converter = converter;
    }

    public void setListenerMode(ListenerMode listenerMode) {
        Assert.notNull(listenerMode, "'listenerMode' must not be null");
        this.listenerMode = listenerMode;
    }

    public void setCheckpointsInterval(long j) {
        this.checkpointsInterval = j;
    }

    public void setCheckpointMode(CheckpointMode checkpointMode) {
        Assert.notNull(checkpointMode, "'checkpointMode' must not be null");
        this.checkpointMode = checkpointMode;
    }

    public void setWorkerId(String str) {
        Assert.hasText(str, "'workerId' must not be null or empty");
        Assert.isNull(this.config, "'workerId' must be configured on the provided KinesisClientLibConfiguration");
        this.workerId = str;
    }

    public void setBindSourceRecord(boolean z) {
        this.bindSourceRecord = z;
    }

    protected void onInit() {
        super.onInit();
        if (this.config == null) {
            this.config = new KinesisClientLibConfiguration(this.consumerGroup, this.stream, (String) null, (String) null, this.streamInitialSequence, this.kinesisProxyCredentialsProvider, (AWSCredentialsProvider) null, (AWSCredentialsProvider) null, 10000L, this.workerId, 10000, this.idleBetweenPolls, false, 10000L, 60000L, true, new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), this.consumerBackoff, 10000L, 10000, true, (String) null, 5000L, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE, new SimpleRecordsFetcherFactory(), 0L, 0L, 0L);
        }
        this.consumerGroup = this.config.getApplicationName();
    }

    protected void doStart() {
        super.doStart();
        if (ListenerMode.batch.equals(this.listenerMode) && CheckpointMode.record.equals(this.checkpointMode)) {
            this.checkpointMode = CheckpointMode.batch;
            this.logger.warn("The 'checkpointMode' is overridden from [CheckpointMode.record] to [CheckpointMode.batch] because it does not make sense in case of [ListenerMode.batch].");
        }
        this.scheduler = new Worker.Builder().kinesisClient(this.kinesisClient).dynamoDBClient(this.dynamoDBClient).cloudWatchClient(this.cloudWatchClient).recordProcessorFactory(this.recordProcessorFactory).execService(new ExecutorServiceAdapter(this.executor)).config(this.config).build();
        this.executor.execute(this.scheduler);
    }

    protected void doStop() {
        super.doStop();
        this.scheduler.shutdown();
    }

    public void destroy() {
        super.destroy();
        if (isRunning()) {
            this.scheduler.shutdown();
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = attributesHolder.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }

    public String toString() {
        return "KclMessageDrivenChannelAdapter{consumerGroup='" + this.consumerGroup + "', stream='" + this.stream + "'}";
    }
}
