package org.apache.storm.multilang;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.Constants;
import org.apache.storm.drpc.PrepareRequest;
import org.apache.storm.shade.net.minidev.json.JSONObject;
import org.apache.storm.shade.net.minidev.json.JSONValue;
import org.apache.storm.shade.net.minidev.json.parser.ParseException;
import org.apache.storm.task.TopologyContext;

/* loaded from: input_file:org/apache/storm/multilang/JsonSerializer.class */
public class JsonSerializer implements ISerializer {
    public static final String DEFAULT_CHARSET = "UTF-8";
    private static final long serialVersionUID = 2548814660410474022L;
    private transient BufferedWriter processIn;
    private transient BufferedReader processOut;

    @Override // org.apache.storm.multilang.ISerializer
    public void initialize(OutputStream outputStream, InputStream inputStream) {
        try {
            this.processIn = new BufferedWriter(new OutputStreamWriter(outputStream, DEFAULT_CHARSET));
            this.processOut = new BufferedReader(new InputStreamReader(inputStream, DEFAULT_CHARSET));
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.storm.multilang.ISerializer
    public Number connect(Map<String, Object> map, TopologyContext topologyContext) throws IOException, NoOutputException {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("pidDir", topologyContext.getPIDDir());
        jSONObject.put(Constants.CONF, map);
        jSONObject.put("context", topologyContext);
        writeMessage(jSONObject);
        return (Number) ((JSONObject) readMessage()).get("pid");
    }

    @Override // org.apache.storm.multilang.ISerializer
    public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put(PrepareRequest.ID_STREAM, boltMsg.getId());
        jSONObject.put("comp", boltMsg.getComp());
        jSONObject.put("stream", boltMsg.getStream());
        jSONObject.put("task", Long.valueOf(boltMsg.getTask()));
        jSONObject.put("tuple", boltMsg.getTuple());
        writeMessage(jSONObject);
    }

    @Override // org.apache.storm.multilang.ISerializer
    public void writeSpoutMsg(SpoutMsg spoutMsg) throws IOException {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("command", spoutMsg.getCommand());
        jSONObject.put(PrepareRequest.ID_STREAM, spoutMsg.getId());
        writeMessage(jSONObject);
    }

    @Override // org.apache.storm.multilang.ISerializer
    public void writeTaskIds(List<Integer> list) throws IOException {
        writeMessage(list);
    }

    private void writeMessage(Object obj) throws IOException {
        writeString(JSONValue.toJSONString(obj));
    }

    private void writeString(String str) throws IOException {
        this.processIn.write(str);
        this.processIn.write("\nend\n");
        this.processIn.flush();
    }

    @Override // org.apache.storm.multilang.ISerializer
    public ShellMsg readShellMsg() throws IOException, NoOutputException {
        Object obj;
        JSONObject jSONObject = (JSONObject) readMessage();
        ShellMsg shellMsg = new ShellMsg();
        String str = (String) jSONObject.get("command");
        shellMsg.setCommand(str);
        shellMsg.setId(jSONObject.get(PrepareRequest.ID_STREAM));
        shellMsg.setMsg((String) jSONObject.get("msg"));
        String str2 = (String) jSONObject.get("stream");
        if (str2 == null) {
            str2 = "default";
        }
        shellMsg.setStream(str2);
        Object obj2 = jSONObject.get("task");
        if (obj2 != null) {
            shellMsg.setTask(((Long) obj2).longValue());
        } else {
            shellMsg.setTask(0L);
        }
        Object obj3 = jSONObject.get("need_task_ids");
        if (obj3 == null || ((Boolean) obj3).booleanValue()) {
            shellMsg.setNeedTaskIds(true);
        } else {
            shellMsg.setNeedTaskIds(false);
        }
        shellMsg.setTuple((List) jSONObject.get("tuple"));
        Object obj4 = jSONObject.get("anchors");
        if (obj4 != null) {
            if (obj4 instanceof String) {
                obj4 = Arrays.asList(obj4);
            }
            Iterator it = ((List) obj4).iterator();
            while (it.hasNext()) {
                shellMsg.addAnchor((String) it.next());
            }
        }
        Object obj5 = jSONObject.get("name");
        String str3 = null;
        if (obj5 != null && (obj5 instanceof String)) {
            str3 = (String) obj5;
        }
        shellMsg.setMetricName(str3);
        shellMsg.setMetricParams(jSONObject.get("params"));
        if (str.equals("log") && (obj = jSONObject.get("level")) != null && (obj instanceof Long)) {
            shellMsg.setLogLevel((int) ((Long) obj).longValue());
        }
        return shellMsg;
    }

    private Object readMessage() throws IOException, NoOutputException {
        try {
            return JSONValue.parseWithException(readString());
        } catch (ParseException e) {
            throw new IOException((Throwable) e);
        }
    }

    private String readString() throws IOException, NoOutputException {
        StringBuilder sb = new StringBuilder();
        while (true) {
            String readLine = this.processOut.readLine();
            if (readLine == null) {
                StringBuilder sb2 = new StringBuilder();
                sb2.append("Pipe to subprocess seems to be broken!");
                if (sb.length() == 0) {
                    sb2.append(" No output read.\n");
                } else {
                    sb2.append(" Currently read output: " + sb.toString() + "\n");
                }
                sb2.append("Serializer Exception:\n");
                throw new NoOutputException(sb2.toString());
            }
            if (readLine.equals("end")) {
                return sb.toString();
            }
            if (sb.length() != 0) {
                sb.append("\n");
            }
            sb.append(readLine);
        }
    }
}
