/*
 * Decompiled with CFR 0.152.
 */
package kafka.restore.schedulers;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import kafka.restore.RestorePartitionOperatorFactory;
import kafka.restore.configmap.NodeConfig;
import kafka.restore.messages.KafkaFetchFtpsRequest;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ReconcileFtpsRequest;
import kafka.restore.messages.ReconcileFtpsResponse;
import kafka.restore.messages.RestoreFtpsRequest;
import kafka.restore.messages.RestoreFtpsResponse;
import kafka.restore.operators.RestorePartitionOperator;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver;
import kafka.restore.schedulers.AsyncTaskScheduler;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class AsyncTaskSchedulerTest
implements AsyncServiceSchedulerResultsReceiver {
    private static final int RESPONSE_QUEUE_CAPACITY = 1;
    private static final int POOL_SIZE = 16;
    private static final String TOPIC = "fakeTopic";
    private static final int PARTITION = 1;
    private static final String FTPS_FILE_PATH = "fakeFtpsFile";
    private static final int REQUEST_UUID = 0;
    private static final NodeConfig BROKER = new NodeConfig(1, "localhost", 9072);
    private AsyncTaskScheduler asyncTaskScheduler;
    private ArrayBlockingQueue<MessageResponse> asyncTaskResponseQueue;
    private RestorePartitionOperatorFactory restoreOperatorFactory;
    private RestorePartitionOperator restoreOperator;

    @BeforeEach
    public void setUp() throws Exception {
        this.restoreOperatorFactory = (RestorePartitionOperatorFactory)Mockito.mock(RestorePartitionOperatorFactory.class);
        this.restoreOperator = (RestorePartitionOperator)Mockito.mock(RestorePartitionOperator.class);
        Mockito.when((Object)this.restoreOperatorFactory.get((TopicPartition)Mockito.any(), (String)Mockito.any(), Mockito.anyLong())).thenReturn((Object)this.restoreOperator);
        this.asyncTaskScheduler = new AsyncTaskScheduler((AsyncServiceSchedulerResultsReceiver)this, 16, this.restoreOperatorFactory);
        this.asyncTaskScheduler.startUp();
        this.asyncTaskResponseQueue = new ArrayBlockingQueue(1);
        Assertions.assertNotNull((Object)this.asyncTaskScheduler);
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.asyncTaskScheduler.getStatus());
        this.asyncTaskScheduler.pause();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.PAUSED, (Object)this.asyncTaskScheduler.getStatus());
        this.asyncTaskScheduler.resume();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.asyncTaskScheduler.getStatus());
    }

    @AfterEach
    public void shutDown() {
        this.asyncTaskScheduler.shutdown();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.SHUTDOWN, (Object)this.asyncTaskScheduler.getStatus());
    }

    @Test
    public void testConstructWithIllegalPoolSize() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> new AsyncTaskScheduler((AsyncServiceSchedulerResultsReceiver)this, -1, this.restoreOperatorFactory));
        Assertions.assertThrows(IllegalArgumentException.class, () -> new AsyncTaskScheduler((AsyncServiceSchedulerResultsReceiver)this, 0, this.restoreOperatorFactory));
    }

    @Test
    public void testConstructWithIllegalRequestQueueCapacity() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> new AsyncTaskScheduler((AsyncServiceSchedulerResultsReceiver)this, 16, this.restoreOperatorFactory, 0));
        Assertions.assertThrows(IllegalArgumentException.class, () -> new AsyncTaskScheduler((AsyncServiceSchedulerResultsReceiver)this, 16, this.restoreOperatorFactory, -1));
    }

    @Test
    public void testSubmitUnsupportedOperation() {
        KafkaFetchFtpsRequest request = new KafkaFetchFtpsRequest(0, TOPIC, 1, BROKER);
        Assertions.assertThrows(UnsupportedOperationException.class, () -> this.lambda$testSubmitUnsupportedOperation$4((MessageRequest)request));
    }

    @Test
    public void testSubmitNull() {
        Assertions.assertThrows(NullPointerException.class, () -> this.asyncTaskScheduler.submitRequest(null));
    }

    @Test
    public void testUuids() {
        HashSet<Integer> seenUuids = new HashSet<Integer>();
        int requestsToSend = 1000;
        for (int i = 0; i < requestsToSend; ++i) {
            ReconcileFtpsRequest request = new ReconcileFtpsRequest(i, TOPIC, 1, FTPS_FILE_PATH, 0L, new HashMap());
            this.asyncTaskScheduler.submitRequest((MessageRequest)request);
            MessageResponse response = this.getNextMessageResponse();
            Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.asyncTaskScheduler.getStatus());
            this.assertValidResponseForRequest(response, (MessageRequest)request);
            seenUuids.add(response.getUuid());
        }
        Assertions.assertEquals((int)seenUuids.size(), (int)requestsToSend);
    }

    @Test
    public void testRestoreFTPS() throws Exception {
        Mockito.when((Object)this.restoreOperator.restore()).thenReturn(new HashMap());
        RestoreFtpsRequest request = new RestoreFtpsRequest(0, TOPIC, 1, FTPS_FILE_PATH, new Date());
        this.asyncTaskScheduler.submitRequest((MessageRequest)request);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.asyncTaskScheduler.getStatus());
        this.assertValidResponseForRequest(response, (MessageRequest)request);
        Assertions.assertEquals(RestoreFtpsResponse.class, response.getClass());
        Assertions.assertEquals((Object)MessageResult.SUCCESS, (Object)response.getResult());
        Assertions.assertEquals((Object)MessageStatusCode.OK, (Object)response.getStatusCode());
    }

    @Test
    public void testRestoreFTPSBadFTPSFile() throws Exception {
        Mockito.when((Object)this.restoreOperator.restore()).thenThrow(new Throwable[]{new IllegalStateException("Bad FTPS File")});
        RestoreFtpsRequest request = new RestoreFtpsRequest(0, TOPIC, 1, FTPS_FILE_PATH, new Date());
        this.asyncTaskScheduler.submitRequest((MessageRequest)request);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.asyncTaskScheduler.getStatus());
        this.assertValidResponseForRequest(response, (MessageRequest)request);
        Assertions.assertEquals(RestoreFtpsResponse.class, response.getClass());
        Assertions.assertEquals((Object)MessageResult.FAILURE, (Object)response.getResult());
        Assertions.assertEquals((Object)MessageStatusCode.BAD_ARGUMENT_ERROR, (Object)response.getStatusCode());
    }

    @Test
    public void testReconcileFTPS() {
        ReconcileFtpsRequest request = new ReconcileFtpsRequest(0, TOPIC, 1, FTPS_FILE_PATH, 0L, new HashMap());
        this.asyncTaskScheduler.submitRequest((MessageRequest)request);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.asyncTaskScheduler.getStatus());
        this.assertValidResponseForRequest(response, (MessageRequest)request);
        Assertions.assertEquals(ReconcileFtpsResponse.class, response.getClass());
        Assertions.assertEquals((Object)MessageResult.FAILURE, (Object)response.getResult());
        Assertions.assertEquals((Object)MessageStatusCode.OPERATION_NOT_SUPPORTED, (Object)response.getStatusCode());
    }

    @Test
    public void testTaskSchedulerThreadPoolFull() {
        int i;
        ArrayList<RestoreFtpsRequest> requests = new ArrayList<RestoreFtpsRequest>();
        for (i = 0; i < 18; ++i) {
            RestoreFtpsRequest request = new RestoreFtpsRequest(i, TOPIC, 1, FTPS_FILE_PATH, new Date());
            requests.add(request);
            this.asyncTaskScheduler.submitRequest((MessageRequest)request);
        }
        for (i = 0; i < requests.size(); ++i) {
            MessageResponse response = this.getNextMessageResponse();
            Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.asyncTaskScheduler.getStatus());
            this.assertValidResponseForRequest(response, (MessageRequest)requests.get(response.getRequestID()));
            Assertions.assertEquals(RestoreFtpsResponse.class, response.getClass());
        }
        Assertions.assertTrue((boolean)this.asyncTaskResponseQueue.isEmpty());
    }

    private void assertValidResponseForRequest(MessageResponse response, MessageRequest request) {
        Assertions.assertNotNull((Object)response);
        Assertions.assertEquals((Object)request.getTopic(), (Object)response.getTopic());
        Assertions.assertEquals((int)request.getPartition(), (int)response.getPartition());
        Assertions.assertEquals((int)request.getUuid(), (int)response.getRequestID());
    }

    private MessageResponse getNextMessageResponse() {
        try {
            return this.asyncTaskResponseQueue.take();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("AsyncTaskScheduler results receiver was interrupted while waiting for result.");
        }
        catch (Exception e) {
            throw new RuntimeException("AsyncTaskScheduler results receiver failed while waiting for result.");
        }
    }

    public void reportServiceSchedulerResponse(MessageResponse response) {
        try {
            this.asyncTaskResponseQueue.put(response);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("AsyncTaskScheduler results receiver was interrupted while result was being reported");
        }
        catch (Exception e) {
            throw new RuntimeException("AsyncTaskScheduler results receiver failed while result was being reported");
        }
    }

    private /* synthetic */ void lambda$testSubmitUnsupportedOperation$4(MessageRequest request) throws Throwable {
        this.asyncTaskScheduler.submitRequest(request);
    }
}

