/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrl;
import org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.opengauss.PGProperty;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.LogSequenceNumber;
import org.opengauss.replication.PGReplicationStream;
import org.opengauss.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OpenGaussLogicalReplication {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(OpenGaussLogicalReplication.class);
    private static final String HA_PORT_ERROR_MESSAGE_KEY = "HA port";

    public Connection createConnection(StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
        Properties props = new Properties();
        YamlJdbcConfiguration jdbcConfig = pipelineDataSourceConfig.getJdbcConfig();
        PGProperty.USER.set(props, jdbcConfig.getUsername());
        PGProperty.PASSWORD.set(props, jdbcConfig.getPassword());
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
        PGProperty.REPLICATION.set(props, "database");
        PGProperty.PREFER_QUERY_MODE.set(props, "simple");
        try {
            return DriverManager.getConnection(jdbcConfig.getJdbcUrl(), props);
        }
        catch (SQLException ex) {
            if (this.failedBecauseOfNonHAPort(ex)) {
                log.info("Failed to connect to openGauss caused by: {} - {}. Try connecting to HA port.", (Object)ex.getSQLState(), (Object)ex.getMessage());
                return this.tryConnectingToHAPort(jdbcConfig.getJdbcUrl(), props);
            }
            throw ex;
        }
    }

    private boolean failedBecauseOfNonHAPort(SQLException ex) {
        return ex.getMessage().contains(HA_PORT_ERROR_MESSAGE_KEY);
    }

    private Connection tryConnectingToHAPort(String jdbcUrl, Properties props) throws SQLException {
        JdbcUrl parseResult = new StandardJdbcUrlParser().parse(jdbcUrl);
        PGProperty.PG_HOST.set(props, parseResult.getHostname());
        PGProperty.PG_DBNAME.set(props, parseResult.getDatabase());
        int haPort = parseResult.getPort() + 1;
        PGProperty.PG_PORT.set(props, haPort);
        return DriverManager.getConnection((String)new OpenGaussDatabaseType().getJdbcUrlPrefixes().iterator().next(), props);
    }

    public PGReplicationStream createReplicationStream(PgConnection connection, BaseLogSequenceNumber startPosition, String slotName) throws SQLException {
        return ((ChainedLogicalStreamBuilder)((ChainedLogicalStreamBuilder)connection.getReplicationAPI().replicationStream().logical().withSlotName(slotName)).withSlotOption("include-xids", true).withSlotOption("skip-empty-xacts", true).withStartPosition((LogSequenceNumber)startPosition.get())).start();
    }
}

