package com.streamsets.pipeline.api.base.configurablestage;

import com.streamsets.pipeline.api.Source;
import com.streamsets.pipeline.api.Stage;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.ClusterSource;
import com.streamsets.pipeline.api.impl.Utils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/streamsets/pipeline/api/base/configurablestage/DClusterSourceOffsetCommitter.class */
public abstract class DClusterSourceOffsetCommitter extends DSourceOffsetCommitter implements ClusterSource {
    private static final Logger LOG = LoggerFactory.getLogger(DClusterSourceOffsetCommitter.class);
    private ClusterSource clusterSource;

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.streamsets.pipeline.api.base.configurablestage.DSourceOffsetCommitter, com.streamsets.pipeline.api.base.configurablestage.DSource, com.streamsets.pipeline.api.base.configurablestage.DStage
    public Stage<Source.Context> createStage() {
        Stage<Source.Context> createStage = super.createStage();
        LOG.info("Created source of type: {}", this.source);
        if (this.source instanceof ClusterSource) {
            this.clusterSource = (ClusterSource) this.source;
        } else if (this.source == null) {
            throw new NullPointerException("Source cannot be null");
        }
        return createStage;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public String getName() {
        if (initializeClusterSource()) {
            return this.clusterSource.getName();
        }
        return null;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public boolean isInBatchMode() {
        if (initializeClusterSource()) {
            return this.clusterSource.isInBatchMode();
        }
        return false;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public Object put(List<Map.Entry> list) throws InterruptedException {
        if (initializeClusterSource()) {
            return this.clusterSource.put(list);
        }
        return null;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public void completeBatch() throws InterruptedException {
        this.clusterSource.completeBatch();
    }

    private boolean initializeClusterSource() {
        long currentTimeMillis = System.currentTimeMillis();
        while (this.clusterSource == null && System.currentTimeMillis() - currentTimeMillis < 60000) {
            try {
                Source source = getSource();
                if (source instanceof ClusterSource) {
                    this.clusterSource = (ClusterSource) source;
                    return true;
                }
                if (source != null) {
                    LOG.info(Utils.format("The instance '{}' will not call this method as it does not implement '{}'", source.getClass().getName(), ClusterSource.class.getName()));
                    return false;
                }
                Thread.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
        if (this.clusterSource == null) {
            throw new RuntimeException("Could not obtain cluster source");
        }
        return true;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public long getRecordsProduced() {
        if (initializeClusterSource()) {
            return this.clusterSource.getRecordsProduced();
        }
        return -1L;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public boolean inErrorState() {
        if (initializeClusterSource()) {
            return this.clusterSource.inErrorState();
        }
        return false;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public Map<String, String> getConfigsToShip() {
        if (initializeClusterSource()) {
            return this.clusterSource.getConfigsToShip();
        }
        return null;
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public void postDestroy() {
        if (initializeClusterSource()) {
            this.clusterSource.postDestroy();
        }
    }

    @Override // com.streamsets.pipeline.api.impl.ClusterSource
    public int getParallelism() throws IOException, StageException {
        if (initializeClusterSource()) {
            return this.clusterSource.getParallelism();
        }
        return -1;
    }
}
