package org.apache.druid.segment.realtime.plumber;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.common.guava.ThreadRenamingCallable;
import org.apache.druid.common.guava.ThreadRenamingRunnable;
import org.apache.druid.concurrent.TaskThreadPriority;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalFileTimestampVersionFinder;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.SinkQuerySegmentWalker;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
import org.apache.druid.utils.JvmUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;

/* loaded from: input_file:org/apache/druid/segment/realtime/plumber/RealtimePlumber.class */
public class RealtimePlumber implements Plumber {
    private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
    private static final int WARN_DELAY = 1000;
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final RejectionPolicy rejectionPolicy;
    private final FireDepartmentMetrics metrics;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final DataSegmentPusher dataSegmentPusher;
    private final SegmentPublisher segmentPublisher;
    private final SegmentHandoffNotifier handoffNotifier;
    private final QuerySegmentWalker texasRanger;
    private final Cache cache;
    private volatile IndexMerger indexMerger;
    private volatile IndexIO indexIO;
    private static final String COMMIT_METADATA_KEY = "%commitMetadata%";
    private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%";
    private final Object handoffCondition = new Object();
    private final ConcurrentMap<Long, Sink> sinks = new ConcurrentHashMap();
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(String.CASE_INSENSITIVE_ORDER);
    private volatile long nextFlush = 0;
    private volatile boolean shuttingDown = false;
    private volatile boolean stopped = false;
    private volatile boolean cleanShutdown = true;
    private volatile ExecutorService persistExecutor = null;
    private volatile ExecutorService mergeExecutor = null;
    private volatile ScheduledExecutorService scheduledExecutor = null;

    public RealtimePlumber(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics, ServiceEmitter serviceEmitter, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentAnnouncer dataSegmentAnnouncer, ExecutorService executorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, SegmentHandoffNotifier segmentHandoffNotifier, IndexMerger indexMerger, IndexIO indexIO, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, ObjectMapper objectMapper) {
        this.schema = dataSchema;
        this.config = realtimeTuningConfig;
        this.rejectionPolicy = realtimeTuningConfig.getRejectionPolicyFactory().create(realtimeTuningConfig.getWindowPeriod());
        this.metrics = fireDepartmentMetrics;
        this.segmentAnnouncer = dataSegmentAnnouncer;
        this.dataSegmentPusher = dataSegmentPusher;
        this.segmentPublisher = segmentPublisher;
        this.handoffNotifier = segmentHandoffNotifier;
        this.indexMerger = (IndexMerger) Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
        this.indexIO = (IndexIO) Preconditions.checkNotNull(indexIO, "Null IndexIO");
        this.cache = cache;
        this.texasRanger = new SinkQuerySegmentWalker(dataSchema.getDataSource(), this.sinkTimeline, objectMapper, serviceEmitter, queryRunnerFactoryConglomerate, executorService, cache, cacheConfig, cachePopulatorStats);
        log.info("Creating plumber using rejectionPolicy[%s]", new Object[]{getRejectionPolicy()});
    }

    public DataSchema getSchema() {
        return this.schema;
    }

    public RealtimeTuningConfig getConfig() {
        return this.config;
    }

    public RejectionPolicy getRejectionPolicy() {
        return this.rejectionPolicy;
    }

    public Map<Long, Sink> getSinks() {
        return this.sinks;
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public Object startJob() {
        computeBaseDir(this.schema).mkdirs();
        initializeExecutors();
        this.handoffNotifier.start();
        Object bootstrapSinksFromDisk = bootstrapSinksFromDisk();
        startPersistThread();
        mergeAndPush();
        resetNextFlush();
        return bootstrapSinksFromDisk;
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public IncrementalIndexAddResult add(InputRow inputRow, Supplier<Committer> supplier) throws IndexSizeExceededException {
        long timestampFromEpoch = inputRow.getTimestampFromEpoch();
        Sink sink = getSink(timestampFromEpoch);
        this.metrics.reportMessageMaxTimestamp(timestampFromEpoch);
        if (sink == null) {
            return Plumber.THROWAWAY;
        }
        IncrementalIndexAddResult add = sink.add(inputRow, false);
        if (this.config.isReportParseExceptions() && add.getParseException() != null) {
            throw add.getParseException();
        }
        if (!sink.canAppendRow() || System.currentTimeMillis() > this.nextFlush) {
            persist((Committer) supplier.get());
        }
        return add;
    }

    private Sink getSink(long j) {
        if (!this.rejectionPolicy.accept(j)) {
            return null;
        }
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        DateTime bucketStart = segmentGranularity.bucketStart(DateTimes.utc(j));
        Sink sink = this.sinks.get(Long.valueOf(bucketStart.getMillis()));
        if (sink == null) {
            Interval interval = new Interval(bucketStart, segmentGranularity.increment(bucketStart));
            sink = new Sink(interval, this.schema, this.config.getShardSpec(), versioningPolicy.getVersion(interval), this.config.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(this.config.getMaxBytesInMemory()), this.config.isReportParseExceptions(), this.config.getDedupColumn());
            addSink(sink);
        }
        return sink;
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
        return this.texasRanger.getQueryRunnerForIntervals(query, query.getIntervals());
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public void persist(final Committer committer) {
        final ArrayList arrayList = new ArrayList();
        for (Sink sink : this.sinks.values()) {
            if (sink.swappable()) {
                arrayList.add(Pair.of(sink.swap(), sink.getInterval()));
            }
        }
        log.info("Submitting persist runnable for dataSource[%s]", new Object[]{this.schema.getDataSource()});
        Stopwatch createStarted = Stopwatch.createStarted();
        final Stopwatch createStarted2 = Stopwatch.createStarted();
        final ImmutableMap of = committer.getMetadata() == null ? null : ImmutableMap.of(COMMIT_METADATA_KEY, committer.getMetadata(), COMMIT_METADATA_TIMESTAMP_KEY, Long.valueOf(System.currentTimeMillis()));
        this.persistExecutor.execute(new ThreadRenamingRunnable(StringUtils.format("%s-incremental-persist", new Object[]{this.schema.getDataSource()})) { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumber.1
            public void doRun() {
                long safeGetThreadCpuTime = JvmUtils.safeGetThreadCpuTime();
                try {
                    try {
                        for (Pair pair : arrayList) {
                            RealtimePlumber.this.metrics.incrementRowOutputCount(RealtimePlumber.this.persistHydrant((FireHydrant) pair.lhs, RealtimePlumber.this.schema, (Interval) pair.rhs, of));
                        }
                        committer.run();
                        RealtimePlumber.this.metrics.incrementPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - safeGetThreadCpuTime);
                        RealtimePlumber.this.metrics.incrementNumPersists();
                        RealtimePlumber.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                        createStarted2.stop();
                    } catch (Exception e) {
                        RealtimePlumber.this.metrics.incrementFailedPersists();
                        throw e;
                    }
                } catch (Throwable th) {
                    RealtimePlumber.this.metrics.incrementPersistCpuTime(JvmUtils.safeGetThreadCpuTime() - safeGetThreadCpuTime);
                    RealtimePlumber.this.metrics.incrementNumPersists();
                    RealtimePlumber.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                    createStarted2.stop();
                    throw th;
                }
            }
        });
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(elapsed);
        if (elapsed > 1000) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", new Object[]{Long.valueOf(elapsed)});
        }
        createStarted.stop();
        resetNextFlush();
    }

    private void persistAndMerge(final long j, final Sink sink) {
        this.mergeExecutor.execute(new ThreadRenamingRunnable(StringUtils.format("%s-%s-persist-n-merge", new Object[]{this.schema.getDataSource(), DateTimes.utc(j)})) { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumber.2
            final Interval interval;
            Stopwatch mergeStopwatch = null;

            {
                this.interval = sink.getInterval();
            }

            public void doRun() {
                try {
                    try {
                        if (RealtimePlumber.this.sinks.get(Long.valueOf(j)) != sink) {
                            RealtimePlumber.log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", new Object[]{sink});
                            if (this.mergeStopwatch != null) {
                                this.mergeStopwatch.stop();
                                return;
                            }
                            return;
                        }
                        File computePersistDir = RealtimePlumber.this.computePersistDir(RealtimePlumber.this.schema, this.interval);
                        File file = new File(computePersistDir, "merged");
                        File file2 = new File(computePersistDir, "isPushedMarker");
                        if (file2.exists()) {
                            RealtimePlumber.log.info("Already pushed sink[%s]", new Object[]{sink});
                            if (this.mergeStopwatch != null) {
                                this.mergeStopwatch.stop();
                                return;
                            }
                            return;
                        }
                        RealtimePlumber.this.removeSegment(sink, file);
                        if (file.exists()) {
                            RealtimePlumber.log.wtf("Merged target[%s] exists?!", new Object[]{file});
                            if (this.mergeStopwatch != null) {
                                this.mergeStopwatch.stop();
                                return;
                            }
                            return;
                        }
                        Iterator<FireHydrant> it = sink.iterator();
                        while (it.hasNext()) {
                            FireHydrant next = it.next();
                            synchronized (next) {
                                if (!next.hasSwapped()) {
                                    RealtimePlumber.log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", new Object[]{next, sink});
                                    RealtimePlumber.this.metrics.incrementRowOutputCount(RealtimePlumber.this.persistHydrant(next, RealtimePlumber.this.schema, this.interval, null));
                                }
                            }
                        }
                        long safeGetThreadCpuTime = JvmUtils.safeGetThreadCpuTime();
                        this.mergeStopwatch = Stopwatch.createStarted();
                        ArrayList arrayList = new ArrayList();
                        Closer create = Closer.create();
                        try {
                            try {
                                Iterator<FireHydrant> it2 = sink.iterator();
                                while (it2.hasNext()) {
                                    FireHydrant next2 = it2.next();
                                    Pair<Segment, Closeable> andIncrementSegment = next2.getAndIncrementSegment();
                                    QueryableIndex asQueryableIndex = ((Segment) andIncrementSegment.lhs).asQueryableIndex();
                                    RealtimePlumber.log.info("Adding hydrant[%s]", new Object[]{next2});
                                    arrayList.add(asQueryableIndex);
                                    create.register((Closeable) andIncrementSegment.rhs);
                                }
                                File mergeQueryableIndex = RealtimePlumber.this.indexMerger.mergeQueryableIndex(arrayList, RealtimePlumber.this.schema.getGranularitySpec().isRollup(), RealtimePlumber.this.schema.getAggregators(), file, RealtimePlumber.this.config.getIndexSpec(), RealtimePlumber.this.config.getSegmentWriteOutMediumFactory());
                                create.close();
                                RealtimePlumber.this.metrics.incrementMergeCpuTime(JvmUtils.safeGetThreadCpuTime() - safeGetThreadCpuTime);
                                RealtimePlumber.this.metrics.incrementMergeTimeMillis(this.mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));
                                RealtimePlumber.log.info("Pushing [%s] to deep storage", new Object[]{sink.getSegment().getId()});
                                DataSegment push = RealtimePlumber.this.dataSegmentPusher.push(mergeQueryableIndex, sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(arrayList)), false);
                                RealtimePlumber.log.info("Inserting [%s] to the metadata store", new Object[]{sink.getSegment().getId()});
                                RealtimePlumber.this.segmentPublisher.publishSegment(push);
                                if (!file2.createNewFile()) {
                                    RealtimePlumber.log.makeAlert("Failed to create marker file for [%s]", new Object[]{RealtimePlumber.this.schema.getDataSource()}).addData("interval", sink.getInterval()).addData("partitionNum", Integer.valueOf(push.getShardSpec().getPartitionNum())).addData("marker", file2).emit();
                                }
                                if (this.mergeStopwatch != null) {
                                    this.mergeStopwatch.stop();
                                }
                            } catch (Throwable th) {
                                throw create.rethrow(th);
                            }
                        } catch (Throwable th2) {
                            create.close();
                            throw th2;
                        }
                    } catch (Exception e) {
                        RealtimePlumber.this.metrics.incrementFailedHandoffs();
                        RealtimePlumber.log.makeAlert(e, "Failed to persist merged index[%s]", new Object[]{RealtimePlumber.this.schema.getDataSource()}).addData("interval", this.interval).emit();
                        if (RealtimePlumber.this.shuttingDown) {
                            RealtimePlumber.this.cleanShutdown = false;
                            RealtimePlumber.this.abandonSegment(j, sink);
                        }
                        if (this.mergeStopwatch != null) {
                            this.mergeStopwatch.stop();
                        }
                    }
                } catch (Throwable th3) {
                    if (this.mergeStopwatch != null) {
                        this.mergeStopwatch.stop();
                    }
                    throw th3;
                }
            }
        });
        this.handoffNotifier.registerSegmentHandoffCallback(new SegmentDescriptor(sink.getInterval(), sink.getVersion(), this.config.getShardSpec().getPartitionNum()), this.mergeExecutor, new Runnable() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumber.3
            @Override // java.lang.Runnable
            public void run() {
                RealtimePlumber.this.abandonSegment(sink.getInterval().getStartMillis(), sink);
                RealtimePlumber.this.metrics.incrementHandOffCount();
            }
        });
    }

    @Override // org.apache.druid.segment.realtime.plumber.Plumber
    public void finishJob() {
        log.info("Shutting down...", new Object[0]);
        this.shuttingDown = true;
        for (Map.Entry<Long, Sink> entry : this.sinks.entrySet()) {
            entry.getValue().clearDedupCache();
            persistAndMerge(entry.getKey().longValue(), entry.getValue());
        }
        long currentTimeMillis = System.currentTimeMillis() + this.config.getHandoffConditionTimeout();
        while (!this.sinks.isEmpty()) {
            try {
                log.info("Cannot shut down yet! Sinks remaining: %s", new Object[]{Collections2.transform(this.sinks.values(), sink -> {
                    return sink.getSegment().getId();
                })});
                synchronized (this.handoffCondition) {
                    while (!this.sinks.isEmpty()) {
                        if (this.config.getHandoffConditionTimeout() == 0) {
                            this.handoffCondition.wait();
                        } else {
                            long currentTimeMillis2 = System.currentTimeMillis();
                            if (currentTimeMillis - currentTimeMillis2 <= 0) {
                                throw new ISE("Segment handoff wait timeout. [%s] segments might not have completed handoff.", new Object[]{Integer.valueOf(this.sinks.size())});
                            }
                            this.handoffCondition.wait(currentTimeMillis - currentTimeMillis2);
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        this.handoffNotifier.close();
        shutdownExecutors();
        this.stopped = true;
        if (!this.cleanShutdown) {
            throw new ISE("Exception occurred during persist and merge.", new Object[0]);
        }
    }

    private void resetNextFlush() {
        this.nextFlush = DateTimes.nowUtc().plus(this.config.getIntermediatePersistPeriod()).getMillis();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeExecutors() {
        int maxPendingPersists = this.config.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = Execs.newBlockingSingleThreaded("plumber_persist_%d", maxPendingPersists, TaskThreadPriority.getThreadPriorityFromTaskPriority(this.config.getPersistThreadPriority()));
        }
        if (this.mergeExecutor == null) {
            this.mergeExecutor = Execs.newBlockingSingleThreaded("plumber_merge_%d", 1, TaskThreadPriority.getThreadPriorityFromTaskPriority(this.config.getMergeThreadPriority()));
        }
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Execs.scheduledSingleThreaded("plumber_scheduled_%d");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdownExecutors() {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
            this.persistExecutor.shutdown();
            this.mergeExecutor.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object bootstrapSinksFromDisk() {
        File[] listFiles;
        Object obj;
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        File computeBaseDir = computeBaseDir(this.schema);
        if (computeBaseDir == null || !computeBaseDir.exists() || (listFiles = computeBaseDir.listFiles()) == null) {
            return null;
        }
        Object obj2 = null;
        long j = 0;
        for (File file : listFiles) {
            Interval of = Intervals.of(file.getName().replace('_', '/'));
            File[] listFiles2 = file.listFiles(new FilenameFilter() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumber.4
                @Override // java.io.FilenameFilter
                public boolean accept(File file2, String str) {
                    return Ints.tryParse(str) != null;
                }
            });
            Arrays.sort(listFiles2, new Comparator<File>() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumber.5
                @Override // java.util.Comparator
                public int compare(File file2, File file3) {
                    try {
                        return Ints.compare(Integer.parseInt(file2.getName()), Integer.parseInt(file3.getName()));
                    } catch (NumberFormatException e) {
                        RealtimePlumber.log.error(e, "Couldn't compare as numbers? [%s][%s]", new Object[]{file2, file3});
                        return file2.compareTo(file3);
                    }
                }
            });
            boolean z = false;
            ArrayList arrayList = new ArrayList();
            for (File file2 : listFiles2) {
                log.info("Loading previously persisted segment at [%s]", new Object[]{file2});
                if (Ints.tryParse(file2.getName()) != null) {
                    QueryableIndex queryableIndex = null;
                    try {
                        queryableIndex = this.indexIO.loadIndex(file2);
                    } catch (IOException e) {
                        log.error(e, "Problem loading segmentDir from disk.", new Object[0]);
                        z = true;
                    }
                    if (z) {
                        try {
                            File computeCorruptedFileDumpDir = computeCorruptedFileDumpDir(file2, this.schema);
                            log.info("Renaming %s to %s", new Object[]{file2.getAbsolutePath(), computeCorruptedFileDumpDir.getAbsolutePath()});
                            FileUtils.copyDirectory(file2, computeCorruptedFileDumpDir);
                            org.apache.druid.java.util.common.FileUtils.deleteDirectory(file2);
                        } catch (Exception e2) {
                            log.error(e2, "Failed to rename %s", new Object[]{file2.getAbsolutePath()});
                        }
                    } else {
                        Metadata metadata = queryableIndex.getMetadata();
                        if (metadata != null && (obj = metadata.get(COMMIT_METADATA_TIMESTAMP_KEY)) != null) {
                            long longValue = ((Long) obj).longValue();
                            if (longValue > j) {
                                log.info("Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", new Object[]{queryableIndex.getMetadata(), Long.valueOf(longValue), Long.valueOf(j)});
                                j = longValue;
                                obj2 = queryableIndex.getMetadata().get(COMMIT_METADATA_KEY);
                            }
                        }
                        arrayList.add(new FireHydrant(new QueryableIndexSegment(queryableIndex, SegmentId.of(this.schema.getDataSource(), of, versioningPolicy.getVersion(of), this.config.getShardSpec())), Integer.parseInt(file2.getName())));
                    }
                }
            }
            if (arrayList.isEmpty()) {
                log.warn("Found persisted segment directory with no intermediate segments present at %s, skipping sink creation.", new Object[]{file.getAbsolutePath()});
            } else {
                addSink(new Sink(of, this.schema, this.config.getShardSpec(), null, versioningPolicy.getVersion(of), this.config.getMaxRowsInMemory(), TuningConfigs.getMaxBytesInMemoryOrDefault(this.config.getMaxBytesInMemory()), this.config.isReportParseExceptions(), this.config.getDedupColumn(), arrayList));
            }
        }
        return obj2;
    }

    private void addSink(Sink sink) {
        this.sinks.put(Long.valueOf(sink.getInterval().getStartMillis()), sink);
        this.metrics.setSinkCount(this.sinks.size());
        this.sinkTimeline.add(sink.getInterval(), sink.getVersion(), new SingleElementPartitionChunk(sink));
        try {
            this.segmentAnnouncer.announceSegment(sink.getSegment());
        } catch (IOException e) {
            log.makeAlert(e, "Failed to announce new segment[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", sink.getInterval()).emit();
        }
        clearDedupCache();
    }

    protected void startPersistThread() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        Period windowPeriod = this.config.getWindowPeriod();
        DateTime bucketStart = segmentGranularity.bucketStart(DateTimes.nowUtc());
        long millis = windowPeriod.toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", new Object[]{DateTimes.nowUtc().plus(new Duration(System.currentTimeMillis(), segmentGranularity.increment(bucketStart).getMillis() + millis))});
        ScheduledExecutors.scheduleAtFixedRate(this.scheduledExecutor, new Duration(System.currentTimeMillis(), segmentGranularity.increment(bucketStart).getMillis() + millis), new Duration(bucketStart, segmentGranularity.increment(bucketStart)), new ThreadRenamingCallable<ScheduledExecutors.Signal>(StringUtils.format("%s-overseer-%d", new Object[]{this.schema.getDataSource(), Integer.valueOf(this.config.getShardSpec().getPartitionNum())})) { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumber.6
            /* renamed from: doCall, reason: merged with bridge method [inline-methods] */
            public ScheduledExecutors.Signal m175doCall() {
                if (RealtimePlumber.this.stopped) {
                    RealtimePlumber.log.info("Stopping merge-n-push overseer thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                RealtimePlumber.this.mergeAndPush();
                if (!RealtimePlumber.this.stopped) {
                    return ScheduledExecutors.Signal.REPEAT;
                }
                RealtimePlumber.log.info("Stopping merge-n-push overseer thread", new Object[0]);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    private void clearDedupCache() {
        long millis = getAllowedMinTime().getMillis();
        for (Map.Entry<Long, Sink> entry : this.sinks.entrySet()) {
            if (entry.getKey().longValue() < millis) {
                entry.getValue().clearDedupCache();
            }
        }
    }

    private DateTime getAllowedMinTime() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        long millis = this.config.getWindowPeriod().toStandardDuration().getMillis();
        return segmentGranularity.bucketStart(DateTimes.utc(Math.max(millis, this.rejectionPolicy.getCurrMaxTime().getMillis()) - millis));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mergeAndPush() {
        log.info("Starting merge and push.", new Object[0]);
        DateTime allowedMinTime = getAllowedMinTime();
        long millis = allowedMinTime.getMillis();
        log.info("Found [%,d] segments. Attempting to hand off segments that start before [%s].", new Object[]{Integer.valueOf(this.sinks.size()), allowedMinTime});
        ArrayList<Map.Entry> arrayList = new ArrayList();
        for (Map.Entry<Long, Sink> entry : this.sinks.entrySet()) {
            Long key = entry.getKey();
            if (key.longValue() < millis) {
                log.info("Adding entry [%s] for merge and push.", new Object[]{entry});
                arrayList.add(entry);
                entry.getValue().clearDedupCache();
            } else {
                log.info("Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", new Object[]{entry, DateTimes.utc(key.longValue()), allowedMinTime});
            }
        }
        log.info("Found [%,d] sinks to persist and merge", new Object[]{Integer.valueOf(arrayList.size())});
        for (Map.Entry entry2 : arrayList) {
            persistAndMerge(((Long) entry2.getKey()).longValue(), (Sink) entry2.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void abandonSegment(long j, Sink sink) {
        if (this.sinks.containsKey(Long.valueOf(j))) {
            try {
                this.segmentAnnouncer.unannounceSegment(sink.getSegment());
                removeSegment(sink, computePersistDir(this.schema, sink.getInterval()));
                log.info("Removing sinkKey %d for segment %s", new Object[]{Long.valueOf(j), sink.getSegment().getId()});
                this.sinks.remove(Long.valueOf(j));
                this.metrics.setSinkCount(this.sinks.size());
                this.sinkTimeline.remove(sink.getInterval(), sink.getVersion(), new SingleElementPartitionChunk(sink));
                Iterator<FireHydrant> it = sink.iterator();
                while (it.hasNext()) {
                    FireHydrant next = it.next();
                    this.cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(next));
                    next.swapSegment(null);
                }
                synchronized (this.handoffCondition) {
                    this.handoffCondition.notifyAll();
                }
            } catch (Exception e) {
                log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", sink.getInterval()).emit();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public File computeBaseDir(DataSchema dataSchema) {
        return new File(this.config.getBasePersistDirectory(), dataSchema.getDataSource());
    }

    protected File computeCorruptedFileDumpDir(File file, DataSchema dataSchema) {
        return new File(StringUtils.replace(file.getAbsolutePath(), dataSchema.getDataSource(), "corrupted" + File.pathSeparator + dataSchema.getDataSource()));
    }

    protected File computePersistDir(DataSchema dataSchema, Interval interval) {
        return new File(computeBaseDir(dataSchema), interval.toString().replace('/', '_'));
    }

    protected int persistHydrant(FireHydrant fireHydrant, DataSchema dataSchema, Interval interval, Map<String, Object> map) {
        synchronized (fireHydrant) {
            if (fireHydrant.hasSwapped()) {
                log.info("DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", new Object[]{dataSchema.getDataSource(), interval, fireHydrant});
                return 0;
            }
            log.info("DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]", new Object[]{dataSchema.getDataSource(), interval, map, fireHydrant});
            try {
                int size = fireHydrant.getIndex().size();
                fireHydrant.getIndex().getMetadata().putAll(map);
                fireHydrant.swapSegment(new QueryableIndexSegment(this.indexIO.loadIndex(this.indexMerger.persist(fireHydrant.getIndex(), interval, new File(computePersistDir(dataSchema, interval), String.valueOf(fireHydrant.getCount())), this.config.getIndexSpecForIntermediatePersists(), this.config.getSegmentWriteOutMediumFactory())), fireHydrant.getSegmentId()));
                return size;
            } catch (IOException e) {
                log.makeAlert("dataSource[%s] -- incremental persist failed", new Object[]{dataSchema.getDataSource()}).addData("interval", interval).addData("count", Integer.valueOf(fireHydrant.getCount())).emit();
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeSegment(Sink sink, File file) {
        if (file.exists()) {
            try {
                log.info("Deleting Index File[%s]", new Object[]{file});
                org.apache.druid.java.util.common.FileUtils.deleteDirectory(file);
            } catch (Exception e) {
                log.makeAlert(e, "Unable to remove file for dataSource[%s]", new Object[]{this.schema.getDataSource()}).addData(LocalFileTimestampVersionFinder.URI_SCHEME, file).addData("interval", sink.getInterval()).emit();
            }
        }
    }
}
