package io.zbus.proxy.http;

import io.zbus.kit.logging.Logger;
import io.zbus.kit.logging.LoggerFactory;
import io.zbus.mq.Broker;
import io.zbus.mq.Consumer;
import io.zbus.mq.ConsumerConfig;
import io.zbus.mq.Message;
import io.zbus.mq.MessageHandler;
import io.zbus.mq.MqClient;
import io.zbus.proxy.http.ProxyConfig;
import io.zbus.transport.Client;
import io.zbus.transport.Session;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/zbus/proxy/http/ProxyHandler.class */
public class ProxyHandler implements MessageHandler, Closeable {
    protected static final Logger log = LoggerFactory.getLogger((Class<?>) ProxyHandler.class);
    private final String topic;
    private final String prefix;
    private final String targetServer;
    private final String targetUrl;
    private Broker broker;
    private Consumer consumer;
    private List<HttpClient> targetClients;
    private int currentClient = 0;
    private final AtomicReference<CountDownLatch> ready = new AtomicReference<>(new CountDownLatch(1));
    private ProxyConfig.ProxyHandlerConfig config;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zbus/proxy/http/ProxyHandler$HttpClient.class */
    public class HttpClient implements Closeable {
        MqClient client;
        Queue<Context> requests = new ConcurrentLinkedQueue();
        Map<String, Context> requestTable = new ConcurrentHashMap();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/zbus/proxy/http/ProxyHandler$HttpClient$Context.class */
        public class Context {
            String topic;
            String msgId;
            String sender;
            MqClient senderClient;

            Context() {
            }
        }

        HttpClient() {
            this.client = new MqClient(ProxyHandler.this.targetServer, ProxyHandler.this.broker.getEventLoop(), ProxyHandler.this.config.heartbeatInterval);
            this.client.onDisconnected(new Client.DisconnectedHandler() { // from class: io.zbus.proxy.http.ProxyHandler.HttpClient.1
                @Override // io.zbus.transport.Client.DisconnectedHandler
                public void onDisconnected() throws IOException {
                    ProxyHandler.this.ready.set(new CountDownLatch(1));
                    ProxyHandler.this.consumer.pause();
                    HttpClient.this.client.ensureConnectedAsync();
                }
            });
            this.client.onConnected(new Client.ConnectedHandler() { // from class: io.zbus.proxy.http.ProxyHandler.HttpClient.2
                @Override // io.zbus.transport.Client.ConnectedHandler
                public void onConnected() throws IOException {
                    ((CountDownLatch) ProxyHandler.this.ready.get()).countDown();
                    ProxyHandler.this.consumer.resume();
                }
            });
            this.client.onMessage(new io.zbus.transport.MessageHandler<Message>() { // from class: io.zbus.proxy.http.ProxyHandler.HttpClient.3
                @Override // io.zbus.transport.MessageHandler
                public void handle(Message message, Session session) throws IOException {
                    String id = message.getId();
                    Context context = null;
                    if (id != null) {
                        context = HttpClient.this.requestTable.remove(id);
                    }
                    if (context != null) {
                        HttpClient.this.requests.remove(context);
                    } else {
                        context = HttpClient.this.requests.poll();
                        if (context != null) {
                            Iterator<Map.Entry<String, Context>> it = HttpClient.this.requestTable.entrySet().iterator();
                            while (true) {
                                if (!it.hasNext()) {
                                    break;
                                } else if (it.next().getValue() == context) {
                                    it.remove();
                                    break;
                                }
                            }
                        }
                    }
                    if (context == null) {
                        return;
                    }
                    if (ProxyHandler.this.config.recvFilter == null || ProxyHandler.this.config.recvFilter.filter(message, context.senderClient)) {
                        message.setId(context.msgId);
                        message.setTopic(context.topic);
                        message.setReceiver(context.sender);
                        try {
                            context.senderClient.route(message);
                        } catch (IOException e) {
                            ProxyHandler.log.error(e.getMessage(), e);
                        }
                    }
                }
            });
        }

        void sendMessage(MqClient mqClient, Message message) throws IOException, InterruptedException {
            Context context = new Context();
            context.msgId = message.getId();
            context.topic = message.getTopic();
            context.sender = message.getSender();
            context.senderClient = mqClient;
            this.requests.add(context);
            this.requestTable.put(context.msgId, context);
            this.client.sendMessage(message);
        }

        void ensureConnectedAsync() throws IOException {
            this.client.ensureConnectedAsync();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.client.close();
        }
    }

    public ProxyHandler(ProxyConfig.ProxyHandlerConfig proxyHandlerConfig) {
        this.config = proxyHandlerConfig;
        this.topic = proxyHandlerConfig.topic;
        this.prefix = "/" + this.topic;
        this.broker = proxyHandlerConfig.broker;
        String str = proxyHandlerConfig.targetUrl;
        String[] split = (str.startsWith("http://") ? str.substring("http://".length()) : str).split("[//]", 2);
        this.targetServer = split[0].trim();
        this.targetUrl = split.length > 1 ? split[1].trim() : "";
    }

    public synchronized void start() {
        if (this.consumer != null) {
            return;
        }
        ConsumerConfig consumerConfig = new ConsumerConfig(this.broker);
        consumerConfig.setTopic(this.topic);
        consumerConfig.setConnectionCount(this.config.connectionCount);
        consumerConfig.setTopicMask(5);
        consumerConfig.setMaxInFlightMessage(1);
        consumerConfig.setConsumeTimeout(this.config.consumeTimeout);
        consumerConfig.setToken(this.config.token);
        this.consumer = new Consumer(consumerConfig);
        this.consumer.setMessageHandler(this);
        try {
            this.consumer.start(true);
            this.targetClients = new ArrayList();
            for (int i = 0; i < this.config.connectionCount; i++) {
                this.targetClients.add(new HttpClient());
            }
            Iterator<HttpClient> it = this.targetClients.iterator();
            while (it.hasNext()) {
                try {
                    it.next().ensureConnectedAsync();
                } catch (IOException e) {
                    log.error(e.getMessage(), e);
                }
            }
        } catch (IOException e2) {
            log.error(e2.getMessage(), e2);
        }
    }

    @Override // io.zbus.mq.MessageHandler
    public void handle(Message message, MqClient mqClient) throws IOException {
        try {
            this.ready.get().await();
            String url = message.getUrl();
            if (url == null) {
                log.error("missing url");
                return;
            }
            if (!url.startsWith(this.prefix)) {
                log.error("Url unmatched");
                return;
            }
            String substring = url.substring(this.prefix.length());
            if (!substring.startsWith("/")) {
                substring = "/" + substring;
            }
            String str = this.targetUrl;
            if (!"/".equals(substring)) {
                str = str + substring;
            }
            if (!str.startsWith("/")) {
                str = "/" + str;
            }
            message.setUrl(str);
            try {
                if (this.config.sendFilter == null || this.config.sendFilter.filter(message, mqClient)) {
                    this.currentClient = (this.currentClient + 1) % this.targetClients.size();
                    this.targetClients.get(this.currentClient).sendMessage(mqClient, message);
                }
            } catch (Exception e) {
                Message message2 = new Message();
                if (e instanceof FileNotFoundException) {
                    message2.setStatus(404);
                    message2.setBody(e.getMessage() + " Not Found");
                } else {
                    message2.setStatus(500);
                    message2.setBody(String.format("Target(%s/%s) invoke error, reason: %s", this.targetServer, this.targetUrl, e.toString()));
                }
            }
        } catch (InterruptedException e2) {
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<HttpClient> it = this.targetClients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
    }
}
