/*
 * Decompiled with CFR 0.152.
 */
package com.xiaomi.infra.galaxy.sds.examples.stream;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiaomi.infra.galaxy.rpc.thrift.Credential;
import com.xiaomi.infra.galaxy.rpc.thrift.GrantType;
import com.xiaomi.infra.galaxy.rpc.thrift.Grantee;
import com.xiaomi.infra.galaxy.rpc.thrift.UserType;
import com.xiaomi.infra.galaxy.sds.client.ClientFactory;
import com.xiaomi.infra.galaxy.sds.examples.stream.DataProvider;
import com.xiaomi.infra.galaxy.sds.thrift.AdminService;
import com.xiaomi.infra.galaxy.sds.thrift.Datum;
import com.xiaomi.infra.galaxy.sds.thrift.DatumUtil;
import com.xiaomi.infra.galaxy.sds.thrift.IncrementRequest;
import com.xiaomi.infra.galaxy.sds.thrift.MutationLogEntry;
import com.xiaomi.infra.galaxy.sds.thrift.PutRequest;
import com.xiaomi.infra.galaxy.sds.thrift.PutResult;
import com.xiaomi.infra.galaxy.sds.thrift.RemoveRequest;
import com.xiaomi.infra.galaxy.sds.thrift.RemoveResult;
import com.xiaomi.infra.galaxy.sds.thrift.StreamSpec;
import com.xiaomi.infra.galaxy.sds.thrift.StreamViewType;
import com.xiaomi.infra.galaxy.sds.thrift.TableInfo;
import com.xiaomi.infra.galaxy.sds.thrift.TableSchema;
import com.xiaomi.infra.galaxy.sds.thrift.TableService;
import com.xiaomi.infra.galaxy.sds.thrift.TableSpec;
import com.xiaomi.infra.galaxy.talos.admin.TalosAdmin;
import com.xiaomi.infra.galaxy.talos.client.SimpleTopicAbnormalCallback;
import com.xiaomi.infra.galaxy.talos.client.TalosClientConfig;
import com.xiaomi.infra.galaxy.talos.client.TopicAbnormalCallback;
import com.xiaomi.infra.galaxy.talos.consumer.MessageCheckpointer;
import com.xiaomi.infra.galaxy.talos.consumer.MessageProcessor;
import com.xiaomi.infra.galaxy.talos.consumer.MessageProcessorFactory;
import com.xiaomi.infra.galaxy.talos.consumer.TalosConsumer;
import com.xiaomi.infra.galaxy.talos.consumer.TalosConsumerConfig;
import com.xiaomi.infra.galaxy.talos.thrift.CreateTopicRequest;
import com.xiaomi.infra.galaxy.talos.thrift.CreateTopicResponse;
import com.xiaomi.infra.galaxy.talos.thrift.MessageAndOffset;
import com.xiaomi.infra.galaxy.talos.thrift.Permission;
import com.xiaomi.infra.galaxy.talos.thrift.TopicAndPartition;
import com.xiaomi.infra.galaxy.talos.thrift.TopicAttribute;
import com.xiaomi.infra.galaxy.talos.thrift.TopicInfo;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamDemo {
    private static final Logger LOG = LoggerFactory.getLogger(StreamDemo.class);
    static Random rand = new Random(0L);
    static final int PARTITAL_DELETE_NUMBER = 2;
    static final int ADMIN_CONN_TIMEOUT = 3000;
    static final int ADMIN_SOCKET_TIMEOUT = 50000;
    static final int TABLE_CONN_TIMEOUT = 3000;
    static final int TABLE_SOCKET_TIMEOUT = 10000;
    private static final String sdsServiceEndpoint = "$sdsServiceEndpoint";
    private static final String tableName = "streamDemoTable";
    private static final String destTableName = "destStreamDemoTable";
    private static final boolean enableEntityGroup = true;
    private static final boolean enableEntityGroupHash = true;
    private static final StreamViewType streamViewType = StreamViewType.MUTATE_LOG;
    private static final String accountKey = "$your_accountKey";
    private static final String accountSecret = "$your_accountSecret";
    private static final String appId = "$your_appId";
    private static final String appKey = "$your_appKey";
    private static final String appSecret = "$your_appSecret";
    private static AdminService.Iface adminClient;
    private static TableService.Iface tableClient;
    private static TableSchema tableSchema;
    private static StreamSpec streamSpec;
    private static final String talosServiceEndpoint = "$talosServiceEndpoint";
    private static final String topicName = "streamDemoTopic";
    private static final int topicPartitionNumber = 8;
    private static final String clientPrefix = "departmentName-";
    private static final String consumerGroup = "groupName";
    private static TalosAdmin talosAdmin;
    private static TopicInfo topicInfo;

    private com.xiaomi.infra.galaxy.sds.thrift.Credential createSdsCredential(String secretKeyId, String secretKey, com.xiaomi.infra.galaxy.sds.thrift.UserType userType) {
        return new com.xiaomi.infra.galaxy.sds.thrift.Credential().setSecretKeyId(secretKeyId).setSecretKey(secretKey).setType(userType);
    }

    private Credential createTalosCredential(String secretKeyId, String secretKey, UserType userType) {
        return new Credential().setSecretKeyId(secretKeyId).setSecretKey(secretKey).setType(userType);
    }

    private AdminService.Iface createAdminClient(String endpoint, com.xiaomi.infra.galaxy.sds.thrift.Credential credential) {
        ClientFactory clientFactory = new ClientFactory().setCredential(credential);
        return clientFactory.newAdminClient(endpoint + "/v1/api/admin", 50000, 3000);
    }

    private TableService.Iface createTableClient(String endpoint, com.xiaomi.infra.galaxy.sds.thrift.Credential credential) {
        ClientFactory clientFactory = new ClientFactory().setCredential(credential);
        return clientFactory.newTableClient(endpoint + "/v1/api/table", 10000, 3000);
    }

    private void initSdsClient() {
        com.xiaomi.infra.galaxy.sds.thrift.Credential credential = this.createSdsCredential(accountKey, accountSecret, com.xiaomi.infra.galaxy.sds.thrift.UserType.DEV_XIAOMI);
        adminClient = this.createAdminClient(sdsServiceEndpoint, credential);
        credential = this.createSdsCredential(appKey, appSecret, com.xiaomi.infra.galaxy.sds.thrift.UserType.APP_SECRET);
        tableClient = this.createTableClient(sdsServiceEndpoint, credential);
    }

    private void initTalosClient() {
        Properties pro = new Properties();
        pro.setProperty("galaxy.talos.service.endpoint", talosServiceEndpoint);
        TalosClientConfig talosClientConfig = new TalosClientConfig(pro);
        Credential credential = this.createTalosCredential(accountKey, accountSecret, UserType.APP_SECRET);
        talosAdmin = new TalosAdmin(talosClientConfig, credential);
    }

    public StreamDemo() {
        this.initSdsClient();
        this.initTalosClient();
    }

    private PutRequest getReplayPutRequest(String tableName, Map<String, Datum> record) {
        return new PutRequest().setTableName(tableName).setRecord(record);
    }

    private RemoveRequest getReplayRemoveRequest(String tableName, Map<String, Datum> record, boolean rowDeleted) {
        if (rowDeleted) {
            Preconditions.checkArgument((boolean)((Object)DataProvider.rowKeyDef(true).keySet()).equals(record.keySet()));
            return new RemoveRequest().setTableName(tableName).setKeys(record);
        }
        ArrayList<String> attributes = new ArrayList<String>();
        HashMap<String, Datum> keys = new HashMap<String, Datum>();
        for (Map.Entry<String, Datum> entry : record.entrySet()) {
            if (DataProvider.rowKeyDef(true).keySet().contains(entry.getKey())) {
                keys.put(entry.getKey(), entry.getValue());
                continue;
            }
            attributes.add(entry.getKey());
        }
        return new RemoveRequest().setTableName(tableName).setKeys(keys).setAttributes(attributes);
    }

    private void replayMutationLogEntry(List<MutationLogEntry> messages, String destTableName) throws Exception {
        block5: for (MutationLogEntry entry : messages) {
            LOG.info("Consuming mutation log entry : " + entry);
            switch (entry.getType()) {
                case PUT: {
                    PutRequest put = this.getReplayPutRequest(destTableName, entry.getRecord());
                    PutResult putResult = tableClient.put(put);
                    Preconditions.checkArgument((boolean)putResult.isSuccess());
                    LOG.info("record " + entry.getRecord() + " is put to " + destTableName);
                    continue block5;
                }
                case INCREMENT: {
                    PutRequest put = this.getReplayPutRequest(destTableName, entry.getRecord());
                    tableClient.put(put);
                    LOG.info("record " + entry.getRecord() + " is increment to " + destTableName);
                    continue block5;
                }
                case DELETE: {
                    RemoveRequest remove = this.getReplayRemoveRequest(destTableName, entry.getRecord(), entry.isRowDeleted());
                    RemoveResult removeResult = tableClient.remove(remove);
                    Preconditions.checkArgument((boolean)removeResult.isSuccess());
                    LOG.info("record " + entry.getRecord() + " is removed from " + destTableName);
                    continue block5;
                }
            }
            Preconditions.checkArgument((boolean)false);
        }
    }

    private TalosConsumer createTalosConsumer() throws Exception {
        if (!streamViewType.equals((Object)StreamViewType.MUTATE_LOG)) {
            throw new RuntimeException("Unexpected stream view type : " + streamViewType);
        }
        Properties pro = new Properties();
        pro.setProperty("galaxy.talos.service.endpoint", talosServiceEndpoint);
        TalosConsumerConfig talosConsumerConfig = new TalosConsumerConfig(pro);
        Credential credential = this.createTalosCredential(appKey, appSecret, UserType.APP_SECRET);
        return new TalosConsumer(consumerGroup, talosConsumerConfig, credential, topicInfo.getTopicTalosResourceName(), (MessageProcessorFactory)new MutateLogProcessorFactory(), clientPrefix, (TopicAbnormalCallback)new SimpleTopicAbnormalCallback());
    }

    private boolean shouldDo() {
        return rand.nextBoolean();
    }

    public void produceData() throws Exception {
        LOG.info("Begin to produce data for table streamDemoTable");
        for (int i = 0; i < 100; ++i) {
            Map<String, Datum> record = DataProvider.randomRecord(DataProvider.attributesDef(true));
            Map<String, Datum> keys = DataProvider.getRecordKeys(tableSchema, record);
            Map dataRecord = Maps.difference(record, keys).entriesOnlyOnLeft();
            PutRequest put = new PutRequest();
            put.setTableName(tableName).setRecord(record);
            PutResult putResult = tableClient.put(put);
            Preconditions.checkArgument((boolean)putResult.isSuccess());
            if (this.shouldDo()) {
                Map<String, Datum> amounts = DataProvider.randomIncrementAmounts(tableSchema, dataRecord.keySet());
                IncrementRequest increment = new IncrementRequest();
                increment.setTableName(tableName).setKeys(keys).setAmounts(amounts);
                tableClient.increment(increment);
            }
            if (!this.shouldDo()) continue;
            List<String> attributes = DataProvider.randomSelect(dataRecord.keySet(), 2);
            RemoveRequest remove = new RemoveRequest();
            remove.setTableName(tableName).setAttributes(attributes).setKeys(keys);
            RemoveResult removeResult = tableClient.remove(remove);
            Preconditions.checkArgument((boolean)removeResult.isSuccess());
        }
        LOG.info("Produce data finished for table streamDemoTable");
    }

    public TableSchema createSrcTable(String tableName, String topicName, StreamSpec streamSpec) throws Exception {
        HashMap<String, StreamSpec> streamSpecs = new HashMap<String, StreamSpec>();
        streamSpecs.put(topicName, streamSpec);
        TableSpec tableSpec = DataProvider.createTableSpec(appId, true, true, streamSpecs, null);
        TableInfo tableInfo = adminClient.createTable(tableName, tableSpec);
        LOG.info("Src table " + tableName + " is created");
        return tableInfo.getSpec().getSchema();
    }

    public void createDestTable() throws Exception {
        TableSpec tableSpec = DataProvider.createTableSpec(appId, true, true);
        adminClient.createTable(destTableName, tableSpec);
        LOG.info("Dest table destStreamDemoTable is created");
    }

    private TopicInfo createTopic() throws Exception {
        CreateTopicRequest createTopicRequest = new CreateTopicRequest();
        createTopicRequest.setTopicName(topicName);
        HashMap<Grantee, Permission> aclMap = new HashMap<Grantee, Permission>();
        Grantee grantee = new Grantee();
        grantee.setIdentifier(appId).setType(GrantType.APP_ROOT);
        aclMap.put(grantee, Permission.TOPIC_READ_AND_MESSAGE_FULL_CONTROL);
        createTopicRequest.setAclMap(aclMap);
        TopicAttribute topicAttribute = new TopicAttribute();
        topicAttribute.setPartitionNumber(8);
        createTopicRequest.setTopicAttribute(topicAttribute);
        CreateTopicResponse createTopicResponse = talosAdmin.createTopic(createTopicRequest);
        LOG.info("Topic streamDemoTopic is created");
        return createTopicResponse.getTopicInfo();
    }

    private StreamSpec createStreamSpec() throws Exception {
        StreamSpec streamSpec = new StreamSpec();
        streamSpec.setViewType(streamViewType).setAttributes((List)Lists.newArrayList(DataProvider.columnsDef().keySet())).setEnableStream(true);
        LOG.info("Stream " + streamSpec + " is created");
        return streamSpec;
    }

    public void createTable() throws Exception {
        topicInfo = this.createTopic();
        streamSpec = this.createStreamSpec();
        tableSchema = this.createSrcTable(tableName, topicName, streamSpec);
    }

    public static void main(String[] args) throws Exception {
        StreamDemo streamDemo = new StreamDemo();
        streamDemo.createTable();
        streamDemo.createDestTable();
        streamDemo.createTalosConsumer();
        streamDemo.produceData();
    }

    private class MutateLogProcessorFactory
    implements MessageProcessorFactory {
        public MutateLogProcessor createProcessor() {
            return new MutateLogProcessor();
        }
    }

    private class MutateLogProcessor
    implements MessageProcessor {
        private MutateLogProcessor() {
        }

        public void init(TopicAndPartition topicAndPartition, long messageOffset) {
        }

        public void process(List<MessageAndOffset> messages, MessageCheckpointer messageCheckpointer) {
            block7: {
                try {
                    if (messages.size() <= 0) break block7;
                    ArrayList<MutationLogEntry> logEntries = new ArrayList<MutationLogEntry>(messages.size());
                    for (MessageAndOffset entry : messages) {
                        try {
                            MutationLogEntry logEntry = (MutationLogEntry)DatumUtil.deserialize((byte[])entry.getMessage().getMessage(), MutationLogEntry.class);
                            logEntries.add(logEntry);
                        }
                        catch (Exception e) {
                            LOG.error("Deserialize message " + entry + " got exception ", (Throwable)e);
                            throw e;
                        }
                    }
                    if (logEntries.isEmpty()) break block7;
                    try {
                        StreamDemo.this.replayMutationLogEntry(logEntries, StreamDemo.destTableName);
                    }
                    catch (Exception e) {
                        LOG.error("Replay mutation log entries " + logEntries + " got exception ", (Throwable)e);
                        throw e;
                    }
                }
                catch (Exception e) {
                    LOG.error("Process mutation log entries for topic streamDemoTopic get exception ", (Throwable)e);
                    throw new RuntimeException(e);
                }
            }
        }

        public void shutdown(MessageCheckpointer messageCheckpointer) {
        }
    }
}

