/*
 * Decompiled with CFR 0.152.
 */
package org.apache.eventmesh.client.tcp.impl.cloudevent;

import com.google.common.base.Preconditions;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.provider.EventFormatProvider;
import io.netty.channel.ChannelHandlerContext;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.client.tcp.EventMeshTCPSubClient;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.RequestContext;
import org.apache.eventmesh.client.tcp.common.TcpClient;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.impl.AbstractEventMeshTCPSubHandler;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CloudEventTCPSubClient
extends TcpClient
implements EventMeshTCPSubClient<CloudEvent> {
    private static final Logger log = LoggerFactory.getLogger(CloudEventTCPSubClient.class);
    private final List<SubscriptionItem> subscriptionItems = Collections.synchronizedList(new LinkedList());
    private ReceiveMsgHook<CloudEvent> callback;

    public CloudEventTCPSubClient(EventMeshTCPClientConfig eventMeshTcpClientConfig) {
        super(eventMeshTcpClientConfig);
    }

    @Override
    public void init() throws EventMeshException {
        try {
            this.open(new CloudEventTCPSubHandler(this.contexts));
            this.hello();
            this.heartbeat();
            log.info("SimpleSubClientImpl|{}|started!", (Object)this.clientNo);
        }
        catch (Exception ex) {
            throw new EventMeshException("Initialize EventMeshMessageTcpSubClient error", (Throwable)ex);
        }
    }

    @Override
    public void reconnect() throws EventMeshException {
        try {
            super.reconnect();
            this.hello();
            if (!CollectionUtils.isEmpty(this.subscriptionItems)) {
                for (SubscriptionItem item : this.subscriptionItems) {
                    Package request = MessageUtils.subscribe(item.getTopic(), item.getMode(), item.getType());
                    this.io(request, 20000L);
                }
            }
            this.listen();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void subscribe(String topic, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType) throws EventMeshException {
        try {
            this.subscriptionItems.add(new SubscriptionItem(topic, subscriptionMode, subscriptionType));
            Package request = MessageUtils.subscribe(topic, subscriptionMode, subscriptionType);
            this.io(request, 20000L);
        }
        catch (Exception ex) {
            throw new EventMeshException("Subscribe error", (Throwable)ex);
        }
    }

    @Override
    public void unsubscribe() throws EventMeshException {
        try {
            Package request = MessageUtils.unsubscribe();
            this.io(request, 20000L);
        }
        catch (Exception ex) {
            throw new EventMeshException("Unsubscribe error", (Throwable)ex);
        }
    }

    @Override
    public void listen() throws EventMeshException {
        try {
            Package request = MessageUtils.listen();
            this.io(request, 20000L);
        }
        catch (Exception ex) {
            throw new EventMeshException("Listen error", (Throwable)ex);
        }
    }

    @Override
    public void registerBusiHandler(ReceiveMsgHook<CloudEvent> handler) throws EventMeshException {
        this.callback = handler;
    }

    @Override
    public void close() {
        try {
            this.goodbye();
            super.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    private class CloudEventTCPSubHandler
    extends AbstractEventMeshTCPSubHandler<CloudEvent> {
        public CloudEventTCPSubHandler(ConcurrentHashMap<Object, RequestContext> contexts) {
            super(contexts);
        }

        @Override
        public CloudEvent getProtocolMessage(Package tcpPackage) {
            EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json");
            Preconditions.checkNotNull((Object)eventFormat, (Object)String.format("Cannot find the cloudevent format: %s", "application/cloudevents+json"));
            return eventFormat.deserialize(tcpPackage.getBody().toString().getBytes(StandardCharsets.UTF_8));
        }

        @Override
        public void callback(CloudEvent cloudEvent, ChannelHandlerContext ctx) {
            if (CloudEventTCPSubClient.this.callback != null) {
                CloudEventTCPSubClient.this.callback.handle(cloudEvent).ifPresent(responseMessage -> ctx.writeAndFlush((Object)MessageUtils.buildPackage(responseMessage, Command.RESPONSE_TO_SERVER)));
            }
        }

        @Override
        public void response(Package tcpPackage) {
            try {
                CloudEventTCPSubClient.this.send(tcpPackage);
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}

