package com.xiaomi.infra.galaxy.sds.examples.stream;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiaomi.infra.galaxy.rpc.thrift.GrantType;
import com.xiaomi.infra.galaxy.rpc.thrift.Grantee;
import com.xiaomi.infra.galaxy.sds.client.ClientFactory;
import com.xiaomi.infra.galaxy.sds.client.SdsClientConfigKeys;
import com.xiaomi.infra.galaxy.sds.thrift.AdminService;
import com.xiaomi.infra.galaxy.sds.thrift.Credential;
import com.xiaomi.infra.galaxy.sds.thrift.Datum;
import com.xiaomi.infra.galaxy.sds.thrift.DatumUtil;
import com.xiaomi.infra.galaxy.sds.thrift.IncrementRequest;
import com.xiaomi.infra.galaxy.sds.thrift.MutationLogEntry;
import com.xiaomi.infra.galaxy.sds.thrift.MutationType;
import com.xiaomi.infra.galaxy.sds.thrift.PutRequest;
import com.xiaomi.infra.galaxy.sds.thrift.RemoveRequest;
import com.xiaomi.infra.galaxy.sds.thrift.StreamSpec;
import com.xiaomi.infra.galaxy.sds.thrift.StreamViewType;
import com.xiaomi.infra.galaxy.sds.thrift.TableInfo;
import com.xiaomi.infra.galaxy.sds.thrift.TableSchema;
import com.xiaomi.infra.galaxy.sds.thrift.TableService;
import com.xiaomi.infra.galaxy.sds.thrift.UserType;
import com.xiaomi.infra.galaxy.talos.admin.TalosAdmin;
import com.xiaomi.infra.galaxy.talos.client.SimpleTopicAbnormalCallback;
import com.xiaomi.infra.galaxy.talos.client.TalosClientConfig;
import com.xiaomi.infra.galaxy.talos.consumer.MessageCheckpointer;
import com.xiaomi.infra.galaxy.talos.consumer.MessageProcessor;
import com.xiaomi.infra.galaxy.talos.consumer.MessageProcessorFactory;
import com.xiaomi.infra.galaxy.talos.consumer.TalosConsumer;
import com.xiaomi.infra.galaxy.talos.consumer.TalosConsumerConfig;
import com.xiaomi.infra.galaxy.talos.thrift.CreateTopicRequest;
import com.xiaomi.infra.galaxy.talos.thrift.CreateTopicResponse;
import com.xiaomi.infra.galaxy.talos.thrift.MessageAndOffset;
import com.xiaomi.infra.galaxy.talos.thrift.Permission;
import com.xiaomi.infra.galaxy.talos.thrift.TopicAndPartition;
import com.xiaomi.infra.galaxy.talos.thrift.TopicAttribute;
import com.xiaomi.infra.galaxy.talos.thrift.TopicInfo;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/infra/galaxy/sds/examples/stream/StreamDemo.class */
public class StreamDemo {
    static final int PARTITAL_DELETE_NUMBER = 2;
    static final int ADMIN_CONN_TIMEOUT = 3000;
    static final int ADMIN_SOCKET_TIMEOUT = 50000;
    static final int TABLE_CONN_TIMEOUT = 3000;
    static final int TABLE_SOCKET_TIMEOUT = 10000;
    private static final String sdsServiceEndpoint = "$sdsServiceEndpoint";
    private static final String tableName = "streamDemoTable";
    private static final String destTableName = "destStreamDemoTable";
    private static final boolean enableEntityGroup = true;
    private static final boolean enableEntityGroupHash = true;
    private static final String accountKey = "$your_accountKey";
    private static final String accountSecret = "$your_accountSecret";
    private static final String appId = "$your_appId";
    private static final String appKey = "$your_appKey";
    private static final String appSecret = "$your_appSecret";
    private static AdminService.Iface adminClient;
    private static TableService.Iface tableClient;
    private static TableSchema tableSchema;
    private static StreamSpec streamSpec;
    private static final String talosServiceEndpoint = "$talosServiceEndpoint";
    private static final String topicName = "streamDemoTopic";
    private static final int topicPartitionNumber = 8;
    private static final String clientPrefix = "departmentName-";
    private static final String consumerGroup = "groupName";
    private static TalosAdmin talosAdmin;
    private static TopicInfo topicInfo;
    private static final Logger LOG = LoggerFactory.getLogger(StreamDemo.class);
    static Random rand = new Random(0);
    private static final StreamViewType streamViewType = StreamViewType.MUTATE_LOG;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.xiaomi.infra.galaxy.sds.examples.stream.StreamDemo$1, reason: invalid class name */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/sds/examples/stream/StreamDemo$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$xiaomi$infra$galaxy$sds$thrift$MutationType = new int[MutationType.values().length];

        static {
            try {
                $SwitchMap$com$xiaomi$infra$galaxy$sds$thrift$MutationType[MutationType.PUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$xiaomi$infra$galaxy$sds$thrift$MutationType[MutationType.INCREMENT.ordinal()] = StreamDemo.PARTITAL_DELETE_NUMBER;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$xiaomi$infra$galaxy$sds$thrift$MutationType[MutationType.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/sds/examples/stream/StreamDemo$MutateLogProcessor.class */
    public class MutateLogProcessor implements MessageProcessor {
        private MutateLogProcessor() {
        }

        public void init(TopicAndPartition topicAndPartition, long j) {
        }

        public void process(List<MessageAndOffset> list, MessageCheckpointer messageCheckpointer) {
            try {
                if (list.size() > 0) {
                    ArrayList arrayList = new ArrayList(list.size());
                    for (MessageAndOffset messageAndOffset : list) {
                        try {
                            arrayList.add(DatumUtil.deserialize(messageAndOffset.getMessage().getMessage(), MutationLogEntry.class));
                        } catch (Exception e) {
                            StreamDemo.LOG.error("Deserialize message " + messageAndOffset + " got exception ", e);
                            throw e;
                        }
                    }
                    if (!arrayList.isEmpty()) {
                        try {
                            StreamDemo.this.replayMutationLogEntry(arrayList, StreamDemo.destTableName);
                        } catch (Exception e2) {
                            StreamDemo.LOG.error("Replay mutation log entries " + arrayList + " got exception ", e2);
                            throw e2;
                        }
                    }
                }
            } catch (Exception e3) {
                StreamDemo.LOG.error("Process mutation log entries for topic streamDemoTopic get exception ", e3);
                throw new RuntimeException(e3);
            }
        }

        public void shutdown(MessageCheckpointer messageCheckpointer) {
        }

        /* synthetic */ MutateLogProcessor(StreamDemo streamDemo, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xiaomi/infra/galaxy/sds/examples/stream/StreamDemo$MutateLogProcessorFactory.class */
    public class MutateLogProcessorFactory implements MessageProcessorFactory {
        public MutateLogProcessorFactory() {
        }

        /* renamed from: createProcessor, reason: merged with bridge method [inline-methods] */
        public MutateLogProcessor m25createProcessor() {
            return new MutateLogProcessor(StreamDemo.this, null);
        }
    }

    private Credential createSdsCredential(String str, String str2, UserType userType) {
        return new Credential().setSecretKeyId(str).setSecretKey(str2).setType(userType);
    }

    private com.xiaomi.infra.galaxy.rpc.thrift.Credential createTalosCredential(String str, String str2, com.xiaomi.infra.galaxy.rpc.thrift.UserType userType) {
        return new com.xiaomi.infra.galaxy.rpc.thrift.Credential().setSecretKeyId(str).setSecretKey(str2).setType(userType);
    }

    private AdminService.Iface createAdminClient(String str, Credential credential) {
        return new ClientFactory().setCredential(credential).newAdminClient(str + "/v1/api/admin", ADMIN_SOCKET_TIMEOUT, 3000);
    }

    private TableService.Iface createTableClient(String str, Credential credential) {
        return new ClientFactory().setCredential(credential).newTableClient(str + "/v1/api/table", 10000, 3000);
    }

    private void initSdsClient() {
        adminClient = createAdminClient(sdsServiceEndpoint, createSdsCredential(accountKey, accountSecret, UserType.DEV_XIAOMI));
        tableClient = createTableClient(sdsServiceEndpoint, createSdsCredential(appKey, appSecret, UserType.APP_SECRET));
    }

    private void initTalosClient() {
        Properties properties = new Properties();
        properties.setProperty("galaxy.talos.service.endpoint", talosServiceEndpoint);
        talosAdmin = new TalosAdmin(new TalosClientConfig(properties), createTalosCredential(accountKey, accountSecret, com.xiaomi.infra.galaxy.rpc.thrift.UserType.APP_SECRET));
    }

    public StreamDemo() {
        initSdsClient();
        initTalosClient();
    }

    private PutRequest getReplayPutRequest(String str, Map<String, Datum> map) {
        return new PutRequest().setTableName(str).setRecord(map);
    }

    private RemoveRequest getReplayRemoveRequest(String str, Map<String, Datum> map, boolean z) {
        if (z) {
            Preconditions.checkArgument(DataProvider.rowKeyDef(true).keySet().equals(map.keySet()));
            return new RemoveRequest().setTableName(str).setKeys(map);
        }
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Datum> entry : map.entrySet()) {
            if (DataProvider.rowKeyDef(true).keySet().contains(entry.getKey())) {
                hashMap.put(entry.getKey(), entry.getValue());
            } else {
                arrayList.add(entry.getKey());
            }
        }
        return new RemoveRequest().setTableName(str).setKeys(hashMap).setAttributes(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void replayMutationLogEntry(List<MutationLogEntry> list, String str) throws Exception {
        for (MutationLogEntry mutationLogEntry : list) {
            LOG.info("Consuming mutation log entry : " + mutationLogEntry);
            switch (AnonymousClass1.$SwitchMap$com$xiaomi$infra$galaxy$sds$thrift$MutationType[mutationLogEntry.getType().ordinal()]) {
                case SdsClientConfigKeys.GALAXY_SDS_CLIENT_MAX_RETRY_DEFAULT /* 1 */:
                    Preconditions.checkArgument(tableClient.put(getReplayPutRequest(str, mutationLogEntry.getRecord())).isSuccess());
                    LOG.info("record " + mutationLogEntry.getRecord() + " is put to " + str);
                    break;
                case PARTITAL_DELETE_NUMBER /* 2 */:
                    tableClient.put(getReplayPutRequest(str, mutationLogEntry.getRecord()));
                    LOG.info("record " + mutationLogEntry.getRecord() + " is increment to " + str);
                    break;
                case 3:
                    Preconditions.checkArgument(tableClient.remove(getReplayRemoveRequest(str, mutationLogEntry.getRecord(), mutationLogEntry.isRowDeleted())).isSuccess());
                    LOG.info("record " + mutationLogEntry.getRecord() + " is removed from " + str);
                    break;
                default:
                    Preconditions.checkArgument(false);
                    break;
            }
        }
    }

    private TalosConsumer createTalosConsumer() throws Exception {
        if (!streamViewType.equals(StreamViewType.MUTATE_LOG)) {
            throw new RuntimeException("Unexpected stream view type : " + streamViewType);
        }
        Properties properties = new Properties();
        properties.setProperty("galaxy.talos.service.endpoint", talosServiceEndpoint);
        return new TalosConsumer(consumerGroup, new TalosConsumerConfig(properties), createTalosCredential(appKey, appSecret, com.xiaomi.infra.galaxy.rpc.thrift.UserType.APP_SECRET), topicInfo.getTopicTalosResourceName(), new MutateLogProcessorFactory(), clientPrefix, new SimpleTopicAbnormalCallback());
    }

    private boolean shouldDo() {
        return rand.nextBoolean();
    }

    public void produceData() throws Exception {
        LOG.info("Begin to produce data for table streamDemoTable");
        for (int i = 0; i < 100; i++) {
            Map<String, Datum> randomRecord = DataProvider.randomRecord(DataProvider.attributesDef(true));
            Map<String, Datum> recordKeys = DataProvider.getRecordKeys(tableSchema, randomRecord);
            Map entriesOnlyOnLeft = Maps.difference(randomRecord, recordKeys).entriesOnlyOnLeft();
            PutRequest putRequest = new PutRequest();
            putRequest.setTableName(tableName).setRecord(randomRecord);
            Preconditions.checkArgument(tableClient.put(putRequest).isSuccess());
            if (shouldDo()) {
                Map<String, Datum> randomIncrementAmounts = DataProvider.randomIncrementAmounts(tableSchema, entriesOnlyOnLeft.keySet());
                IncrementRequest incrementRequest = new IncrementRequest();
                incrementRequest.setTableName(tableName).setKeys(recordKeys).setAmounts(randomIncrementAmounts);
                tableClient.increment(incrementRequest);
            }
            if (shouldDo()) {
                List<String> randomSelect = DataProvider.randomSelect(entriesOnlyOnLeft.keySet(), PARTITAL_DELETE_NUMBER);
                RemoveRequest removeRequest = new RemoveRequest();
                removeRequest.setTableName(tableName).setAttributes(randomSelect).setKeys(recordKeys);
                Preconditions.checkArgument(tableClient.remove(removeRequest).isSuccess());
            }
        }
        LOG.info("Produce data finished for table streamDemoTable");
    }

    public TableSchema createSrcTable(String str, String str2, StreamSpec streamSpec2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(str2, streamSpec2);
        TableInfo createTable = adminClient.createTable(str, DataProvider.createTableSpec(appId, true, true, hashMap, null));
        LOG.info("Src table " + str + " is created");
        return createTable.getSpec().getSchema();
    }

    public void createDestTable() throws Exception {
        adminClient.createTable(destTableName, DataProvider.createTableSpec(appId, true, true));
        LOG.info("Dest table destStreamDemoTable is created");
    }

    private TopicInfo createTopic() throws Exception {
        CreateTopicRequest createTopicRequest = new CreateTopicRequest();
        createTopicRequest.setTopicName(topicName);
        HashMap hashMap = new HashMap();
        Grantee grantee = new Grantee();
        grantee.setIdentifier(appId).setType(GrantType.APP_ROOT);
        hashMap.put(grantee, Permission.TOPIC_READ_AND_MESSAGE_FULL_CONTROL);
        createTopicRequest.setAclMap(hashMap);
        TopicAttribute topicAttribute = new TopicAttribute();
        topicAttribute.setPartitionNumber(topicPartitionNumber);
        createTopicRequest.setTopicAttribute(topicAttribute);
        CreateTopicResponse createTopic = talosAdmin.createTopic(createTopicRequest);
        LOG.info("Topic streamDemoTopic is created");
        return createTopic.getTopicInfo();
    }

    private StreamSpec createStreamSpec() throws Exception {
        StreamSpec streamSpec2 = new StreamSpec();
        streamSpec2.setViewType(streamViewType).setAttributes(Lists.newArrayList(DataProvider.columnsDef().keySet())).setEnableStream(true);
        LOG.info("Stream " + streamSpec2 + " is created");
        return streamSpec2;
    }

    public void createTable() throws Exception {
        topicInfo = createTopic();
        streamSpec = createStreamSpec();
        tableSchema = createSrcTable(tableName, topicName, streamSpec);
    }

    public static void main(String[] strArr) throws Exception {
        StreamDemo streamDemo = new StreamDemo();
        streamDemo.createTable();
        streamDemo.createDestTable();
        streamDemo.createTalosConsumer();
        streamDemo.produceData();
    }
}
