package com.hazelcast.jet.kafka.connect.impl;

import com.hazelcast.client.impl.protocol.util.PropertiesUtil;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ReflectionUtils;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;

/* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/ConnectorWrapper.class */
public class ConnectorWrapper {
    private static final ILogger LOGGER = Logger.getLogger(ConnectorWrapper.class);
    private final SourceConnector connector;
    private final int tasksMax;
    private final List<TaskRunner> taskRunners = new CopyOnWriteArrayList();
    private final AtomicInteger taskIdGenerator = new AtomicInteger();
    private final ReentrantLock reconfigurationLock = new ReentrantLock();
    private final State state = new State();
    private final String name;

    /* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/ConnectorWrapper$JetConnectorContext.class */
    private class JetConnectorContext implements ConnectorContext {
        private JetConnectorContext() {
        }

        public void requestTaskReconfiguration() {
            ConnectorWrapper.this.requestTaskReconfiguration();
        }

        public void raiseError(Exception exc) {
            throw ExceptionUtil.rethrow(exc);
        }
    }

    public ConnectorWrapper(Properties properties) {
        String checkRequiredProperty = Preconditions.checkRequiredProperty(properties, "connector.class");
        this.name = Preconditions.checkRequiredProperty(properties, "name");
        this.tasksMax = Integer.parseInt(properties.getProperty("tasks.max", "1"));
        this.connector = newConnectorInstance(checkRequiredProperty);
        LOGGER.fine("Initializing connector '" + this.name + "'");
        this.connector.initialize(new JetConnectorContext());
        LOGGER.fine("Starting connector '" + this.name + "'");
        this.connector.start(PropertiesUtil.toMap(properties));
    }

    private static SourceConnector newConnectorInstance(String str) {
        try {
            return (SourceConnector) ReflectionUtils.newInstance(Thread.currentThread().getContextClassLoader(), str);
        } catch (Exception e) {
            if (e instanceof ClassNotFoundException) {
                throw new HazelcastException("Connector class '" + str + "' not found. Did you add the connector jar to the job?", e);
            }
            throw ExceptionUtil.rethrow(e);
        }
    }

    public void stop() {
        LOGGER.fine("Stopping connector '" + this.name + "'");
        this.connector.stop();
        LOGGER.fine("Connector '" + this.name + "' stopped");
    }

    public TaskRunner createTaskRunner() {
        TaskRunner taskRunner = new TaskRunner(this.name + "-task-" + this.taskIdGenerator.getAndIncrement(), this.state, this::createSourceTask);
        this.taskRunners.add(taskRunner);
        requestTaskReconfiguration();
        return taskRunner;
    }

    private SourceTask createSourceTask() {
        return (SourceTask) ReflectionUtils.newInstance(Thread.currentThread().getContextClassLoader(), this.connector.taskClass().asSubclass(SourceTask.class).getName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestTaskReconfiguration() {
        if (this.taskRunners.isEmpty()) {
            return;
        }
        try {
            this.reconfigurationLock.lock();
            LOGGER.fine("Updating tasks configuration");
            int size = this.taskRunners.size();
            List taskConfigs = this.connector.taskConfigs(Math.min(this.tasksMax, size));
            int i = 0;
            while (i < size) {
                this.taskRunners.get(i).updateTaskConfig(i < taskConfigs.size() ? (Map) taskConfigs.get(i) : null);
                i++;
            }
        } finally {
            this.reconfigurationLock.unlock();
        }
    }

    public String toString() {
        return "ConnectorWrapper{name='" + this.name + "'}";
    }
}
