package com.couchbase.client.core.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import java.util.List;
import rx.Observable;
import rx.functions.Func1;

@InterfaceAudience.Public
@InterfaceStability.Experimental
/* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator.class */
public class BucketStreamAggregator {
    public static String DEFAULT_CONNECTION_NAME = "jvmCore";
    private final ClusterFacade core;
    private final String bucket;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.core.dcp.BucketStreamAggregator$1, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator$1.class */
    public class AnonymousClass1 implements Func1<OpenConnectionResponse, Observable<DCPRequest>> {
        final /* synthetic */ BucketStreamAggregatorState val$aggregatorState;
        final /* synthetic */ String val$connectionName;

        AnonymousClass1(BucketStreamAggregatorState bucketStreamAggregatorState, String str) {
            this.val$aggregatorState = bucketStreamAggregatorState;
            this.val$connectionName = str;
        }

        public Observable<DCPRequest> call(final OpenConnectionResponse openConnectionResponse) {
            return Observable.from(this.val$aggregatorState).flatMap(new Func1<BucketStreamState, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1.2
                public Observable<StreamRequestResponse> call(final BucketStreamState bucketStreamState) {
                    return BucketStreamAggregator.this.core.send(new StreamRequestRequest(AnonymousClass1.this.val$connectionName, bucketStreamState.partition(), bucketStreamState.vbucketUUID(), bucketStreamState.startSequenceNumber(), bucketStreamState.endSequenceNumber(), bucketStreamState.snapshotStartSequenceNumber(), bucketStreamState.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket)).flatMap(new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1.2.1
                        public Observable<StreamRequestResponse> call(StreamRequestResponse streamRequestResponse) {
                            long rollbackToSequenceNumber;
                            switch (AnonymousClass3.$SwitchMap$com$couchbase$client$core$message$ResponseStatus[streamRequestResponse.status().ordinal()]) {
                                case 1:
                                    rollbackToSequenceNumber = 0;
                                    break;
                                case 2:
                                    rollbackToSequenceNumber = streamRequestResponse.rollbackToSequenceNumber();
                                    break;
                                default:
                                    return Observable.just(streamRequestResponse);
                            }
                            return BucketStreamAggregator.this.core.send(new StreamRequestRequest(AnonymousClass1.this.val$connectionName, bucketStreamState.partition(), bucketStreamState.vbucketUUID(), rollbackToSequenceNumber, bucketStreamState.endSequenceNumber(), bucketStreamState.snapshotStartSequenceNumber(), bucketStreamState.snapshotEndSequenceNumber(), BucketStreamAggregator.this.bucket));
                        }
                    });
                }
            }).toList().flatMap(new Func1<List<StreamRequestResponse>, Observable<DCPRequest>>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.1.1
                public Observable<DCPRequest> call(List<StreamRequestResponse> list) {
                    return openConnectionResponse.connection().subject();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.client.core.dcp.BucketStreamAggregator$3, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/client/core/dcp/BucketStreamAggregator$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$couchbase$client$core$message$ResponseStatus = new int[ResponseStatus.values().length];

        static {
            try {
                $SwitchMap$com$couchbase$client$core$message$ResponseStatus[ResponseStatus.RANGE_ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$couchbase$client$core$message$ResponseStatus[ResponseStatus.ROLLBACK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public BucketStreamAggregator(ClusterFacade clusterFacade, String str) {
        this.core = clusterFacade;
        this.bucket = str;
    }

    public Observable<DCPRequest> feed() {
        BucketStreamAggregatorState bucketStreamAggregatorState = new BucketStreamAggregatorState(DEFAULT_CONNECTION_NAME);
        int intValue = ((Integer) partitionSize().toBlocking().first()).intValue();
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= intValue) {
                return feed(bucketStreamAggregatorState);
            }
            bucketStreamAggregatorState.put(new BucketStreamState(s2, 0L, 0L, -1L, 0L, -1L));
            s = (short) (s2 + 1);
        }
    }

    public Observable<DCPRequest> feed(BucketStreamAggregatorState bucketStreamAggregatorState) {
        String name = bucketStreamAggregatorState.name();
        return this.core.send(new OpenConnectionRequest(name, this.bucket)).flatMap(new AnonymousClass1(bucketStreamAggregatorState, name));
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>() { // from class: com.couchbase.client.core.dcp.BucketStreamAggregator.2
            public Integer call(GetClusterConfigResponse getClusterConfigResponse) {
                return Integer.valueOf(((CouchbaseBucketConfig) getClusterConfigResponse.config().bucketConfig(BucketStreamAggregator.this.bucket)).numberOfPartitions());
            }
        });
    }
}
