package io.vertx.ext.stomp.impl;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/ext/stomp/impl/TransactionsTest.class */
public class TransactionsTest {
    private Vertx vertx;
    private StompServer server;
    private List<StompClient> clients = new ArrayList();

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx)).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown() {
        this.clients.forEach((v0) -> {
            v0.close();
        });
        this.clients.clear();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    @Test
    public void testBasicTransaction() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.commit("my-tx");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 3 && copyOnWriteArrayList2.isEmpty());
        });
        Iterator it = copyOnWriteArrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Frame) it.next()).getHeader("transaction")).isEqualTo("my-tx");
        }
    }

    @Test
    public void testAbortedTransaction() throws InterruptedException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.abort("my-tx");
        }));
        Thread.sleep(2000L);
        Assertions.assertThat(copyOnWriteArrayList2).isEmpty();
        Assertions.assertThat(copyOnWriteArrayList).isEmpty();
    }

    @Test
    public void testTransactionDeliveringToTwoClients() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            copyOnWriteArrayList3.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.commit("my-tx");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 3 && copyOnWriteArrayList2.size() == 3 && copyOnWriteArrayList3.isEmpty());
        });
        Iterator it = copyOnWriteArrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Frame) it.next()).getHeader("transaction")).isEqualTo("my-tx");
        }
        Iterator it2 = copyOnWriteArrayList2.iterator();
        while (it2.hasNext()) {
            Assertions.assertThat(((Frame) it2.next()).getHeader("transaction")).isEqualTo("my-tx");
        }
    }

    @Test
    public void testThatYouCannotBeginTwoTransactionsWithTheSameId() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.beginTX("my-tx");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!copyOnWriteArrayList2.isEmpty());
        });
        Assertions.assertThat(copyOnWriteArrayList.isEmpty());
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).getHeader("message")).containsIgnoringCase("Already existing transaction");
    }

    @Test
    public void testThatTransactionIDCanBeReusedAfterCommit() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.commit("my-tx");
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.commit("my-tx");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 3 && copyOnWriteArrayList2.isEmpty());
        });
        Iterator it = copyOnWriteArrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Frame) it.next()).getHeader("transaction")).isEqualTo("my-tx");
        }
    }

    @Test
    public void testAutoAbortOnClose() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.close();
            atomicBoolean.set(true);
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Transactions.instance().getTransactionCount() == 0 && atomicBoolean.get());
        });
        Assertions.assertThat(copyOnWriteArrayList).isEmpty();
        Assertions.assertThat(copyOnWriteArrayList2).isEmpty();
    }

    @Test
    public void testAutoAbortOnDisconnect() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx", frame -> {
                stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
                stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
                stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
                stompClientConnection.disconnect();
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Transactions.instance().getTransactionCount() == 0);
        });
        Assertions.assertThat(copyOnWriteArrayList).isEmpty();
        Assertions.assertThat(copyOnWriteArrayList2).isEmpty();
    }

    @Test
    public void testCommitWithIllegalId() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.commit("illegal");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() >= 1);
        });
        Assertions.assertThat(copyOnWriteArrayList).isEmpty();
        Assertions.assertThat(Transactions.instance().getTransactionCount()).isEqualTo(0);
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).toString()).containsIgnoringCase("Unknown transaction");
    }

    @Test
    public void testAbortWithBadTransactionId() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.abort("illegal");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() >= 1);
        });
        Assertions.assertThat(copyOnWriteArrayList).isEmpty();
        Assertions.assertThat(Transactions.instance().getTransactionCount()).isEqualTo(0);
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).toString()).containsIgnoringCase("Unknown transaction");
    }

    @Test
    public void testNumberOfFramesInTransaction() {
        this.server.options().setMaxFrameInTransaction(2);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            copyOnWriteArrayList2.getClass();
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx");
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World")));
            stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!")));
            stompClientConnection.commit("my-tx");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Transactions.instance().getTransactionCount() == 0);
        });
        Assertions.assertThat(copyOnWriteArrayList).isEmpty();
        Assertions.assertThat(copyOnWriteArrayList2).hasSize(1);
    }

    @Test
    public void testTransactionChunk() {
        this.server.options().setTransactionChunkSize(100);
        this.server.options().setMaxFrameInTransaction(10000);
        CopyOnWriteArrayList<Frame> copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            copyOnWriteArrayList.getClass();
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.beginTX("my-tx");
            for (int i = 0; i < 5000; i++) {
                stompClientConnection.send(new Frame().setCommand(Frame.Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello-" + i)));
            }
            stompClientConnection.commit("my-tx");
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 5000);
        });
        int i = 0;
        for (Frame frame : copyOnWriteArrayList) {
            Assertions.assertThat(frame.getHeader("transaction")).isEqualTo("my-tx");
            Assertions.assertThat(frame.getBodyAsString()).isEqualTo("Hello-" + i);
            i++;
        }
    }
}
