package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSession.class */
public class ReactorSession implements AmqpSession {
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private final ClientLogger logger;
    private final Flux<AmqpEndpointState> endpointStates;
    private final AmqpConnection amqpConnection;
    private final Session session;
    private final SessionHandler sessionHandler;
    private final String sessionName;
    private final ReactorProvider provider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final String activeTimeoutMessage;
    private final AmqpRetryOptions retryOptions;
    private final ReactorHandlerProvider handlerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
    private final Flux<AmqpShutdownSignal> shutdownSignals;
    private final ConcurrentMap<String, LinkSubscription<AmqpSendLink>> openSendLinks = new ConcurrentHashMap();
    private final ConcurrentMap<String, LinkSubscription<AmqpReceiveLink>> openReceiveLinks = new ConcurrentHashMap();
    private final Scheduler timeoutScheduler = Schedulers.parallel();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Object closeLock = new Object();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final Disposable.Composite subscriptions = Disposables.composite();
    private final AtomicReference<TransactionCoordinator> transactionCoordinator = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSession$LinkSubscription.class */
    public static final class LinkSubscription<T extends AmqpLink> {
        private final AtomicBoolean isDisposed;
        private final T link;
        private final Disposable subscription;
        private final String errorMessage;

        private LinkSubscription(T t, Disposable disposable, String str) {
            this.isDisposed = new AtomicBoolean();
            this.link = t;
            this.subscription = disposable;
            this.errorMessage = str;
        }

        public T getLink() {
            return this.link;
        }

        Mono<Void> closeAsync(ErrorCondition errorCondition) {
            if (this.isDisposed.getAndSet(true)) {
                return Mono.empty();
            }
            this.subscription.dispose();
            if (this.link instanceof ReactorReceiver) {
                return ((ReactorReceiver) this.link).closeAsync(this.errorMessage, errorCondition);
            }
            if (this.link instanceof ReactorSender) {
                return ((ReactorSender) this.link).closeAsync(this.errorMessage, errorCondition);
            }
            this.link.dispose();
            return Mono.empty();
        }
    }

    public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, String str, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, Mono<ClaimsBasedSecurityNode> mono, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions amqpRetryOptions) {
        this.amqpConnection = amqpConnection;
        this.session = session;
        this.sessionHandler = sessionHandler;
        this.handlerProvider = reactorHandlerProvider;
        this.sessionName = str;
        this.provider = reactorProvider;
        this.cbsNodeSupplier = mono;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.retryOptions = amqpRetryOptions;
        this.activeTimeoutMessage = String.format("ReactorSession connectionId[%s], session[%s]: Retries exhausted waiting for ACTIVE endpoint state.", sessionHandler.getConnectionId(), str);
        this.logger = new ClientLogger(ReactorSession.class, AmqpLoggingUtils.createContextWithConnectionId(this.sessionHandler.getConnectionId()));
        this.endpointStates = sessionHandler.getEndpointStates().map(endpointState -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.SESSION_NAME_KEY, str).addKeyValue("state", endpointState).log("Got endpoint state.");
            return AmqpEndpointStateUtil.getConnectionState(endpointState);
        }).doOnError(th -> {
            handleError(th);
        }).doOnComplete(() -> {
            handleClose();
        }).cache(1);
        this.shutdownSignals = amqpConnection.getShutdownSignals();
        this.subscriptions.add(this.endpointStates.subscribe());
        this.subscriptions.add(this.shutdownSignals.flatMap(amqpShutdownSignal -> {
            return closeAsync("Shutdown signal received", null, false);
        }).subscribe());
        session.open();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session session() {
        return this.session;
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        closeAsync().block(this.retryOptions.getTryTimeout());
    }

    @Override // com.azure.core.amqp.AmqpSession
    public String getSessionName() {
        return this.sessionName;
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Duration getOperationTimeout() {
        return this.retryOptions.getTryTimeout();
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<AmqpTransaction> createTransaction() {
        return getOrCreateTransactionCoordinator().flatMap(amqpTransactionCoordinator -> {
            return amqpTransactionCoordinator.declare();
        });
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<Void> commitTransaction(AmqpTransaction amqpTransaction) {
        return getOrCreateTransactionCoordinator().flatMap(amqpTransactionCoordinator -> {
            return amqpTransactionCoordinator.discharge(amqpTransaction, true);
        });
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<Void> rollbackTransaction(AmqpTransaction amqpTransaction) {
        return getOrCreateTransactionCoordinator().flatMap(amqpTransactionCoordinator -> {
            return amqpTransactionCoordinator.discharge(amqpTransaction, false);
        });
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<AmqpLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy) {
        return createProducer(str, str2, duration, amqpRetryPolicy, null).or(onClosedError("Connection closed while waiting for new producer link.", str2, str));
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<AmqpLink> createConsumer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy) {
        return createConsumer(str, str2, duration, amqpRetryPolicy, null, null, null, SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND).or(onClosedError("Connection closed while waiting for new receive link.", str2, str)).cast(AmqpLink.class);
    }

    @Override // com.azure.core.amqp.AmqpSession
    public boolean removeLink(String str) {
        return removeLink(this.openSendLinks, str) || removeLink(this.openReceiveLinks, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> isClosed() {
        return this.isClosedMono.asMono();
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<Void> closeAsync() {
        return closeAsync(null, null, true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> closeAsync(String str, ErrorCondition errorCondition, boolean z) {
        if (this.isDisposed.getAndSet(true)) {
            return this.isClosedMono.asMono();
        }
        AmqpLoggingUtils.addErrorCondition(this.logger.atVerbose(), errorCondition).addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("Setting error condition and disposing session. {}", new Object[]{str});
        return Mono.fromRunnable(() -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    disposeWork(errorCondition, z);
                });
            } catch (IOException e) {
                this.logger.atInfo().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("Error while scheduling work. Manually disposing.", new Object[]{e});
                disposeWork(errorCondition, z);
            } catch (RejectedExecutionException e2) {
                this.logger.atInfo().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("RejectedExecutionException when scheduling work.");
                disposeWork(errorCondition, z);
            }
        }).then(this.isClosedMono.asMono());
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<? extends AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
        if (isDisposed()) {
            return Mono.error(this.logger.atError().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log(new AmqpException(true, String.format("Cannot create coordinator send link %s from a closed session.", TRANSACTION_LINK_NAME), this.sessionHandler.getErrorContext())));
        }
        TransactionCoordinator transactionCoordinator = this.transactionCoordinator.get();
        if (transactionCoordinator == null) {
            return createProducer(TRANSACTION_LINK_NAME, TRANSACTION_LINK_NAME, new Coordinator(), this.retryOptions, null, false).map(amqpSendLink -> {
                TransactionCoordinator transactionCoordinator2 = new TransactionCoordinator(amqpSendLink, this.messageSerializer);
                return this.transactionCoordinator.compareAndSet(null, transactionCoordinator2) ? transactionCoordinator2 : this.transactionCoordinator.get();
            }).or(onClosedError("Connection closed while waiting for transaction coordinator creation.", ClientConstants.NOT_APPLICABLE, ClientConstants.NOT_APPLICABLE));
        }
        this.logger.atVerbose().addKeyValue(TRANSACTION_LINK_NAME, TRANSACTION_LINK_NAME).log("Returning existing transaction coordinator.");
        return Mono.just(transactionCoordinator);
    }

    protected Mono<AmqpReceiveLink> createConsumer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, Map<Symbol, Object> map, Map<Symbol, Object> map2, Symbol[] symbolArr, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        if (isDisposed()) {
            return Mono.error(this.logger.atError().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).addKeyValue(ClientConstants.ENTITY_PATH_KEY, str2).addKeyValue(ClientConstants.LINK_NAME_KEY, str).log(Exceptions.propagate(new AmqpException(true, "Cannot create receive link from a closed session.", this.sessionHandler.getErrorContext()))));
        }
        LinkSubscription<AmqpReceiveLink> linkSubscription = this.openReceiveLinks.get(str);
        if (linkSubscription != null) {
            this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).addKeyValue(ClientConstants.ENTITY_PATH_KEY, str2).log("Returning existing receive link.");
            return Mono.just(linkSubscription.getLink());
        }
        TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, str2);
        return Mono.when(new Publisher[]{onActiveEndpoint(), tokenManager.authorize()}).then(Mono.create(monoSink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    monoSink.success(this.openReceiveLinks.compute(str, (str3, linkSubscription2) -> {
                        if (linkSubscription2 == null) {
                            this.logger.atInfo().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).addKeyValue(ClientConstants.LINK_NAME_KEY, str).log("Creating a new receiver link.");
                            return getSubscription(str3, str2, map, map2, symbolArr, senderSettleMode, receiverSettleMode, tokenManager);
                        }
                        this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).log("Another receive link exists. Disposing of new one.");
                        tokenManager.close();
                        return linkSubscription2;
                    }).getLink());
                });
            } catch (IOException | RejectedExecutionException e) {
                monoSink.error(e);
            }
        })).onErrorResume(th -> {
            return Mono.error(() -> {
                tokenManager.close();
                return th;
            });
        });
    }

    protected ReactorReceiver createConsumer(String str, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
        return new ReactorReceiver(this.amqpConnection, str, receiver, receiveLinkHandler, tokenManager, reactorProvider.getReactorDispatcher(), this.retryOptions, this.handlerProvider.getMetricProvider(this.amqpConnection.getFullyQualifiedNamespace(), str));
    }

    protected Mono<AmqpLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, Map<Symbol, Object> map) {
        Target target = new Target();
        target.setAddress(str2);
        AmqpRetryOptions amqpRetryOptions = amqpRetryPolicy != null ? new AmqpRetryOptions(amqpRetryPolicy.getRetryOptions()) : new AmqpRetryOptions();
        if (duration != null) {
            amqpRetryOptions.setTryTimeout(duration);
        }
        return createProducer(str, str2, target, amqpRetryOptions, map, true).cast(AmqpLink.class);
    }

    private Mono<AmqpSendLink> createProducer(String str, String str2, org.apache.qpid.proton.amqp.transport.Target target, AmqpRetryOptions amqpRetryOptions, Map<Symbol, Object> map, boolean z) {
        TokenManager tokenManager;
        Mono<Long> empty;
        if (isDisposed()) {
            return Mono.error(this.logger.atError().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).addKeyValue(ClientConstants.ENTITY_PATH_KEY, str2).addKeyValue(ClientConstants.LINK_NAME_KEY, str).log(Exceptions.propagate(new AmqpException(true, "Cannot create send link from a closed session.", this.sessionHandler.getErrorContext()))));
        }
        LinkSubscription<AmqpSendLink> linkSubscription = this.openSendLinks.get(str);
        if (linkSubscription != null) {
            this.logger.atVerbose().addKeyValue(ClientConstants.LINK_NAME_KEY, str).log("Returning existing send link.");
            return Mono.just(linkSubscription.getLink());
        }
        if (z) {
            tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, str2);
            empty = tokenManager.authorize();
        } else {
            tokenManager = null;
            empty = Mono.empty();
        }
        TokenManager tokenManager2 = tokenManager;
        return Mono.when(new Publisher[]{onActiveEndpoint(), empty}).then(Mono.create(monoSink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    monoSink.success(this.openSendLinks.compute(str, (str3, linkSubscription2) -> {
                        if (linkSubscription2 == null) {
                            this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("Creating a new send link.");
                            return getSubscription(str, str2, target, map, amqpRetryOptions, tokenManager2);
                        }
                        this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).log("Another send link exists. Disposing of new one.");
                        if (tokenManager2 != null) {
                            tokenManager2.close();
                        }
                        return linkSubscription2;
                    }).getLink());
                });
            } catch (IOException | RejectedExecutionException e) {
                monoSink.error(e);
            }
        }));
    }

    private LinkSubscription<AmqpSendLink> getSubscription(String str, String str2, org.apache.qpid.proton.amqp.transport.Target target, Map<Symbol, Object> map, AmqpRetryOptions amqpRetryOptions, TokenManager tokenManager) {
        Sender sender = this.session.sender(str);
        sender.setTarget(target);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        Source source = new Source();
        if (map != null && map.size() > 0) {
            String str3 = (String) map.get(AmqpConstants.CLIENT_IDENTIFIER);
            if (!CoreUtils.isNullOrEmpty(str3)) {
                source.setAddress(str3);
                map.remove(AmqpConstants.CLIENT_IDENTIFIER);
            }
            sender.setProperties(map);
        }
        sender.setSource(source);
        SendLinkHandler createSendLinkHandler = this.handlerProvider.createSendLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), str, str2);
        BaseHandler.setHandler(sender, createSendLinkHandler);
        sender.open();
        ReactorSender reactorSender = new ReactorSender(this.amqpConnection, str2, sender, createSendLinkHandler, this.provider, tokenManager, this.messageSerializer, amqpRetryOptions, this.timeoutScheduler, this.handlerProvider.getMetricProvider(this.amqpConnection.getFullyQualifiedNamespace(), str2));
        return new LinkSubscription<>(reactorSender, reactorSender.getEndpointStates().subscribe(amqpEndpointState -> {
        }, th -> {
            if (this.isDisposed.get()) {
                return;
            }
            removeLink(this.openSendLinks, str);
        }, () -> {
            if (this.isDisposed.get()) {
                return;
            }
            this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).log("Complete. Removing and disposing send link.");
            removeLink(this.openSendLinks, str);
        }), String.format("connectionId[%s] session[%s]: Setting error on receive link.", this.sessionHandler.getConnectionId(), this.sessionName));
    }

    private LinkSubscription<AmqpReceiveLink> getSubscription(String str, String str2, Map<Symbol, Object> map, Map<Symbol, Object> map2, Symbol[] symbolArr, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, TokenManager tokenManager) {
        Receiver receiver = this.session.receiver(str);
        Source source = new Source();
        source.setAddress(str2);
        if (map != null && map.size() > 0) {
            source.setFilter(map);
        }
        receiver.setSource(source);
        receiver.setSenderSettleMode(senderSettleMode);
        receiver.setReceiverSettleMode(receiverSettleMode);
        Target target = new Target();
        if (map2 != null && !map2.isEmpty()) {
            receiver.setProperties(map2);
            String str3 = (String) map2.get(AmqpConstants.CLIENT_RECEIVER_IDENTIFIER);
            if (!CoreUtils.isNullOrEmpty(str3)) {
                target.setAddress(str3);
            }
        }
        receiver.setTarget(target);
        if (symbolArr != null && symbolArr.length > 0) {
            receiver.setDesiredCapabilities(symbolArr);
        }
        ReceiveLinkHandler createReceiveLinkHandler = this.handlerProvider.createReceiveLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), str, str2);
        BaseHandler.setHandler(receiver, createReceiveLinkHandler);
        receiver.open();
        ReactorReceiver createConsumer = createConsumer(str2, receiver, createReceiveLinkHandler, tokenManager, this.provider);
        return new LinkSubscription<>(createConsumer, createConsumer.getEndpointStates().subscribe(amqpEndpointState -> {
        }, th -> {
            if (this.isDisposed.get()) {
                return;
            }
            removeLink(this.openReceiveLinks, str);
        }, () -> {
            if (this.isDisposed.get()) {
                return;
            }
            this.logger.atInfo().addKeyValue(ClientConstants.LINK_NAME_KEY, str).addKeyValue(ClientConstants.ENTITY_PATH_KEY, str2).log("Complete. Removing receive link.");
            removeLink(this.openReceiveLinks, str);
        }), String.format("connectionId[%s] sessionName[%s]: Setting error on receive link.", this.amqpConnection.getId(), this.sessionName));
    }

    private <T> Mono<T> onClosedError(String str, String str2, String str3) {
        return Mono.firstWithSignal(new Mono[]{this.isClosedMono.asMono(), this.shutdownSignals.next()}).then(Mono.error(new AmqpException(false, String.format("connectionId[%s] entityPath[%s] linkName[%s] Connection closed. %s", this.sessionHandler.getConnectionId(), str3, str2, str), this.sessionHandler.getErrorContext())));
    }

    private Mono<Void> onActiveEndpoint() {
        return RetryUtil.withRetry(getEndpointStates().takeUntil(amqpEndpointState -> {
            return amqpEndpointState == AmqpEndpointState.ACTIVE;
        }), this.retryOptions, this.activeTimeoutMessage).then();
    }

    private void handleClose() {
        this.logger.atVerbose().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("Disposing of active send and receive links due to session close.");
        closeAsync().subscribe();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleError(Throwable th) {
        ErrorCondition errorCondition;
        this.logger.atVerbose().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("Disposing of active links due to error.");
        if (th instanceof AmqpException) {
            AmqpException amqpException = (AmqpException) th;
            errorCondition = new ErrorCondition(Symbol.getSymbol(amqpException.getErrorCondition() != null ? amqpException.getErrorCondition().getErrorCondition() : "UNKNOWN"), amqpException.getMessage());
            closeAsync(amqpException.getMessage(), errorCondition, true).subscribe();
        } else {
            errorCondition = null;
        }
        closeAsync(th.getMessage(), errorCondition, true).subscribe();
    }

    private void disposeWork(ErrorCondition errorCondition, boolean z) {
        if (this.session.getLocalState() != EndpointState.CLOSED) {
            this.session.close();
            if (errorCondition != null && this.session.getCondition() == null) {
                this.session.setCondition(errorCondition);
            }
        }
        ArrayList arrayList = new ArrayList();
        if (z) {
            synchronized (this.closeLock) {
                this.openReceiveLinks.values().forEach(linkSubscription -> {
                    if (linkSubscription == null) {
                        return;
                    }
                    arrayList.add(linkSubscription.closeAsync(errorCondition));
                });
                this.openSendLinks.values().forEach(linkSubscription2 -> {
                    if (linkSubscription2 == null) {
                        return;
                    }
                    arrayList.add(linkSubscription2.closeAsync(errorCondition));
                });
            }
        }
        this.subscriptions.add(Mono.when(arrayList).timeout(this.retryOptions.getTryTimeout()).onErrorResume(th -> {
            this.logger.atWarning().addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("Timed out waiting for all links to close.", new Object[]{th});
            return Mono.empty();
        }).then(Mono.fromRunnable(() -> {
            this.isClosedMono.emitEmpty((signalType, emitResult) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, emitResult).addKeyValue(ClientConstants.SESSION_NAME_KEY, this.sessionName).log("Unable to emit shutdown signal.");
                return false;
            });
            this.sessionHandler.close();
            this.subscriptions.dispose();
        })).subscribe());
    }

    private <T extends AmqpLink> boolean removeLink(ConcurrentMap<String, LinkSubscription<T>> concurrentMap, String str) {
        boolean z;
        if (str == null) {
            return false;
        }
        synchronized (this.closeLock) {
            LinkSubscription<T> remove = concurrentMap.remove(str);
            if (remove != null) {
                remove.closeAsync(null).subscribe();
            }
            z = remove != null;
        }
        return z;
    }
}
