package io.zbus.proxy.http;

import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.zbus.kit.ThreadKit;
import io.zbus.kit.logging.Logger;
import io.zbus.kit.logging.LoggerFactory;
import io.zbus.mq.MqClient;
import io.zbus.transport.Client;
import io.zbus.transport.CodecInitializer;
import io.zbus.transport.EventLoop;
import io.zbus.transport.Session;
import io.zbus.transport.http.Message;
import io.zbus.transport.http.MessageCodec;
import io.zbus.transport.tcp.TcpClient;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/zbus/proxy/http/ProxyClient.class */
public class ProxyClient extends TcpClient<Message, Message> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProxyClient.class);
    private Map<String, Context> requestTable;
    private boolean targetMessageIdentifiable;
    private MessageFilter recvFilter;
    private int defaultTimeout;
    private Context context;
    private ThreadKit.ManualResetEvent ready;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zbus/proxy/http/ProxyClient$Context.class */
    public static class Context {
        String topic;
        String msgId;
        String sender;
        MqClient senderClient;

        private Context() {
        }
    }

    public ProxyClient(String str, final EventLoop eventLoop) {
        super(str, eventLoop);
        this.requestTable = new ConcurrentHashMap();
        this.targetMessageIdentifiable = false;
        this.defaultTimeout = 10000;
        this.ready = new ThreadKit.ManualResetEvent(true);
        codec(new CodecInitializer() { // from class: io.zbus.proxy.http.ProxyClient.1
            @Override // io.zbus.transport.CodecInitializer
            public void initPipeline(List<ChannelHandler> list) {
                list.add(new HttpRequestEncoder());
                list.add(new HttpResponseDecoder());
                list.add(new HttpObjectAggregator(eventLoop.getPackageSizeLimit()));
                list.add(new MessageCodec());
            }
        });
        onDisconnected(new Client.DisconnectedHandler() { // from class: io.zbus.proxy.http.ProxyClient.2
            @Override // io.zbus.transport.Client.DisconnectedHandler
            public void onDisconnected() throws IOException {
                ProxyClient.log.info("Disconnected from(%s) ID=%s", ProxyClient.this.serverAddress(), ProxyClient.this.clientId);
                ProxyClient.this.close();
            }
        });
        onError(new Client.ErrorHandler() { // from class: io.zbus.proxy.http.ProxyClient.3
            @Override // io.zbus.transport.Client.ErrorHandler
            public void onError(Throwable th, Session session) throws IOException {
                ProxyClient.this.close();
            }
        });
    }

    @Override // io.zbus.transport.AbstractClient, io.zbus.transport.IoAdaptor
    public void onMessage(Object obj, Session session) throws IOException {
        Context context;
        io.zbus.mq.Message message = new io.zbus.mq.Message((Message) obj);
        String id = message.getId();
        if (this.targetMessageIdentifiable) {
            context = this.requestTable.remove(id);
        } else {
            context = this.context;
            this.context = null;
        }
        if (context == null) {
            log.warn("Message from target without context: " + message);
            return;
        }
        message.setId(context.msgId);
        message.setTopic(context.topic);
        message.setReceiver(context.sender);
        if (!this.targetMessageIdentifiable) {
            this.ready.set();
        }
        if (this.recvFilter == null || this.recvFilter.filter(message, context.senderClient)) {
            try {
                context.senderClient.route(message);
            } catch (IOException e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    public void sendMessage(io.zbus.mq.Message message, MqClient mqClient, int i) throws IOException, InterruptedException {
        if (this.targetMessageIdentifiable) {
            sendMessageUnsafe(message, mqClient, i);
            return;
        }
        synchronized (this.ready) {
            sendMessageUnsafe(message, mqClient, i);
            this.ready.await(i, TimeUnit.MILLISECONDS);
            if (!this.ready.isSignalled()) {
                throw new IOException("waiting result timeout, should close connection");
            }
        }
    }

    private void sendMessageUnsafe(io.zbus.mq.Message message, MqClient mqClient, int i) throws IOException, InterruptedException {
        Context context = new Context();
        context.msgId = message.getId();
        context.topic = message.getTopic();
        context.sender = message.getSender();
        context.senderClient = mqClient;
        if (this.targetMessageIdentifiable) {
            this.requestTable.put(context.msgId, context);
        } else {
            this.context = context;
            this.ready.reset();
        }
        super.sendMessage(message);
    }

    public void sendMessage(io.zbus.mq.Message message, MqClient mqClient) throws IOException, InterruptedException {
        sendMessage(message, mqClient, this.defaultTimeout);
    }

    public MessageFilter getRecvFilter() {
        return this.recvFilter;
    }

    public void setRecvFilter(MessageFilter messageFilter) {
        this.recvFilter = messageFilter;
    }

    public int getDefaultTimeout() {
        return this.defaultTimeout;
    }

    public void setDefaultTimeout(int i) {
        this.defaultTimeout = i;
    }

    public void setTargetMessageIdentifiable(boolean z) {
        this.targetMessageIdentifiable = z;
    }
}
