package com.hazelcast.connector;

import com.hazelcast.connector.map.Hz3MapAdapter;
import com.hazelcast.connector.map.Reader;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.HeapData;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.map.impl.LazyMapEntry;
import com.hazelcast.nio.serialization.HazelcastSerializationException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/hazelcast/connector/ReadMapOrCacheP.class */
public final class ReadMapOrCacheP<F extends CompletableFuture, B, R> extends AbstractProcessor {
    private final Reader<F, B, R> reader;
    private final int[] partitionIds;
    private final BooleanSupplier migrationWatcher;
    private final int[] readOffsets;
    private F[] readFutures;
    private List<R> currentBatch;
    private int currentBatchPosition;
    private int currentPartitionIndex;
    private int numCompletedPartitions;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/hazelcast/connector/ReadMapOrCacheP$RemoteProcessorSupplier.class */
    static class RemoteProcessorSupplier<F extends CompletableFuture, B, R> implements ProcessorSupplier {
        static final long serialVersionUID = 1;
        private final String mapName;
        private final String clientXml;
        private transient Hz3MapAdapter hz3MapAdapter;
        private transient InternalSerializationService serializationService;
        private transient int totalParallelism;
        private transient int baseIndex;

        /* JADX INFO: Access modifiers changed from: package-private */
        public RemoteProcessorSupplier(String str, @Nonnull String str2) {
            this.mapName = str;
            this.clientXml = str2;
        }

        public void init(@Nonnull ProcessorSupplier.Context context) {
            this.hz3MapAdapter = Hz3Util.createMapAdapter(this.clientXml);
            this.serializationService = context.hazelcastInstance().node.getCompatibilitySerializationService();
            this.totalParallelism = context.totalParallelism();
            this.baseIndex = context.memberIndex() * context.localParallelism();
        }

        public void close(Throwable th) {
            if (this.hz3MapAdapter != null) {
                this.hz3MapAdapter.shutdown();
            }
        }

        @Nonnull
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public List<Processor> m1get(int i) {
            int partitionCount = this.hz3MapAdapter.getPartitionCount();
            BooleanSupplier createWatcher = this.hz3MapAdapter.createWatcher();
            return (List) IntStream.range(0, i).mapToObj(i2 -> {
                return new ReadMapOrCacheP(this.hz3MapAdapter.reader(this.mapName, toObjectFn()), Util.roundRobinPart(partitionCount, this.totalParallelism, this.baseIndex + i2), createWatcher);
            }).collect(Collectors.toList());
        }

        private Function<Map.Entry<byte[], byte[]>, Object> toObjectFn() {
            return entry -> {
                return new LazyMapEntry(new HeapData((byte[]) entry.getKey()), new HeapData((byte[]) entry.getValue()), this.serializationService);
            };
        }
    }

    private ReadMapOrCacheP(@Nonnull Reader<F, B, R> reader, @Nonnull int[] iArr, @Nonnull BooleanSupplier booleanSupplier) {
        this.currentBatch = Collections.emptyList();
        this.currentPartitionIndex = -1;
        this.reader = reader;
        this.partitionIds = iArr;
        this.migrationWatcher = booleanSupplier;
        this.readOffsets = new int[iArr.length];
        Arrays.fill(this.readOffsets, Integer.MAX_VALUE);
    }

    public boolean complete() {
        if (this.readFutures == null) {
            initialRead();
        }
        while (emitResultSet()) {
            if (!tryGetNextResultSet()) {
                return this.numCompletedPartitions == this.partitionIds.length;
            }
        }
        return false;
    }

    private void initialRead() {
        this.readFutures = (F[]) new CompletableFuture[this.partitionIds.length];
        for (int i = 0; i < this.readFutures.length; i++) {
            ((F[]) this.readFutures)[i] = this.reader.readBatch(this.partitionIds[i], Integer.MAX_VALUE);
        }
    }

    private boolean emitResultSet() {
        checkMigration();
        while (this.currentBatchPosition < this.currentBatch.size()) {
            Object object = this.reader.toObject(this.currentBatch.get(this.currentBatchPosition));
            if (object != null && !tryEmit(object)) {
                return false;
            }
            this.currentBatchPosition++;
        }
        return true;
    }

    private boolean tryGetNextResultSet() {
        while (this.currentBatch.size() == this.currentBatchPosition) {
            int i = this.currentPartitionIndex + 1;
            this.currentPartitionIndex = i;
            if (i >= this.partitionIds.length) {
                break;
            }
            if (this.readOffsets[this.currentPartitionIndex] >= 0) {
                F f = this.readFutures[this.currentPartitionIndex];
                if (f.isDone()) {
                    B batchResult = toBatchResult(f);
                    int nextIndex = this.reader.toNextIndex(batchResult);
                    if (nextIndex < 0) {
                        this.numCompletedPartitions++;
                    } else if (!$assertionsDisabled && this.currentBatch.isEmpty()) {
                        throw new AssertionError("empty but not terminal batch");
                    }
                    this.currentBatch = this.reader.toRecordSet(batchResult);
                    this.currentBatchPosition = 0;
                    this.readOffsets[this.currentPartitionIndex] = nextIndex;
                    ((F[]) this.readFutures)[this.currentPartitionIndex] = this.readOffsets[this.currentPartitionIndex] >= 0 ? this.reader.readBatch(this.partitionIds[this.currentPartitionIndex], this.readOffsets[this.currentPartitionIndex]) : null;
                } else {
                    continue;
                }
            } else if (!$assertionsDisabled && this.readFutures[this.currentPartitionIndex] != null) {
                throw new AssertionError("future not null");
            }
        }
        if (this.currentPartitionIndex != this.partitionIds.length) {
            return true;
        }
        this.currentPartitionIndex = -1;
        return false;
    }

    private B toBatchResult(F f) {
        try {
            return (B) this.reader.toBatchResult(f);
        } catch (InterruptedException e) {
            throw ExceptionUtil.rethrow(e);
        } catch (ExecutionException e2) {
            Throwable peel = ExceptionUtil.peel(e2);
            if (peel instanceof HazelcastSerializationException) {
                throw new JetException("Serialization error when reading the map: are the key, value, predicate and projection classes visible to IMDG? You need to use User Code Deployment, adding the classes to JetConfig isn't enough", e2);
            }
            throw ExceptionUtil.rethrow(peel);
        }
    }

    private void checkMigration() {
        if (this.migrationWatcher.getAsBoolean()) {
            throw new RestartableException("Partition migration detected");
        }
    }

    static {
        $assertionsDisabled = !ReadMapOrCacheP.class.desiredAssertionStatus();
    }
}
