/*
 * Decompiled with CFR 0.152.
 */
package kafka.restore.schedulers;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import kafka.restore.RestorePartitionOperatorFactory;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ReconcileFtpsRequest;
import kafka.restore.messages.ReconcileFtpsResponse;
import kafka.restore.messages.RestoreFtpsRequest;
import kafka.restore.messages.RestoreFtpsResponse;
import kafka.restore.operators.ReconcilePartitionOperator;
import kafka.restore.operators.RestorePartitionOperator;
import kafka.restore.operators.SegmentStateAndPath;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncTaskScheduler
extends AbstractAsyncServiceScheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTaskScheduler.class);
    private static final int DEFAULT_REQUEST_QUEUE_CAPACITY = 50000;
    private static final int DEFAULT_CORES_TO_POOL_SIZE_RATIO = 2;
    private ThreadPoolExecutor threadPool;
    private final int poolSize;
    private final RestorePartitionOperatorFactory restoreOperatorFactory;
    private int nextUuid = 0;

    private int getNextUuid() {
        int uuid = this.nextUuid++;
        return uuid;
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver resultsReceiver, int poolSize, RestorePartitionOperatorFactory restoreOperatorFactory) {
        this(resultsReceiver, poolSize, restoreOperatorFactory, 50000);
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver resultsReceiver, int poolSize, RestorePartitionOperatorFactory restoreOperatorFactory, int requestQueueSize) {
        super(resultsReceiver, requestQueueSize);
        this.restoreOperatorFactory = restoreOperatorFactory;
        if (poolSize < 1) {
            throw new IllegalArgumentException("poolSize must be at least 1.");
        }
        this.poolSize = poolSize;
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver resultsReceiver, RestorePartitionOperatorFactory restoreOperatorFactory, int availableCores) {
        this(resultsReceiver, availableCores * 2, restoreOperatorFactory, 50000);
    }

    public AsyncTaskScheduler(AsyncServiceSchedulerResultsReceiver resultsReceiver, RestorePartitionOperatorFactory restoreOperatorFactory, int availableCores, int requestQueueSize) {
        this(resultsReceiver, availableCores * 2, restoreOperatorFactory, requestQueueSize);
    }

    @Override
    public MessageStatusCode submitRequest(MessageRequest request) {
        if (RestoreFtpsRequest.class.equals(request.getClass()) || ReconcileFtpsRequest.class.equals(request.getClass())) {
            return super.submitRequest(request);
        }
        throw new UnsupportedOperationException("Request type must be either KAFKA_RESTORE_FTPS or KAFKA_RECONCILE_FTPS.");
    }

    @Override
    protected void processRequestFromRequestQueue(MessageRequest request) {
        if (RestoreFtpsRequest.class.equals(request.getClass())) {
            RestoreFtpsRequest restoreFtpsRequest = (RestoreFtpsRequest)request;
            try {
                this.threadPool.execute(() -> {
                    try {
                        RestorePartitionOperator restoreOperator = this.restoreOperatorFactory.get(request.getTopicPartition(), restoreFtpsRequest.getFtpsFilePath(), restoreFtpsRequest.getFromTimestamp().getTime());
                        Map<UUID, SegmentStateAndPath> segmentStateMap = restoreOperator.restore();
                        LOGGER.info(String.format("[%s]: Found %s segments to be restored", request.getTopicPartition(), segmentStateMap.size()));
                        this.reportRestoreFtpsResponse(restoreFtpsRequest, MessageStatusCode.OK, MessageResult.SUCCESS, segmentStateMap);
                    }
                    catch (Exception e) {
                        LOGGER.error(String.format("[%s]: Exception when run RestorePartitionOperator", request.getTopicPartition()), (Throwable)e);
                        this.reportRestoreFtpsResponse(restoreFtpsRequest, MessageStatusCode.BAD_ARGUMENT_ERROR, MessageResult.FAILURE, new HashMap<UUID, SegmentStateAndPath>());
                    }
                });
            }
            catch (RejectedExecutionException e) {
                LOGGER.error(String.format("[%s]: Execution was rejected due to thread pool error", request.getTopicPartition()), (Throwable)e);
                this.reportRestoreFtpsResponse(restoreFtpsRequest, MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE, new HashMap<UUID, SegmentStateAndPath>());
            }
        } else if (ReconcileFtpsRequest.class.equals(request.getClass())) {
            ReconcileFtpsRequest reconcileFtpsRequest = (ReconcileFtpsRequest)request;
            try {
                this.threadPool.execute(() -> {
                    try {
                        ReconcilePartitionOperator reconcilePartitionOperator = new ReconcilePartitionOperator(request.getTopicPartition(), reconcileFtpsRequest.getFtpsFilePath(), reconcileFtpsRequest.getFromTimestamp());
                        Map<UUID, String> segmentStateMap = reconcilePartitionOperator.reconcile(reconcileFtpsRequest.getSegmentsMap());
                        LOGGER.info(String.format("[%s]: Changed %s segments to Fenced state", request.getTopicPartition(), segmentStateMap.size()));
                        this.reportReconcileFtpsResponse(reconcileFtpsRequest, MessageStatusCode.OK, MessageResult.SUCCESS, segmentStateMap.size());
                    }
                    catch (Exception e) {
                        LOGGER.error(String.format("[%s]: Exception when run ReconcilePartitionOperator", request.getTopicPartition()), (Throwable)e);
                        this.reportReconcileFtpsResponse(reconcileFtpsRequest, MessageStatusCode.OPERATION_NOT_SUPPORTED, MessageResult.FAILURE, 0);
                    }
                });
            }
            catch (RejectedExecutionException e) {
                LOGGER.error(String.format("[%s]: Execution was rejected due to thread pool error", request.getTopicPartition()), (Throwable)e);
                this.reportReconcileFtpsResponse(reconcileFtpsRequest, MessageStatusCode.INTERNAL_ERROR, MessageResult.FAILURE, 0);
            }
        } else {
            throw new RuntimeException("Illegal request type " + request.getClass() + " was added to request queue");
        }
    }

    @Override
    public synchronized boolean startUp() {
        boolean success = super.startUp();
        if (success) {
            this.threadPool = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1L, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
            return true;
        }
        return false;
    }

    @Override
    public synchronized boolean shutdown() {
        boolean success = super.shutdown();
        if (success) {
            this.threadPool.shutdown();
            return true;
        }
        return false;
    }

    @Override
    public synchronized boolean pause() {
        boolean success = super.pause();
        if (success) {
            this.threadPool.shutdown();
            return true;
        }
        return false;
    }

    @Override
    public synchronized boolean resume() {
        boolean success = super.resume();
        if (success) {
            this.threadPool = new ThreadPoolExecutor(this.poolSize, this.poolSize, 1L, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
            return true;
        }
        return false;
    }

    private void reportRestoreFtpsResponse(RestoreFtpsRequest restoreFtpsRequest, MessageStatusCode statusCode, MessageResult result, Map<UUID, SegmentStateAndPath> segmentPathMap) {
        RestoreFtpsResponse response = new RestoreFtpsResponse(this.getNextUuid(), restoreFtpsRequest.getTopic(), restoreFtpsRequest.getPartition(), restoreFtpsRequest.getUuid(), statusCode, result, segmentPathMap);
        this.getResultsReceiver().reportServiceSchedulerResponse(response);
    }

    private void reportReconcileFtpsResponse(ReconcileFtpsRequest reconcileFtpsRequest, MessageStatusCode statusCode, MessageResult result, int reconcileSegmentCount) {
        ReconcileFtpsResponse response = new ReconcileFtpsResponse(this.getNextUuid(), reconcileFtpsRequest.getTopic(), reconcileFtpsRequest.getPartition(), reconcileFtpsRequest.getUuid(), reconcileSegmentCount, statusCode, result);
        this.getResultsReceiver().reportServiceSchedulerResponse(response);
    }
}

