001: package org.jgroups.tests;
002:
003: import org.jgroups.*;
004: import org.jgroups.blocks.RpcDispatcher;
005:
006: /*
007: * @author Bob Stevenson - HAMMER
008: * @author Ananda Bollu - FLOW_CONTROL
009: */
010: public class HammerListener implements ChannelListener,
011: MembershipListener {
012: private static JChannel channel = null;
013: private static int SEND_COUNT = 100000;
014: private static int counter = 0;
015: private static long startTime = 0;
016: static {
017: initCommChannel();
018: }
019:
020: public void channelConnected(Channel ch) {
021: }
022:
023: public void channelDisconnected(Channel ch) {
024: }
025:
026: public void channelClosed(Channel ch) {
027: }
028:
029: public void channelShunned() {
030: }
031:
032: public void channelReconnected(Address addr) {
033: }
034:
035: /**
036: * this class initializes the communication channel to the broadcast
037: * group defined, this is a two-way communication channel with full error-recovery
038: * auto-resend capabilities and group auto-discovery built in it uses udp multi-cast, where multiple users can
039: * all listen for broadcasts on the same port Everyone that's interested in these messages, just joins the group
040: * and they will receive these messages
041: */
042: static private void initCommChannel() {
043: // preload all the static ip's, we only do this once, of course
044: String props = "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32;"
045: + "ucast_recv_buf_size=16000;ucast_send_buf_size=16000;"
046: + "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"
047: + "PING(timeout=2000;num_initial_members=3):"
048: + "MERGE2(min_interval=5000;max_interval=10000):"
049: + "FD:"
050: + "VERIFY_SUSPECT(timeout=1500):"
051: + "pbcast.STABLE(desired_avg_gossip=10000):"
052: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=1000,1500,2000,3000;max_xmit_size=8192):"
053: + "UNICAST(timeout=1000,1500,2000,3000):"
054: + "FLOW_CONTROL(window_size=1000;fwd_mrgn=200;rttweight=0.125;reduction=0.75;expansion=1.25):"
055: + "FRAG(frag_size=8192;down_thread=false;up_thread=false):"
056: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true):"
057: + "pbcast.STATE_TRANSFER";
058: try {
059: channel = new JChannel(props);
060: HammerListener listener = new HammerListener();
061: new RpcDispatcher(channel, null, listener, listener);
062: channel.connect("BOSGroup");
063:
064: } catch (org.jgroups.ChannelException ce) {
065: System.err.println("Channel Error" + ce);
066: }
067: }
068:
069: static public int printnum(Integer number) throws Exception {
070: counter++;
071: if (counter >= SEND_COUNT) {
072: long endTime = System.currentTimeMillis();
073: System.out.println("Messages received " + counter);
074: System.out.println("Messages succesfully trasmitted in "
075: + (endTime - startTime));
076: System.exit(0);
077: }
078: return number.intValue() * 2;
079: }
080:
081: public void viewAccepted(View new_view) {
082: System.out.println("Accepted view (" + new_view.size()
083: + new_view.getMembers() + ')');
084: }
085:
086: public void suspect(Address suspected_mbr) {
087: System.out.println("-- suspected " + suspected_mbr);
088: }
089:
090: public void block() {
091: ;
092: }
093:
094: /** creates a new commandlistener and kick start's the thread */
095: public HammerListener() {
096: System.out.println("HammerListener loaded");
097: }
098:
099: static public void main(String[] args) {
100: startTime = System.currentTimeMillis();
101: System.out.println("startTime " + startTime);
102: for (int i = 0; i < SEND_COUNT; i++) {
103: HammerSender.executeDistributedCommand();
104: }
105: }
106:
107: /** lets' clean up when we are done */
108: protected void finalizer() {
109: channel.disconnect();
110: channel.close();
111: }
112:
113: }
|