package org.apache.beam.sdk.io.aws.dynamodb;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.aws.dynamodb.AutoValue_DynamoDBIO_Read;
import org.apache.beam.sdk.io.aws.dynamodb.AutoValue_DynamoDBIO_RetryConfiguration;
import org.apache.beam.sdk.io.aws.dynamodb.AutoValue_DynamoDBIO_Write;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.class */
public final class DynamoDBIO {

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Read.class */
    public static abstract class Read<T> extends PTransform<PBegin, PCollection<T>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Read$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setAwsClientsProvider(AwsClientsProvider awsClientsProvider);

            abstract Builder<T> setScanRequestFn(SerializableFunction<Void, ScanRequest> serializableFunction);

            abstract Builder<T> setSegmentId(Integer num);

            abstract Builder<T> setScanResultMapperFn(SerializableFunction<ScanResult, T> serializableFunction);

            abstract Builder<T> setCoder(Coder<T> coder);

            abstract Read<T> build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Read$ItemsMapper.class */
        static final class ItemsMapper implements SerializableFunction<ScanResult, List<Map<String, AttributeValue>>> {
            ItemsMapper() {
            }

            public List<Map<String, AttributeValue>> apply(ScanResult scanResult) {
                return scanResult == null ? Collections.emptyList() : scanResult.getItems();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Read$ReadFn.class */
        public static class ReadFn<T> extends DoFn<Read<T>, T> {
            private ReadFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Read<T> read, DoFn.OutputReceiver<T> outputReceiver) {
                AmazonDynamoDB createDynamoDB = read.getAwsClientsProvider().createDynamoDB();
                Map map = null;
                do {
                    ScanRequest scanRequest = (ScanRequest) read.getScanRequestFn().apply((Object) null);
                    scanRequest.setSegment(read.getSegmentId());
                    if (map != null) {
                        scanRequest.withExclusiveStartKey(map);
                    }
                    ScanResult scan = createDynamoDB.scan(scanRequest);
                    outputReceiver.output(read.getScanResultMapperFn().apply(scan));
                    map = scan.getLastEvaluatedKey();
                } while (map != null);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Read$SplitFn.class */
        public static class SplitFn<T> extends DoFn<Read<T>, Read<T>> {
            private SplitFn() {
            }

            @DoFn.ProcessElement
            public void processElement(@DoFn.Element Read<T> read, DoFn.OutputReceiver<Read<T>> outputReceiver) {
                ScanRequest scanRequest = (ScanRequest) read.getScanRequestFn().apply((Object) null);
                for (int i = 0; i < scanRequest.getTotalSegments().intValue(); i++) {
                    outputReceiver.output(read.withSegmentId(Integer.valueOf(i)));
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AwsClientsProvider getAwsClientsProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<Void, ScanRequest> getScanRequestFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Integer getSegmentId();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<ScanResult, T> getScanResultMapperFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Coder<T> getCoder();

        abstract Builder<T> toBuilder();

        public Read<T> withAwsClientsProvider(AwsClientsProvider awsClientsProvider) {
            return toBuilder().setAwsClientsProvider(awsClientsProvider).build();
        }

        public Read<T> withAwsClientsProvider(String str, String str2, Regions regions, String str3) {
            return withAwsClientsProvider(new BasicDynamoDBProvider(str, str2, regions, str3));
        }

        public Read<T> withAwsClientsProvider(String str, String str2, Regions regions) {
            return withAwsClientsProvider(str, str2, regions, null);
        }

        public Read<T> withScanRequestFn(SerializableFunction<Void, ScanRequest> serializableFunction) {
            return toBuilder().setScanRequestFn(serializableFunction).build();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Read<T> withSegmentId(Integer num) {
            Preconditions.checkArgument(num != null, "segmentId can not be null");
            return toBuilder().setSegmentId(num).build();
        }

        public Read<T> withScanResultMapperFn(SerializableFunction<ScanResult, T> serializableFunction) {
            Preconditions.checkArgument(serializableFunction != null, "scanResultMapper can not be null");
            return toBuilder().setScanResultMapperFn(serializableFunction).build();
        }

        public Read<List<Map<String, AttributeValue>>> items() {
            return withScanResultMapperFn(new ItemsMapper()).withCoder(ListCoder.of(MapCoder.of(StringUtf8Coder.of(), AttributeValueCoder.of())));
        }

        public Read<T> withCoder(Coder<T> coder) {
            Preconditions.checkArgument(coder != null, "coder can not be null");
            return toBuilder().setCoder(coder).build();
        }

        public PCollection<T> expand(PBegin pBegin) {
            Preconditions.checkArgument(getScanRequestFn() != null, "withScanRequestFn() is required");
            Preconditions.checkArgument(getAwsClientsProvider() != null, "withAwsClientsProvider() is required");
            ScanRequest scanRequest = (ScanRequest) getScanRequestFn().apply((Object) null);
            Preconditions.checkArgument(scanRequest.getTotalSegments() != null && scanRequest.getTotalSegments().intValue() > 0, "TotalSegments is required with withScanRequestFn() and greater zero");
            PCollection apply = pBegin.apply("Create", Create.of(this, new Read[0])).apply("Split", ParDo.of(new SplitFn()));
            apply.setCoder(SerializableCoder.of(new TypeDescriptor<Read<T>>() { // from class: org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO.Read.1
            }));
            PCollection<T> apply2 = apply.apply("Reshuffle", Reshuffle.viaRandomKey()).apply("Read", ParDo.of(new ReadFn()));
            apply2.setCoder(getCoder());
            return apply2;
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$RetryConfiguration.class */
    public static abstract class RetryConfiguration implements Serializable {
        private static final Duration DEFAULT_INITIAL_DURATION = Duration.standardSeconds(5);

        @VisibleForTesting
        static final RetryPredicate DEFAULT_RETRY_PREDICATE = new DefaultRetryPredicate();

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$RetryConfiguration$Builder.class */
        public static abstract class Builder {
            abstract Builder setMaxAttempts(int i);

            abstract Builder setMaxDuration(Duration duration);

            abstract Builder setInitialDuration(Duration duration);

            abstract Builder setRetryPredicate(RetryPredicate retryPredicate);

            abstract RetryConfiguration build();
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$RetryConfiguration$DefaultRetryPredicate.class */
        private static class DefaultRetryPredicate implements RetryPredicate {
            private static final ImmutableSet<Integer> ELIGIBLE_CODES = ImmutableSet.of(503);

            private DefaultRetryPredicate() {
            }

            @Override // java.util.function.Predicate
            public boolean test(Throwable th) {
                return (th instanceof IOException) || (th instanceof AmazonDynamoDBException) || ((th instanceof AmazonDynamoDBException) && ELIGIBLE_CODES.contains(Integer.valueOf(((AmazonDynamoDBException) th).getStatusCode())));
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @FunctionalInterface
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$RetryConfiguration$RetryPredicate.class */
        public interface RetryPredicate extends Predicate<Throwable>, Serializable {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract int getMaxAttempts();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxDuration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getInitialDuration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryPredicate getRetryPredicate();

        abstract Builder builder();

        public static RetryConfiguration create(int i, Duration duration) {
            return create(i, duration, DEFAULT_INITIAL_DURATION);
        }

        static RetryConfiguration create(int i, Duration duration, Duration duration2) {
            Preconditions.checkArgument(i > 0, "maxAttempts should be greater than 0");
            Preconditions.checkArgument(duration != null && duration.isLongerThan(Duration.ZERO), "maxDuration should be greater than 0");
            Preconditions.checkArgument(duration2 != null && duration2.isLongerThan(Duration.ZERO), "initialDuration should be greater than 0");
            return new AutoValue_DynamoDBIO_RetryConfiguration.Builder().setMaxAttempts(i).setMaxDuration(duration).setInitialDuration(duration2).setRetryPredicate(DEFAULT_RETRY_PREDICATE).build();
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Write.class */
    public static abstract class Write<T> extends PTransform<PCollection<T>, PCollection<Void>> {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Write$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setAwsClientsProvider(AwsClientsProvider awsClientsProvider);

            abstract Builder<T> setRetryConfiguration(RetryConfiguration retryConfiguration);

            abstract Builder<T> setWriteItemMapperFn(SerializableFunction<T, KV<String, WriteRequest>> serializableFunction);

            abstract Builder<T> setDeduplicateKeys(List<String> list);

            abstract Write<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO$Write$WriteFn.class */
        public static class WriteFn<T> extends DoFn<T, Void> {

            @VisibleForTesting
            static final String RETRY_ERROR_LOG = "Error writing items to DynamoDB [attempts:{}]: {}";
            private static final String RESUME_ERROR_LOG = "Error writing remaining unprocessed items to DynamoDB: {}";
            private static final String ERROR_NO_RETRY = "Error writing to DynamoDB. No attempt made to retry";
            private static final String ERROR_RETRIES_EXCEEDED = "Error writing to DynamoDB after %d attempt(s). No more attempts allowed";
            private static final String ERROR_UNPROCESSED_ITEMS = "Error writing to DynamoDB. Unprocessed items remaining";
            private transient FluentBackoff resumeBackoff;
            private transient FluentBackoff retryBackoff;
            private static final Logger LOG = LoggerFactory.getLogger(WriteFn.class);
            private static final Counter DYNAMO_DB_WRITE_FAILURES = Metrics.counter(WriteFn.class, "DynamoDB_Write_Failures");
            private static final int BATCH_SIZE = 25;
            private transient AmazonDynamoDB client;
            private final Write<T> spec;
            private Map<KV<String, Map<String, AttributeValue>>, KV<String, WriteRequest>> batch;

            WriteFn(Write<T> write) {
                this.spec = write;
            }

            @DoFn.Setup
            public void setup() {
                this.client = this.spec.getAwsClientsProvider().createDynamoDB();
                this.resumeBackoff = FluentBackoff.DEFAULT;
                this.retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(0);
                RetryConfiguration retryConfiguration = this.spec.getRetryConfiguration();
                if (retryConfiguration != null) {
                    this.resumeBackoff = this.resumeBackoff.withInitialBackoff(retryConfiguration.getInitialDuration());
                    this.retryBackoff = this.retryBackoff.withMaxRetries(retryConfiguration.getMaxAttempts() - 1).withInitialBackoff(retryConfiguration.getInitialDuration()).withMaxCumulativeBackoff(retryConfiguration.getMaxDuration());
                }
            }

            @DoFn.StartBundle
            public void startBundle(DoFn<T, Void>.StartBundleContext startBundleContext) {
                this.batch = new HashMap();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<T, Void>.ProcessContext processContext) throws Exception {
                KV<String, WriteRequest> kv = (KV) this.spec.getWriteItemMapperFn().apply(processContext.element());
                this.batch.put(KV.of((String) kv.getKey(), extractDeduplicateKeyValues((WriteRequest) kv.getValue())), kv);
                if (this.batch.size() >= BATCH_SIZE) {
                    flushBatch();
                }
            }

            private Map<String, AttributeValue> extractDeduplicateKeyValues(WriteRequest writeRequest) {
                List<String> deduplicateKeys = this.spec.getDeduplicateKeys();
                Map<String, AttributeValue> emptyMap = Collections.emptyMap();
                if (writeRequest.getPutRequest() != null) {
                    emptyMap = writeRequest.getPutRequest().getItem();
                } else if (writeRequest.getDeleteRequest() != null) {
                    emptyMap = writeRequest.getDeleteRequest().getKey();
                }
                return (emptyMap.isEmpty() || deduplicateKeys.isEmpty()) ? emptyMap : (Map) emptyMap.entrySet().stream().filter(entry -> {
                    return deduplicateKeys.contains(entry.getKey());
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
            }

            @DoFn.FinishBundle
            public void finishBundle(DoFn<T, Void>.FinishBundleContext finishBundleContext) throws Exception {
                flushBatch();
            }

            private void flushBatch() throws IOException, InterruptedException {
                if (this.batch.isEmpty()) {
                    return;
                }
                try {
                    Map map = (Map) this.batch.values().stream().collect(Collectors.groupingBy((v0) -> {
                        return v0.getKey();
                    }, Collectors.mapping((v0) -> {
                        return v0.getValue();
                    }, Collectors.toList())));
                    BackOff backoff = this.resumeBackoff.backoff();
                    do {
                        map = writeWithRetries(new BatchWriteItemRequest(map)).getUnprocessedItems();
                        if (map.isEmpty()) {
                            break;
                        }
                    } while (BackOffUtils.next(Sleeper.DEFAULT, backoff));
                    if (map.isEmpty()) {
                        return;
                    }
                    DYNAMO_DB_WRITE_FAILURES.inc();
                    LOG.error(RESUME_ERROR_LOG, map);
                    throw new IOException(ERROR_UNPROCESSED_ITEMS);
                } finally {
                    this.batch.clear();
                }
            }

            /* JADX WARN: Removed duplicated region for block: B:17:0x0057  */
            /* JADX WARN: Removed duplicated region for block: B:20:0x006b  */
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            private com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult writeWithRetries(com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest r9) throws java.io.IOException, java.lang.InterruptedException {
                /*
                    r8 = this;
                    r0 = r8
                    org.apache.beam.sdk.util.FluentBackoff r0 = r0.retryBackoff
                    org.apache.beam.sdk.util.BackOff r0 = r0.backoff()
                    r10 = r0
                    r0 = 0
                    r12 = r0
                Lb:
                    int r12 = r12 + 1
                    r0 = r8
                    com.amazonaws.services.dynamodbv2.AmazonDynamoDB r0 = r0.client     // Catch: java.lang.Exception -> L19
                    r1 = r9
                    com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult r0 = r0.batchWriteItem(r1)     // Catch: java.lang.Exception -> L19
                    return r0
                L19:
                    r13 = move-exception
                    r0 = r13
                    r11 = r0
                    r0 = r8
                    r1 = r11
                    boolean r0 = r0.canRetry(r1)
                    if (r0 == 0) goto L30
                    org.apache.beam.sdk.util.Sleeper r0 = org.apache.beam.sdk.util.Sleeper.DEFAULT
                    r1 = r10
                    boolean r0 = org.apache.beam.sdk.util.BackOffUtils.next(r0, r1)
                    if (r0 != 0) goto Lb
                L30:
                    org.apache.beam.sdk.metrics.Counter r0 = org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO.Write.WriteFn.DYNAMO_DB_WRITE_FAILURES
                    r0.inc()
                    org.slf4j.Logger r0 = org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO.Write.WriteFn.LOG
                    java.lang.String r1 = "Error writing items to DynamoDB [attempts:{}]: {}"
                    r2 = r12
                    java.lang.Integer r2 = java.lang.Integer.valueOf(r2)
                    r3 = r9
                    java.util.Map r3 = r3.getRequestItems()
                    r0.warn(r1, r2, r3)
                    java.io.IOException r0 = new java.io.IOException
                    r1 = r0
                    r2 = r8
                    r3 = r11
                    boolean r2 = r2.canRetry(r3)
                    if (r2 == 0) goto L6b
                    java.lang.String r2 = "Error writing to DynamoDB after %d attempt(s). No more attempts allowed"
                    r3 = 1
                    java.lang.Object[] r3 = new java.lang.Object[r3]
                    r4 = r3
                    r5 = 0
                    r6 = r12
                    java.lang.Integer r6 = java.lang.Integer.valueOf(r6)
                    r4[r5] = r6
                    java.lang.String r2 = java.lang.String.format(r2, r3)
                    goto L6d
                L6b:
                    java.lang.String r2 = "Error writing to DynamoDB. No attempt made to retry"
                L6d:
                    r3 = r11
                    r1.<init>(r2, r3)
                    throw r0
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.aws.dynamodb.DynamoDBIO.Write.WriteFn.writeWithRetries(com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest):com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult");
            }

            private boolean canRetry(Exception exc) {
                return this.spec.getRetryConfiguration() != null && this.spec.getRetryConfiguration().getRetryPredicate().test(exc);
            }

            @DoFn.Teardown
            public void tearDown() {
                if (this.client != null) {
                    this.client.shutdown();
                    this.client = null;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract AwsClientsProvider getAwsClientsProvider();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract RetryConfiguration getRetryConfiguration();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SerializableFunction<T, KV<String, WriteRequest>> getWriteItemMapperFn();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract List<String> getDeduplicateKeys();

        abstract Builder<T> builder();

        public Write<T> withAwsClientsProvider(AwsClientsProvider awsClientsProvider) {
            return builder().setAwsClientsProvider(awsClientsProvider).build();
        }

        public Write<T> withAwsClientsProvider(String str, String str2, Regions regions, String str3) {
            return withAwsClientsProvider(new BasicDynamoDBProvider(str, str2, regions, str3));
        }

        public Write<T> withAwsClientsProvider(String str, String str2, Regions regions) {
            return withAwsClientsProvider(str, str2, regions, null);
        }

        public Write<T> withRetryConfiguration(RetryConfiguration retryConfiguration) {
            Preconditions.checkArgument(retryConfiguration != null, "retryConfiguration is required");
            return builder().setRetryConfiguration(retryConfiguration).build();
        }

        public Write<T> withWriteRequestMapperFn(SerializableFunction<T, KV<String, WriteRequest>> serializableFunction) {
            return builder().setWriteItemMapperFn(serializableFunction).build();
        }

        public Write<T> withDeduplicateKeys(List<String> list) {
            return builder().setDeduplicateKeys(list).build();
        }

        public PCollection<Void> expand(PCollection<T> pCollection) {
            return pCollection.apply(ParDo.of(new WriteFn(this)));
        }
    }

    public static <T> Read<T> read() {
        return new AutoValue_DynamoDBIO_Read.Builder().build();
    }

    public static <T> Write<T> write() {
        return new AutoValue_DynamoDBIO_Write.Builder().setDeduplicateKeys(new ArrayList()).build();
    }
}
