/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Optional;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;

public class ListenerContainerPauseService {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(ListenerContainerPauseService.class));
    @Nullable
    private final ListenerContainerRegistry registry;
    private final TaskScheduler scheduler;

    public ListenerContainerPauseService(@Nullable ListenerContainerRegistry registry, TaskScheduler scheduler) {
        Assert.notNull((Object)scheduler, (String)"'scheduler' cannot be null");
        this.registry = registry;
        this.scheduler = scheduler;
    }

    public void pause(String listenerId, Duration pauseDuration) {
        Assert.notNull((Object)this.registry, (String)"Pause by id is only supported when a registry is provided");
        this.getListenerContainer(listenerId).ifPresent(messageListenerContainer -> this.pause((MessageListenerContainer)messageListenerContainer, pauseDuration));
    }

    public void pause(MessageListenerContainer messageListenerContainer, Duration pauseDuration) {
        if (messageListenerContainer.isPauseRequested()) {
            LOGGER.debug(() -> "Container " + messageListenerContainer + " already has pause requested");
        } else {
            Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
            LOGGER.debug(() -> "Pausing container " + messageListenerContainer + ", resume scheduled for " + resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
            messageListenerContainer.pause();
            this.scheduler.schedule(() -> {
                LOGGER.debug(() -> "Pausing container " + messageListenerContainer);
                this.resume(messageListenerContainer);
            }, resumeAt);
        }
    }

    public void pausePartition(MessageListenerContainer messageListenerContainer, TopicPartition partition, Duration pauseDuration) {
        Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
        LOGGER.debug(() -> "Pausing container: " + messageListenerContainer + " partition: " + partition + ", resume scheduled for " + resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
        messageListenerContainer.pausePartition(partition);
        this.scheduler.schedule(() -> {
            LOGGER.debug(() -> "Resuming container: " + messageListenerContainer + " partition: " + partition);
            messageListenerContainer.resumePartition(partition);
        }, resumeAt);
    }

    public void resume(String listenerId) {
        Assert.notNull((Object)this.registry, (String)"Resume by id is only supported when a registry is provided");
        this.getListenerContainer(listenerId).ifPresent(this::resume);
    }

    public void resume(MessageListenerContainer messageListenerContainer) {
        if (messageListenerContainer.isPauseRequested()) {
            LOGGER.debug(() -> "Resuming container " + messageListenerContainer);
            messageListenerContainer.resume();
        } else {
            LOGGER.debug(() -> "Container " + messageListenerContainer + " was not paused");
        }
    }

    private Optional<MessageListenerContainer> getListenerContainer(String listenerId) {
        MessageListenerContainer messageListenerContainer = this.registry.getListenerContainer(listenerId);
        if (messageListenerContainer == null) {
            LOGGER.warn(() -> "MessageListenerContainer " + listenerId + " does not exists");
        }
        return Optional.ofNullable(messageListenerContainer);
    }
}

