package io.kestra.plugin.aws.kinesis;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Rethrow;
import io.kestra.plugin.aws.AbstractConnection;
import io.kestra.plugin.aws.kinesis.model.Record;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import java.beans.ConstructorProperties;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import lombok.Generated;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;

@Plugin(examples = {@Example(title = "Send multiple records as maps to Amazon Kinesis Data Streams. Check the following AWS API reference for the structure of the [PutRecordsRequestEntry](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecordsRequestEntry.html) request payload.", code = {"streamName: \"mystream\"", "records:", "  - data: \"user sign-in event\"", "    explicitHashKey: \"optional hash value overriding the partition key\"", "  - data: \"user sign-out event\"", "    partitionKey: \"user1\""}), @Example(title = "Send multiple records from an internal storage ion file to Amazon Kinesis Data Streams.", code = {"streamName: \"mystream\"", "records: kestra://myfile.ion"})})
@Schema(title = "Send multiple records to Amazon Kinesis Data Streams.")
/* loaded from: input_file:io/kestra/plugin/aws/kinesis/PutRecords.class */
public class PutRecords extends AbstractConnection implements RunnableTask<Output> {
    private static final ObjectMapper MAPPER = JacksonMapper.ofIon().setSerializationInclusion(JsonInclude.Include.ALWAYS);

    @NotNull
    @PluginProperty(dynamic = false)
    @Schema(title = "Mark the task as failed when sending a record is unsuccessful.", description = "If true, the task will fail when any record fails to be sent.")
    private boolean failOnUnsuccessfulRecords;

    @PluginProperty(dynamic = true)
    @Schema(title = "The name of the stream to add the records.", description = "Make sure to set either `streamName` or `streamArn`. One of those must be provided.")
    private String streamName;

    @PluginProperty(dynamic = true)
    @Schema(title = "The ARN of the stream to add the records.", description = "Make sure to set either `streamName` or `streamArn`. One of those must be provided.")
    private String streamArn;

    @NotNull
    @PluginProperty(dynamic = true)
    @Schema(title = "List of records (i.e., list of maps) or internal storage URI of the file that defines the records to be sent to AWS Kinesis Data Streams.", description = "A list of at least one record with a map including `data` and `partitionKey` properties (those two are required arguments). Check the [PutRecordsRequestEntry](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecordsRequestEntry.html) API reference for a detailed description of required fields.", anyOf = {String.class, Record[].class})
    private Object records;

    /* loaded from: input_file:io/kestra/plugin/aws/kinesis/PutRecords$Output.class */
    public static class Output implements io.kestra.core.models.tasks.Output {

        @Schema(title = "The URI of stored data", description = "The successfully and unsuccessfully ingested records.If the ingestion was successful, the output includes the record sequence number.Otherwise, the output provides the error code and error message for troubleshooting.")
        private URI uri;

        @Schema(title = "The number of failed records.")
        private int failedRecordsCount;

        @Schema(title = "The total number of records sent to AWS Kinesis Data Streams.")
        private int recordCount;

        @Generated
        /* loaded from: input_file:io/kestra/plugin/aws/kinesis/PutRecords$Output$OutputBuilder.class */
        public static class OutputBuilder {

            @Generated
            private URI uri;

            @Generated
            private int failedRecordsCount;

            @Generated
            private int recordCount;

            @Generated
            OutputBuilder() {
            }

            @Generated
            public OutputBuilder uri(URI uri) {
                this.uri = uri;
                return this;
            }

            @Generated
            public OutputBuilder failedRecordsCount(int i) {
                this.failedRecordsCount = i;
                return this;
            }

            @Generated
            public OutputBuilder recordCount(int i) {
                this.recordCount = i;
                return this;
            }

            @Generated
            public Output build() {
                return new Output(this.uri, this.failedRecordsCount, this.recordCount);
            }

            @Generated
            public String toString() {
                return "PutRecords.Output.OutputBuilder(uri=" + this.uri + ", failedRecordsCount=" + this.failedRecordsCount + ", recordCount=" + this.recordCount + ")";
            }
        }

        public Optional<State.Type> finalState() {
            return this.failedRecordsCount > 0 ? Optional.of(State.Type.WARNING) : super.finalState();
        }

        @Generated
        @ConstructorProperties({"uri", "failedRecordsCount", "recordCount"})
        Output(URI uri, int i, int i2) {
            this.uri = uri;
            this.failedRecordsCount = i;
            this.recordCount = i2;
        }

        @Generated
        public static OutputBuilder builder() {
            return new OutputBuilder();
        }

        @Generated
        public URI getUri() {
            return this.uri;
        }

        @Generated
        public int getFailedRecordsCount() {
            return this.failedRecordsCount;
        }

        @Generated
        public int getRecordCount() {
            return this.recordCount;
        }
    }

    /* loaded from: input_file:io/kestra/plugin/aws/kinesis/PutRecords$OutputEntry.class */
    public static class OutputEntry {

        @Schema(title = "The sequence number for an individual record result.")
        private final String sequenceNumber;

        @Schema(title = "The shard ID for an individual record result.")
        private final String shardId;

        @Schema(title = "The error code that indicates the failure.")
        private final String errorCode;

        @Schema(title = "The error message that explains the failure.")
        private final String errorMessage;

        @Schema(title = "The original record.")
        private final Record record;

        @Generated
        /* loaded from: input_file:io/kestra/plugin/aws/kinesis/PutRecords$OutputEntry$OutputEntryBuilder.class */
        public static class OutputEntryBuilder {

            @Generated
            private String sequenceNumber;

            @Generated
            private String shardId;

            @Generated
            private String errorCode;

            @Generated
            private String errorMessage;

            @Generated
            private Record record;

            @Generated
            OutputEntryBuilder() {
            }

            @Generated
            public OutputEntryBuilder sequenceNumber(String str) {
                this.sequenceNumber = str;
                return this;
            }

            @Generated
            public OutputEntryBuilder shardId(String str) {
                this.shardId = str;
                return this;
            }

            @Generated
            public OutputEntryBuilder errorCode(String str) {
                this.errorCode = str;
                return this;
            }

            @Generated
            public OutputEntryBuilder errorMessage(String str) {
                this.errorMessage = str;
                return this;
            }

            @Generated
            public OutputEntryBuilder record(Record record) {
                this.record = record;
                return this;
            }

            @Generated
            public OutputEntry build() {
                return new OutputEntry(this.sequenceNumber, this.shardId, this.errorCode, this.errorMessage, this.record);
            }

            @Generated
            public String toString() {
                return "PutRecords.OutputEntry.OutputEntryBuilder(sequenceNumber=" + this.sequenceNumber + ", shardId=" + this.shardId + ", errorCode=" + this.errorCode + ", errorMessage=" + this.errorMessage + ", record=" + this.record + ")";
            }
        }

        @Generated
        @ConstructorProperties({"sequenceNumber", "shardId", "errorCode", "errorMessage", "record"})
        OutputEntry(String str, String str2, String str3, String str4, Record record) {
            this.sequenceNumber = str;
            this.shardId = str2;
            this.errorCode = str3;
            this.errorMessage = str4;
            this.record = record;
        }

        @Generated
        public static OutputEntryBuilder builder() {
            return new OutputEntryBuilder();
        }

        @Generated
        public String getSequenceNumber() {
            return this.sequenceNumber;
        }

        @Generated
        public String getShardId() {
            return this.shardId;
        }

        @Generated
        public String getErrorCode() {
            return this.errorCode;
        }

        @Generated
        public String getErrorMessage() {
            return this.errorMessage;
        }

        @Generated
        public Record getRecord() {
            return this.record;
        }
    }

    @Generated
    /* loaded from: input_file:io/kestra/plugin/aws/kinesis/PutRecords$PutRecordsBuilder.class */
    public static abstract class PutRecordsBuilder<C extends PutRecords, B extends PutRecordsBuilder<C, B>> extends AbstractConnection.AbstractConnectionBuilder<C, B> {

        @Generated
        private boolean failOnUnsuccessfulRecords$set;

        @Generated
        private boolean failOnUnsuccessfulRecords$value;

        @Generated
        private String streamName;

        @Generated
        private String streamArn;

        @Generated
        private Object records;

        @Generated
        public B failOnUnsuccessfulRecords(boolean z) {
            this.failOnUnsuccessfulRecords$value = z;
            this.failOnUnsuccessfulRecords$set = true;
            return mo923self();
        }

        @Generated
        public B streamName(String str) {
            this.streamName = str;
            return mo923self();
        }

        @Generated
        public B streamArn(String str) {
            this.streamArn = str;
            return mo923self();
        }

        @Generated
        public B records(Object obj) {
            this.records = obj;
            return mo923self();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.kestra.plugin.aws.AbstractConnection.AbstractConnectionBuilder
        @Generated
        /* renamed from: self */
        public abstract B mo923self();

        @Override // io.kestra.plugin.aws.AbstractConnection.AbstractConnectionBuilder
        @Generated
        /* renamed from: build */
        public abstract C mo922build();

        @Override // io.kestra.plugin.aws.AbstractConnection.AbstractConnectionBuilder
        @Generated
        public String toString() {
            return "PutRecords.PutRecordsBuilder(super=" + super.toString() + ", failOnUnsuccessfulRecords$value=" + this.failOnUnsuccessfulRecords$value + ", streamName=" + this.streamName + ", streamArn=" + this.streamArn + ", records=" + this.records + ")";
        }
    }

    @Generated
    /* loaded from: input_file:io/kestra/plugin/aws/kinesis/PutRecords$PutRecordsBuilderImpl.class */
    private static final class PutRecordsBuilderImpl extends PutRecordsBuilder<PutRecords, PutRecordsBuilderImpl> {
        @Generated
        private PutRecordsBuilderImpl() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.kestra.plugin.aws.kinesis.PutRecords.PutRecordsBuilder, io.kestra.plugin.aws.AbstractConnection.AbstractConnectionBuilder
        @Generated
        /* renamed from: self */
        public PutRecordsBuilderImpl mo923self() {
            return this;
        }

        @Override // io.kestra.plugin.aws.kinesis.PutRecords.PutRecordsBuilder, io.kestra.plugin.aws.AbstractConnection.AbstractConnectionBuilder
        @Generated
        /* renamed from: build */
        public PutRecords mo922build() {
            return new PutRecords(this);
        }
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public Output m964run(RunContext runContext) throws Exception {
        long nanoTime = System.nanoTime();
        List<Record> recordList = getRecordList(this.records, runContext);
        PutRecordsResponse putRecords = putRecords(runContext, recordList);
        if (this.failOnUnsuccessfulRecords && putRecords.failedRecordCount().intValue() > 0) {
            runContext.logger().error("Response show {} record failed: {}", putRecords.failedRecordCount(), putRecords);
            throw new RuntimeException(String.format("Response show %d record failed: %s", putRecords.failedRecordCount(), putRecords));
        }
        runContext.metric(Timer.of("duration", Duration.ofNanos(System.nanoTime() - nanoTime), new String[0]));
        runContext.metric(Counter.of("failedRecordCount", putRecords.failedRecordCount(), new String[0]));
        runContext.metric(Counter.of("successfulRecordCount", Integer.valueOf(recordList.size() - putRecords.failedRecordCount().intValue()), new String[0]));
        runContext.metric(Counter.of("recordCount", Integer.valueOf(recordList.size()), new String[0]));
        return Output.builder().uri(runContext.putTempFile(writeOutputFile(runContext, putRecords, recordList))).failedRecordsCount(putRecords.failedRecordCount().intValue()).recordCount(recordList.size()).build();
    }

    private PutRecordsResponse putRecords(RunContext runContext, List<Record> list) throws IllegalVariableEvaluationException {
        KinesisClient client = client(runContext);
        try {
            PutRecordsRequest.Builder builder = PutRecordsRequest.builder();
            if (!Strings.isNullOrEmpty(this.streamArn)) {
                builder.streamARN(this.streamArn);
            } else {
                if (Strings.isNullOrEmpty(this.streamName)) {
                    throw new IllegalArgumentException("Either streamName or streamArn has to be set.");
                }
                builder.streamName(this.streamName);
            }
            builder.records((List) list.stream().map(Rethrow.throwFunction(record -> {
                return record.toPutRecordsRequestEntry(runContext);
            })).collect(Collectors.toList()));
            PutRecordsResponse putRecords = client.putRecords((PutRecordsRequest) builder.mo2863build());
            if (client != null) {
                client.close();
            }
            return putRecords;
        } catch (Throwable th) {
            if (client != null) {
                try {
                    client.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<Record> getRecordList(Object obj, RunContext runContext) throws IllegalVariableEvaluationException, URISyntaxException, IOException {
        if (!(obj instanceof String)) {
            if (obj instanceof List) {
                return (List) MAPPER.convertValue(obj, new TypeReference<List<Record>>() { // from class: io.kestra.plugin.aws.kinesis.PutRecords.1
                });
            }
            throw new IllegalVariableEvaluationException("Invalid records type '" + obj.getClass() + "'");
        }
        URI uri = new URI(runContext.render((String) obj));
        if (!uri.getScheme().equals("kestra")) {
            throw new IllegalArgumentException("Invalid records parameter, must be a Kestra internal storage URI, or a list of record.");
        }
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(uri)));
        try {
            List<Record> list = (List) Flowable.create(FileSerde.reader(bufferedReader, Record.class), BackpressureStrategy.BUFFER).toList().blockingGet();
            bufferedReader.close();
            return list;
        } catch (Throwable th) {
            try {
                bufferedReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private File writeOutputFile(RunContext runContext, PutRecordsResponse putRecordsResponse, List<Record> list) throws IOException {
        File file = runContext.tempFile(".ion").toFile();
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        try {
            Flowable.fromIterable(list).zipWith(putRecordsResponse.records(), (record, putRecordsResultEntry) -> {
                return OutputEntry.builder().record(record).sequenceNumber(putRecordsResultEntry.sequenceNumber()).shardId(putRecordsResultEntry.shardId()).errorCode(putRecordsResultEntry.errorCode()).errorMessage(putRecordsResultEntry.errorMessage()).build();
            }).blockingForEach(outputEntry -> {
                FileSerde.write(fileOutputStream, outputEntry);
            });
            fileOutputStream.close();
            return file;
        } catch (Throwable th) {
            try {
                fileOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    protected KinesisClient client(RunContext runContext) throws IllegalVariableEvaluationException {
        KinesisClientBuilder kinesisClientBuilder = (KinesisClientBuilder) KinesisClient.builder().credentialsProvider(credentials(runContext));
        if (this.region != null) {
            kinesisClientBuilder.region(Region.of(runContext.render(this.region)));
        }
        if (this.endpointOverride != null) {
            kinesisClientBuilder.endpointOverride(URI.create(runContext.render(this.endpointOverride)));
        }
        return kinesisClientBuilder.mo2863build();
    }

    @Generated
    private static boolean $default$failOnUnsuccessfulRecords() {
        return true;
    }

    @Generated
    protected PutRecords(PutRecordsBuilder<?, ?> putRecordsBuilder) {
        super(putRecordsBuilder);
        if (((PutRecordsBuilder) putRecordsBuilder).failOnUnsuccessfulRecords$set) {
            this.failOnUnsuccessfulRecords = ((PutRecordsBuilder) putRecordsBuilder).failOnUnsuccessfulRecords$value;
        } else {
            this.failOnUnsuccessfulRecords = $default$failOnUnsuccessfulRecords();
        }
        this.streamName = ((PutRecordsBuilder) putRecordsBuilder).streamName;
        this.streamArn = ((PutRecordsBuilder) putRecordsBuilder).streamArn;
        this.records = ((PutRecordsBuilder) putRecordsBuilder).records;
    }

    @Generated
    public static PutRecordsBuilder<?, ?> builder() {
        return new PutRecordsBuilderImpl();
    }

    @Override // io.kestra.plugin.aws.AbstractConnection
    @Generated
    public String toString() {
        return "PutRecords(super=" + super.toString() + ", failOnUnsuccessfulRecords=" + isFailOnUnsuccessfulRecords() + ", streamName=" + getStreamName() + ", streamArn=" + getStreamArn() + ", records=" + getRecords() + ")";
    }

    @Override // io.kestra.plugin.aws.AbstractConnection
    @Generated
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof PutRecords)) {
            return false;
        }
        PutRecords putRecords = (PutRecords) obj;
        if (!putRecords.canEqual(this) || !super.equals(obj) || isFailOnUnsuccessfulRecords() != putRecords.isFailOnUnsuccessfulRecords()) {
            return false;
        }
        String streamName = getStreamName();
        String streamName2 = putRecords.getStreamName();
        if (streamName == null) {
            if (streamName2 != null) {
                return false;
            }
        } else if (!streamName.equals(streamName2)) {
            return false;
        }
        String streamArn = getStreamArn();
        String streamArn2 = putRecords.getStreamArn();
        if (streamArn == null) {
            if (streamArn2 != null) {
                return false;
            }
        } else if (!streamArn.equals(streamArn2)) {
            return false;
        }
        Object records = getRecords();
        Object records2 = putRecords.getRecords();
        return records == null ? records2 == null : records.equals(records2);
    }

    @Override // io.kestra.plugin.aws.AbstractConnection
    @Generated
    protected boolean canEqual(Object obj) {
        return obj instanceof PutRecords;
    }

    @Override // io.kestra.plugin.aws.AbstractConnection
    @Generated
    public int hashCode() {
        int hashCode = (super.hashCode() * 59) + (isFailOnUnsuccessfulRecords() ? 79 : 97);
        String streamName = getStreamName();
        int hashCode2 = (hashCode * 59) + (streamName == null ? 43 : streamName.hashCode());
        String streamArn = getStreamArn();
        int hashCode3 = (hashCode2 * 59) + (streamArn == null ? 43 : streamArn.hashCode());
        Object records = getRecords();
        return (hashCode3 * 59) + (records == null ? 43 : records.hashCode());
    }

    @Generated
    public boolean isFailOnUnsuccessfulRecords() {
        return this.failOnUnsuccessfulRecords;
    }

    @Generated
    public String getStreamName() {
        return this.streamName;
    }

    @Generated
    public String getStreamArn() {
        return this.streamArn;
    }

    @Generated
    public Object getRecords() {
        return this.records;
    }

    @Generated
    public PutRecords() {
        this.failOnUnsuccessfulRecords = $default$failOnUnsuccessfulRecords();
    }
}
