Source Code Cross Referenced for STREAMING_STATE_TRANSFER.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » pbcast » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols.pbcast 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        package org.jgroups.protocols.pbcast;
0002:
0003:        import java.io.BufferedInputStream;
0004:        import java.io.BufferedOutputStream;
0005:        import java.io.DataInputStream;
0006:        import java.io.DataOutputStream;
0007:        import java.io.IOException;
0008:        import java.io.InputStream;
0009:        import java.io.ObjectInput;
0010:        import java.io.ObjectInputStream;
0011:        import java.io.ObjectOutput;
0012:        import java.io.ObjectOutputStream;
0013:        import java.io.OutputStream;
0014:        import java.net.InetAddress;
0015:        import java.net.InetSocketAddress;
0016:        import java.net.ServerSocket;
0017:        import java.net.Socket;
0018:        import java.net.UnknownHostException;
0019:        import java.util.HashMap;
0020:        import java.util.HashSet;
0021:        import java.util.Iterator;
0022:        import java.util.Map;
0023:        import java.util.Properties;
0024:        import java.util.Set;
0025:        import java.util.Vector;
0026:
0027:        import org.jgroups.Address;
0028:        import org.jgroups.Channel;
0029:        import org.jgroups.Event;
0030:        import org.jgroups.Global;
0031:        import org.jgroups.Header;
0032:        import org.jgroups.Message;
0033:        import org.jgroups.TimeoutException;
0034:        import org.jgroups.View;
0035:        import org.jgroups.stack.IpAddress;
0036:        import org.jgroups.stack.Protocol;
0037:        import org.jgroups.stack.StateTransferInfo;
0038:        import org.jgroups.util.Promise;
0039:        import org.jgroups.util.Streamable;
0040:        import org.jgroups.util.Util;
0041:
0042:        import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
0043:        import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
0044:
0045:        /**
0046:         * <code>STREAMING_STATE_TRANSFER</code>, as its name implies, allows a streaming 
0047:         * state transfer between two channel instances. 
0048:         * 
0049:         * <p>
0050:         * 
0051:         * Major advantage of this approach is that transfering application state to a 
0052:         * joining member of a group does not entail loading of the complete application 
0053:         * state into memory. Application state, for example, might be located entirely 
0054:         * on some form of disk based storage. The default <code>STATE_TRANSFER</code> 
0055:         * requires this state to be loaded entirely into memory before being transferred 
0056:         * to a group member while <code>STREAMING_STATE_TRANSFER</code> does not. 
0057:         * Thus <code>STREAMING_STATE_TRANSFER</code> protocol is able to transfer 
0058:         * application state that is very large (>1Gb) without a likelihood of such transfer 
0059:         * resulting in OutOfMemoryException.
0060:         * 
0061:         * <p>
0062:         * 
0063:         * Channel instance can be configured with either <code>STREAMING_STATE_TRANSFER</code> 
0064:         * or <code>STATE_TRANSFER</code> but not both protocols at the same time. 
0065:         * 
0066:         * <p>
0067:         * 
0068:         * In order to process streaming state transfer an application has to implement 
0069:         * <code>ExtendedMessageListener</code> if it is using channel in a push style 
0070:         * mode or it has to process <code>StreamingSetStateEvent</code> and 
0071:         * <code>StreamingGetStateEvent</code> if it is using channel in a pull style mode.    
0072:         * 
0073:         * 
0074:         * @author Vladimir Blagojevic
0075:         * @see org.jgroups.ExtendedMessageListener
0076:         * @see org.jgroups.StreamingGetStateEvent
0077:         * @see org.jgroups.StreamingSetStateEvent
0078:         * @see org.jgroups.protocols.pbcast.STATE_TRANSFER
0079:         * @since 2.4
0080:         * 
0081:         * @version $Id$
0082:         * 
0083:         */
0084:        public class STREAMING_STATE_TRANSFER extends Protocol {
0085:            private Address local_addr = null;
0086:
0087:            private final Vector members = new Vector();
0088:
0089:            private final Map state_requesters = new HashMap();
0090:
0091:            private Digest digest = null;
0092:
0093:            private final HashMap map = new HashMap(); // to store configuration information
0094:
0095:            private int num_state_reqs = 0;
0096:
0097:            private long num_bytes_sent = 0;
0098:
0099:            private double avg_state_size = 0;
0100:
0101:            private final static String NAME = "STREAMING_STATE_TRANSFER";
0102:
0103:            private InetAddress bind_addr;
0104:
0105:            private int bind_port = 0;
0106:
0107:            private StateProviderThreadSpawner spawner;
0108:
0109:            private int max_pool = 5;
0110:
0111:            private long pool_thread_keep_alive;
0112:
0113:            private int socket_buffer_size = 8 * 1024;
0114:
0115:            private boolean use_reading_thread;
0116:
0117:            private final Promise flush_promise = new Promise();;
0118:
0119:            private volatile boolean use_flush;
0120:
0121:            private long flush_timeout = 4000;
0122:
0123:            private final Object poolLock = new Object();
0124:
0125:            private int threadCounter;
0126:
0127:            private volatile boolean flushProtocolInStack = false;
0128:
0129:            public final String getName() {
0130:                return NAME;
0131:            }
0132:
0133:            public int getNumberOfStateRequests() {
0134:                return num_state_reqs;
0135:            }
0136:
0137:            public long getNumberOfStateBytesSent() {
0138:                return num_bytes_sent;
0139:            }
0140:
0141:            public double getAverageStateSize() {
0142:                return avg_state_size;
0143:            }
0144:
0145:            public Vector requiredDownServices() {
0146:                Vector retval = new Vector();
0147:                retval.addElement(new Integer(Event.GET_DIGEST_STATE));
0148:                retval.addElement(new Integer(Event.SET_DIGEST));
0149:                return retval;
0150:            }
0151:
0152:            public void resetStats() {
0153:                super .resetStats();
0154:                num_state_reqs = 0;
0155:                num_bytes_sent = 0;
0156:                avg_state_size = 0;
0157:            }
0158:
0159:            public boolean setProperties(Properties props) {
0160:                super .setProperties(props);
0161:                use_flush = Util.parseBoolean(props, "use_flush", false);
0162:                flush_timeout = Util.parseLong(props, "flush_timeout",
0163:                        flush_timeout);
0164:
0165:                try {
0166:                    bind_addr = Util.parseBindAddress(props, "bind_addr");
0167:                } catch (UnknownHostException e) {
0168:                    log.error("(bind_addr): host " + e.getLocalizedMessage()
0169:                            + " not known");
0170:                    return false;
0171:                }
0172:                bind_port = Util.parseInt(props, "start_port", 0);
0173:                socket_buffer_size = Util.parseInt(props, "socket_buffer_size",
0174:                        8 * 1024); //8K
0175:                max_pool = Util.parseInt(props, "max_pool", 5);
0176:                pool_thread_keep_alive = Util.parseLong(props,
0177:                        "pool_thread_keep_alive", 1000 * 30); //30 sec
0178:                use_reading_thread = Util.parseBoolean(props,
0179:                        "use_reading_thread", false);
0180:                if (props.size() > 0) {
0181:                    log.error("the following properties are not recognized: "
0182:                            + props);
0183:
0184:                    return false;
0185:                }
0186:                return true;
0187:            }
0188:
0189:            public void init() throws Exception {
0190:                map.put("state_transfer", Boolean.TRUE);
0191:                map.put("protocol_class", getClass().getName());
0192:            }
0193:
0194:            public void start() throws Exception {
0195:                passUp(new Event(Event.CONFIG, map));
0196:                if (!flushProtocolInStack && use_flush) {
0197:                    log
0198:                            .warn("use_flush is true, however, FLUSH protocol not found in stack.");
0199:                    use_flush = false;
0200:                }
0201:            }
0202:
0203:            public void stop() {
0204:                super .stop();
0205:                if (spawner != null) {
0206:                    spawner.stop();
0207:                }
0208:            }
0209:
0210:            public void up(Event evt) {
0211:                switch (evt.getType()) {
0212:                case Event.BECOME_SERVER:
0213:                    break;
0214:
0215:                case Event.SET_LOCAL_ADDRESS:
0216:                    local_addr = (Address) evt.getArg();
0217:                    break;
0218:
0219:                case Event.TMP_VIEW:
0220:                case Event.VIEW_CHANGE:
0221:                    handleViewChange((View) evt.getArg());
0222:                    break;
0223:
0224:                case Event.GET_DIGEST_STATE_OK:
0225:                    synchronized (state_requesters) {
0226:                        digest = (Digest) evt.getArg();
0227:                        if (log.isDebugEnabled())
0228:                            log.debug("GET_DIGEST_STATE_OK: digest is "
0229:                                    + digest);
0230:                    }
0231:                    respondToStateRequester();
0232:                    return;
0233:
0234:                case Event.MSG:
0235:                    Message msg = (Message) evt.getArg();
0236:                    StateHeader hdr = (StateHeader) msg.removeHeader(getName());
0237:                    if (hdr != null) {
0238:                        switch (hdr.type) {
0239:                        case StateHeader.STATE_REQ:
0240:                            handleStateReq(hdr);
0241:                            break;
0242:                        case StateHeader.STATE_RSP:
0243:                            handleStateRsp(hdr);
0244:                            break;
0245:                        case StateHeader.STATE_REMOVE_REQUESTER:
0246:                            removeFromStateRequesters(hdr.sender, hdr.state_id);
0247:                            break;
0248:                        default:
0249:                            if (log.isErrorEnabled())
0250:                                log.error("type " + hdr.type
0251:                                        + " not known in StateHeader");
0252:                            break;
0253:                        }
0254:                        return;
0255:                    }
0256:                    break;
0257:                case Event.CONFIG:
0258:                    Map config = (Map) evt.getArg();
0259:                    if (bind_addr == null
0260:                            && (config != null && config
0261:                                    .containsKey("bind_addr"))) {
0262:                        bind_addr = (InetAddress) config.get("bind_addr");
0263:                        if (log.isDebugEnabled())
0264:                            log.debug("using bind_addr from CONFIG event "
0265:                                    + bind_addr);
0266:                    }
0267:                    break;
0268:                }
0269:                passUp(evt);
0270:            }
0271:
0272:            public void down(Event evt) {
0273:                Address target;
0274:                StateTransferInfo info;
0275:
0276:                switch (evt.getType()) {
0277:
0278:                case Event.TMP_VIEW:
0279:                case Event.VIEW_CHANGE:
0280:                    handleViewChange((View) evt.getArg());
0281:                    break;
0282:
0283:                case Event.GET_STATE:
0284:                    info = (StateTransferInfo) evt.getArg();
0285:                    if (info.target == null) {
0286:                        target = determineCoordinator();
0287:                    } else {
0288:                        target = info.target;
0289:                        if (target.equals(local_addr)) {
0290:                            if (log.isErrorEnabled())
0291:                                log
0292:                                        .error("GET_STATE: cannot fetch state from myself !");
0293:                            target = null;
0294:                        }
0295:                    }
0296:                    if (target == null) {
0297:                        if (log.isDebugEnabled())
0298:                            log.debug("GET_STATE: first member (no state)");
0299:                        passUp(new Event(Event.GET_STATE_OK,
0300:                                new StateTransferInfo()));
0301:                    } else {
0302:                        boolean successfulFlush = false;
0303:                        if (use_flush) {
0304:                            successfulFlush = startFlush(flush_timeout, 5);
0305:                        }
0306:                        if (successfulFlush) {
0307:                            log.debug("Successful flush at " + local_addr);
0308:                        }
0309:                        Message state_req = new Message(target, null, null);
0310:                        state_req.putHeader(NAME, new StateHeader(
0311:                                StateHeader.STATE_REQ, local_addr,
0312:                                info.state_id));
0313:                        String stateRequested = info.state_id == null ? "full"
0314:                                : info.state_id;
0315:                        if (log.isDebugEnabled())
0316:                            log.debug("Member " + local_addr + " asking "
0317:                                    + target + " for " + stateRequested
0318:                                    + " state");
0319:
0320:                        // suspend sending and handling of mesage garbage collection gossip messages,
0321:                        // fixes bugs #943480 and #938584). Wake up when state has been received
0322:                        if (log.isTraceEnabled())
0323:                            log.trace("passing down a SUSPEND_STABLE event");
0324:                        passDown(new Event(Event.SUSPEND_STABLE, new Long(
0325:                                info.timeout)));
0326:                        passDown(new Event(Event.MSG, state_req));
0327:                    }
0328:                    return; // don't pass down any further !
0329:
0330:                case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED:
0331:                    if (use_flush) {
0332:                        stopFlush();
0333:                    }
0334:
0335:                    if (log.isTraceEnabled())
0336:                        log.trace("STATE_TRANSFER_INPUTSTREAM_CLOSED received");
0337:                    //resume sending and handling of message garbage collection gossip messages,
0338:                    // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage
0339:                    // collection protocol (e.g. STABLE)
0340:                    if (log.isTraceEnabled())
0341:                        log.trace("passing down a RESUME_STABLE event");
0342:                    passDown(new Event(Event.RESUME_STABLE));
0343:                    return;
0344:                case Event.SUSPEND_OK:
0345:                    if (use_flush) {
0346:                        flush_promise.setResult(Boolean.TRUE);
0347:                    }
0348:                    break;
0349:                case Event.SUSPEND_FAILED:
0350:                    if (use_flush) {
0351:                        flush_promise.setResult(Boolean.FALSE);
0352:                    }
0353:                    break;
0354:                case Event.CONFIG:
0355:                    Map config = (Map) evt.getArg();
0356:                    if (config != null && config.containsKey("flush_timeout")) {
0357:                        Long ftimeout = (Long) config.get("flush_timeout");
0358:                        use_flush = true;
0359:                        flush_timeout = ftimeout.longValue();
0360:                    }
0361:                    if ((config != null && !config
0362:                            .containsKey("flush_suported"))) {
0363:                        flushProtocolInStack = true;
0364:                    }
0365:                    break;
0366:
0367:                }
0368:
0369:                passDown(evt); // pass on to the layer below us
0370:            }
0371:
0372:            /* --------------------------- Private Methods -------------------------------- */
0373:
0374:            /**
0375:             * When FLUSH is used we do not need to pass digests between members
0376:             *
0377:             * see JGroups/doc/design/PArtialStateTransfer.txt
0378:             * see JGroups/doc/design/FLUSH.txt
0379:             *
0380:             * @return true if use of digests is required, false otherwise
0381:             */
0382:            private boolean isDigestNeeded() {
0383:                return !use_flush;
0384:            }
0385:
0386:            private void respondToStateRequester() {
0387:
0388:                // setup the plumbing if needed
0389:                if (spawner == null) {
0390:                    ServerSocket serverSocket = Util.createServerSocket(
0391:                            bind_addr, bind_port);
0392:                    spawner = new StateProviderThreadSpawner(setupThreadPool(),
0393:                            serverSocket);
0394:                    new Thread(Util.getGlobalThreadGroup(), spawner,
0395:                            "StateProviderThreadSpawner").start();
0396:                }
0397:
0398:                synchronized (state_requesters) {
0399:                    if (state_requesters.isEmpty()) {
0400:                        if (log.isWarnEnabled())
0401:                            log
0402:                                    .warn("Should be responding to state requester, but there are no requesters !");
0403:                        return;
0404:                    }
0405:
0406:                    if (digest == null && isDigestNeeded()) {
0407:                        if (log.isWarnEnabled()) {
0408:                            log
0409:                                    .warn("Should be responding to state requester, but there is no digest !");
0410:                        }
0411:                    } else if (digest != null && isDigestNeeded()) {
0412:                        digest = digest.copy();
0413:                    }
0414:
0415:                    if (log.isTraceEnabled())
0416:                        log.trace("Iterating state requesters "
0417:                                + state_requesters);
0418:
0419:                    for (Iterator it = state_requesters.keySet().iterator(); it
0420:                            .hasNext();) {
0421:                        String tmp_state_id = (String) it.next();
0422:                        Set requesters = (Set) state_requesters
0423:                                .get(tmp_state_id);
0424:                        for (Iterator iter = requesters.iterator(); iter
0425:                                .hasNext();) {
0426:                            Address requester = (Address) iter.next();
0427:                            Message state_rsp = new Message(requester);
0428:                            StateHeader hdr = new StateHeader(
0429:                                    StateHeader.STATE_RSP, local_addr, spawner
0430:                                            .getServerSocketAddress(),
0431:                                    isDigestNeeded() ? digest : null,
0432:                                    tmp_state_id);
0433:                            state_rsp.putHeader(NAME, hdr);
0434:
0435:                            if (log.isTraceEnabled())
0436:                                log.trace("Responding to state requester "
0437:                                        + requester + " with address "
0438:                                        + spawner.getServerSocketAddress()
0439:                                        + " and digest " + digest);
0440:                            passDown(new Event(Event.MSG, state_rsp));
0441:                            if (stats) {
0442:                                num_state_reqs++;
0443:                            }
0444:                        }
0445:                    }
0446:                }
0447:            }
0448:
0449:            private boolean startFlush(long timeout, int numberOfAttempts) {
0450:                boolean successfulFlush = false;
0451:                flush_promise.reset();
0452:                passUp(new Event(Event.SUSPEND));
0453:                try {
0454:                    Boolean r = (Boolean) flush_promise
0455:                            .getResultWithTimeout(timeout);
0456:                    successfulFlush = r.booleanValue();
0457:                } catch (TimeoutException e) {
0458:                    log.warn("Initiator of flush and state requesting member "
0459:                            + local_addr
0460:                            + " timed out waiting for flush responses after "
0461:                            + flush_timeout + " msec");
0462:                }
0463:
0464:                if (!successfulFlush && numberOfAttempts > 0) {
0465:                    long backOffSleepTime = Util.random(5000);
0466:                    if (log.isInfoEnabled())
0467:                        log.info("Flush in progress detected at " + local_addr
0468:                                + ". Backing off for " + backOffSleepTime
0469:                                + " ms. Attempts left " + numberOfAttempts);
0470:
0471:                    Util.sleepRandom(backOffSleepTime);
0472:                    successfulFlush = startFlush(flush_timeout,
0473:                            --numberOfAttempts);
0474:                }
0475:                return successfulFlush;
0476:            }
0477:
0478:            private void stopFlush() {
0479:                passUp(new Event(Event.RESUME));
0480:            }
0481:
0482:            private PooledExecutor setupThreadPool() {
0483:                PooledExecutor threadPool = new PooledExecutor(max_pool);
0484:                threadPool.waitWhenBlocked();
0485:                threadPool.setMinimumPoolSize(1);
0486:                threadPool.setKeepAliveTime(pool_thread_keep_alive);
0487:                threadPool.setThreadFactory(new ThreadFactory() {
0488:
0489:                    public Thread newThread(final Runnable command) {
0490:                        synchronized (poolLock) {
0491:                            threadCounter++;
0492:                        }
0493:                        return new Thread(Util.getGlobalThreadGroup(),
0494:                                "STREAMING_STATE_TRANSFER.poolid="
0495:                                        + threadCounter) {
0496:                            public void run() {
0497:                                if (log.isTraceEnabled()) {
0498:                                    log.trace(Thread.currentThread()
0499:                                            + " started.");
0500:                                }
0501:                                command.run();
0502:                                if (log.isTraceEnabled()) {
0503:                                    log.trace(Thread.currentThread()
0504:                                            + " stopped.");
0505:                                }
0506:                            }
0507:                        };
0508:                    }
0509:
0510:                });
0511:                return threadPool;
0512:            }
0513:
0514:            private Address determineCoordinator() {
0515:                Address ret = null;
0516:                synchronized (members) {
0517:                    if (!members.isEmpty()) {
0518:                        for (int i = 0; i < members.size(); i++)
0519:                            if (!local_addr.equals(members.elementAt(i)))
0520:                                return (Address) members.elementAt(i);
0521:                    }
0522:                }
0523:                return ret;
0524:            }
0525:
0526:            private void handleViewChange(View v) {
0527:                synchronized (members) {
0528:                    members.clear();
0529:                    members.addAll(v.getMembers());
0530:                }
0531:            }
0532:
0533:            private void handleStateReq(StateHeader hdr) {
0534:                Object sender = hdr.sender;
0535:                if (sender == null) {
0536:                    if (log.isErrorEnabled())
0537:                        log.error("sender is null !");
0538:                    return;
0539:                }
0540:                String id = hdr.state_id;
0541:                synchronized (state_requesters) {
0542:                    boolean empty = state_requesters.isEmpty();
0543:                    Set requesters = (Set) state_requesters.get(id);
0544:                    if (requesters == null) {
0545:                        requesters = new HashSet();
0546:                    }
0547:                    requesters.add(sender);
0548:                    state_requesters.put(id, requesters);
0549:
0550:                    if (!isDigestNeeded()) {
0551:                        respondToStateRequester();
0552:                    } else if (empty) {
0553:                        digest = null;
0554:                        if (log.isTraceEnabled())
0555:                            log.trace("passing down GET_DIGEST_STATE");
0556:                        passDown(new Event(Event.GET_DIGEST_STATE));
0557:                    }
0558:                }
0559:            }
0560:
0561:            void handleStateRsp(StateHeader hdr) {
0562:                Digest tmp_digest = hdr.my_digest;
0563:                if (isDigestNeeded()) {
0564:                    if (tmp_digest == null) {
0565:                        if (log.isWarnEnabled())
0566:                            log.warn("digest received from " + hdr.sender
0567:                                    + " is null, skipping setting digest !");
0568:                    } else {
0569:                        // set the digest (e.g.in NAKACK)
0570:                        passDown(new Event(Event.SET_DIGEST, tmp_digest));
0571:                    }
0572:                }
0573:                connectToStateProvider(hdr);
0574:            }
0575:
0576:            void removeFromStateRequesters(Address address, String state_id) {
0577:                synchronized (state_requesters) {
0578:                    Set requesters = (Set) state_requesters.get(state_id);
0579:                    if (requesters != null && !requesters.isEmpty()) {
0580:                        boolean removed = requesters.remove(address);
0581:                        if (log.isTraceEnabled()) {
0582:                            log
0583:                                    .trace("Attempted to clear " + address
0584:                                            + " from requesters, successful="
0585:                                            + removed);
0586:                        }
0587:                        if (requesters.isEmpty()) {
0588:                            state_requesters.remove(state_id);
0589:                            if (log.isTraceEnabled()) {
0590:                                log.trace("Cleared all requesters for state "
0591:                                        + state_id + ",state_requesters="
0592:                                        + state_requesters);
0593:                            }
0594:                        }
0595:                    }
0596:                }
0597:            }
0598:
0599:            private void connectToStateProvider(StateHeader hdr) {
0600:                IpAddress address = hdr.bind_addr;
0601:                String tmp_state_id = hdr.getStateId();
0602:                StreamingInputStreamWrapper wrapper = null;
0603:                StateTransferInfo sti = null;
0604:                final Socket socket = new Socket();
0605:                try {
0606:                    socket.bind(new InetSocketAddress(bind_addr, 0));
0607:                    int bufferSize = socket.getReceiveBufferSize();
0608:                    socket.setReceiveBufferSize(socket_buffer_size);
0609:                    if (log.isDebugEnabled())
0610:                        log.debug("Connecting to state provider "
0611:                                + address.getIpAddress() + ":"
0612:                                + address.getPort()
0613:                                + ", original buffer size was " + bufferSize
0614:                                + " and was reset to "
0615:                                + socket.getReceiveBufferSize());
0616:                    socket.connect(new InetSocketAddress(
0617:                            address.getIpAddress(), address.getPort()));
0618:                    if (log.isDebugEnabled())
0619:                        log
0620:                                .debug("Connected to state provider, my end of the socket is "
0621:                                        + socket.getLocalAddress()
0622:                                        + ":"
0623:                                        + socket.getLocalPort()
0624:                                        + " passing inputstream up...");
0625:
0626:                    //write out our state_id and address so state provider can clear this request
0627:                    ObjectOutputStream out = new ObjectOutputStream(socket
0628:                            .getOutputStream());
0629:                    out.writeObject(tmp_state_id);
0630:                    out.writeObject(local_addr);
0631:
0632:                    wrapper = new StreamingInputStreamWrapper(socket);
0633:                    sti = new StateTransferInfo(hdr.sender, wrapper,
0634:                            tmp_state_id);
0635:                } catch (IOException e) {
0636:                    if (log.isWarnEnabled()) {
0637:                        log.warn(
0638:                                "State reader socket thread spawned abnormaly",
0639:                                e);
0640:                    }
0641:
0642:                    //pass null stream up so that JChannel.getState() returns false 
0643:                    InputStream is = null;
0644:                    sti = new StateTransferInfo(hdr.sender, is, tmp_state_id);
0645:                } finally {
0646:                    if (!socket.isConnected()) {
0647:                        if (log.isWarnEnabled())
0648:                            log
0649:                                    .warn("Could not connect to state provider. Closing socket...");
0650:                        try {
0651:                            if (wrapper != null) {
0652:                                wrapper.close();
0653:                            } else {
0654:                                socket.close();
0655:                            }
0656:
0657:                        } catch (IOException e) {
0658:                        }
0659:                        //since socket did not connect properly we have to
0660:                        //clear our entry in state providers hashmap "manually"
0661:                        Message m = new Message(hdr.sender);
0662:                        StateHeader mhdr = new StateHeader(
0663:                                StateHeader.STATE_REMOVE_REQUESTER, local_addr,
0664:                                tmp_state_id);
0665:                        m.putHeader(NAME, mhdr);
0666:                        passDown(new Event(Event.MSG, m));
0667:                    }
0668:                    passStreamUp(sti);
0669:                }
0670:            }
0671:
0672:            private void passStreamUp(final StateTransferInfo sti) {
0673:                Runnable readingThread = new Runnable() {
0674:                    public void run() {
0675:                        passUp(new Event(Event.STATE_TRANSFER_INPUTSTREAM, sti));
0676:                    }
0677:                };
0678:                if (use_reading_thread) {
0679:                    new Thread(Util.getGlobalThreadGroup(), readingThread,
0680:                            "STREAMING_STATE_TRANSFER.reader").start();
0681:
0682:                } else {
0683:                    readingThread.run();
0684:                }
0685:            }
0686:
0687:            /*
0688:             * ------------------------ End of Private Methods
0689:             * ------------------------------
0690:             */
0691:
0692:            private class StateProviderThreadSpawner implements  Runnable {
0693:                PooledExecutor pool;
0694:
0695:                ServerSocket serverSocket;
0696:
0697:                IpAddress address;
0698:
0699:                Thread runner;
0700:
0701:                volatile boolean running = true;
0702:
0703:                public StateProviderThreadSpawner(PooledExecutor pool,
0704:                        ServerSocket stateServingSocket) {
0705:                    if (pool == null || stateServingSocket == null) {
0706:                        throw new IllegalArgumentException(
0707:                                "Cannot create thread pool");
0708:                    }
0709:                    this .pool = pool;
0710:                    this .serverSocket = stateServingSocket;
0711:                    this .address = new IpAddress(
0712:                            STREAMING_STATE_TRANSFER.this .bind_addr,
0713:                            serverSocket.getLocalPort());
0714:                }
0715:
0716:                public void run() {
0717:                    runner = Thread.currentThread();
0718:                    for (; running;) {
0719:                        try {
0720:                            if (log.isDebugEnabled())
0721:                                log
0722:                                        .debug("StateProviderThreadSpawner listening at "
0723:                                                + getServerSocketAddress()
0724:                                                + "...");
0725:                            if (log.isTraceEnabled())
0726:                                log.trace("Pool has " + pool.getPoolSize()
0727:                                        + " active threads");
0728:                            final Socket socket = serverSocket.accept();
0729:                            pool.execute(new Runnable() {
0730:                                public void run() {
0731:                                    if (log.isDebugEnabled())
0732:                                        log
0733:                                                .debug("Accepted request for state transfer from "
0734:                                                        + socket
0735:                                                                .getInetAddress()
0736:                                                        + ":"
0737:                                                        + socket.getPort()
0738:                                                        + " handing of to PooledExecutor thread");
0739:                                    new StateProviderHandler().process(socket);
0740:                                }
0741:                            });
0742:
0743:                        } catch (IOException e) {
0744:                            if (log.isWarnEnabled()) {
0745:                                //we get this exception when we close server socket
0746:                                //exclude that case
0747:                                if (!serverSocket.isClosed()) {
0748:                                    log
0749:                                            .warn(
0750:                                                    "Spawning socket from server socket finished abnormaly",
0751:                                                    e);
0752:                                }
0753:                            }
0754:                        } catch (InterruptedException e) {
0755:                            // should not happen
0756:                        }
0757:                    }
0758:                }
0759:
0760:                public IpAddress getServerSocketAddress() {
0761:                    return address;
0762:                }
0763:
0764:                public void stop() {
0765:                    running = false;
0766:                    try {
0767:                        if (!serverSocket.isClosed()) {
0768:                            serverSocket.close();
0769:                        }
0770:                    } catch (IOException e) {
0771:                    } finally {
0772:                        if (log.isDebugEnabled())
0773:                            log
0774:                                    .debug("Waiting for StateProviderThreadSpawner to die ... ");
0775:
0776:                        try {
0777:                            runner.join(3000);
0778:                        } catch (InterruptedException ignored) {
0779:                        }
0780:
0781:                        if (log.isDebugEnabled())
0782:                            log.debug("Shutting the thread pool down... ");
0783:
0784:                        pool.shutdownNow();
0785:                        try {
0786:                            //TODO use some system wide timeout eventually
0787:                            pool.awaitTerminationAfterShutdown(5000);
0788:                        } catch (InterruptedException ignored) {
0789:                        }
0790:                    }
0791:                    if (log.isDebugEnabled())
0792:                        log
0793:                                .debug("Thread pool is shutdown. All pool threads are cleaned up.");
0794:                }
0795:            }
0796:
0797:            private class StateProviderHandler {
0798:                public void process(Socket socket) {
0799:                    StreamingOutputStreamWrapper wrapper = null;
0800:                    ObjectInputStream ois = null;
0801:                    try {
0802:                        int bufferSize = socket.getSendBufferSize();
0803:                        socket.setSendBufferSize(socket_buffer_size);
0804:                        if (log.isDebugEnabled())
0805:                            log
0806:                                    .debug("Running on "
0807:                                            + Thread.currentThread()
0808:                                            + ". Accepted request for state transfer from "
0809:                                            + socket.getInetAddress() + ":"
0810:                                            + socket.getPort()
0811:                                            + ", original buffer size was "
0812:                                            + bufferSize + " and was reset to "
0813:                                            + socket.getSendBufferSize()
0814:                                            + ", passing outputstream up... ");
0815:
0816:                        //read out state requesters state_id and address and clear this request
0817:                        ois = new ObjectInputStream(socket.getInputStream());
0818:                        String state_id = (String) ois.readObject();
0819:                        Address stateRequester = (Address) ois.readObject();
0820:                        removeFromStateRequesters(stateRequester, state_id);
0821:
0822:                        wrapper = new StreamingOutputStreamWrapper(socket);
0823:                        StateTransferInfo sti = new StateTransferInfo(
0824:                                stateRequester, wrapper, state_id);
0825:                        passUp(new Event(Event.STATE_TRANSFER_OUTPUTSTREAM, sti));
0826:                    } catch (IOException e) {
0827:                        if (log.isWarnEnabled()) {
0828:                            log
0829:                                    .warn(
0830:                                            "State writer socket thread spawned abnormaly",
0831:                                            e);
0832:                        }
0833:                    } catch (ClassNotFoundException e) {
0834:                        //thrown by ois.readObject()
0835:                        //should never happen since String/Address are core classes
0836:                    } finally {
0837:                        if (socket != null && !socket.isConnected()) {
0838:                            if (log.isWarnEnabled())
0839:                                log
0840:                                        .warn("Accepted request for state transfer but socket "
0841:                                                + socket
0842:                                                + " not connected properly. Closing it...");
0843:                            try {
0844:                                if (wrapper != null) {
0845:                                    wrapper.close();
0846:                                } else {
0847:                                    socket.close();
0848:                                }
0849:                            } catch (IOException e) {
0850:                            }
0851:                        }
0852:                    }
0853:                }
0854:            }
0855:
0856:            private class StreamingInputStreamWrapper extends InputStream {
0857:
0858:                private Socket inputStreamOwner;
0859:
0860:                private InputStream delegate;
0861:
0862:                private Channel channelOwner;
0863:
0864:                public StreamingInputStreamWrapper(Socket inputStreamOwner)
0865:                        throws IOException {
0866:                    super ();
0867:                    this .inputStreamOwner = inputStreamOwner;
0868:                    this .delegate = new BufferedInputStream(inputStreamOwner
0869:                            .getInputStream());
0870:                    this .channelOwner = stack.getChannel();
0871:                }
0872:
0873:                public int available() throws IOException {
0874:                    return delegate.available();
0875:                }
0876:
0877:                public void close() throws IOException {
0878:                    if (log.isDebugEnabled()) {
0879:                        log.debug("State reader " + inputStreamOwner
0880:                                + " is closing the socket ");
0881:                    }
0882:                    if (channelOwner != null && channelOwner.isConnected()) {
0883:                        channelOwner.down(new Event(
0884:                                Event.STATE_TRANSFER_INPUTSTREAM_CLOSED));
0885:                    }
0886:                    inputStreamOwner.close();
0887:                }
0888:
0889:                public synchronized void mark(int readlimit) {
0890:                    delegate.mark(readlimit);
0891:                }
0892:
0893:                public boolean markSupported() {
0894:                    return delegate.markSupported();
0895:                }
0896:
0897:                public int read() throws IOException {
0898:                    return delegate.read();
0899:                }
0900:
0901:                public int read(byte[] b, int off, int len) throws IOException {
0902:                    return delegate.read(b, off, len);
0903:                }
0904:
0905:                public int read(byte[] b) throws IOException {
0906:                    return delegate.read(b);
0907:                }
0908:
0909:                public synchronized void reset() throws IOException {
0910:                    delegate.reset();
0911:                }
0912:
0913:                public long skip(long n) throws IOException {
0914:                    return delegate.skip(n);
0915:                }
0916:            }
0917:
0918:            private class StreamingOutputStreamWrapper extends OutputStream {
0919:                private Socket outputStreamOwner;
0920:
0921:                private OutputStream delegate;
0922:
0923:                private long bytesWrittenCounter = 0;
0924:
0925:                private Channel channelOwner;
0926:
0927:                public StreamingOutputStreamWrapper(Socket outputStreamOwner)
0928:                        throws IOException {
0929:                    super ();
0930:                    this .outputStreamOwner = outputStreamOwner;
0931:                    this .delegate = new BufferedOutputStream(outputStreamOwner
0932:                            .getOutputStream());
0933:                    this .channelOwner = stack.getChannel();
0934:                }
0935:
0936:                public void close() throws IOException {
0937:                    if (log.isDebugEnabled()) {
0938:                        log.debug("State writer " + outputStreamOwner
0939:                                + " is closing the socket ");
0940:                    }
0941:                    try {
0942:                        if (channelOwner != null && channelOwner.isConnected()) {
0943:                            channelOwner.down(new Event(
0944:                                    Event.STATE_TRANSFER_OUTPUTSTREAM_CLOSED));
0945:                        }
0946:                        outputStreamOwner.close();
0947:                    } catch (IOException e) {
0948:                        throw e;
0949:                    } finally {
0950:                        if (stats) {
0951:                            synchronized (state_requesters) {
0952:                                num_bytes_sent += bytesWrittenCounter;
0953:                                avg_state_size = num_bytes_sent
0954:                                        / (double) num_state_reqs;
0955:                            }
0956:                        }
0957:                    }
0958:                }
0959:
0960:                public void flush() throws IOException {
0961:                    delegate.flush();
0962:                }
0963:
0964:                public void write(byte[] b, int off, int len)
0965:                        throws IOException {
0966:                    delegate.write(b, off, len);
0967:                    bytesWrittenCounter += len;
0968:                }
0969:
0970:                public void write(byte[] b) throws IOException {
0971:                    delegate.write(b);
0972:                    if (b != null) {
0973:                        bytesWrittenCounter += b.length;
0974:                    }
0975:                }
0976:
0977:                public void write(int b) throws IOException {
0978:                    delegate.write(b);
0979:                    bytesWrittenCounter += 1;
0980:                }
0981:            }
0982:
0983:            public static class StateHeader extends Header implements 
0984:                    Streamable {
0985:                public static final byte STATE_REQ = 1;
0986:
0987:                public static final byte STATE_RSP = 2;
0988:
0989:                public static final byte STATE_REMOVE_REQUESTER = 3;
0990:
0991:                long id = 0; // state transfer ID (to separate multiple state transfers at the same time)
0992:
0993:                byte type = 0;
0994:
0995:                Address sender; // sender of state STATE_REQ or STATE_RSP
0996:
0997:                Digest my_digest = null; // digest of sender (if type is STATE_RSP)
0998:
0999:                IpAddress bind_addr = null;
1000:
1001:                String state_id = null; // for partial state transfer
1002:
1003:                public StateHeader() { // for externalization
1004:                }
1005:
1006:                public StateHeader(byte type, Address sender, String state_id) {
1007:                    this .type = type;
1008:                    this .sender = sender;
1009:                    this .state_id = state_id;
1010:                }
1011:
1012:                public StateHeader(byte type, Address sender, long id,
1013:                        Digest digest) {
1014:                    this .type = type;
1015:                    this .sender = sender;
1016:                    this .id = id;
1017:                    this .my_digest = digest;
1018:                }
1019:
1020:                public StateHeader(byte type, Address sender,
1021:                        IpAddress bind_addr, Digest digest, String state_id) {
1022:                    this .type = type;
1023:                    this .sender = sender;
1024:                    this .my_digest = digest;
1025:                    this .bind_addr = bind_addr;
1026:                    this .state_id = state_id;
1027:                }
1028:
1029:                public int getType() {
1030:                    return type;
1031:                }
1032:
1033:                public Digest getDigest() {
1034:                    return my_digest;
1035:                }
1036:
1037:                public String getStateId() {
1038:                    return state_id;
1039:                }
1040:
1041:                public boolean equals(Object o) {
1042:                    StateHeader other;
1043:
1044:                    if (sender != null && o != null) {
1045:                        if (!(o instanceof  StateHeader))
1046:                            return false;
1047:                        other = (StateHeader) o;
1048:                        return sender.equals(other.sender) && id == other.id;
1049:                    }
1050:                    return false;
1051:                }
1052:
1053:                public int hashCode() {
1054:                    if (sender != null)
1055:                        return sender.hashCode() + (int) id;
1056:                    else
1057:                        return (int) id;
1058:                }
1059:
1060:                public String toString() {
1061:                    StringBuffer sb = new StringBuffer();
1062:                    sb.append("type=").append(type2Str(type));
1063:                    if (sender != null)
1064:                        sb.append(", sender=").append(sender).append(" id=")
1065:                                .append(id);
1066:                    if (my_digest != null)
1067:                        sb.append(", digest=").append(my_digest);
1068:                    return sb.toString();
1069:                }
1070:
1071:                static String type2Str(int t) {
1072:                    switch (t) {
1073:                    case STATE_REQ:
1074:                        return "STATE_REQ";
1075:                    case STATE_RSP:
1076:                        return "STATE_RSP";
1077:                    case STATE_REMOVE_REQUESTER:
1078:                        return "STATE_REMOVE_REQUESTER";
1079:                    default:
1080:                        return "<unknown>";
1081:                    }
1082:                }
1083:
1084:                public void writeExternal(ObjectOutput out) throws IOException {
1085:                    out.writeObject(sender);
1086:                    out.writeLong(id);
1087:                    out.writeByte(type);
1088:                    out.writeObject(my_digest);
1089:                    out.writeObject(bind_addr);
1090:                    out.writeUTF(state_id);
1091:                }
1092:
1093:                public void readExternal(ObjectInput in) throws IOException,
1094:                        ClassNotFoundException {
1095:                    sender = (Address) in.readObject();
1096:                    id = in.readLong();
1097:                    type = in.readByte();
1098:                    my_digest = (Digest) in.readObject();
1099:                    bind_addr = (IpAddress) in.readObject();
1100:                    state_id = in.readUTF();
1101:                }
1102:
1103:                public void writeTo(DataOutputStream out) throws IOException {
1104:                    out.writeByte(type);
1105:                    out.writeLong(id);
1106:                    Util.writeAddress(sender, out);
1107:                    Util.writeStreamable(my_digest, out);
1108:                    Util.writeStreamable(bind_addr, out);
1109:                    Util.writeString(state_id, out);
1110:                }
1111:
1112:                public void readFrom(DataInputStream in) throws IOException,
1113:                        IllegalAccessException, InstantiationException {
1114:                    type = in.readByte();
1115:                    id = in.readLong();
1116:                    sender = Util.readAddress(in);
1117:                    my_digest = (Digest) Util.readStreamable(Digest.class, in);
1118:                    bind_addr = (IpAddress) Util.readStreamable(
1119:                            IpAddress.class, in);
1120:                    state_id = Util.readString(in);
1121:                }
1122:
1123:                public long size() {
1124:                    long retval = Global.LONG_SIZE + Global.BYTE_SIZE; // id and type
1125:
1126:                    retval += Util.size(sender);
1127:
1128:                    retval += Global.BYTE_SIZE; // presence byte for my_digest
1129:                    if (my_digest != null)
1130:                        retval += my_digest.serializedSize();
1131:
1132:                    retval += Global.BYTE_SIZE; // presence byte for state_id
1133:                    if (state_id != null)
1134:                        retval += state_id.length() + 2;
1135:                    return retval;
1136:                }
1137:
1138:            }
1139:
1140:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.