/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateDirectory {
    static final String LOCK_FILE_NAME = ".lock";
    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
    private final File stateDir;
    private final HashMap<TaskId, FileChannel> channels = new HashMap();
    private final HashMap<TaskId, LockAndOwner> locks = new HashMap();
    private final Time time;
    private FileChannel globalStateChannel;
    private FileLock globalStateLock;

    public StateDirectory(String applicationId, String stateDirConfig, Time time) {
        this.time = time;
        File baseDir = new File(stateDirConfig);
        if (!baseDir.exists() && !baseDir.mkdirs()) {
            throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", stateDirConfig));
        }
        this.stateDir = new File(baseDir, applicationId);
        if (!this.stateDir.exists() && !this.stateDir.mkdir()) {
            throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", this.stateDir.getPath()));
        }
    }

    public File directoryForTask(TaskId taskId) {
        File taskDir = new File(this.stateDir, taskId.toString());
        if (!taskDir.exists() && !taskDir.mkdir()) {
            throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
        }
        return taskDir;
    }

    File globalStateDir() {
        File dir = new File(this.stateDir, "global");
        if (!dir.exists() && !dir.mkdir()) {
            throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
        }
        return dir;
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    synchronized boolean lock(TaskId taskId, int retry) throws IOException {
        FileChannel channel;
        File lockFile;
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            log.trace("{} Found cached state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            return true;
        }
        if (lockAndOwner != null) {
            return false;
        }
        try {
            lockFile = new File(this.directoryForTask(taskId), LOCK_FILE_NAME);
        }
        catch (ProcessorStateException e) {
            return false;
        }
        try {
            channel = this.getOrCreateFileChannel(taskId, lockFile.toPath());
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock lock = this.tryLock(retry, channel);
        if (lock != null) {
            this.locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
            log.debug("{} Acquired state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
        }
        return lock != null;
    }

    synchronized boolean lockGlobalState(int retry) throws IOException {
        FileChannel channel;
        if (this.globalStateLock != null) {
            log.trace("{} Found cached state dir lock for the global task", (Object)this.logPrefix());
            return true;
        }
        File lockFile = new File(this.globalStateDir(), LOCK_FILE_NAME);
        try {
            channel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        }
        catch (NoSuchFileException e) {
            return false;
        }
        FileLock fileLock = this.tryLock(retry, channel);
        if (fileLock == null) {
            channel.close();
            return false;
        }
        this.globalStateChannel = channel;
        this.globalStateLock = fileLock;
        log.debug("{} Acquired global state dir lock", (Object)this.logPrefix());
        return true;
    }

    synchronized void unlockGlobalState() throws IOException {
        if (this.globalStateLock == null) {
            return;
        }
        this.globalStateLock.release();
        this.globalStateChannel.close();
        this.globalStateLock = null;
        this.globalStateChannel = null;
        log.debug("{} Released global state dir lock", (Object)this.logPrefix());
    }

    synchronized boolean unlock(TaskId taskId) throws IOException {
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            this.locks.remove(taskId);
            lockAndOwner.lock.release();
            log.debug("{} Released state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
            FileChannel fileChannel = this.channels.remove(taskId);
            if (fileChannel != null) {
                fileChannel.close();
            }
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void cleanRemovedTasks(long cleanupDelayMs) {
        File[] taskDirs = this.listTaskDirectories();
        if (taskDirs == null || taskDirs.length == 0) {
            return;
        }
        for (File taskDir : taskDirs) {
            String dirName = taskDir.getName();
            TaskId id = TaskId.parse(dirName);
            if (this.locks.containsKey(id)) continue;
            try {
                long lastModifiedMs;
                long now;
                if (!this.lock(id, 0) || (now = this.time.milliseconds()) <= (lastModifiedMs = taskDir.lastModified()) + cleanupDelayMs) continue;
                log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", new Object[]{this.logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs});
                Utils.delete((File)taskDir);
            }
            catch (OverlappingFileLockException e) {
            }
            catch (IOException e) {
                log.error("{} Failed to lock the state directory due to an unexpected exception", (Object)this.logPrefix(), (Object)e);
            }
            finally {
                try {
                    this.unlock(id);
                }
                catch (IOException e) {
                    log.error("{} Failed to release the state directory lock", (Object)this.logPrefix());
                }
            }
        }
    }

    File[] listTaskDirectories() {
        return this.stateDir.listFiles(new FileFilter(){

            @Override
            public boolean accept(File pathname) {
                String name = pathname.getName();
                return pathname.isDirectory() && name.matches("\\d+_\\d+");
            }
        });
    }

    private FileLock tryLock(int retry, FileChannel channel) throws IOException {
        FileLock lock = this.tryAcquireLock(channel);
        while (lock == null && retry > 0) {
            try {
                Thread.sleep(200L);
            }
            catch (Exception ex) {
                // empty catch block
            }
            --retry;
            lock = this.tryAcquireLock(channel);
        }
        return lock;
    }

    private FileChannel getOrCreateFileChannel(TaskId taskId, Path lockPath) throws IOException {
        if (!this.channels.containsKey(taskId)) {
            this.channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
        }
        return this.channels.get(taskId);
    }

    private FileLock tryAcquireLock(FileChannel channel) throws IOException {
        try {
            return channel.tryLock();
        }
        catch (OverlappingFileLockException e) {
            return null;
        }
    }

    private static class LockAndOwner {
        final FileLock lock;
        final String owningThread;

        LockAndOwner(String owningThread, FileLock lock) {
            this.owningThread = owningThread;
            this.lock = lock;
        }
    }
}

