package org.apache.activemq.broker;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/broker/RecoveryStatsBrokerTest.class */
public class RecoveryStatsBrokerTest extends BrokerRestartTestSupport {
    private RestartType restartType;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/broker/RecoveryStatsBrokerTest$RestartType.class */
    public enum RestartType {
        NORMAL,
        FULL_RECOVERY,
        UNCLEAN_SHUTDOWN
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.broker.BrokerRestartTestSupport
    public void configureBroker(BrokerService brokerService) throws Exception {
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setJournalMaxFileLength(20480);
        kahaDBPersistenceAdapter.setDirectory(brokerService.getBrokerDataDirectory());
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
        brokerService.setDestinationPolicy(this.policyMap);
    }

    protected void restartBroker(RestartType restartType) throws Exception {
        if (restartType == RestartType.FULL_RECOVERY) {
            stopBroker();
            File directory = this.broker.getPersistenceAdapter().getDirectory();
            if (directory != null) {
                IOHelper.deleteFile(new File(directory, "db.data"));
            }
            this.broker.start();
            return;
        }
        if (restartType != RestartType.UNCLEAN_SHUTDOWN) {
            restartBroker();
            return;
        }
        File brokerDataDirectory = this.broker.getBrokerDataDirectory();
        File file = new File(brokerDataDirectory, "bk");
        IOHelper.mkdirs(new File(brokerDataDirectory, "bk"));
        for (File file2 : brokerDataDirectory.listFiles()) {
            if (!file2.isDirectory()) {
                IOHelper.copyFile(file2, new File(file, file2.getName()));
            }
        }
        stopBroker();
        for (File file3 : file.listFiles()) {
            if (!file3.isDirectory()) {
                IOHelper.copyFile(file3, new File(brokerDataDirectory, file3.getName()));
            }
        }
        this.broker.start();
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{RestartType.NORMAL}, new Object[]{RestartType.FULL_RECOVERY}, new Object[]{RestartType.UNCLEAN_SHUTDOWN});
    }

    public RecoveryStatsBrokerTest(RestartType restartType) {
        this.restartType = restartType;
    }

    @Override // org.apache.activemq.broker.BrokerTestSupport
    @Before
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override // org.apache.activemq.broker.BrokerTestSupport
    @After
    public void tearDown() throws Exception {
        super.tearDown();
    }

    @Test(timeout = 60000)
    public void testStaticsRecovery() throws Exception {
        ImmutableList of = ImmutableList.of(new ActiveMQQueue("TEST.A"), new ActiveMQQueue("TEST.B"));
        Random random = new Random();
        HashMap hashMap = new HashMap();
        of.forEach(activeMQDestination -> {
        });
        StubConnection createConnection = createConnection();
        ConnectionInfo createConnectionInfo = createConnectionInfo();
        SessionInfo createSessionInfo = createSessionInfo(createConnectionInfo);
        ProducerInfo createProducerInfo = createProducerInfo(createSessionInfo);
        createConnection.send(createConnectionInfo);
        createConnection.send(createSessionInfo);
        createConnection.send(createProducerInfo);
        for (int i = 0; i < 400; i++) {
            Iterator<ActiveMQDestination> it = of.iterator();
            while (it.hasNext()) {
                Message createMessage = createMessage(createProducerInfo, it.next());
                createMessage.setPersistent(true);
                createMessage.setProducerId(createMessage.getMessageId().getProducerId());
                createConnection.request(createMessage);
            }
        }
        Map<ActiveMQDestination, MessageStoreStatistics> currentStatistics = getCurrentStatistics(of);
        checkStatistics(of, currentStatistics);
        restartBroker(this.restartType);
        checkStatistics(of, currentStatistics);
        Iterator<ActiveMQDestination> it2 = of.iterator();
        while (it2.hasNext()) {
            consume(it2.next(), 100, false);
        }
        checkStatistics(of, currentStatistics);
        restartBroker(this.restartType);
        checkStatistics(of, currentStatistics);
        for (ActiveMQDestination activeMQDestination2 : of) {
            int nextInt = random.nextInt(DurableSubProcessWithRestartTest.CARGO_SIZE);
            consume(activeMQDestination2, nextInt, true);
            hashMap.compute(activeMQDestination2, (activeMQDestination3, num) -> {
                return Integer.valueOf(num.intValue() + nextInt);
            });
        }
        Map<ActiveMQDestination, MessageStoreStatistics> currentStatistics2 = getCurrentStatistics(of);
        Iterator<ActiveMQDestination> it3 = of.iterator();
        while (it3.hasNext()) {
            assertEquals("", DurableSubProcessWithRestartTest.CARGO_SIZE - ((Integer) hashMap.get(r0)).intValue(), currentStatistics2.get(it3.next()).getMessageCount().getCount());
        }
        checkStatistics(of, currentStatistics2);
        restartBroker(this.restartType);
        checkStatistics(of, currentStatistics2);
    }

    private Map<ActiveMQDestination, MessageStoreStatistics> getCurrentStatistics(List<ActiveMQDestination> list) {
        return (Map) list.stream().map(activeMQDestination -> {
            return getDestination(this.broker, activeMQDestination);
        }).collect(Collectors.toMap(destination -> {
            return new ActiveMQQueue(destination.getName());
        }, destination2 -> {
            return destination2.getMessageStore().getMessageStoreStatistics();
        }));
    }

    private void consume(ActiveMQDestination activeMQDestination, int i, boolean z) throws Exception {
        StubConnection createConnection = createConnection();
        ConnectionInfo createConnectionInfo = createConnectionInfo();
        SessionInfo createSessionInfo = createSessionInfo(createConnectionInfo);
        createConnection.send(createConnectionInfo);
        createConnection.send(createSessionInfo);
        ConsumerInfo createConsumerInfo = createConsumerInfo(createSessionInfo, activeMQDestination);
        createConnection.send(createConsumerInfo);
        for (int i2 = 0; i2 < i; i2++) {
            Message receiveMessage = receiveMessage(createConnection);
            assertNotNull(receiveMessage);
            if (z) {
                createConnection.request(createAck(createConsumerInfo, receiveMessage, 1, (byte) 2));
            }
        }
        createConnection.request(closeConnectionInfo(createConnectionInfo));
    }

    private void checkStatistics(List<ActiveMQDestination> list, Map<ActiveMQDestination, MessageStoreStatistics> map) {
        for (ActiveMQDestination activeMQDestination : list) {
            MessageStoreStatistics messageStoreStatistics = map.get(activeMQDestination);
            assertEquals("Have Same Count", messageStoreStatistics.getMessageCount().getCount(), getDestination(this.broker, activeMQDestination).getMessageStore().getMessageStoreStatistics().getMessageCount().getCount());
            assertEquals("Have Same TotalSize", messageStoreStatistics.getMessageSize().getTotalSize(), getDestination(this.broker, activeMQDestination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
        }
    }

    protected Destination getDestination(BrokerService brokerService, ActiveMQDestination activeMQDestination) {
        RegionBroker regionBroker = brokerService.getRegionBroker();
        return activeMQDestination.isTemporary() ? activeMQDestination.isQueue() ? (Destination) regionBroker.getTempQueueRegion().getDestinationMap().get(activeMQDestination) : (Destination) regionBroker.getTempTopicRegion().getDestinationMap().get(activeMQDestination) : activeMQDestination.isQueue() ? (Destination) regionBroker.getQueueRegion().getDestinationMap().get(activeMQDestination) : (Destination) regionBroker.getTopicRegion().getDestinationMap().get(activeMQDestination);
    }
}
