/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.parseq.batching;

import com.linkedin.parseq.Context;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.batching.Batch;
import com.linkedin.parseq.batching.BatchAggregationTimeMetric;
import com.linkedin.parseq.batching.BatchImpl;
import com.linkedin.parseq.batching.BatchSizeMetric;
import com.linkedin.parseq.internal.ContextImpl;
import com.linkedin.parseq.internal.PlanContext;
import com.linkedin.parseq.promise.CountDownPromiseListener;
import com.linkedin.parseq.promise.PromiseListener;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import com.linkedin.parseq.trace.Relationship;
import com.linkedin.parseq.trace.ShallowTraceBuilder;
import com.linkedin.parseq.trace.TraceBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BatchingStrategy<G, K, T> {
    public static final int DEFAULT_MAX_BATCH_SIZE = 1024;
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchingStrategy.class);
    private static final int DEFAULT_KEY_SIZE = 1;
    private final ConcurrentMap<Long, GroupBatchBuilder> _batches = new ConcurrentHashMap<Long, GroupBatchBuilder>();
    private final BatchSizeMetric _batchSizeMetric = new BatchSizeMetric();
    private final BatchAggregationTimeMetric _batchAggregationTimeMetric = new BatchAggregationTimeMetric();

    public Task<T> batchable(String desc, K key) {
        Task batchableTask = Task.async((String)desc, ctx -> {
            G group;
            BatchImpl.BatchPromise result = new BatchImpl.BatchPromise();
            Long planId = ctx.getPlanId();
            GroupBatchBuilder builder = this._batches.computeIfAbsent(planId, k -> new GroupBatchBuilder());
            Batch fullBatch = builder.add(group = this.classify(key), key, ctx.getShallowTraceBuilder(), result);
            if (fullBatch != null) {
                try {
                    ctx.run(new Task[]{this.taskForBatch(group, fullBatch, true)});
                }
                catch (Throwable t) {
                    fullBatch.failAll(t);
                }
            }
            return result;
        });
        batchableTask.getShallowTraceBuilder().setTaskType("batched");
        return batchableTask;
    }

    public Task<T> batchable(K key) {
        return this.batchable("batchableTaskForKey: " + key.toString(), key);
    }

    private Task<?> taskForBatch(G group, Batch<K, T> batch, boolean hasParent) {
        this._batchSizeMetric.record(batch.batchSize());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(this.debugInfo(group, batch));
        }
        return Task.async((String)this.getBatchName(group, batch), ctx -> {
            SettablePromise result = Promises.settable();
            CountDownPromiseListener countDownListener = new CountDownPromiseListener(batch.keySize(), result, null);
            boolean assignedParent = false;
            TraceBuilder traceBuilder = ctx.getTraceBuilder();
            for (BatchImpl.BatchEntry entry : batch.values()) {
                for (ShallowTraceBuilder shallowTraceBuilder : entry.getShallowTraceBuilders()) {
                    if (!assignedParent && !hasParent) {
                        traceBuilder.addRelationship(Relationship.CHILD_OF, ctx.getShallowTraceBuilder(), shallowTraceBuilder);
                        assignedParent = true;
                        continue;
                    }
                    traceBuilder.addRelationship(Relationship.POTENTIAL_CHILD_OF, ctx.getShallowTraceBuilder(), shallowTraceBuilder);
                }
                BatchImpl.BatchPromise promise = entry.getPromise();
                promise.getInternal().addListener((PromiseListener)countDownListener);
                result.addListener(v -> promise.trigger());
            }
            try {
                this.executeBatchWithContext(group, batch, (Context)ctx);
            }
            catch (Throwable t) {
                batch.failAll(t);
            }
            ctx.getShallowTraceBuilder().setSystemHidden(true);
            return result;
        });
    }

    private void runBatch(PlanContext planContext, G group, Batch<K, T> batch) {
        try {
            Task<?> batchedTask = this.taskForBatch(group, batch, false);
            PlanContext forkedPlan = planContext.fork(batchedTask);
            new ContextImpl(forkedPlan, batchedTask).runTask();
        }
        catch (Throwable t) {
            batch.failAll(t);
        }
    }

    void handleBatch(PlanContext planContext) {
        GroupBatchBuilder batchBuilder = (GroupBatchBuilder)this._batches.remove(planContext.getId());
        if (batchBuilder != null) {
            batchBuilder.batches().forEach((group, builder) -> this.runBatch(planContext, group, builder.build()));
        }
    }

    private String debugInfo(G group, Batch<K, T> batch) {
        StringBuilder debugInfo = new StringBuilder("\n");
        debugInfo.append("group: ").append(group).append("\n").append("batch keys: \n");
        batch.keys().forEach(key -> debugInfo.append("    ").append(key).append("\n"));
        return debugInfo.toString();
    }

    public BatchSizeMetric getBatchSizeMetric() {
        return this._batchSizeMetric;
    }

    public BatchAggregationTimeMetric getBatchAggregationTimeMetric() {
        return this._batchAggregationTimeMetric;
    }

    public abstract void executeBatch(G var1, Batch<K, T> var2);

    protected void executeBatchWithContext(G group, Batch<K, T> batch, Context ctx) {
        this.executeBatch(group, batch);
    }

    public abstract G classify(K var1);

    public int maxBatchSizeForGroup(G group) {
        return 1024;
    }

    public int keySize(G group, K key) {
        return 1;
    }

    public String getBatchName(G group, Batch<K, T> batch) {
        return "batch(keys: " + batch.keySize() + ", size: " + batch.batchSize() + ")";
    }

    private class GroupBatchBuilder {
        private final Map<G, BatchImpl.BatchBuilder<K, T>> _batchesByGroup = new HashMap();

        private GroupBatchBuilder() {
        }

        Batch<K, T> add(G group, K key, ShallowTraceBuilder traceBuilder, BatchImpl.BatchPromise<T> promise) {
            int size = BatchingStrategy.this.keySize(group, key);
            BatchImpl.BatchBuilder builder = this._batchesByGroup.computeIfAbsent(group, x -> new BatchImpl.BatchBuilder(BatchingStrategy.this.maxBatchSizeForGroup(group), BatchingStrategy.this._batchAggregationTimeMetric));
            if (builder.add(key, traceBuilder, promise, size)) {
                if (builder.isFull()) {
                    this._batchesByGroup.remove(group);
                    return builder.build();
                }
                return null;
            }
            BatchImpl.BatchBuilder newBuilder = new BatchImpl.BatchBuilder(BatchingStrategy.this.maxBatchSizeForGroup(group), BatchingStrategy.this._batchAggregationTimeMetric);
            newBuilder.add(key, traceBuilder, promise, size);
            if (newBuilder.isFull()) {
                return newBuilder.build();
            }
            if (builder.batchSize() > newBuilder.batchSize()) {
                this._batchesByGroup.put(group, newBuilder);
                return builder.build();
            }
            return newBuilder.build();
        }

        Map<G, BatchImpl.BatchBuilder<K, T>> batches() {
            return this._batchesByGroup;
        }
    }
}

