package org.nutz.plugins.event.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.nutz.integration.jedis.RedisService;
import org.nutz.ioc.Ioc;
import org.nutz.json.Json;
import org.nutz.json.JsonFormat;
import org.nutz.lang.Lang;
import org.nutz.lang.Strings;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.nutz.plugins.event.Event;
import org.nutz.plugins.event.EventBus;
import org.nutz.plugins.event.EventListener;

/* loaded from: input_file:org/nutz/plugins/event/impl/RedisEventBus.class */
public class RedisEventBus implements EventBus {
    private Log log = Logs.get();
    protected String prefix = "nutzevent-";
    protected long errorSleep = 10000;
    private Ioc ioc;
    private RedisService redisService;
    private ExecutorService executorService;

    @Override // org.nutz.plugins.event.EventBus
    public void init() {
        if (this.executorService == null) {
            this.executorService = Executors.newCachedThreadPool();
        }
        for (final String str : this.ioc.getNamesByType(EventListener.class)) {
            final String str2 = this.prefix + ((EventListener) this.ioc.get(EventListener.class, str)).subscribeTopic();
            this.executorService.submit(new Runnable() { // from class: org.nutz.plugins.event.impl.RedisEventBus.1
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        try {
                            String rpop = RedisEventBus.this.redisService.rpop(str2);
                            if (!Strings.isBlank(rpop)) {
                                RedisEventBus.this.log.debugf("on %s -> %s", new Object[]{str2, rpop});
                                try {
                                    ((EventListener) RedisEventBus.this.ioc.get(EventListener.class, str)).onEvent((Event) Json.fromJson(Event.class, rpop));
                                } catch (Exception e) {
                                    RedisEventBus.this.log.error("event listener error!", e);
                                }
                            }
                        } catch (Exception e2) {
                            RedisEventBus.this.log.warnf("on %s error : %s", new Object[]{str2, e2.getMessage()});
                            Lang.sleep(RedisEventBus.this.errorSleep);
                        }
                    }
                }
            });
        }
    }

    @Override // org.nutz.plugins.event.EventBus
    public void depose() {
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                this.executorService.awaitTermination(5L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // org.nutz.plugins.event.EventBus
    public void fireEvent(Event event) {
        this.redisService.lpush(this.prefix + event.getTopic(), new String[]{Json.toJson(event, JsonFormat.compact())});
    }
}
