Source Code Cross Referenced for WebdavReplicatorLocation.java in  » Content-Management-System » hippo » nl » hippo » slide » replication » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Content Management System » hippo » nl.hippo.slide.replication 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * Copyright 2005 Hippo Webworks.
003:         * 
004:         * Licensed under the Apache License, Version 2.0 (the "License");
005:         * you may not use this file except in compliance with the License.
006:         * You may obtain a copy of the License at
007:         * 
008:         *      http://www.apache.org/licenses/LICENSE-2.0
009:         * 
010:         * Unless required by applicable law or agreed to in writing, software
011:         * distributed under the License is distributed on an "AS IS" BASIS,
012:         * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013:         * See the License for the specific language governing permissions and
014:         * limitations under the License.
015:         */
016:        package nl.hippo.slide.replication;
017:
018:        import java.io.BufferedInputStream;
019:        import java.io.BufferedOutputStream;
020:        import java.io.ByteArrayInputStream;
021:        import java.io.File;
022:        import java.io.FileInputStream;
023:        import java.io.FileNotFoundException;
024:        import java.io.FileOutputStream;
025:        import java.io.IOException;
026:        import java.io.InputStream;
027:        import java.io.StringWriter;
028:        import java.util.Enumeration;
029:        import java.util.HashSet;
030:        import java.util.LinkedList;
031:        import java.util.Vector;
032:
033:        import javax.xml.transform.Result;
034:        import javax.xml.transform.Source;
035:        import javax.xml.transform.Transformer;
036:        import javax.xml.transform.TransformerException;
037:        import javax.xml.transform.TransformerFactory;
038:        import javax.xml.transform.dom.DOMSource;
039:        import javax.xml.transform.stream.StreamResult;
040:
041:        import org.apache.avalon.framework.configuration.Configuration;
042:        import org.apache.avalon.framework.configuration.ConfigurationException;
043:        import org.apache.avalon.framework.logger.AbstractLogEnabled;
044:        import org.apache.commons.httpclient.HttpClient;
045:        import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
046:        import org.apache.commons.httpclient.UsernamePasswordCredentials;
047:        import org.apache.commons.httpclient.methods.PutMethod;
048:        import org.apache.commons.httpclient.util.URIUtil;
049:        import org.apache.slide.content.NodeProperty;
050:        import org.apache.slide.content.NodeRevisionContent;
051:        import org.apache.slide.content.NodeRevisionDescriptor;
052:        import org.apache.webdav.lib.Property;
053:        import org.apache.webdav.lib.PropertyName;
054:        import org.apache.webdav.lib.methods.DeleteMethod;
055:        import org.apache.webdav.lib.methods.MkcolMethod;
056:        import org.apache.webdav.lib.methods.PropFindMethod;
057:        import org.apache.webdav.lib.methods.PropPatchMethod;
058:        import org.apache.webdav.lib.properties.ResourceTypeProperty;
059:
060:        public class WebdavReplicatorLocation extends AbstractLogEnabled
061:                implements  ReplicatorLocation {
062:
063:            private String host;
064:            private String port;
065:            private String user;
066:            private String password;
067:            private String baseUri;
068:            private String rootpath;
069:            private boolean createParents;
070:
071:            private HttpClient client;
072:
073:            // the queue in which the replication jobs are stored
074:            private LinkedList queue;
075:
076:            // the worker thread that does the actual replication
077:            Thread worker;
078:
079:            private boolean replicateContent = true;
080:            private HashSet propertiesToReplicate = null;
081:
082:            // number of retries
083:            private static final int DEFAULT_RETRY_COUNT = 3;
084:            // timeout to wait for connection
085:            private static final int DEFAULT_CONNECTION_TIMEOUT_SECONDS = 30;
086:            // timeout to wait for response
087:            private static final int DEFAULT_SOCKET_TIMEOUT_SECONDS = 180;
088:
089:            private static final int MILI_TO_SECS = 1000;
090:
091:            private boolean retryEnabled = false;
092:            private int retryCount;
093:            private int connectionTimeoutSeconds;
094:            private int socketTimeoutSeconds;
095:
096:            private static final PropertyName RESOURCETYPE = new PropertyName(
097:                    "DAV:", "resourcetype");
098:
099:            public void configure(Configuration config)
100:                    throws ConfigurationException {
101:                host = config.getChild("host").getValue();
102:                port = config.getChild("port").getValue();
103:                user = config.getChild("user").getValue();
104:                password = config.getChild("password").getValue();
105:                rootpath = config.getChild("rootpath").getValue();
106:                baseUri = config.getChild("path").getValue();
107:                createParents = config.getChild("createparents")
108:                        .getValueAsBoolean(false);
109:                replicateContent = config.getChild("replicate-content")
110:                        .getAttributeAsBoolean("value", replicateContent);
111:
112:                retryCount = config.getChild("retrycount").getValueAsInteger(
113:                        DEFAULT_RETRY_COUNT);
114:                connectionTimeoutSeconds = config.getChild(
115:                        "connecttimeoutseconds").getValueAsInteger(
116:                        DEFAULT_CONNECTION_TIMEOUT_SECONDS);
117:                socketTimeoutSeconds = config.getChild("sockettimeoutseconds")
118:                        .getValueAsInteger(DEFAULT_SOCKET_TIMEOUT_SECONDS);
119:
120:                /* some sanity checks */
121:                if (retryCount > 1) {
122:                    retryEnabled = true;
123:                } else if (retryCount < 1) {
124:                    retryCount = DEFAULT_RETRY_COUNT;
125:                }
126:                if (connectionTimeoutSeconds < 1) {
127:                    connectionTimeoutSeconds = DEFAULT_CONNECTION_TIMEOUT_SECONDS;
128:                }
129:                if (socketTimeoutSeconds < 1) {
130:                    socketTimeoutSeconds = DEFAULT_SOCKET_TIMEOUT_SECONDS;
131:                }
132:                if (!rootpath.startsWith("/")) {
133:                    rootpath = "/" + rootpath;
134:                }
135:                if (!baseUri.startsWith("/")) {
136:                    baseUri = "/" + baseUri;
137:                }
138:                if (!baseUri.endsWith("/")) {
139:                    baseUri = baseUri + "/";
140:                }
141:                if (getLogger().isDebugEnabled()) {
142:                    getLogger().debug("Replicator config:");
143:                    getLogger().debug("host          : " + host + ":" + port);
144:                    getLogger().debug("user          : " + user);
145:                    getLogger().debug(
146:                            "timeout (secs): " + connectionTimeoutSeconds);
147:                    getLogger().debug("retry enabled : " + retryEnabled);
148:                    getLogger().debug("retry count   : " + retryCount);
149:                    getLogger().debug("baseuri       : " + baseUri);
150:                    getLogger().debug("createparents : " + createParents);
151:                    getLogger().debug("repl. content : " + replicateContent);
152:                }
153:                baseUri = "http://" + host + ":" + port + rootpath + baseUri;
154:                getLogger().info("Configured replicator to: " + baseUri);
155:
156:                // allow for restricting of properties which have to be replicated
157:                Configuration[] propertyConfigs = config.getChild(
158:                        "replicate-properties").getChildren("property");
159:                if (propertyConfigs.length != 0) {
160:                    propertiesToReplicate = new HashSet();
161:                    for (int i = 0; i < propertyConfigs.length; i++) {
162:                        NodeProperty prop = new NodeProperty(propertyConfigs[i]
163:                                .getAttribute("name"), null, propertyConfigs[i]
164:                                .getAttribute("namespace"));
165:                        propertiesToReplicate.add(prop);
166:
167:                        if (getLogger().isDebugEnabled()) {
168:                            getLogger().debug(
169:                                    "Replicating prop " + prop + " to "
170:                                            + baseUri);
171:                        }
172:                    }
173:                }
174:
175:                // create queue for jobs
176:                queue = new LinkedList();
177:                client = new HttpClient(
178:                        new MultiThreadedHttpConnectionManager());
179:                client.getState().setCredentials(null, host,
180:                        new UsernamePasswordCredentials(user, password));
181:                client.getState().setAuthenticationPreemptive(true);
182:                client.setConnectionTimeout(connectionTimeoutSeconds
183:                        * MILI_TO_SECS);
184:                client.setTimeout(socketTimeoutSeconds * MILI_TO_SECS);
185:            }
186:
187:            //---------------------------------------< interface impl >------------------------------------//
188:            /**
189:             * Add put job to the queue
190:             */
191:            public void put(String uri, NodeRevisionDescriptor descriptor,
192:                    NodeRevisionContent content) {
193:                PutJob job = new PutJob(uri, descriptor, content);
194:                synchronized (queue) {
195:                    queue.add(job);
196:                    if (worker == null) {
197:                        worker = new ReplicatorWorker();
198:                        worker.start();
199:                    }
200:                }
201:            }
202:
203:            /**
204:             * Add mkcol job to the queue
205:             */
206:            public void mkcol(String uri, NodeRevisionDescriptor descriptor) {
207:                MkcolJob job = new MkcolJob(uri, descriptor);
208:                synchronized (queue) {
209:                    queue.add(job);
210:                    if (worker == null) {
211:                        worker = new ReplicatorWorker();
212:                        worker.start();
213:                    }
214:                }
215:            }
216:
217:            /**
218:             * Add delete job to the queue
219:             */
220:            public void delete(String uri) {
221:                DeleteJob job = new DeleteJob(uri);
222:                synchronized (queue) {
223:                    queue.add(job);
224:                    if (worker == null) {
225:                        worker = new ReplicatorWorker();
226:                        worker.start();
227:                    }
228:                }
229:            }
230:
231:            //----------------------------------< job execution methods >-------------------//
232:            /**
233:             * precond: uri is already relative to base uri
234:             */
235:            public void doPut(PutJob job) {
236:                job.addTry();
237:                String uri = job.getUri();
238:                InputStream content = job.getContentStream();
239:
240:                if (!uri.startsWith(baseUri)) {
241:                    if (uri.startsWith("/"))
242:                        uri = uri.substring("/".length());
243:                    uri = baseUri + uri;
244:                }
245:
246:                // content InputStream is null when only properties have changed
247:                if (replicateContent && content != null) {
248:                    if (createParents) {
249:                        mkparents(uri.substring(0, uri.lastIndexOf('/')));
250:                    }
251:                    final PutMethod put = new PutMethod(uri);
252:                    try {
253:                        put.setRequestBody(content);
254:                        int status = client.executeMethod(put);
255:                        if (getLogger().isDebugEnabled()) {
256:                            getLogger().debug(
257:                                    "PUT executed. Server responded: (uri="
258:                                            + uri
259:                                            + ") "
260:                                            + status
261:                                            + " ("
262:                                            + URIUtil.decode(put
263:                                                    .getStatusText()) + ")");
264:                        }
265:                        // 201 => created, 204 => overwrite
266:                        if (status != 201 && status != 204) {
267:                            getLogger().error(
268:                                    "Error executing PUT. Server responded: (uri="
269:                                            + uri
270:                                            + ") "
271:                                            + status
272:                                            + " ("
273:                                            + URIUtil.decode(put
274:                                                    .getStatusText()) + ")");
275:                        } else {
276:                            status = setProperties(uri, job.getProperties(),
277:                                    job.getRemovedProperties());
278:                            if (status == 207) {
279:                                job.setDone(true);
280:                            }
281:                        }
282:                    } catch (IOException e) {
283:                        getLogger().error("Error putting resource: " + uri, e);
284:                    } finally {
285:                        put.releaseConnection();
286:                    }
287:                }
288:            }
289:
290:            public void doMkcol(MkcolJob job) {
291:                job.addTry();
292:                String uri = job.getUri();
293:
294:                if (!uri.startsWith(baseUri)) {
295:                    if (uri.startsWith("/"))
296:                        uri = uri.substring("/".length());
297:                    uri = baseUri + uri;
298:                }
299:
300:                if (replicateContent) {
301:                    if (createParents) {
302:                        mkparents(uri.substring(0, uri.lastIndexOf('/')));
303:                    }
304:                    final MkcolMethod mkcol = new MkcolMethod(uri);
305:                    try {
306:                        int status = client.executeMethod(mkcol);
307:                        if (getLogger().isDebugEnabled()) {
308:                            getLogger().debug(
309:                                    "MKCOL executed. Server responded: (uri="
310:                                            + uri
311:                                            + ") "
312:                                            + status
313:                                            + " ("
314:                                            + URIUtil.decode(mkcol
315:                                                    .getStatusText()) + ")");
316:                        }
317:                        // 201 => created, 405 => folder already exists
318:                        if (status != 201 && status != 405) {
319:                            getLogger().error(
320:                                    "Error executing MKCOL. Server responded: (uri="
321:                                            + uri
322:                                            + ") "
323:                                            + status
324:                                            + " ("
325:                                            + URIUtil.decode(mkcol
326:                                                    .getStatusText()) + ")");
327:                        } else {
328:                            status = setProperties(uri, job.getProperties(),
329:                                    job.getRemovedProperties());
330:                            if (status == 207) {
331:                                job.setDone(true);
332:                            }
333:                        }
334:                    } catch (IOException e) {
335:                        getLogger().error("Error executing mkcol: " + uri, e);
336:                    } finally {
337:                        mkcol.releaseConnection();
338:                    }
339:                }
340:
341:            }
342:
343:            public void doDelete(DeleteJob job) {
344:                job.addTry();
345:                String uri = job.getUri();
346:
347:                if (!uri.startsWith(baseUri)) {
348:                    if (uri.startsWith("/"))
349:                        uri = uri.substring("/".length());
350:                    uri = baseUri + uri;
351:                }
352:
353:                if (replicateContent && ensureNoChildren(uri)) {
354:                    final DeleteMethod delete = new DeleteMethod(uri);
355:                    try {
356:                        int status = client.executeMethod(delete);
357:                        if (getLogger().isDebugEnabled()) {
358:                            getLogger().debug(
359:                                    "DELETE executed. Server responded: (uri="
360:                                            + uri
361:                                            + ") "
362:                                            + status
363:                                            + " ("
364:                                            + URIUtil.decode(delete
365:                                                    .getStatusText()) + ") ");
366:                        }
367:                        // 204 => deleted, 404 => not found
368:                        if (status != 204 && status != 404) {
369:                            getLogger().error(
370:                                    "Error executing DELETE. Server responded: (uri="
371:                                            + uri
372:                                            + ") "
373:                                            + status
374:                                            + " ("
375:                                            + URIUtil.decode(delete
376:                                                    .getStatusText()) + ")");
377:                        } else {
378:                            job.setDone(true);
379:                        }
380:                    } catch (IOException e) {
381:                        getLogger().error("Error executing delete: " + uri, e);
382:                    } finally {
383:                        delete.releaseConnection();
384:                    }
385:                } else {
386:                    if (replicateContent && getLogger().isWarnEnabled()) {
387:                        getLogger().warn(
388:                                "Tried to delete non-empty collection! " + uri);
389:                    }
390:                }
391:            }
392:
393:            //-------------------------------< webdav helper methods >-------------------//
394:
395:            private void mkparents(String uri) {
396:                /* if the current uri is a collection, we're done */
397:                if (isCollection(uri)) {
398:                    return;
399:                }
400:
401:                /* check if the parent of the uri exists */
402:                String parentUri = uri.substring(0, uri.lastIndexOf('/'));
403:
404:                /* don't try to create baseUri, compensate for trailing slash */
405:                if (parentUri.length() < (baseUri.length() - 1)) {
406:                    getLogger().error(
407:                            "Replication baseUri doesn't exists : " + baseUri);
408:                    return;
409:                }
410:
411:                /* check parent and create if necessary */
412:                mkparents(parentUri);
413:
414:                /* parent uri is collection. try to create collection */
415:                final MkcolMethod mkcol = new MkcolMethod(uri);
416:
417:                try {
418:                    int status = client.executeMethod(mkcol);
419:                    if (getLogger().isDebugEnabled()) {
420:                        getLogger().debug(
421:                                "MKCOL executed for parent. Server responded: (uri="
422:                                        + uri + ") " + status + " ("
423:                                        + URIUtil.decode(mkcol.getStatusText())
424:                                        + ")");
425:                    }
426:                } catch (IOException e) {
427:                    getLogger().error("Error executing mkcol: " + uri, e);
428:                } finally {
429:                    mkcol.releaseConnection();
430:                }
431:            }
432:
433:            private boolean isCollection(String uri) {
434:                boolean iscollection = false;
435:                boolean isresourcetype = false;
436:
437:                // some webdav servers require a slash at the end for collections
438:                if (!uri.endsWith("/")) {
439:                    uri = uri + "/";
440:                }
441:
442:                final PropFindMethod propfind = new PropFindMethod(uri, 0,
443:                        PropFindMethod.BY_NAME);
444:
445:                Vector props = new Vector(1);
446:                props.add(RESOURCETYPE);
447:                propfind.setPropertyNames(props.elements());
448:                propfind.setFollowRedirects(true);
449:
450:                try {
451:                    final int status = client.executeMethod(propfind);
452:
453:                    if (getLogger().isDebugEnabled()) {
454:                        getLogger().debug(
455:                                "PROPFIND executed. Server responded: (uri="
456:                                        + uri
457:                                        + ") "
458:                                        + status
459:                                        + " ("
460:                                        + URIUtil.decode(propfind
461:                                                .getStatusText()) + ") ");
462:                    }
463:                    if (status == 207) {
464:                        Enumeration responseURLEnum = propfind
465:                                .getAllResponseURLs();
466:                        while (responseURLEnum.hasMoreElements()) {
467:                            String href = (String) responseURLEnum
468:                                    .nextElement();
469:                            Enumeration propertyEnum = propfind
470:                                    .getResponseProperties(href);
471:                            while (propertyEnum.hasMoreElements()) {
472:                                Property property = (Property) propertyEnum
473:                                        .nextElement();
474:                                if (property instanceof  ResourceTypeProperty) {
475:                                    isresourcetype = true;
476:                                    iscollection = ((ResourceTypeProperty) property)
477:                                            .isCollection();
478:                                }
479:                            }
480:                        }
481:                        if (!isresourcetype) {
482:                            getLogger().warn(
483:                                    "PROPFFIND did not return resourcetype");
484:                        }
485:                    }
486:                } catch (Exception e) {
487:                    getLogger().warn("Error checking if collection: " + uri, e);
488:                } finally {
489:                    propfind.releaseConnection();
490:                }
491:                if (getLogger().isDebugEnabled()) {
492:                    getLogger().debug(
493:                            "IsCollection = " + iscollection + " for uri: "
494:                                    + uri);
495:                }
496:                return iscollection;
497:            }
498:
499:            private boolean ensureNoChildren(String uri) {
500:                Vector names = new Vector(1);
501:                names.add(new PropertyName("DAV", "displayname"));
502:
503:                final PropFindMethod propfind = new PropFindMethod(uri);
504:                propfind.setDepth(1);
505:                propfind.setPropertyNames(names.elements());
506:
507:                try {
508:                    final int status = client.executeMethod(propfind);
509:                    // not found => no childeren
510:                    if (status == 404) {
511:                        return true;
512:                    }
513:                    if (status >= 400) {
514:                        return false;
515:                    }
516:                    Enumeration urls = propfind.getAllResponseURLs();
517:                    int counter = 0;
518:                    while (urls.hasMoreElements()) {
519:                        urls.nextElement();
520:                        counter++;
521:                    }
522:                    return counter == 1; // propfind also finds the uri it acts on
523:                } catch (Exception e) {
524:                    if (getLogger().isWarnEnabled()) {
525:                        getLogger().warn("Error checking for children: " + uri,
526:                                e);
527:                    }
528:                    return false;
529:                } finally {
530:                    propfind.releaseConnection();
531:                }
532:            }
533:
534:            private String wrapCDATA(String nonxml) {
535:                StringBuffer buf = new StringBuffer();
536:                buf.append("<![CDATA[").append(nonxml).append("]]>");
537:                return buf.toString();
538:            }
539:
540:            private int setProperties(String uri, Enumeration properties,
541:                    Enumeration removedproperties) {
542:                final PropPatchMethod proppatch = new PropPatchMethod(uri);
543:                boolean doPropPatch = false;
544:                int status = 0;
545:
546:                while (properties.hasMoreElements()) {
547:                    NodeProperty prop = (NodeProperty) properties.nextElement();
548:
549:                    // only patch DAV:displayname and all non-DAV properties (and only those which are meant to be replicated)
550:                    if ((propertiesToReplicate == null || propertiesToReplicate
551:                            .contains(prop))
552:                            && (!"DAV:".equals(prop.getNamespace()) || "displayname"
553:                                    .equals(prop.getName()))) {
554:                        proppatch.addPropertyToSet(prop.getName(),
555:                                wrapCDATA(prop.getValue().toString()),
556:                                "property", prop.getNamespace());
557:
558:                        doPropPatch = true;
559:
560:                        if (getLogger().isDebugEnabled()) {
561:                            getLogger().debug(
562:                                    "PROPPATCH: setting " + prop.getNamespace()
563:                                            + ":" + prop.getName() + " to "
564:                                            + prop.getValue());
565:                        }
566:                    } else {
567:                        if (getLogger().isDebugEnabled()) {
568:                            getLogger().debug(
569:                                    "PROPPATCH: NOT setting "
570:                                            + prop.getNamespace() + ":"
571:                                            + prop.getName() + " to "
572:                                            + prop.getValue());
573:                        }
574:                    }
575:                }
576:
577:                while (removedproperties.hasMoreElements()) {
578:                    NodeProperty prop = (NodeProperty) removedproperties
579:                            .nextElement();
580:
581:                    // only remove non-DAV properties (and only those which are meant to be replicated)
582:                    if ((propertiesToReplicate == null || propertiesToReplicate
583:                            .contains(prop))
584:                            && !"DAV:".equals(prop.getNamespace())) {
585:                        proppatch.addPropertyToRemove(prop.getName(),
586:                                "property", prop.getNamespace());
587:
588:                        doPropPatch = true;
589:
590:                        if (getLogger().isDebugEnabled()) {
591:                            getLogger().debug(
592:                                    "PROPPATCH: removing "
593:                                            + prop.getNamespace() + ":"
594:                                            + prop.getName());
595:                        }
596:                    } else {
597:                        if (getLogger().isDebugEnabled()) {
598:                            getLogger().debug(
599:                                    "PROPPATCH: NOT removing "
600:                                            + prop.getNamespace() + ":"
601:                                            + prop.getName());
602:                        }
603:                    }
604:                }
605:
606:                if (doPropPatch) {
607:                    try {
608:                        status = client.executeMethod(proppatch);
609:                        if (getLogger().isDebugEnabled()) {
610:                            getLogger().debug(
611:                                    "PROPPATCH executed. Server responded: (uri="
612:                                            + uri
613:                                            + ") "
614:                                            + status
615:                                            + " ("
616:                                            + URIUtil.decode(proppatch
617:                                                    .getStatusText()) + ") ");
618:                        }
619:                        if (status != 207) {
620:                            getLogger().error(
621:                                    "Error setting properties, got status ["
622:                                            + status + "] for uri: " + uri);
623:                        }
624:                    } catch (IOException e) {
625:                        getLogger()
626:                                .error("Error setting properties: " + uri, e);
627:                    } finally {
628:                        proppatch.releaseConnection();
629:                    }
630:                } else {
631:                    if (getLogger().isDebugEnabled()) {
632:                        getLogger().debug(
633:                                "Did not send proppatch, no props to patch!");
634:                    }
635:                }
636:                return status;
637:            }
638:
639:            //-----------------------------------< Worker & worker jobs >------------------------//
640:
641:            /**
642:             * Worker class for background replication
643:             */
644:            public class ReplicatorWorker extends Thread {
645:
646:                private boolean jobsLeft = true;
647:
648:                public void run() {
649:                    if (getLogger().isDebugEnabled()) {
650:                        getLogger()
651:                                .debug(
652:                                        "Replicator thread " + getName()
653:                                                + " starting.");
654:                    }
655:
656:                    // the main loop that fetches the jobs and executes them
657:                    do {
658:                        Job queuedJob = null;
659:                        synchronized (queue) {
660:                            if (queue.isEmpty()) {
661:                                jobsLeft = false;
662:                                worker = null;
663:                                if (getLogger().isDebugEnabled()) {
664:                                    getLogger().debug(
665:                                            "Replicator thread " + getName()
666:                                                    + " nothing left todo.");
667:                                }
668:                                // the queue is done, bail out
669:                                return;
670:                            } else {
671:                                // get the next job from the queue
672:                                queuedJob = (Job) queue.removeFirst();
673:                                // execute the job
674:                                if (queuedJob instanceof  PutJob) {
675:                                    if (getLogger().isDebugEnabled()) {
676:                                        getLogger()
677:                                                .debug(
678:                                                        "Replicator thread "
679:                                                                + getName()
680:                                                                + " executing PutJob : "
681:                                                                + queuedJob
682:                                                                        .getUri());
683:                                    }
684:                                    doPut((PutJob) queuedJob);
685:                                } else if (queuedJob instanceof  MkcolJob) {
686:                                    if (getLogger().isDebugEnabled()) {
687:                                        getLogger()
688:                                                .debug(
689:                                                        "Replicator thread "
690:                                                                + getName()
691:                                                                + " executing MkcolJob : "
692:                                                                + queuedJob
693:                                                                        .getUri());
694:                                    }
695:                                    doMkcol((MkcolJob) queuedJob);
696:                                } else if (queuedJob instanceof  DeleteJob) {
697:                                    if (getLogger().isDebugEnabled()) {
698:                                        getLogger()
699:                                                .debug(
700:                                                        "Replicator thread "
701:                                                                + getName()
702:                                                                + " executing DeleteJob : "
703:                                                                + queuedJob
704:                                                                        .getUri());
705:                                    }
706:                                    doDelete((DeleteJob) queuedJob);
707:                                } else {
708:                                    getLogger().warn(
709:                                            "Replicator thread "
710:                                                    + getName()
711:                                                    + " unknow job: "
712:                                                    + queuedJob.getClass()
713:                                                            .getName());
714:                                }
715:
716:                                if (!queuedJob.isDone()) {
717:                                    if (retryEnabled
718:                                            && queuedJob.getTries() < retryCount) {
719:                                        queue.add(queuedJob);
720:                                        if (getLogger().isDebugEnabled()) {
721:                                            getLogger()
722:                                                    .debug(
723:                                                            "Replicator thread "
724:                                                                    + getName()
725:                                                                    + " queuing job for retry ["
726:                                                                    + queuedJob
727:                                                                            .getTries()
728:                                                                    + "] : "
729:                                                                    + queuedJob
730:                                                                            .getClass()
731:                                                                            .getName());
732:                                        }
733:                                    } else {
734:                                        getLogger().warn(
735:                                                "Replicator thread "
736:                                                        + getName()
737:                                                        + " job failed: "
738:                                                        + queuedJob.getClass()
739:                                                                .getName());
740:                                    }
741:                                } else {
742:                                    // Job is done, cleanup
743:                                    if (queuedJob instanceof  PutJob) {
744:                                        ((PutJob) queuedJob).cleanupTempFile();
745:                                    }
746:                                }
747:                            }
748:                        }
749:                    } while (jobsLeft);
750:                }
751:            }
752:
753:            /**
754:             * Replication job
755:             */
756:            public abstract class Job {
757:                private int tries = 0;
758:                private boolean done = false;
759:                private String uri;
760:                NodeRevisionDescriptor descriptor;
761:
762:                public String getUri() {
763:                    return uri;
764:                }
765:
766:                protected void setUri(String uri) {
767:                    this .uri = uri;
768:                }
769:
770:                public Enumeration getProperties() {
771:                    return descriptor.enumerateProperties();
772:                }
773:
774:                protected void setDescriptor(NodeRevisionDescriptor descriptor) {
775:                    this .descriptor = descriptor;
776:                }
777:
778:                public Enumeration getRemovedProperties() {
779:                    return descriptor.enumerateRemovedProperties();
780:                }
781:
782:                public void addTry() {
783:                    this .tries++;
784:                }
785:
786:                public int getTries() {
787:                    return this .tries;
788:                }
789:
790:                public void setDone(boolean done) {
791:                    this .done = done;
792:                }
793:
794:                public boolean isDone() {
795:                    return this .done;
796:                }
797:            }
798:
799:            /**
800:             * Put Job, uses temporary files for storing the content before replicating
801:             */
802:            private class PutJob extends Job {
803:                File tempFile;
804:
805:                public PutJob(String uri, NodeRevisionDescriptor descriptor,
806:                        NodeRevisionContent content) {
807:                    setContent(content);
808:                    setUri(uri);
809:                    setDescriptor(descriptor);
810:                }
811:
812:                /**
813:                 * Get the input stream for the content of the temporary file
814:                 * @return the inputstream
815:                 */
816:                public InputStream getContentStream() {
817:                    if (tempFile == null) {
818:                        // contentStream is null when only properties have changed
819:                        if (getLogger().isDebugEnabled()) {
820:                            getLogger()
821:                                    .debug(
822:                                            "Only replicating properties for "
823:                                                    + getUri()
824:                                                    + ": (tempfile is null)");
825:                        }
826:                        return null;
827:                    }
828:                    if (!tempFile.exists()) {
829:                        getLogger().error(
830:                                "Unable to replicate " + getUri()
831:                                        + ": Tempfile was deleted?");
832:                        return null;
833:                    }
834:                    try {
835:                        FileInputStream is = new FileInputStream(tempFile);
836:                        return new BufferedInputStream(is);
837:                    } catch (FileNotFoundException e) {
838:                        throw new IllegalStateException("Unable to replicate "
839:                                + getUri() + ": " + e.getMessage());
840:                    }
841:                }
842:
843:                /**
844:                 * Store the contentstream in a temporary file
845:                 * @param contentStream
846:                 */
847:                private void setContent(NodeRevisionContent content) {
848:                    if (content == null) {
849:                        // content is null when only properties have changed
850:                        return;
851:                    }
852:
853:                    InputStream contentStream = new ByteArrayInputStream(
854:                            content.getContentBytes());
855:                    try {
856:                        tempFile = File.createTempFile("repl", ".data");
857:                    } catch (IOException e) {
858:                        getLogger()
859:                                .error(
860:                                        "Unablte to create tempfile: "
861:                                                + e.getMessage());
862:                        return;
863:                    }
864:
865:                    try {
866:                        FileOutputStream fos = new FileOutputStream(tempFile);
867:                        try {
868:                            BufferedOutputStream bos = new BufferedOutputStream(
869:                                    fos);
870:                            try {
871:                                byte[] buffer = new byte[16384];
872:                                int bytesRead;
873:                                while ((bytesRead = contentStream.read(buffer)) >= 0) {
874:                                    fos.write(buffer, 0, bytesRead);
875:                                }
876:                            } finally {
877:                                bos.close();
878:                            }
879:                        } finally {
880:                            fos.close();
881:                        }
882:                    } catch (IOException e) {
883:
884:                        tempFile.delete();
885:                        getLogger().error(
886:                                "Unablte to write to tempfile: "
887:                                        + e.getMessage());
888:                        return;
889:                    }
890:                }
891:
892:                /**
893:                 * Cleanup the temporary file if it still exists
894:                 *
895:                 */
896:                public void cleanupTempFile() {
897:                    if (tempFile != null && tempFile.exists()) {
898:                        tempFile.delete();
899:                    }
900:                }
901:            }
902:
903:            /**
904:             * Delete replication job
905:             */
906:            private class DeleteJob extends Job {
907:                public DeleteJob(String uri) {
908:                    setUri(uri);
909:                }
910:            }
911:
912:            /**
913:             * Create collection job
914:             */
915:            private class MkcolJob extends Job {
916:                public MkcolJob(String uri, NodeRevisionDescriptor descriptor) {
917:                    setUri(uri);
918:                    setDescriptor(descriptor);
919:                }
920:            }
921:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.