package cn.stylefeng.roses.kernel.sync.modular.cc;

import cn.stylefeng.roses.kernel.sync.core.util.CustomSpringContextHolder;
import cn.stylefeng.roses.kernel.sync.modular.ew.base.AbstractEntryWrapper;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.lang.Thread;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;

/* loaded from: input_file:cn/stylefeng/roses/kernel/sync/modular/cc/AbstractCanalClient.class */
public class AbstractCanalClient {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractCanalClient.class);
    protected volatile boolean running;
    protected Thread.UncaughtExceptionHandler handler;
    protected Thread thread;
    protected CanalConnector connector;
    protected String destination;

    public AbstractCanalClient(String str) {
        this(str, null);
    }

    public AbstractCanalClient(String str, CanalConnector canalConnector) {
        this.running = false;
        this.handler = (thread, th) -> {
            logger.error("parse events has an error", th);
        };
        this.thread = null;
        this.destination = str;
        this.connector = canalConnector;
    }

    public void setConnector(CanalConnector canalConnector) {
        this.connector = canalConnector;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() {
        Assert.notNull(this.connector, "connector is null");
        this.thread = new Thread(() -> {
            process();
        });
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.running = true;
        this.thread.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.thread != null) {
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                }
            }
            MDC.remove("destination");
        }
    }

    protected void process() {
        while (this.running) {
            try {
                try {
                    MDC.put("destination", this.destination);
                    this.connector.connect();
                    this.connector.subscribe();
                    while (this.running) {
                        Message withoutAck = this.connector.getWithoutAck(5120);
                        long id = withoutAck.getId();
                        int size = withoutAck.getEntries().size();
                        if (id != -1 && size != 0) {
                            processEntry(withoutAck.getEntries());
                        }
                        this.connector.ack(id);
                    }
                    this.connector.disconnect();
                    MDC.remove("destination");
                } catch (Exception e) {
                    logger.error("processEntrys error!", e);
                    this.connector.disconnect();
                    MDC.remove("destination");
                }
            } catch (Throwable th) {
                this.connector.disconnect();
                MDC.remove("destination");
                throw th;
            }
        }
    }

    protected void processEntry(List<CanalEntry.Entry> list) {
        List beanOfType = CustomSpringContextHolder.getBeanOfType(AbstractEntryWrapper.class);
        if (beanOfType == null || beanOfType.size() == 0) {
            return;
        }
        Iterator it = beanOfType.iterator();
        while (it.hasNext()) {
            ((AbstractEntryWrapper) it.next()).processEntrys(list);
        }
    }
}
