001: /*
002: * Copyright 2005 Hippo Webworks.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016: package nl.hippo.slide.replication;
017:
018: import java.io.BufferedInputStream;
019: import java.io.BufferedOutputStream;
020: import java.io.ByteArrayInputStream;
021: import java.io.File;
022: import java.io.FileInputStream;
023: import java.io.FileNotFoundException;
024: import java.io.FileOutputStream;
025: import java.io.IOException;
026: import java.io.InputStream;
027: import java.io.StringWriter;
028: import java.util.Enumeration;
029: import java.util.HashSet;
030: import java.util.LinkedList;
031: import java.util.Vector;
032:
033: import javax.xml.transform.Result;
034: import javax.xml.transform.Source;
035: import javax.xml.transform.Transformer;
036: import javax.xml.transform.TransformerException;
037: import javax.xml.transform.TransformerFactory;
038: import javax.xml.transform.dom.DOMSource;
039: import javax.xml.transform.stream.StreamResult;
040:
041: import org.apache.avalon.framework.configuration.Configuration;
042: import org.apache.avalon.framework.configuration.ConfigurationException;
043: import org.apache.avalon.framework.logger.AbstractLogEnabled;
044: import org.apache.commons.httpclient.HttpClient;
045: import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
046: import org.apache.commons.httpclient.UsernamePasswordCredentials;
047: import org.apache.commons.httpclient.methods.PutMethod;
048: import org.apache.commons.httpclient.util.URIUtil;
049: import org.apache.slide.content.NodeProperty;
050: import org.apache.slide.content.NodeRevisionContent;
051: import org.apache.slide.content.NodeRevisionDescriptor;
052: import org.apache.webdav.lib.Property;
053: import org.apache.webdav.lib.PropertyName;
054: import org.apache.webdav.lib.methods.DeleteMethod;
055: import org.apache.webdav.lib.methods.MkcolMethod;
056: import org.apache.webdav.lib.methods.PropFindMethod;
057: import org.apache.webdav.lib.methods.PropPatchMethod;
058: import org.apache.webdav.lib.properties.ResourceTypeProperty;
059:
060: public class WebdavReplicatorLocation extends AbstractLogEnabled
061: implements ReplicatorLocation {
062:
063: private String host;
064: private String port;
065: private String user;
066: private String password;
067: private String baseUri;
068: private String rootpath;
069: private boolean createParents;
070:
071: private HttpClient client;
072:
073: // the queue in which the replication jobs are stored
074: private LinkedList queue;
075:
076: // the worker thread that does the actual replication
077: Thread worker;
078:
079: private boolean replicateContent = true;
080: private HashSet propertiesToReplicate = null;
081:
082: // number of retries
083: private static final int DEFAULT_RETRY_COUNT = 3;
084: // timeout to wait for connection
085: private static final int DEFAULT_CONNECTION_TIMEOUT_SECONDS = 30;
086: // timeout to wait for response
087: private static final int DEFAULT_SOCKET_TIMEOUT_SECONDS = 180;
088:
089: private static final int MILI_TO_SECS = 1000;
090:
091: private boolean retryEnabled = false;
092: private int retryCount;
093: private int connectionTimeoutSeconds;
094: private int socketTimeoutSeconds;
095:
096: private static final PropertyName RESOURCETYPE = new PropertyName(
097: "DAV:", "resourcetype");
098:
099: public void configure(Configuration config)
100: throws ConfigurationException {
101: host = config.getChild("host").getValue();
102: port = config.getChild("port").getValue();
103: user = config.getChild("user").getValue();
104: password = config.getChild("password").getValue();
105: rootpath = config.getChild("rootpath").getValue();
106: baseUri = config.getChild("path").getValue();
107: createParents = config.getChild("createparents")
108: .getValueAsBoolean(false);
109: replicateContent = config.getChild("replicate-content")
110: .getAttributeAsBoolean("value", replicateContent);
111:
112: retryCount = config.getChild("retrycount").getValueAsInteger(
113: DEFAULT_RETRY_COUNT);
114: connectionTimeoutSeconds = config.getChild(
115: "connecttimeoutseconds").getValueAsInteger(
116: DEFAULT_CONNECTION_TIMEOUT_SECONDS);
117: socketTimeoutSeconds = config.getChild("sockettimeoutseconds")
118: .getValueAsInteger(DEFAULT_SOCKET_TIMEOUT_SECONDS);
119:
120: /* some sanity checks */
121: if (retryCount > 1) {
122: retryEnabled = true;
123: } else if (retryCount < 1) {
124: retryCount = DEFAULT_RETRY_COUNT;
125: }
126: if (connectionTimeoutSeconds < 1) {
127: connectionTimeoutSeconds = DEFAULT_CONNECTION_TIMEOUT_SECONDS;
128: }
129: if (socketTimeoutSeconds < 1) {
130: socketTimeoutSeconds = DEFAULT_SOCKET_TIMEOUT_SECONDS;
131: }
132: if (!rootpath.startsWith("/")) {
133: rootpath = "/" + rootpath;
134: }
135: if (!baseUri.startsWith("/")) {
136: baseUri = "/" + baseUri;
137: }
138: if (!baseUri.endsWith("/")) {
139: baseUri = baseUri + "/";
140: }
141: if (getLogger().isDebugEnabled()) {
142: getLogger().debug("Replicator config:");
143: getLogger().debug("host : " + host + ":" + port);
144: getLogger().debug("user : " + user);
145: getLogger().debug(
146: "timeout (secs): " + connectionTimeoutSeconds);
147: getLogger().debug("retry enabled : " + retryEnabled);
148: getLogger().debug("retry count : " + retryCount);
149: getLogger().debug("baseuri : " + baseUri);
150: getLogger().debug("createparents : " + createParents);
151: getLogger().debug("repl. content : " + replicateContent);
152: }
153: baseUri = "http://" + host + ":" + port + rootpath + baseUri;
154: getLogger().info("Configured replicator to: " + baseUri);
155:
156: // allow for restricting of properties which have to be replicated
157: Configuration[] propertyConfigs = config.getChild(
158: "replicate-properties").getChildren("property");
159: if (propertyConfigs.length != 0) {
160: propertiesToReplicate = new HashSet();
161: for (int i = 0; i < propertyConfigs.length; i++) {
162: NodeProperty prop = new NodeProperty(propertyConfigs[i]
163: .getAttribute("name"), null, propertyConfigs[i]
164: .getAttribute("namespace"));
165: propertiesToReplicate.add(prop);
166:
167: if (getLogger().isDebugEnabled()) {
168: getLogger().debug(
169: "Replicating prop " + prop + " to "
170: + baseUri);
171: }
172: }
173: }
174:
175: // create queue for jobs
176: queue = new LinkedList();
177: client = new HttpClient(
178: new MultiThreadedHttpConnectionManager());
179: client.getState().setCredentials(null, host,
180: new UsernamePasswordCredentials(user, password));
181: client.getState().setAuthenticationPreemptive(true);
182: client.setConnectionTimeout(connectionTimeoutSeconds
183: * MILI_TO_SECS);
184: client.setTimeout(socketTimeoutSeconds * MILI_TO_SECS);
185: }
186:
187: //---------------------------------------< interface impl >------------------------------------//
188: /**
189: * Add put job to the queue
190: */
191: public void put(String uri, NodeRevisionDescriptor descriptor,
192: NodeRevisionContent content) {
193: PutJob job = new PutJob(uri, descriptor, content);
194: synchronized (queue) {
195: queue.add(job);
196: if (worker == null) {
197: worker = new ReplicatorWorker();
198: worker.start();
199: }
200: }
201: }
202:
203: /**
204: * Add mkcol job to the queue
205: */
206: public void mkcol(String uri, NodeRevisionDescriptor descriptor) {
207: MkcolJob job = new MkcolJob(uri, descriptor);
208: synchronized (queue) {
209: queue.add(job);
210: if (worker == null) {
211: worker = new ReplicatorWorker();
212: worker.start();
213: }
214: }
215: }
216:
217: /**
218: * Add delete job to the queue
219: */
220: public void delete(String uri) {
221: DeleteJob job = new DeleteJob(uri);
222: synchronized (queue) {
223: queue.add(job);
224: if (worker == null) {
225: worker = new ReplicatorWorker();
226: worker.start();
227: }
228: }
229: }
230:
231: //----------------------------------< job execution methods >-------------------//
232: /**
233: * precond: uri is already relative to base uri
234: */
235: public void doPut(PutJob job) {
236: job.addTry();
237: String uri = job.getUri();
238: InputStream content = job.getContentStream();
239:
240: if (!uri.startsWith(baseUri)) {
241: if (uri.startsWith("/"))
242: uri = uri.substring("/".length());
243: uri = baseUri + uri;
244: }
245:
246: // content InputStream is null when only properties have changed
247: if (replicateContent && content != null) {
248: if (createParents) {
249: mkparents(uri.substring(0, uri.lastIndexOf('/')));
250: }
251: final PutMethod put = new PutMethod(uri);
252: try {
253: put.setRequestBody(content);
254: int status = client.executeMethod(put);
255: if (getLogger().isDebugEnabled()) {
256: getLogger().debug(
257: "PUT executed. Server responded: (uri="
258: + uri
259: + ") "
260: + status
261: + " ("
262: + URIUtil.decode(put
263: .getStatusText()) + ")");
264: }
265: // 201 => created, 204 => overwrite
266: if (status != 201 && status != 204) {
267: getLogger().error(
268: "Error executing PUT. Server responded: (uri="
269: + uri
270: + ") "
271: + status
272: + " ("
273: + URIUtil.decode(put
274: .getStatusText()) + ")");
275: } else {
276: status = setProperties(uri, job.getProperties(),
277: job.getRemovedProperties());
278: if (status == 207) {
279: job.setDone(true);
280: }
281: }
282: } catch (IOException e) {
283: getLogger().error("Error putting resource: " + uri, e);
284: } finally {
285: put.releaseConnection();
286: }
287: }
288: }
289:
290: public void doMkcol(MkcolJob job) {
291: job.addTry();
292: String uri = job.getUri();
293:
294: if (!uri.startsWith(baseUri)) {
295: if (uri.startsWith("/"))
296: uri = uri.substring("/".length());
297: uri = baseUri + uri;
298: }
299:
300: if (replicateContent) {
301: if (createParents) {
302: mkparents(uri.substring(0, uri.lastIndexOf('/')));
303: }
304: final MkcolMethod mkcol = new MkcolMethod(uri);
305: try {
306: int status = client.executeMethod(mkcol);
307: if (getLogger().isDebugEnabled()) {
308: getLogger().debug(
309: "MKCOL executed. Server responded: (uri="
310: + uri
311: + ") "
312: + status
313: + " ("
314: + URIUtil.decode(mkcol
315: .getStatusText()) + ")");
316: }
317: // 201 => created, 405 => folder already exists
318: if (status != 201 && status != 405) {
319: getLogger().error(
320: "Error executing MKCOL. Server responded: (uri="
321: + uri
322: + ") "
323: + status
324: + " ("
325: + URIUtil.decode(mkcol
326: .getStatusText()) + ")");
327: } else {
328: status = setProperties(uri, job.getProperties(),
329: job.getRemovedProperties());
330: if (status == 207) {
331: job.setDone(true);
332: }
333: }
334: } catch (IOException e) {
335: getLogger().error("Error executing mkcol: " + uri, e);
336: } finally {
337: mkcol.releaseConnection();
338: }
339: }
340:
341: }
342:
343: public void doDelete(DeleteJob job) {
344: job.addTry();
345: String uri = job.getUri();
346:
347: if (!uri.startsWith(baseUri)) {
348: if (uri.startsWith("/"))
349: uri = uri.substring("/".length());
350: uri = baseUri + uri;
351: }
352:
353: if (replicateContent && ensureNoChildren(uri)) {
354: final DeleteMethod delete = new DeleteMethod(uri);
355: try {
356: int status = client.executeMethod(delete);
357: if (getLogger().isDebugEnabled()) {
358: getLogger().debug(
359: "DELETE executed. Server responded: (uri="
360: + uri
361: + ") "
362: + status
363: + " ("
364: + URIUtil.decode(delete
365: .getStatusText()) + ") ");
366: }
367: // 204 => deleted, 404 => not found
368: if (status != 204 && status != 404) {
369: getLogger().error(
370: "Error executing DELETE. Server responded: (uri="
371: + uri
372: + ") "
373: + status
374: + " ("
375: + URIUtil.decode(delete
376: .getStatusText()) + ")");
377: } else {
378: job.setDone(true);
379: }
380: } catch (IOException e) {
381: getLogger().error("Error executing delete: " + uri, e);
382: } finally {
383: delete.releaseConnection();
384: }
385: } else {
386: if (replicateContent && getLogger().isWarnEnabled()) {
387: getLogger().warn(
388: "Tried to delete non-empty collection! " + uri);
389: }
390: }
391: }
392:
393: //-------------------------------< webdav helper methods >-------------------//
394:
395: private void mkparents(String uri) {
396: /* if the current uri is a collection, we're done */
397: if (isCollection(uri)) {
398: return;
399: }
400:
401: /* check if the parent of the uri exists */
402: String parentUri = uri.substring(0, uri.lastIndexOf('/'));
403:
404: /* don't try to create baseUri, compensate for trailing slash */
405: if (parentUri.length() < (baseUri.length() - 1)) {
406: getLogger().error(
407: "Replication baseUri doesn't exists : " + baseUri);
408: return;
409: }
410:
411: /* check parent and create if necessary */
412: mkparents(parentUri);
413:
414: /* parent uri is collection. try to create collection */
415: final MkcolMethod mkcol = new MkcolMethod(uri);
416:
417: try {
418: int status = client.executeMethod(mkcol);
419: if (getLogger().isDebugEnabled()) {
420: getLogger().debug(
421: "MKCOL executed for parent. Server responded: (uri="
422: + uri + ") " + status + " ("
423: + URIUtil.decode(mkcol.getStatusText())
424: + ")");
425: }
426: } catch (IOException e) {
427: getLogger().error("Error executing mkcol: " + uri, e);
428: } finally {
429: mkcol.releaseConnection();
430: }
431: }
432:
433: private boolean isCollection(String uri) {
434: boolean iscollection = false;
435: boolean isresourcetype = false;
436:
437: // some webdav servers require a slash at the end for collections
438: if (!uri.endsWith("/")) {
439: uri = uri + "/";
440: }
441:
442: final PropFindMethod propfind = new PropFindMethod(uri, 0,
443: PropFindMethod.BY_NAME);
444:
445: Vector props = new Vector(1);
446: props.add(RESOURCETYPE);
447: propfind.setPropertyNames(props.elements());
448: propfind.setFollowRedirects(true);
449:
450: try {
451: final int status = client.executeMethod(propfind);
452:
453: if (getLogger().isDebugEnabled()) {
454: getLogger().debug(
455: "PROPFIND executed. Server responded: (uri="
456: + uri
457: + ") "
458: + status
459: + " ("
460: + URIUtil.decode(propfind
461: .getStatusText()) + ") ");
462: }
463: if (status == 207) {
464: Enumeration responseURLEnum = propfind
465: .getAllResponseURLs();
466: while (responseURLEnum.hasMoreElements()) {
467: String href = (String) responseURLEnum
468: .nextElement();
469: Enumeration propertyEnum = propfind
470: .getResponseProperties(href);
471: while (propertyEnum.hasMoreElements()) {
472: Property property = (Property) propertyEnum
473: .nextElement();
474: if (property instanceof ResourceTypeProperty) {
475: isresourcetype = true;
476: iscollection = ((ResourceTypeProperty) property)
477: .isCollection();
478: }
479: }
480: }
481: if (!isresourcetype) {
482: getLogger().warn(
483: "PROPFFIND did not return resourcetype");
484: }
485: }
486: } catch (Exception e) {
487: getLogger().warn("Error checking if collection: " + uri, e);
488: } finally {
489: propfind.releaseConnection();
490: }
491: if (getLogger().isDebugEnabled()) {
492: getLogger().debug(
493: "IsCollection = " + iscollection + " for uri: "
494: + uri);
495: }
496: return iscollection;
497: }
498:
499: private boolean ensureNoChildren(String uri) {
500: Vector names = new Vector(1);
501: names.add(new PropertyName("DAV", "displayname"));
502:
503: final PropFindMethod propfind = new PropFindMethod(uri);
504: propfind.setDepth(1);
505: propfind.setPropertyNames(names.elements());
506:
507: try {
508: final int status = client.executeMethod(propfind);
509: // not found => no childeren
510: if (status == 404) {
511: return true;
512: }
513: if (status >= 400) {
514: return false;
515: }
516: Enumeration urls = propfind.getAllResponseURLs();
517: int counter = 0;
518: while (urls.hasMoreElements()) {
519: urls.nextElement();
520: counter++;
521: }
522: return counter == 1; // propfind also finds the uri it acts on
523: } catch (Exception e) {
524: if (getLogger().isWarnEnabled()) {
525: getLogger().warn("Error checking for children: " + uri,
526: e);
527: }
528: return false;
529: } finally {
530: propfind.releaseConnection();
531: }
532: }
533:
534: private String wrapCDATA(String nonxml) {
535: StringBuffer buf = new StringBuffer();
536: buf.append("<![CDATA[").append(nonxml).append("]]>");
537: return buf.toString();
538: }
539:
540: private int setProperties(String uri, Enumeration properties,
541: Enumeration removedproperties) {
542: final PropPatchMethod proppatch = new PropPatchMethod(uri);
543: boolean doPropPatch = false;
544: int status = 0;
545:
546: while (properties.hasMoreElements()) {
547: NodeProperty prop = (NodeProperty) properties.nextElement();
548:
549: // only patch DAV:displayname and all non-DAV properties (and only those which are meant to be replicated)
550: if ((propertiesToReplicate == null || propertiesToReplicate
551: .contains(prop))
552: && (!"DAV:".equals(prop.getNamespace()) || "displayname"
553: .equals(prop.getName()))) {
554: proppatch.addPropertyToSet(prop.getName(),
555: wrapCDATA(prop.getValue().toString()),
556: "property", prop.getNamespace());
557:
558: doPropPatch = true;
559:
560: if (getLogger().isDebugEnabled()) {
561: getLogger().debug(
562: "PROPPATCH: setting " + prop.getNamespace()
563: + ":" + prop.getName() + " to "
564: + prop.getValue());
565: }
566: } else {
567: if (getLogger().isDebugEnabled()) {
568: getLogger().debug(
569: "PROPPATCH: NOT setting "
570: + prop.getNamespace() + ":"
571: + prop.getName() + " to "
572: + prop.getValue());
573: }
574: }
575: }
576:
577: while (removedproperties.hasMoreElements()) {
578: NodeProperty prop = (NodeProperty) removedproperties
579: .nextElement();
580:
581: // only remove non-DAV properties (and only those which are meant to be replicated)
582: if ((propertiesToReplicate == null || propertiesToReplicate
583: .contains(prop))
584: && !"DAV:".equals(prop.getNamespace())) {
585: proppatch.addPropertyToRemove(prop.getName(),
586: "property", prop.getNamespace());
587:
588: doPropPatch = true;
589:
590: if (getLogger().isDebugEnabled()) {
591: getLogger().debug(
592: "PROPPATCH: removing "
593: + prop.getNamespace() + ":"
594: + prop.getName());
595: }
596: } else {
597: if (getLogger().isDebugEnabled()) {
598: getLogger().debug(
599: "PROPPATCH: NOT removing "
600: + prop.getNamespace() + ":"
601: + prop.getName());
602: }
603: }
604: }
605:
606: if (doPropPatch) {
607: try {
608: status = client.executeMethod(proppatch);
609: if (getLogger().isDebugEnabled()) {
610: getLogger().debug(
611: "PROPPATCH executed. Server responded: (uri="
612: + uri
613: + ") "
614: + status
615: + " ("
616: + URIUtil.decode(proppatch
617: .getStatusText()) + ") ");
618: }
619: if (status != 207) {
620: getLogger().error(
621: "Error setting properties, got status ["
622: + status + "] for uri: " + uri);
623: }
624: } catch (IOException e) {
625: getLogger()
626: .error("Error setting properties: " + uri, e);
627: } finally {
628: proppatch.releaseConnection();
629: }
630: } else {
631: if (getLogger().isDebugEnabled()) {
632: getLogger().debug(
633: "Did not send proppatch, no props to patch!");
634: }
635: }
636: return status;
637: }
638:
639: //-----------------------------------< Worker & worker jobs >------------------------//
640:
641: /**
642: * Worker class for background replication
643: */
644: public class ReplicatorWorker extends Thread {
645:
646: private boolean jobsLeft = true;
647:
648: public void run() {
649: if (getLogger().isDebugEnabled()) {
650: getLogger()
651: .debug(
652: "Replicator thread " + getName()
653: + " starting.");
654: }
655:
656: // the main loop that fetches the jobs and executes them
657: do {
658: Job queuedJob = null;
659: synchronized (queue) {
660: if (queue.isEmpty()) {
661: jobsLeft = false;
662: worker = null;
663: if (getLogger().isDebugEnabled()) {
664: getLogger().debug(
665: "Replicator thread " + getName()
666: + " nothing left todo.");
667: }
668: // the queue is done, bail out
669: return;
670: } else {
671: // get the next job from the queue
672: queuedJob = (Job) queue.removeFirst();
673: // execute the job
674: if (queuedJob instanceof PutJob) {
675: if (getLogger().isDebugEnabled()) {
676: getLogger()
677: .debug(
678: "Replicator thread "
679: + getName()
680: + " executing PutJob : "
681: + queuedJob
682: .getUri());
683: }
684: doPut((PutJob) queuedJob);
685: } else if (queuedJob instanceof MkcolJob) {
686: if (getLogger().isDebugEnabled()) {
687: getLogger()
688: .debug(
689: "Replicator thread "
690: + getName()
691: + " executing MkcolJob : "
692: + queuedJob
693: .getUri());
694: }
695: doMkcol((MkcolJob) queuedJob);
696: } else if (queuedJob instanceof DeleteJob) {
697: if (getLogger().isDebugEnabled()) {
698: getLogger()
699: .debug(
700: "Replicator thread "
701: + getName()
702: + " executing DeleteJob : "
703: + queuedJob
704: .getUri());
705: }
706: doDelete((DeleteJob) queuedJob);
707: } else {
708: getLogger().warn(
709: "Replicator thread "
710: + getName()
711: + " unknow job: "
712: + queuedJob.getClass()
713: .getName());
714: }
715:
716: if (!queuedJob.isDone()) {
717: if (retryEnabled
718: && queuedJob.getTries() < retryCount) {
719: queue.add(queuedJob);
720: if (getLogger().isDebugEnabled()) {
721: getLogger()
722: .debug(
723: "Replicator thread "
724: + getName()
725: + " queuing job for retry ["
726: + queuedJob
727: .getTries()
728: + "] : "
729: + queuedJob
730: .getClass()
731: .getName());
732: }
733: } else {
734: getLogger().warn(
735: "Replicator thread "
736: + getName()
737: + " job failed: "
738: + queuedJob.getClass()
739: .getName());
740: }
741: } else {
742: // Job is done, cleanup
743: if (queuedJob instanceof PutJob) {
744: ((PutJob) queuedJob).cleanupTempFile();
745: }
746: }
747: }
748: }
749: } while (jobsLeft);
750: }
751: }
752:
753: /**
754: * Replication job
755: */
756: public abstract class Job {
757: private int tries = 0;
758: private boolean done = false;
759: private String uri;
760: NodeRevisionDescriptor descriptor;
761:
762: public String getUri() {
763: return uri;
764: }
765:
766: protected void setUri(String uri) {
767: this .uri = uri;
768: }
769:
770: public Enumeration getProperties() {
771: return descriptor.enumerateProperties();
772: }
773:
774: protected void setDescriptor(NodeRevisionDescriptor descriptor) {
775: this .descriptor = descriptor;
776: }
777:
778: public Enumeration getRemovedProperties() {
779: return descriptor.enumerateRemovedProperties();
780: }
781:
782: public void addTry() {
783: this .tries++;
784: }
785:
786: public int getTries() {
787: return this .tries;
788: }
789:
790: public void setDone(boolean done) {
791: this .done = done;
792: }
793:
794: public boolean isDone() {
795: return this .done;
796: }
797: }
798:
799: /**
800: * Put Job, uses temporary files for storing the content before replicating
801: */
802: private class PutJob extends Job {
803: File tempFile;
804:
805: public PutJob(String uri, NodeRevisionDescriptor descriptor,
806: NodeRevisionContent content) {
807: setContent(content);
808: setUri(uri);
809: setDescriptor(descriptor);
810: }
811:
812: /**
813: * Get the input stream for the content of the temporary file
814: * @return the inputstream
815: */
816: public InputStream getContentStream() {
817: if (tempFile == null) {
818: // contentStream is null when only properties have changed
819: if (getLogger().isDebugEnabled()) {
820: getLogger()
821: .debug(
822: "Only replicating properties for "
823: + getUri()
824: + ": (tempfile is null)");
825: }
826: return null;
827: }
828: if (!tempFile.exists()) {
829: getLogger().error(
830: "Unable to replicate " + getUri()
831: + ": Tempfile was deleted?");
832: return null;
833: }
834: try {
835: FileInputStream is = new FileInputStream(tempFile);
836: return new BufferedInputStream(is);
837: } catch (FileNotFoundException e) {
838: throw new IllegalStateException("Unable to replicate "
839: + getUri() + ": " + e.getMessage());
840: }
841: }
842:
843: /**
844: * Store the contentstream in a temporary file
845: * @param contentStream
846: */
847: private void setContent(NodeRevisionContent content) {
848: if (content == null) {
849: // content is null when only properties have changed
850: return;
851: }
852:
853: InputStream contentStream = new ByteArrayInputStream(
854: content.getContentBytes());
855: try {
856: tempFile = File.createTempFile("repl", ".data");
857: } catch (IOException e) {
858: getLogger()
859: .error(
860: "Unablte to create tempfile: "
861: + e.getMessage());
862: return;
863: }
864:
865: try {
866: FileOutputStream fos = new FileOutputStream(tempFile);
867: try {
868: BufferedOutputStream bos = new BufferedOutputStream(
869: fos);
870: try {
871: byte[] buffer = new byte[16384];
872: int bytesRead;
873: while ((bytesRead = contentStream.read(buffer)) >= 0) {
874: fos.write(buffer, 0, bytesRead);
875: }
876: } finally {
877: bos.close();
878: }
879: } finally {
880: fos.close();
881: }
882: } catch (IOException e) {
883:
884: tempFile.delete();
885: getLogger().error(
886: "Unablte to write to tempfile: "
887: + e.getMessage());
888: return;
889: }
890: }
891:
892: /**
893: * Cleanup the temporary file if it still exists
894: *
895: */
896: public void cleanupTempFile() {
897: if (tempFile != null && tempFile.exists()) {
898: tempFile.delete();
899: }
900: }
901: }
902:
903: /**
904: * Delete replication job
905: */
906: private class DeleteJob extends Job {
907: public DeleteJob(String uri) {
908: setUri(uri);
909: }
910: }
911:
912: /**
913: * Create collection job
914: */
915: private class MkcolJob extends Job {
916: public MkcolJob(String uri, NodeRevisionDescriptor descriptor) {
917: setUri(uri);
918: setDescriptor(descriptor);
919: }
920: }
921: }
|