/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.protocols.TP;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"stack-dependent"}, sequential=true)
public class TransportThreadPoolTest
extends ChannelTestBase {
    JChannel c1;
    JChannel c2;

    @BeforeMethod
    protected void setUp() throws Exception {
        this.c1 = this.createChannel(true, 2);
        this.c2 = this.createChannel(this.c1);
    }

    @AfterMethod
    protected void tearDown() throws Exception {
        Util.close(this.c2, this.c1);
    }

    @Test
    public void testThreadPoolReplacement() throws Exception {
        Receiver r1 = new Receiver();
        Receiver r2 = new Receiver();
        this.c1.setReceiver(r1);
        this.c2.setReceiver(r2);
        this.c1.connect("TransportThreadPoolTest");
        this.c2.connect("TransportThreadPoolTest");
        Util.blockUntilViewsReceived(5000L, 500L, this.c1, this.c2);
        assert (this.c2.getView().size() == 2) : "view is " + this.c2.getView() + ", but should have had a size of 2";
        TP transport = this.c1.getProtocolStack().getTransport();
        ExecutorService thread_pool = Executors.newFixedThreadPool(2);
        transport.setDefaultThreadPool(thread_pool);
        transport = this.c2.getProtocolStack().getTransport();
        thread_pool = Executors.newFixedThreadPool(2);
        transport.setDefaultThreadPool(thread_pool);
        this.c1.send(null, null, (Serializable)((Object)"hello world"));
        this.c2.send(null, null, (Serializable)((Object)"bela"));
        this.c1.send(null, null, (Serializable)((Object)"message 3"));
        this.c2.send(null, null, (Serializable)((Object)"message 4"));
        long start = System.currentTimeMillis();
        r1.getLatch().await(3000L, TimeUnit.MILLISECONDS);
        r2.getLatch().await(3000L, TimeUnit.MILLISECONDS);
        long diff = System.currentTimeMillis() - start;
        System.out.println("messages c1: " + TransportThreadPoolTest.print(r1.getMsgs()) + "\nmessages c2: " + TransportThreadPoolTest.print(r2.getMsgs()) + "\ntook " + diff + " ms");
        assert (r1.getMsgs().size() == 4);
        assert (r2.getMsgs().size() == 4);
    }

    private static String print(Collection<Message> msgs) {
        StringBuilder sb = new StringBuilder();
        for (Message msg : msgs) {
            sb.append("\"" + msg.getObject() + "\"").append(" ");
        }
        return sb.toString();
    }

    private static class Receiver
    extends ReceiverAdapter {
        Collection<Message> msgs = new ConcurrentLinkedQueue<Message>();
        final CountDownLatch latch = new CountDownLatch(4);

        private Receiver() {
        }

        public Collection<Message> getMsgs() {
            return this.msgs;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        @Override
        public void receive(Message msg) {
            this.msgs.add(msg);
            this.latch.countDown();
        }
    }
}

