/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utilities.sources;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.GcsEventsSource;
import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

public class TestGcsEventsSource
extends UtilitiesTestBase {
    @Mock
    PubsubMessagesFetcher pubsubMessagesFetcher;
    protected FilebasedSchemaProvider schemaProvider;
    private TypedProperties props;
    private static final String CHECKPOINT_VALUE_ZERO = "0";

    @BeforeAll
    public static void beforeAll() throws Exception {
        UtilitiesTestBase.initTestServices();
    }

    @BeforeEach
    public void beforeEach() throws Exception {
        this.schemaProvider = new FilebasedSchemaProvider(UtilitiesTestBase.Helpers.setupSchemaOnDFS(), jsc);
        MockitoAnnotations.initMocks((Object)this);
        this.props = new TypedProperties();
        this.props.put((Object)"hoodie.deltastreamer.source.gcs.project.id", (Object)"dummy-project");
        this.props.put((Object)"hoodie.deltastreamer.source.gcs.subscription.id", (Object)"dummy-subscription");
    }

    @Test
    public void shouldReturnEmptyOnNoMessages() {
        Mockito.when((Object)this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Collections.emptyList());
        GcsEventsSource source = new GcsEventsSource(this.props, jsc, sparkSession, null, this.pubsubMessagesFetcher);
        Pair expected = Pair.of((Object)Option.empty(), (Object)CHECKPOINT_VALUE_ZERO);
        Pair dataAndCheckpoint = source.fetchNextBatch(Option.of((Object)CHECKPOINT_VALUE_ZERO), 100L);
        Assertions.assertEquals((Object)expected, (Object)dataAndCheckpoint);
    }

    @Test
    public void shouldReturnDataOnValidMessages() {
        ReceivedMessage msg1 = this.fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}");
        ReceivedMessage msg2 = this.fileCreateMessage("objectId-2", "{'data':{'bucket':'bucket-2'}}");
        Mockito.when((Object)this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(msg1, msg2));
        GcsEventsSource source = new GcsEventsSource(this.props, jsc, sparkSession, null, this.pubsubMessagesFetcher);
        Pair dataAndCheckpoint = source.fetchNextBatch(Option.of((Object)CHECKPOINT_VALUE_ZERO), 100L);
        source.onCommit((String)dataAndCheckpoint.getRight());
        Assertions.assertEquals((Object)CHECKPOINT_VALUE_ZERO, (Object)dataAndCheckpoint.getRight());
        Dataset resultDs = (Dataset)((Option)dataAndCheckpoint.getLeft()).get();
        List result = resultDs.collectAsList();
        this.assertBucket((Row)result.get(0), "bucket-1");
        this.assertBucket((Row)result.get(1), "bucket-2");
        ((PubsubMessagesFetcher)Mockito.verify((Object)this.pubsubMessagesFetcher)).fetchMessages();
    }

    @Test
    public void shouldFetchMessagesInBatches() {
        ReceivedMessage msg1 = this.fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}");
        ReceivedMessage msg2 = this.fileCreateMessage("objectId-2", "{'data':{'bucket':'bucket-2'}}");
        ReceivedMessage msg3 = this.fileCreateMessage("objectId-3", "{'data':{'bucket':'bucket-3'}}");
        ReceivedMessage msg4 = this.fileCreateMessage("objectId-4", "{'data':{'bucket':'bucket-4'}}");
        Mockito.when((Object)this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(msg1, msg2)).thenReturn(Arrays.asList(msg3, msg4));
        GcsEventsSource source = new GcsEventsSource(this.props, jsc, sparkSession, null, this.pubsubMessagesFetcher);
        Pair dataAndCheckpoint1 = source.fetchNextBatch(Option.of((Object)CHECKPOINT_VALUE_ZERO), 100L);
        source.onCommit((String)dataAndCheckpoint1.getRight());
        Assertions.assertEquals((Object)CHECKPOINT_VALUE_ZERO, (Object)dataAndCheckpoint1.getRight());
        List result1 = ((Dataset)((Option)dataAndCheckpoint1.getLeft()).get()).collectAsList();
        this.assertBucket((Row)result1.get(0), "bucket-1");
        this.assertBucket((Row)result1.get(1), "bucket-2");
        Pair dataAndCheckpoint2 = source.fetchNextBatch(Option.of((Object)CHECKPOINT_VALUE_ZERO), 100L);
        source.onCommit((String)dataAndCheckpoint2.getRight());
        List result2 = ((Dataset)((Option)dataAndCheckpoint2.getLeft()).get()).collectAsList();
        this.assertBucket((Row)result2.get(0), "bucket-3");
        this.assertBucket((Row)result2.get(1), "bucket-4");
        ((PubsubMessagesFetcher)Mockito.verify((Object)this.pubsubMessagesFetcher, (VerificationMode)Mockito.times((int)2))).fetchMessages();
    }

    @Test
    public void shouldSkipInvalidMessages1() {
        ReceivedMessage invalid1 = this.fileDeleteMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}");
        ReceivedMessage invalid2 = this.fileCreateMessageWithOverwroteGen("objectId-2", "{'data':{'bucket':'bucket-2'}}");
        ReceivedMessage valid1 = this.fileCreateMessage("objectId-3", "{'data':{'bucket':'bucket-3'}}");
        Mockito.when((Object)this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(invalid1, valid1, invalid2));
        GcsEventsSource source = new GcsEventsSource(this.props, jsc, sparkSession, null, this.pubsubMessagesFetcher);
        Pair dataAndCheckpoint = source.fetchNextBatch(Option.of((Object)CHECKPOINT_VALUE_ZERO), 100L);
        source.onCommit((String)dataAndCheckpoint.getRight());
        Assertions.assertEquals((Object)CHECKPOINT_VALUE_ZERO, (Object)dataAndCheckpoint.getRight());
        Dataset resultDs = (Dataset)((Option)dataAndCheckpoint.getLeft()).get();
        List result = resultDs.collectAsList();
        Assertions.assertEquals((int)1, (int)result.size());
        this.assertBucket((Row)result.get(0), "bucket-3");
        ((PubsubMessagesFetcher)Mockito.verify((Object)this.pubsubMessagesFetcher)).fetchMessages();
    }

    @Test
    public void shouldGcsEventsSourceDoesNotDedupeInterally() {
        ReceivedMessage dupe1 = this.fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}");
        ReceivedMessage dupe2 = this.fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}");
        Mockito.when((Object)this.pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(dupe1, dupe2));
        GcsEventsSource source = new GcsEventsSource(this.props, jsc, sparkSession, null, this.pubsubMessagesFetcher);
        Pair dataAndCheckpoint = source.fetchNextBatch(Option.of((Object)CHECKPOINT_VALUE_ZERO), 100L);
        source.onCommit((String)dataAndCheckpoint.getRight());
        Assertions.assertEquals((Object)CHECKPOINT_VALUE_ZERO, (Object)dataAndCheckpoint.getRight());
        Dataset resultDs = (Dataset)((Option)dataAndCheckpoint.getLeft()).get();
        List result = resultDs.collectAsList();
        Assertions.assertEquals((int)2, (int)result.size());
        this.assertBucket((Row)result.get(0), "bucket-1");
        this.assertBucket((Row)result.get(1), "bucket-1");
        ((PubsubMessagesFetcher)Mockito.verify((Object)this.pubsubMessagesFetcher)).fetchMessages();
    }

    private ReceivedMessage fileCreateMessageWithOverwroteGen(String objectId, String payload) {
        HashMap<String, String> attrs = new HashMap<String, String>();
        attrs.put("overwroteGeneration", "objectId-N");
        return ReceivedMessage.newBuilder().setMessage(this.objectWithEventTypeAndAttrs(objectId, "OBJECT_FINALIZE", attrs, payload)).setAckId(objectId).build();
    }

    private ReceivedMessage fileCreateMessage(String objectId, String payload) {
        return ReceivedMessage.newBuilder().setMessage(this.objectFinalizeMessage(objectId, payload)).setAckId(objectId).build();
    }

    private ReceivedMessage fileDeleteMessage(String objectId, String payload) {
        return ReceivedMessage.newBuilder().setMessage(this.objectDeleteMessage(objectId, payload)).setAckId(objectId).build();
    }

    private PubsubMessage.Builder objectFinalizeMessage(String objectId, String dataMessage) {
        return this.objectWithEventType(objectId, "OBJECT_FINALIZE", dataMessage);
    }

    private PubsubMessage.Builder objectDeleteMessage(String objectId, String dataMessage) {
        return this.objectWithEventType(objectId, "OBJECT_DELETE", dataMessage);
    }

    private PubsubMessage.Builder objectWithEventType(String objectId, String eventType, String dataMessage) {
        return this.messageWithAttrs(this.createBasicAttrs(objectId, eventType), dataMessage);
    }

    private PubsubMessage.Builder objectWithEventTypeAndAttrs(String objectId, String eventType, Map<String, String> attrs, String dataMessage) {
        Map<String, String> allAttrs = this.createBasicAttrs(objectId, eventType);
        allAttrs.putAll(attrs);
        return this.messageWithAttrs(allAttrs, dataMessage);
    }

    private Map<String, String> createBasicAttrs(String objectId, String eventType) {
        HashMap<String, String> map = new HashMap<String, String>();
        map.put("objectId", objectId);
        map.put("eventType", eventType);
        return map;
    }

    private PubsubMessage.Builder messageWithAttrs(Map<String, String> attrs, String dataMessage) {
        return PubsubMessage.newBuilder().putAllAttributes(new HashMap<String, String>(attrs)).setData(ByteString.copyFrom((byte[])dataMessage.getBytes()));
    }

    private void assertBucket(Row row, String expectedBucketName) {
        Row record = (Row)row.getAs("data");
        String bucket = (String)record.getAs("bucket");
        Assertions.assertEquals((Object)expectedBucketName, (Object)bucket);
    }
}

