/*
 * Decompiled with CFR 0.152.
 */
package com.wavefront.agent.handlers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.wavefront.agent.api.APIContainer;
import com.wavefront.agent.data.EntityProperties;
import com.wavefront.agent.data.EntityPropertiesFactory;
import com.wavefront.agent.data.QueueingReason;
import com.wavefront.agent.handlers.AbstractSenderTask;
import com.wavefront.agent.handlers.EventSenderTask;
import com.wavefront.agent.handlers.HandlerKey;
import com.wavefront.agent.handlers.LineDelimitedSenderTask;
import com.wavefront.agent.handlers.LogSenderTask;
import com.wavefront.agent.handlers.SenderTask;
import com.wavefront.agent.handlers.SenderTaskFactory;
import com.wavefront.agent.handlers.SourceTagSenderTask;
import com.wavefront.agent.queueing.QueueController;
import com.wavefront.agent.queueing.QueueingFactory;
import com.wavefront.agent.queueing.TaskQueueFactory;
import com.wavefront.agent.queueing.TaskSizeEstimator;
import com.wavefront.api.ProxyV2API;
import com.wavefront.common.Managed;
import com.wavefront.common.NamedThreadFactory;
import com.wavefront.common.TaggedMetricName;
import com.wavefront.data.ReportableEntityType;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class SenderTaskFactoryImpl
implements SenderTaskFactory {
    private final Logger log = Logger.getLogger(SenderTaskFactoryImpl.class.getCanonicalName());
    private final Map<String, List<ReportableEntityType>> entityTypes = new ConcurrentHashMap<String, List<ReportableEntityType>>();
    private final Map<HandlerKey, ScheduledExecutorService> executors = new ConcurrentHashMap<HandlerKey, ScheduledExecutorService>();
    private final Map<HandlerKey, List<SenderTask<?>>> managedTasks = new ConcurrentHashMap();
    private final Map<HandlerKey, QueueController> managedServices = new ConcurrentHashMap<HandlerKey, QueueController>();
    private final Map<HandlerKey, TaskSizeEstimator> taskSizeEstimators = new ConcurrentHashMap<HandlerKey, TaskSizeEstimator>();
    private final APIContainer apiContainer;
    private final UUID proxyId;
    private final TaskQueueFactory taskQueueFactory;
    private final QueueingFactory queueingFactory;
    private final Map<String, EntityPropertiesFactory> entityPropsFactoryMap;

    public SenderTaskFactoryImpl(APIContainer apiContainer, UUID proxyId, TaskQueueFactory taskQueueFactory, @Nullable QueueingFactory queueingFactory, Map<String, EntityPropertiesFactory> entityPropsFactoryMap) {
        this.apiContainer = apiContainer;
        this.proxyId = proxyId;
        this.taskQueueFactory = taskQueueFactory;
        this.queueingFactory = queueingFactory;
        this.entityPropsFactoryMap = entityPropsFactoryMap;
        Metrics.newGauge((MetricName)new TaggedMetricName("buffer", "fill-rate"), (Gauge)new Gauge<Long>(){

            public Long value() {
                List sizes = SenderTaskFactoryImpl.this.taskSizeEstimators.values().stream().map(TaskSizeEstimator::getBytesPerMinute).filter(Objects::nonNull).collect(Collectors.toList());
                return sizes.size() == 0 ? null : Long.valueOf(sizes.stream().mapToLong(x -> x).sum());
            }
        });
    }

    public Map<String, Collection<SenderTask<?>>> createSenderTasks(@Nonnull HandlerKey handlerKey) {
        ReportableEntityType entityType = handlerKey.getEntityType();
        String handle = handlerKey.getHandle();
        HashMap toReturn = Maps.newHashMap();
        for (String tenantName : this.apiContainer.getTenantNameList()) {
            int numThreads = this.entityPropsFactoryMap.get(tenantName).get(entityType).getFlushThreads();
            HandlerKey tenantHandlerKey = HandlerKey.of(entityType, handle, tenantName);
            ScheduledExecutorService scheduler = this.executors.computeIfAbsent(tenantHandlerKey, x -> Executors.newScheduledThreadPool(numThreads, (ThreadFactory)new NamedThreadFactory("submitter-" + tenantHandlerKey.getEntityType() + "-" + tenantHandlerKey.getHandle())));
            toReturn.put(tenantName, this.generateSenderTaskList(tenantHandlerKey, numThreads, scheduler));
        }
        return toReturn;
    }

    private Collection<SenderTask<?>> generateSenderTaskList(HandlerKey handlerKey, int numThreads, ScheduledExecutorService scheduler) {
        String tenantName = handlerKey.getTenantName();
        if (tenantName == null) {
            throw new IllegalArgumentException("Tenant name in handlerKey should not be null when generating sender task list.");
        }
        TaskSizeEstimator taskSizeEstimator = new TaskSizeEstimator(handlerKey.getHandle());
        this.taskSizeEstimators.put(handlerKey, taskSizeEstimator);
        ReportableEntityType entityType = handlerKey.getEntityType();
        ArrayList senderTaskList = new ArrayList(numThreads);
        ProxyV2API proxyV2API = this.apiContainer.getProxyV2APIForTenant(tenantName);
        EntityProperties properties = this.entityPropsFactoryMap.get(tenantName).get(entityType);
        for (int threadNo = 0; threadNo < numThreads; ++threadNo) {
            AbstractSenderTask senderTask;
            switch (entityType) {
                case POINT: 
                case DELTA_COUNTER: {
                    senderTask = new LineDelimitedSenderTask(handlerKey, "wavefront", proxyV2API, this.proxyId, properties, scheduler, threadNo, taskSizeEstimator, this.taskQueueFactory.getTaskQueue(handlerKey, threadNo));
                    break;
                }
                case HISTOGRAM: {
                    senderTask = new LineDelimitedSenderTask(handlerKey, "histogram", proxyV2API, this.proxyId, properties, scheduler, threadNo, taskSizeEstimator, this.taskQueueFactory.getTaskQueue(handlerKey, threadNo));
                    break;
                }
                case SOURCE_TAG: {
                    senderTask = new SourceTagSenderTask(handlerKey, this.apiContainer.getSourceTagAPIForTenant(tenantName), threadNo, properties, scheduler, this.taskQueueFactory.getTaskQueue(handlerKey, threadNo));
                    break;
                }
                case TRACE: {
                    senderTask = new LineDelimitedSenderTask(handlerKey, "trace", proxyV2API, this.proxyId, properties, scheduler, threadNo, taskSizeEstimator, this.taskQueueFactory.getTaskQueue(handlerKey, threadNo));
                    break;
                }
                case TRACE_SPAN_LOGS: {
                    senderTask = new LineDelimitedSenderTask(handlerKey, "spanLogs", proxyV2API, this.proxyId, properties, scheduler, threadNo, taskSizeEstimator, this.taskQueueFactory.getTaskQueue(handlerKey, threadNo));
                    break;
                }
                case EVENT: {
                    senderTask = new EventSenderTask(handlerKey, this.apiContainer.getEventAPIForTenant(tenantName), this.proxyId, threadNo, properties, scheduler, this.taskQueueFactory.getTaskQueue(handlerKey, threadNo));
                    break;
                }
                case LOGS: {
                    senderTask = new LogSenderTask(handlerKey, this.apiContainer.getLogAPI(), this.proxyId, threadNo, this.entityPropsFactoryMap.get(tenantName).get(entityType), scheduler, this.taskQueueFactory.getTaskQueue(handlerKey, threadNo));
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unexpected entity type " + handlerKey.getEntityType().name() + " for " + handlerKey.getHandle());
                }
            }
            senderTaskList.add(senderTask);
            senderTask.start();
        }
        if (this.queueingFactory != null) {
            QueueController controller = this.queueingFactory.getQueueController(handlerKey, numThreads);
            this.managedServices.put(handlerKey, controller);
            controller.start();
        }
        this.managedTasks.put(handlerKey, senderTaskList);
        this.entityTypes.computeIfAbsent(handlerKey.getHandle(), x -> new ArrayList()).add(handlerKey.getEntityType());
        return senderTaskList;
    }

    @Override
    public void shutdown() {
        this.managedTasks.values().stream().flatMap(Collection::stream).forEach(Managed::stop);
        this.taskSizeEstimators.values().forEach(TaskSizeEstimator::shutdown);
        this.managedServices.values().forEach(Managed::stop);
        this.executors.values().forEach(x -> {
            try {
                x.shutdown();
                x.awaitTermination(1000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown(@Nonnull String handle) {
        for (String tenantName : this.apiContainer.getTenantNameList()) {
            String tenantHandlerKey = HandlerKey.generateTenantSpecificHandle(handle, tenantName);
            List<ReportableEntityType> types = this.entityTypes.get(tenantHandlerKey);
            if (types == null) {
                return;
            }
            try {
                types.forEach(x -> this.taskSizeEstimators.remove(HandlerKey.of(x, handle, tenantName)).shutdown());
                types.forEach(x -> this.managedServices.remove(HandlerKey.of(x, handle, tenantName)).stop());
                types.forEach(x -> this.managedTasks.remove(HandlerKey.of(x, handle, tenantName)).forEach(t -> {
                    t.stop();
                    t.drainBuffersToQueue(null);
                }));
                types.forEach(x -> this.executors.remove(HandlerKey.of(x, handle, tenantName)).shutdown());
            }
            finally {
                this.entityTypes.remove(tenantHandlerKey);
            }
        }
    }

    @Override
    public void drainBuffersToQueue(QueueingReason reason) {
        this.managedTasks.values().stream().flatMap(Collection::stream).forEach(x -> x.drainBuffersToQueue(reason));
    }

    @Override
    public void truncateBuffers() {
        this.managedServices.entrySet().forEach(handlerKeyManagedEntry -> {
            System.out.println("Truncating buffers: Queue with handlerKey " + handlerKeyManagedEntry.getKey());
            this.log.info("Truncating buffers: Queue with handlerKey " + handlerKeyManagedEntry.getKey());
            QueueController pp = (QueueController)handlerKeyManagedEntry.getValue();
            pp.truncateBuffers();
        });
    }

    @VisibleForTesting
    public void flushNow(@Nonnull HandlerKey handlerKey) {
        ReportableEntityType entityType = handlerKey.getEntityType();
        String handle = handlerKey.getHandle();
        for (String tenantName : this.apiContainer.getTenantNameList()) {
            HandlerKey tenantHandlerKey = HandlerKey.of(entityType, handle, tenantName);
            this.managedTasks.get(tenantHandlerKey).forEach(task -> {
                if (task instanceof AbstractSenderTask) {
                    ((AbstractSenderTask)task).run();
                }
            });
        }
    }
}

