package org.apache.flink.table.gateway.service.session;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.api.utils.ThreadUtils;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/gateway/service/session/SessionManagerImpl.class */
public class SessionManagerImpl implements SessionManager {
    private static final Logger LOG = LoggerFactory.getLogger(SessionManagerImpl.class);
    private static final String OPERATION_POOL_NAME = "sql-gateway-operation-pool";
    private final DefaultContext defaultContext;
    private final long idleTimeout;
    private final long checkInterval;
    private final int maxSessionCount;
    private final Map<SessionHandle, Session> sessions;
    private ExecutorService operationExecutorService;

    @Nullable
    private ScheduledExecutorService cleanupService;

    @Nullable
    private ScheduledFuture<?> timeoutCheckerFuture;

    public SessionManagerImpl(DefaultContext defaultContext) {
        this.defaultContext = defaultContext;
        Configuration flinkConfig = defaultContext.getFlinkConfig();
        this.idleTimeout = ((Duration) flinkConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT)).toMillis();
        this.checkInterval = ((Duration) flinkConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL)).toMillis();
        this.maxSessionCount = ((Integer) flinkConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM)).intValue();
        this.sessions = new ConcurrentHashMap();
    }

    @Override // org.apache.flink.table.gateway.service.session.SessionManager
    public void start() {
        if (this.checkInterval > 0 && this.idleTimeout > 0) {
            this.cleanupService = Executors.newSingleThreadScheduledExecutor();
            this.timeoutCheckerFuture = this.cleanupService.scheduleAtFixedRate(() -> {
                LOG.debug("Start to cleanup expired sessions, current session count: {}", Integer.valueOf(this.sessions.size()));
                for (Map.Entry<SessionHandle, Session> entry : this.sessions.entrySet()) {
                    SessionHandle key = entry.getKey();
                    Session value = entry.getValue();
                    if (isSessionExpired(value)) {
                        LOG.info("Session {} is expired, closing it...", key);
                        closeSession(value);
                    }
                }
                LOG.debug("Removing expired session finished, current session count: {}", Integer.valueOf(this.sessions.size()));
            }, this.checkInterval, this.checkInterval, TimeUnit.MILLISECONDS);
        }
        Configuration flinkConfig = this.defaultContext.getFlinkConfig();
        this.operationExecutorService = ThreadUtils.newThreadPool(((Integer) flinkConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN)).intValue(), ((Integer) flinkConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX)).intValue(), ((Duration) flinkConfig.get(SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME)).toMillis(), OPERATION_POOL_NAME);
    }

    @Override // org.apache.flink.table.gateway.service.session.SessionManager
    public void stop() {
        if (this.cleanupService != null) {
            this.timeoutCheckerFuture.cancel(true);
            this.cleanupService.shutdown();
        }
        if (this.operationExecutorService != null) {
            this.operationExecutorService.shutdown();
        }
        LOG.info("SessionManager is stopped.");
    }

    @Override // org.apache.flink.table.gateway.service.session.SessionManager
    public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException {
        Session session = this.sessions.get(sessionHandle);
        if (session != null) {
            session.touch();
            return session;
        }
        String format = String.format("Session '%s' does not exist.", sessionHandle);
        LOG.warn(format);
        throw new SqlGatewayException(format);
    }

    @Override // org.apache.flink.table.gateway.service.session.SessionManager
    public synchronized Session openSession(SessionEnvironment sessionEnvironment) throws SqlGatewayException {
        SessionHandle create;
        checkSessionCount();
        do {
            create = SessionHandle.create();
        } while (this.sessions.containsKey(create));
        Session session = new Session(SessionContext.create(this.defaultContext, create, sessionEnvironment, this.operationExecutorService));
        session.open();
        this.sessions.put(create, session);
        LOG.info("Session {} is opened, and the number of current sessions is {}.", session.getSessionHandle(), Integer.valueOf(this.sessions.size()));
        return session;
    }

    @Override // org.apache.flink.table.gateway.service.session.SessionManager
    public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException {
        closeSession(getSession(sessionHandle));
    }

    private void checkSessionCount() throws SqlGatewayException {
        if (this.maxSessionCount > 0 && this.sessions.size() >= this.maxSessionCount) {
            String format = String.format("Failed to create session, the count of active sessions exceeds the max count: %s", Integer.valueOf(this.maxSessionCount));
            LOG.warn(format);
            throw new SqlGatewayException(format);
        }
    }

    private boolean isSessionExpired(Session session) {
        return this.idleTimeout > 0 && System.currentTimeMillis() - session.getLastAccessTime() > this.idleTimeout;
    }

    private void closeSession(Session session) {
        SessionHandle sessionHandle = session.getSessionHandle();
        this.sessions.remove(sessionHandle);
        session.close();
        LOG.info("Session: {} is closed.", sessionHandle);
    }

    @VisibleForTesting
    public boolean isSessionAlive(SessionHandle sessionHandle) {
        return this.sessions.containsKey(sessionHandle);
    }

    @VisibleForTesting
    public int currentSessionCount() {
        return this.sessions.size();
    }

    @VisibleForTesting
    public int getOperationCount(SessionHandle sessionHandle) {
        return getSession(sessionHandle).getOperationManager().getOperationCount();
    }
}
