package org.apache.druid.segment.realtime.appenderator;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.druid.common.guava.ThreadRenamingCallable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.plumber.Committers;
import org.apache.druid.segment.realtime.plumber.Plumber;
import org.apache.druid.segment.realtime.plumber.RejectionPolicy;
import org.apache.druid.segment.realtime.plumber.VersioningPolicy;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.class */
public class AppenderatorPlumber implements Plumber {
    private static final EmittingLogger log = new EmittingLogger(AppenderatorPlumber.class);
    private static final int WARN_DELAY = 1000;
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final RejectionPolicy rejectionPolicy;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final SegmentPublisher segmentPublisher;
    private final SegmentHandoffNotifier handoffNotifier;
    private final Appenderator appenderator;
    private final Object handoffCondition = new Object();
    private final ConcurrentMap<Long, SegmentIdWithShardSpec> segments = new ConcurrentHashMap();
    private volatile boolean shuttingDown = false;
    private volatile boolean stopped = false;
    private volatile boolean cleanShutdown = true;
    private volatile ScheduledExecutorService scheduledExecutor = null;
    private volatile Supplier<Committer> lastCommitterSupplier = null;

    public AppenderatorPlumber(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics, DataSegmentAnnouncer dataSegmentAnnouncer, SegmentPublisher segmentPublisher, SegmentHandoffNotifier segmentHandoffNotifier, Appenderator appenderator) {
        this.schema = dataSchema;
        this.config = realtimeTuningConfig;
        this.rejectionPolicy = realtimeTuningConfig.getRejectionPolicyFactory().create(realtimeTuningConfig.getWindowPeriod());
        this.metrics = fireDepartmentMetrics;
        this.segmentAnnouncer = dataSegmentAnnouncer;
        this.segmentPublisher = segmentPublisher;
        this.handoffNotifier = segmentHandoffNotifier;
        this.appenderator = appenderator;
        log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
    }

    public Map<Long, SegmentIdWithShardSpec> getSegmentsView() {
        return ImmutableMap.copyOf((Map) this.segments);
    }

    public DataSchema getSchema() {
        return this.schema;
    }

    public RealtimeTuningConfig getConfig() {
        return this.config;
    }

    public RejectionPolicy getRejectionPolicy() {
        return this.rejectionPolicy;
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public Object startJob() {
        this.handoffNotifier.start();
        Object startJob = this.appenderator.startJob();
        initializeExecutors();
        startPersistThread();
        mergeAndPush();
        return startJob;
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public IncrementalIndexAddResult add(InputRow inputRow, Supplier<Committer> supplier) throws IndexSizeExceededException {
        SegmentIdWithShardSpec segmentIdentifier = getSegmentIdentifier(inputRow.getTimestampFromEpoch());
        if (segmentIdentifier == null) {
            return Plumber.THROWAWAY;
        }
        try {
            Appenderator.AppenderatorAddResult add = this.appenderator.add(segmentIdentifier, inputRow, supplier);
            this.lastCommitterSupplier = supplier;
            return new IncrementalIndexAddResult(add.getNumRowsInSegment(), 0L);
        } catch (SegmentNotWritableException e) {
            return Plumber.NOT_WRITABLE;
        }
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return new QueryRunner<T>() { // from class: org.apache.druid.segment.realtime.appenderator.AppenderatorPlumber.1
            @Override // org.apache.druid.query.QueryRunner
            public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext) {
                return queryPlus.run(AppenderatorPlumber.this.appenderator, responseContext);
            }
        };
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public void persist(Committer committer) {
        Stopwatch createStarted = Stopwatch.createStarted();
        this.appenderator.persistAll(committer);
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(elapsed);
        if (elapsed > 1000) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", Long.valueOf(elapsed));
        }
        createStarted.stop();
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public void finishJob() {
        log.info("Shutting down...", new Object[0]);
        this.shuttingDown = true;
        List<SegmentIdWithShardSpec> segments = this.appenderator.getSegments();
        if (segments.isEmpty()) {
            log.info("No segments to hand off.", new Object[0]);
        } else {
            log.info("Pushing segments: %s", Joiner.on(", ").join(segments));
        }
        try {
            try {
                if (this.lastCommitterSupplier != null) {
                    mergeAndPush();
                }
                synchronized (this.handoffCondition) {
                    while (!this.segments.isEmpty()) {
                        log.info("Waiting to hand off: %s", Joiner.on(", ").join(segments));
                        this.handoffCondition.wait();
                        segments = this.appenderator.getSegments();
                    }
                }
                if (!this.cleanShutdown) {
                    throw new ISE("Exception occurred during persist and merge.", new Object[0]);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            this.stopped = true;
            this.handoffNotifier.close();
            shutdownExecutors();
            this.appenderator.close();
        }
    }

    private SegmentIdWithShardSpec getSegmentIdentifier(long j) {
        if (!this.rejectionPolicy.accept(j)) {
            return null;
        }
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        DateTime bucketStart = segmentGranularity.bucketStart(DateTimes.utc(j));
        SegmentIdWithShardSpec segmentIdWithShardSpec = this.segments.get(Long.valueOf(bucketStart.getMillis()));
        if (segmentIdWithShardSpec == null) {
            Interval interval = new Interval(bucketStart, segmentGranularity.increment(bucketStart));
            segmentIdWithShardSpec = new SegmentIdWithShardSpec(this.schema.getDataSource(), interval, versioningPolicy.getVersion(interval), this.config.getShardSpec());
            addSegment(segmentIdWithShardSpec);
        }
        return segmentIdWithShardSpec;
    }

    protected void initializeExecutors() {
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
        }
    }

    protected void shutdownExecutors() {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
        }
    }

    private void addSegment(SegmentIdWithShardSpec segmentIdWithShardSpec) {
        this.segments.put(Long.valueOf(segmentIdWithShardSpec.getInterval().getStartMillis()), segmentIdWithShardSpec);
        try {
            this.segmentAnnouncer.announceSegment(new DataSegment(segmentIdWithShardSpec.getDataSource(), segmentIdWithShardSpec.getInterval(), segmentIdWithShardSpec.getVersion(), ImmutableMap.of(), ImmutableList.of(), ImmutableList.of(), segmentIdWithShardSpec.getShardSpec(), null, 0L));
        } catch (IOException e) {
            log.makeAlert(e, "Failed to announce new segment[%s]", segmentIdWithShardSpec.getDataSource()).addData("interval", segmentIdWithShardSpec.getInterval()).emit();
        }
    }

    public void dropSegment(final SegmentIdWithShardSpec segmentIdWithShardSpec) {
        log.info("Dropping segment: %s", segmentIdWithShardSpec);
        this.segments.remove(Long.valueOf(segmentIdWithShardSpec.getInterval().getStartMillis()));
        Futures.addCallback(this.appenderator.drop(segmentIdWithShardSpec), new FutureCallback<Object>() { // from class: org.apache.druid.segment.realtime.appenderator.AppenderatorPlumber.2
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Object obj) {
                AppenderatorPlumber.log.info("Dropped segment: %s", segmentIdWithShardSpec);
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AppenderatorPlumber.log.warn(th, "Failed to drop segment: %s", segmentIdWithShardSpec);
            }
        }, MoreExecutors.directExecutor());
    }

    private void startPersistThread() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        Period windowPeriod = this.config.getWindowPeriod();
        DateTime bucketStart = segmentGranularity.bucketStart(DateTimes.nowUtc());
        long millis = windowPeriod.toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", DateTimes.nowUtc().plus(new Duration(System.currentTimeMillis(), segmentGranularity.increment(bucketStart).getMillis() + millis)));
        ScheduledExecutors.scheduleAtFixedRate(this.scheduledExecutor, new Duration(System.currentTimeMillis(), segmentGranularity.increment(bucketStart).getMillis() + millis), new Duration(bucketStart, segmentGranularity.increment(bucketStart)), new ThreadRenamingCallable<ScheduledExecutors.Signal>(StringUtils.format("%s-overseer-%d", this.schema.getDataSource(), Integer.valueOf(this.config.getShardSpec().getPartitionNum()))) { // from class: org.apache.druid.segment.realtime.appenderator.AppenderatorPlumber.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.druid.common.guava.ThreadRenamingCallable
            public ScheduledExecutors.Signal doCall() {
                if (AppenderatorPlumber.this.stopped) {
                    AppenderatorPlumber.log.info("Stopping merge-n-push overseer thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                AppenderatorPlumber.this.mergeAndPush();
                if (!AppenderatorPlumber.this.stopped) {
                    return ScheduledExecutors.Signal.REPEAT;
                }
                AppenderatorPlumber.log.info("Stopping merge-n-push overseer thread", new Object[0]);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mergeAndPush() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        long millis = this.config.getWindowPeriod().toStandardDuration().getMillis();
        log.info("Starting merge and push.", new Object[0]);
        DateTime bucketStart = segmentGranularity.bucketStart(DateTimes.utc(Math.max(millis, this.rejectionPolicy.getCurrMaxTime().getMillis()) - millis));
        long millis2 = bucketStart.getMillis();
        List<SegmentIdWithShardSpec> segments = this.appenderator.getSegments();
        final ArrayList arrayList = new ArrayList();
        if (this.shuttingDown) {
            log.info("Found [%,d] segments. Attempting to hand off all of them.", Integer.valueOf(segments.size()));
            arrayList.addAll(segments);
        } else {
            log.info("Found [%,d] segments. Attempting to hand off segments that start before [%s].", Integer.valueOf(segments.size()), bucketStart);
            for (SegmentIdWithShardSpec segmentIdWithShardSpec : segments) {
                Long valueOf = Long.valueOf(segmentIdWithShardSpec.getInterval().getStartMillis());
                if (valueOf.longValue() < millis2) {
                    log.info("Adding entry [%s] for merge and push.", segmentIdWithShardSpec);
                    arrayList.add(segmentIdWithShardSpec);
                } else {
                    log.info("Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", segmentIdWithShardSpec, DateTimes.utc(valueOf.longValue()), bucketStart);
                }
            }
        }
        log.info("Found [%,d] segments to persist and merge", Integer.valueOf(arrayList.size()));
        final Function<Throwable, Void> function = new Function<Throwable, Void>() { // from class: org.apache.druid.segment.realtime.appenderator.AppenderatorPlumber.4
            @Override // com.google.common.base.Function, java.util.function.Function
            public Void apply(Throwable th) {
                AppenderatorPlumber.log.makeAlert(th, "Failed to publish merged indexes[%s]", AppenderatorPlumber.this.schema.getDataSource()).addData("segments", Lists.transform(arrayList, (v0) -> {
                    return v0.toString();
                })).emit();
                if (!AppenderatorPlumber.this.shuttingDown) {
                    return null;
                }
                AppenderatorPlumber.this.cleanShutdown = false;
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    AppenderatorPlumber.this.dropSegment((SegmentIdWithShardSpec) it2.next());
                }
                return null;
            }
        };
        Futures.addCallback(this.appenderator.push(arrayList, Committers.nil(), false), new FutureCallback<SegmentsAndCommitMetadata>() { // from class: org.apache.druid.segment.realtime.appenderator.AppenderatorPlumber.5
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(SegmentsAndCommitMetadata segmentsAndCommitMetadata) {
                Iterator<DataSegment> it2 = segmentsAndCommitMetadata.getSegments().iterator();
                while (it2.hasNext()) {
                    try {
                        AppenderatorPlumber.this.segmentPublisher.publishSegment(it2.next());
                    } catch (Exception e) {
                        function.apply(e);
                    }
                }
                AppenderatorPlumber.log.info("Published [%,d] sinks.", Integer.valueOf(arrayList.size()));
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AppenderatorPlumber.log.warn(th, "Failed to push [%,d] segments.", Integer.valueOf(arrayList.size()));
                function.apply(th);
            }
        }, MoreExecutors.directExecutor());
    }
}
