package com.linkedin.r2.disruptor;

import com.linkedin.r2.disruptor.DisruptContexts;
import com.linkedin.r2.filter.NextFilter;
import com.linkedin.r2.filter.message.rest.RestFilter;
import com.linkedin.r2.filter.message.stream.StreamFilter;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.Response;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.util.ArgumentUtil;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/r2/disruptor/DisruptFilter.class */
public class DisruptFilter implements StreamFilter, RestFilter {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptFilter.class);
    private final ScheduledExecutorService _scheduler;
    private final ExecutorService _executor;
    private final int _requestTimeout;

    public DisruptFilter(ScheduledExecutorService scheduledExecutorService, ExecutorService executorService, int i) {
        ArgumentUtil.notNull(scheduledExecutorService, "scheduler");
        ArgumentUtil.notNull(executorService, "executor");
        this._scheduler = scheduledExecutorService;
        this._executor = executorService;
        this._requestTimeout = i;
    }

    public void onStreamRequest(StreamRequest streamRequest, RequestContext requestContext, Map<String, String> map, NextFilter<StreamRequest, StreamResponse> nextFilter) {
        doDisrupt(streamRequest, requestContext, map, nextFilter);
    }

    public void onRestRequest(RestRequest restRequest, RequestContext requestContext, Map<String, String> map, NextFilter<RestRequest, RestResponse> nextFilter) {
        doDisrupt(restRequest, requestContext, map, nextFilter);
    }

    private <REQ extends Request, RES extends Response> void doDisrupt(REQ req, RequestContext requestContext, Map<String, String> map, NextFilter<REQ, RES> nextFilter) {
        DisruptContext disruptContext = (DisruptContext) requestContext.getLocalAttr(DisruptContext.DISRUPT_CONTEXT_KEY);
        if (disruptContext == null) {
            nextFilter.onRequest(req, requestContext, map);
            return;
        }
        try {
            switch (disruptContext.mode()) {
                case DELAY:
                    this._scheduler.schedule(() -> {
                        try {
                            this._executor.execute(() -> {
                                nextFilter.onRequest(req, requestContext, map);
                            });
                        } catch (RejectedExecutionException e) {
                            LOG.error("Unable to continue filter chain execution after {} disrupt.", disruptContext.mode(), e);
                        }
                    }, ((DisruptContexts.DelayDisruptContext) disruptContext).delay(), TimeUnit.MILLISECONDS);
                    break;
                case ERROR:
                    this._scheduler.schedule(() -> {
                        try {
                            DisruptedException disruptedException = new DisruptedException("Request is disrupted with an error response");
                            this._executor.execute(() -> {
                                nextFilter.onError(disruptedException, requestContext, map);
                            });
                        } catch (RejectedExecutionException e) {
                            LOG.error("Unable to continue filter chain execution after {} disrupt.", disruptContext.mode(), e);
                        }
                    }, ((DisruptContexts.ErrorDisruptContext) disruptContext).latency(), TimeUnit.MILLISECONDS);
                    break;
                case TIMEOUT:
                    this._scheduler.schedule(() -> {
                        try {
                            this._executor.execute(() -> {
                                nextFilter.onError(new TimeoutException("Exceeded request timeout of " + this._requestTimeout + "ms due to disrupt"), requestContext, map);
                            });
                        } catch (RejectedExecutionException e) {
                            LOG.error("Unable to continue filter chain execution after {} disrupt.", disruptContext.mode(), e);
                        }
                    }, this._requestTimeout, TimeUnit.MILLISECONDS);
                    break;
                default:
                    LOG.warn("Unrecognized disrupt mode {}", disruptContext.mode());
                    nextFilter.onRequest(req, requestContext, map);
                    break;
            }
        } catch (RejectedExecutionException e) {
            LOG.warn("Unable to perform {} disrupt", disruptContext.mode(), e);
            nextFilter.onRequest(req, requestContext, map);
        }
    }
}
