/*
 * Decompiled with CFR 0.152.
 */
package kafka.restore.schedulers;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ObjectStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreResponse;
import kafka.restore.messages.UploadFtpsToStoreRequest;
import kafka.restore.messages.UploadFtpsToStoreResponse;
import kafka.restore.operators.SegmentStateAndPath;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver;
import kafka.restore.schedulers.ObjectStoreManager;
import kafka.restore.schedulers.ObjectStorePool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class ObjectStoreManagerTest
implements AsyncServiceSchedulerResultsReceiver {
    private static final int RESPONSE_QUEUE_CAPACITY = 1;
    private static final String SEGMENT_OBJECT_STORE_PATH = "fakePath";
    private static final int REQUEST_UUID = 0;
    private static final String TOPIC = "fakeTopic";
    private static final int PARTITION = 1;
    private static final String FTPS_FILE_PATH = "ftpsFilePath";
    private ArrayBlockingQueue<MessageResponse> responseQueue;
    private ObjectStorePool objectStorePool;
    private ObjectStoreManager objectStoreManager;

    @BeforeEach
    public void setUp() {
        this.responseQueue = new ArrayBlockingQueue(1);
        this.objectStorePool = (ObjectStorePool)Mockito.mock(ObjectStorePool.class);
        this.objectStoreManager = new ObjectStoreManager((AsyncServiceSchedulerResultsReceiver)this, this.objectStorePool);
        this.objectStoreManager.startUp();
        Assertions.assertNotNull((Object)this.objectStoreManager);
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.objectStoreManager.getStatus());
        this.objectStoreManager.pause();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.PAUSED, (Object)this.objectStoreManager.getStatus());
        this.objectStoreManager.resume();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, (Object)this.objectStoreManager.getStatus());
    }

    @AfterEach
    public void shutdown() {
        this.objectStoreManager.shutdown();
        Assertions.assertEquals((Object)AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.SHUTDOWN, (Object)this.objectStoreManager.getStatus());
    }

    @Test
    public void testSubmitNonBatchedRestoreObjectsInStoreRequest() {
        int mapSize = 198;
        RestoreObjectsInStoreRequest request = this.constructRestoreObjectsInStoreRequest(mapSize);
        this.mockObjectStorePoolToReplyWithSuccessfulResponse();
        this.objectStoreManager.submitRequest((MessageRequest)request);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals(RestoreObjectsInStoreResponse.class, response.getClass());
        this.assertCorrectResponse(response, TOPIC, 1, request.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
    }

    @Test
    public void testSubmitBatchedRestoreObjectsInStoreRequest() {
        int numBatches = 4;
        int mapSize = 200 * numBatches;
        RestoreObjectsInStoreRequest request = this.constructRestoreObjectsInStoreRequest(mapSize);
        this.mockObjectStorePoolToReplyWithSuccessfulResponse();
        this.objectStoreManager.submitRequest((MessageRequest)request);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals(RestoreObjectsInStoreResponse.class, response.getClass());
        this.assertCorrectResponse(response, TOPIC, 1, request.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
    }

    @Test
    public void testSubmitUploadFtpsToStoreRequest() {
        UploadFtpsToStoreRequest request = this.constructUploadFtpsToStoreRequest();
        this.mockObjectStorePoolToReplyWithSuccessfulResponse();
        this.objectStoreManager.submitRequest((MessageRequest)request);
        MessageResponse response = this.getNextMessageResponse();
        Assertions.assertEquals(UploadFtpsToStoreResponse.class, response.getClass());
        this.assertCorrectResponse(response, TOPIC, 1, request.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
    }

    private Map<UUID, SegmentStateAndPath> createSegmentPathMap(int mapSize) {
        HashMap<UUID, SegmentStateAndPath> segmentPathMap = new HashMap<UUID, SegmentStateAndPath>();
        for (int i = 0; i < mapSize; ++i) {
            segmentPathMap.put(UUID.randomUUID(), new SegmentStateAndPath(null, null, SEGMENT_OBJECT_STORE_PATH));
        }
        return segmentPathMap;
    }

    private MessageResponse getNextMessageResponse() {
        MessageResponse response;
        try {
            response = this.responseQueue.take();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("KafkaManager results receiver was interrupted prior to receiving result.");
        }
        return response;
    }

    private RestoreObjectsInStoreRequest constructRestoreObjectsInStoreRequest(int mapSize) {
        Map<UUID, SegmentStateAndPath> segmentPathMap = this.createSegmentPathMap(mapSize);
        return new RestoreObjectsInStoreRequest(0, TOPIC, 1, segmentPathMap);
    }

    private UploadFtpsToStoreRequest constructUploadFtpsToStoreRequest() {
        return new UploadFtpsToStoreRequest(0, TOPIC, 1, FTPS_FILE_PATH);
    }

    private void assertCorrectResponse(MessageResponse response, String topic, int partition, int requestUuid, MessageStatusCode statusCode, MessageResult result) {
        Assertions.assertEquals((Object)topic, (Object)response.getTopic());
        Assertions.assertEquals((int)partition, (int)response.getPartition());
        Assertions.assertEquals((int)requestUuid, (int)response.getRequestID());
        Assertions.assertEquals((Object)statusCode, (Object)response.getStatusCode());
        Assertions.assertEquals((Object)result, (Object)response.getResult());
    }

    private void mockObjectStorePoolToReplyWithSuccessfulResponse() {
        ((ObjectStorePool)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                RestoreObjectsInStoreResponse response;
                ObjectStoreRequest request = (ObjectStoreRequest)invocation.getArgument(0);
                if (request instanceof RestoreObjectsInStoreRequest) {
                    response = new RestoreObjectsInStoreResponse(0, ObjectStoreManagerTest.TOPIC, 1, request.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS, new HashSet());
                } else if (request instanceof UploadFtpsToStoreRequest) {
                    response = new UploadFtpsToStoreResponse(0, ObjectStoreManagerTest.TOPIC, 1, request.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
                } else {
                    throw new RuntimeException("ObjectStorePool received request of unrecognized type.");
                }
                ObjectStoreManagerTest.this.objectStoreManager.reportServiceSchedulerResponse((MessageResponse)response);
                return null;
            }
        }).when((Object)this.objectStorePool)).submitObjectStoreRequest((ObjectStoreRequest)Mockito.any());
    }

    public void reportServiceSchedulerResponse(MessageResponse response) {
        try {
            this.responseQueue.put(response);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("S3Manager results receiver was interrupted while result was being reported");
        }
        catch (Exception e) {
            throw new RuntimeException("S3Manager results receiver failed while result was being reported");
        }
    }
}

