/*
 * Decompiled with CFR 0.152.
 */
package com.xiaomi.infra.galaxy.sds.examples.stream;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiaomi.infra.galaxy.rpc.thrift.Credential;
import com.xiaomi.infra.galaxy.rpc.thrift.GrantType;
import com.xiaomi.infra.galaxy.rpc.thrift.Grantee;
import com.xiaomi.infra.galaxy.sds.client.ClientFactory;
import com.xiaomi.infra.galaxy.sds.examples.stream.DataProvider;
import com.xiaomi.infra.galaxy.sds.examples.stream.StreamDemo;
import com.xiaomi.infra.galaxy.sds.thrift.AdminService;
import com.xiaomi.infra.galaxy.sds.thrift.Datum;
import com.xiaomi.infra.galaxy.sds.thrift.IncrementRequest;
import com.xiaomi.infra.galaxy.sds.thrift.PointInTimeRecovery;
import com.xiaomi.infra.galaxy.sds.thrift.PutRequest;
import com.xiaomi.infra.galaxy.sds.thrift.PutResult;
import com.xiaomi.infra.galaxy.sds.thrift.RemoveRequest;
import com.xiaomi.infra.galaxy.sds.thrift.RemoveResult;
import com.xiaomi.infra.galaxy.sds.thrift.StreamCheckpoint;
import com.xiaomi.infra.galaxy.sds.thrift.StreamSpec;
import com.xiaomi.infra.galaxy.sds.thrift.StreamViewType;
import com.xiaomi.infra.galaxy.sds.thrift.TableInfo;
import com.xiaomi.infra.galaxy.sds.thrift.TableSchema;
import com.xiaomi.infra.galaxy.sds.thrift.TableService;
import com.xiaomi.infra.galaxy.sds.thrift.TableSnapshots;
import com.xiaomi.infra.galaxy.sds.thrift.TableSpec;
import com.xiaomi.infra.galaxy.sds.thrift.UserType;
import com.xiaomi.infra.galaxy.talos.admin.TalosAdmin;
import com.xiaomi.infra.galaxy.talos.client.TalosClientConfig;
import com.xiaomi.infra.galaxy.talos.thrift.CreateTopicRequest;
import com.xiaomi.infra.galaxy.talos.thrift.CreateTopicResponse;
import com.xiaomi.infra.galaxy.talos.thrift.Permission;
import com.xiaomi.infra.galaxy.talos.thrift.TopicAttribute;
import com.xiaomi.infra.galaxy.talos.thrift.TopicInfo;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PitrDemo {
    private static final Logger LOG = LoggerFactory.getLogger(StreamDemo.class);
    static Random rand = new Random(0L);
    static final int PARTITAL_DELETE_NUMBER = 2;
    static final int ADMIN_CONN_TIMEOUT = 3000;
    static final int ADMIN_SOCKET_TIMEOUT = 50000;
    static final int TABLE_CONN_TIMEOUT = 3000;
    static final int TABLE_SOCKET_TIMEOUT = 10000;
    private static final String sdsServiceEndpoint = "$sdsServiceEndpoint";
    private static final String tableName = "pitrDemoTable";
    private static final String destTableName = "destPitrDemoTable";
    private static final boolean enableEntityGroup = true;
    private static final boolean enableEntityGroupHash = true;
    private static final StreamViewType streamViewType = StreamViewType.MUTATE_LOG;
    private static final String accountKey = "$your_accountKey";
    private static final String accountSecret = "$your_accountSecret";
    private static final String appId = "$your_appId";
    private static final String appKey = "$your_appKey";
    private static final String appSecret = "$your_appSecret";
    private static AdminService.Iface adminClient;
    private static TableService.Iface tableClient;
    private static TableSchema tableSchema;
    private static StreamSpec streamSpec;
    private static PointInTimeRecovery pitr;
    private static final String talosServiceEndpoint = "$talosServiceEndpoint";
    private static final String topicName = "pitrDemoTopic";
    private static final int topicPartitionNumber = 8;
    private static TalosAdmin talosAdmin;
    private static TopicInfo topicInfo;

    private com.xiaomi.infra.galaxy.sds.thrift.Credential createSdsCredential(String secretKeyId, String secretKey, UserType userType) {
        return new com.xiaomi.infra.galaxy.sds.thrift.Credential().setSecretKeyId(secretKeyId).setSecretKey(secretKey).setType(userType);
    }

    private Credential createTalosCredential(String secretKeyId, String secretKey, com.xiaomi.infra.galaxy.rpc.thrift.UserType userType) {
        return new Credential().setSecretKeyId(secretKeyId).setSecretKey(secretKey).setType(userType);
    }

    private AdminService.Iface createAdminClient(String endpoint, com.xiaomi.infra.galaxy.sds.thrift.Credential credential) {
        ClientFactory clientFactory = new ClientFactory().setCredential(credential);
        return clientFactory.newAdminClient(endpoint + "/v1/api/admin", 50000, 3000);
    }

    private TableService.Iface createTableClient(String endpoint, com.xiaomi.infra.galaxy.sds.thrift.Credential credential) {
        ClientFactory clientFactory = new ClientFactory().setCredential(credential);
        return clientFactory.newTableClient(endpoint + "/v1/api/table", 10000, 3000);
    }

    private void initSdsClient() {
        com.xiaomi.infra.galaxy.sds.thrift.Credential credential = this.createSdsCredential(accountKey, accountSecret, UserType.DEV_XIAOMI);
        adminClient = this.createAdminClient(sdsServiceEndpoint, credential);
        credential = this.createSdsCredential(appKey, appSecret, UserType.APP_SECRET);
        tableClient = this.createTableClient(sdsServiceEndpoint, credential);
    }

    private void initTalosClient() {
        Properties pro = new Properties();
        pro.setProperty("galaxy.talos.service.endpoint", talosServiceEndpoint);
        TalosClientConfig talosClientConfig = new TalosClientConfig(pro);
        Credential credential = this.createTalosCredential(accountKey, accountSecret, com.xiaomi.infra.galaxy.rpc.thrift.UserType.DEV_XIAOMI);
        talosAdmin = new TalosAdmin(talosClientConfig, credential);
    }

    public PitrDemo() {
        this.initSdsClient();
        this.initTalosClient();
    }

    private boolean shouldDo() {
        return rand.nextBoolean();
    }

    public void listSnapshots() throws Exception {
        TableSnapshots tableSnapshots = adminClient.listSnapshots(tableName);
        LOG.info("table pitrDemoTable snapshots are " + tableSnapshots);
    }

    public StreamCheckpoint getLatestCheckpoint() throws Exception {
        return adminClient.getLatestStreamCheckpoint(tableName, topicName);
    }

    public void produceData() throws Exception {
        LOG.info("Begin to produce data for table pitrDemoTable");
        for (int i = 0; i < 1000; ++i) {
            Map<String, Datum> record = DataProvider.randomRecord(DataProvider.attributesDef(true));
            Map<String, Datum> keys = DataProvider.getRecordKeys(tableSchema, record);
            Map dataRecord = Maps.difference(record, keys).entriesOnlyOnLeft();
            PutRequest put = new PutRequest();
            put.setTableName(tableName).setRecord(record);
            PutResult putResult = tableClient.put(put);
            Preconditions.checkArgument((boolean)putResult.isSuccess());
            if (this.shouldDo()) {
                Map<String, Datum> amounts = DataProvider.randomIncrementAmounts(tableSchema, dataRecord.keySet());
                IncrementRequest increment = new IncrementRequest();
                increment.setTableName(tableName).setKeys(keys).setAmounts(amounts);
                tableClient.increment(increment);
            }
            if (!this.shouldDo()) continue;
            List<String> attributes = DataProvider.randomSelect(dataRecord.keySet(), 2);
            RemoveRequest remove = new RemoveRequest();
            remove.setTableName(tableName).setAttributes(attributes).setKeys(keys);
            RemoveResult removeResult = tableClient.remove(remove);
            Preconditions.checkArgument((boolean)removeResult.isSuccess());
        }
        LOG.info("Produce data finished for table pitrDemoTable");
    }

    public TableSchema createSrcTable() throws Exception {
        HashMap<String, StreamSpec> streamSpecs = new HashMap<String, StreamSpec>();
        streamSpecs.put(topicName, streamSpec);
        TableSpec tableSpec = DataProvider.createTableSpec(appId, true, true, streamSpecs, pitr);
        TableInfo tableInfo = adminClient.createTable(tableName, tableSpec);
        LOG.info("Src table pitrDemoTable is created");
        return tableInfo.getSpec().getSchema();
    }

    private TopicInfo createTopic() throws Exception {
        CreateTopicRequest createTopicRequest = new CreateTopicRequest();
        createTopicRequest.setTopicName(topicName);
        HashMap<Grantee, Permission> aclMap = new HashMap<Grantee, Permission>();
        Grantee grantee = new Grantee();
        grantee.setIdentifier(appId).setType(GrantType.APP_ROOT);
        aclMap.put(grantee, Permission.TOPIC_READ_AND_MESSAGE_FULL_CONTROL);
        createTopicRequest.setAclMap(aclMap);
        TopicAttribute topicAttribute = new TopicAttribute();
        topicAttribute.setPartitionNumber(8);
        createTopicRequest.setTopicAttribute(topicAttribute);
        CreateTopicResponse createTopicResponse = talosAdmin.createTopic(createTopicRequest);
        LOG.info("Topic pitrDemoTopic is created");
        return createTopicResponse.getTopicInfo();
    }

    private StreamSpec createStreamSpec() throws Exception {
        StreamSpec streamSpec = new StreamSpec();
        streamSpec.setViewType(streamViewType).setAttributes((List)Lists.newArrayList(DataProvider.columnsDef().keySet())).setEnableStream(true);
        LOG.info("Stream " + streamSpec + " is created");
        return streamSpec;
    }

    private PointInTimeRecovery createPitr() {
        PointInTimeRecovery pitr = new PointInTimeRecovery();
        pitr.setEnablePointInTimeRecovery(true);
        pitr.setTopicName(topicName);
        pitr.setSnapshotPeriod(86400L);
        pitr.setTtl(604800L);
        return pitr;
    }

    public void createTable() throws Exception {
        topicInfo = this.createTopic();
        streamSpec = this.createStreamSpec();
        pitr = this.createPitr();
        tableSchema = this.createSrcTable();
    }

    public void recoverTable(long timestamp) throws Exception {
        adminClient.recoverTable(tableName, destTableName, topicName, timestamp);
    }

    public static void main(String[] args) throws Exception {
        PitrDemo pitrDemo = new PitrDemo();
        pitrDemo.createTable();
        pitrDemo.produceData();
        Thread.sleep(120000L);
        pitrDemo.produceData();
        pitrDemo.listSnapshots();
        StreamCheckpoint latestCheckpoint = pitrDemo.getLatestCheckpoint();
        pitrDemo.recoverTable(latestCheckpoint.getTimestamp() - 10L);
    }
}

