/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.protocols.TP;
import org.jgroups.tests.ChannelTestBase;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"stack-dependent"}, sequential=true)
public class FifoOrderTest
extends ChannelTestBase {
    JChannel ch1;
    JChannel ch2;
    JChannel ch3;
    static final int NUM = 25;
    static final int EXPECTED = 75;
    static final long SLEEPTIME = 100L;
    CyclicBarrier barrier;

    @BeforeMethod
    void setUp() throws Exception {
        this.barrier = new CyclicBarrier(4);
        this.ch1 = this.createChannel(true, 3);
        this.ch2 = this.createChannel(this.ch1);
        this.ch3 = this.createChannel(this.ch1);
    }

    @AfterMethod
    protected void tearDown() throws Exception {
        Util.close(this.ch3, this.ch2, this.ch1);
        this.barrier.reset();
    }

    @Test
    public void testFifoDelivery() throws Exception {
        FifoOrderTest.modifyDefaultThreadPool(this.ch1);
        FifoOrderTest.modifyDefaultThreadPool(this.ch2);
        FifoOrderTest.modifyDefaultThreadPool(this.ch3);
        MyReceiver r1 = new MyReceiver("R1");
        MyReceiver r2 = new MyReceiver("R2");
        MyReceiver r3 = new MyReceiver("R3");
        this.ch1.setReceiver(r1);
        this.ch2.setReceiver(r2);
        this.ch3.setReceiver(r3);
        this.ch1.connect("ConcurrentStackTest");
        this.ch2.connect("ConcurrentStackTest");
        this.ch3.connect("ConcurrentStackTest");
        View v = this.ch3.getView();
        assert (v.size() == 3) : "view is " + v;
        new Thread((Runnable)new Sender(this.ch1)){}.start();
        new Thread((Runnable)new Sender(this.ch2)){}.start();
        new Thread((Runnable)new Sender(this.ch3)){}.start();
        this.barrier.await();
        long start = System.currentTimeMillis();
        TimeoutException ex = null;
        try {
            System.out.println("waiting for all messages to be received");
            this.barrier.await(11250L, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            ex = e;
        }
        long stop = System.currentTimeMillis();
        long diff = stop - start;
        System.out.println("Total time: " + diff + " ms\n");
        this.checkFIFO(r1);
        this.checkFIFO(r2);
        this.checkFIFO(r3);
        if (ex != null) {
            throw ex;
        }
    }

    private void checkFIFO(MyReceiver r) {
        List<Pair<Address, Integer>> msgs = r.getMessages();
        HashMap map = new HashMap();
        for (Pair<Address, Integer> p : msgs) {
            Address sender = (Address)p.key;
            LinkedList list = (LinkedList)map.get(sender);
            if (list == null) {
                list = new LinkedList();
                map.put(sender, list);
            }
            list.add(p.val);
        }
        boolean fifo = true;
        LinkedList<Address> incorrect_receivers = new LinkedList<Address>();
        System.out.println("Checking FIFO for " + r.getName() + ":");
        for (Address addr : map.keySet()) {
            List list = (List)map.get(addr);
            FifoOrderTest.print(addr, list);
            if (FifoOrderTest.verifyFIFO(list)) continue;
            fifo = false;
            incorrect_receivers.add(addr);
        }
        System.out.print("\n");
        if (!fifo) assert (false) : "The following receivers didn't receive all messages in FIFO order: " + incorrect_receivers;
    }

    private static boolean verifyFIFO(List<Integer> list) {
        LinkedList<Integer> list2 = new LinkedList<Integer>(list);
        Collections.sort(list2);
        return ((Object)list).equals(list2);
    }

    private static void print(Address addr, List<Integer> list) {
        StringBuilder sb = new StringBuilder();
        sb.append(addr).append(": ");
        for (Integer i : list) {
            sb.append(i).append(" ");
        }
        System.out.println(sb);
    }

    private static void modifyDefaultThreadPool(JChannel ch1) {
        TP transport = ch1.getProtocolStack().getTransport();
        ThreadPoolExecutor default_pool = (ThreadPoolExecutor)transport.getDefaultThreadPool();
        if (default_pool != null) {
            default_pool.setCorePoolSize(1);
            default_pool.setMaximumPoolSize(100);
        }
        transport.setThreadPoolQueueEnabled(false);
    }

    private class MyReceiver
    extends ReceiverAdapter {
        String name;
        final List<Pair<Address, Integer>> msgs = new LinkedList<Pair<Address, Integer>>();
        AtomicInteger count = new AtomicInteger(0);

        public MyReceiver(String name) {
            this.name = name;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void receive(Message msg) {
            Util.sleep(100L);
            Pair<Address, Integer> pair = new Pair<Address, Integer>(msg.getSrc(), (Integer)msg.getObject());
            List<Pair<Address, Integer>> list = this.msgs;
            synchronized (list) {
                this.msgs.add(pair);
            }
            if (this.count.incrementAndGet() >= 75) {
                System.out.println("[" + this.name + "]: received all messages (" + this.count.get() + ")");
                try {
                    FifoOrderTest.this.barrier.await();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public List<Pair<Address, Integer>> getMessages() {
            return this.msgs;
        }

        public String getName() {
            return this.name;
        }
    }

    private class Pair<K, V> {
        K key;
        V val;

        public Pair(K key, V val) {
            this.key = key;
            this.val = val;
        }

        public String toString() {
            return this.key + "::" + this.val;
        }
    }

    private class Sender
    implements Runnable {
        final Channel ch;
        final Address local_addr;

        public Sender(Channel ch) {
            this.ch = ch;
            this.local_addr = ch.getAddress();
        }

        @Override
        public void run() {
            try {
                FifoOrderTest.this.barrier.await();
            }
            catch (Throwable t) {
                return;
            }
            for (int i = 1; i <= 25; ++i) {
                Message msg = new Message(null, null, new Integer(i));
                try {
                    this.ch.send(msg);
                    continue;
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

