package org.correomqtt.business.dispatcher;

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.correomqtt.business.dispatcher.BaseConnectionObserver;
import org.correomqtt.business.utils.ConnectionHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MarkerFactory;

/* loaded from: input_file:org/correomqtt/business/dispatcher/BaseConnectionDispatcher.class */
public abstract class BaseConnectionDispatcher<T extends BaseConnectionObserver> extends BaseDispatcher<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BaseConnectionDispatcher.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void triggerFiltered(String str, Consumer<T> consumer) {
        String callerString = getCallerString();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(MarkerFactory.getMarker(ConnectionHolder.getInstance().getConfig(str).getName()), "Trigger with connectionId {}: {}", str, callerString);
        }
        ((List) this.observer.stream().filter(baseConnectionObserver -> {
            return baseConnectionObserver.getConnectionId() != null;
        }).filter(baseConnectionObserver2 -> {
            return baseConnectionObserver2.getConnectionId().equals(str);
        }).collect(Collectors.toList())).forEach(baseConnectionObserver3 -> {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace(MarkerFactory.getMarker(ConnectionHolder.getInstance().getConfig(str).getName()), "Trigger with connectionId {}: {} -> {}", new Object[]{str, callerString, baseConnectionObserver3.getClass().getSimpleName()});
            }
            consumer.accept(baseConnectionObserver3);
        });
    }
}
