001: /*
002: * Helma License Notice
003: *
004: * The contents of this file are subject to the Helma License
005: * Version 2.0 (the "License"). You may not use this file except in
006: * compliance with the License. A copy of the License is available at
007: * http://adele.helma.org/download/helma/license.txt
008: *
009: * Copyright 1998-2003 Helma Software. All Rights Reserved.
010: *
011: * $RCSfile$
012: * $Author: root $
013: * $Revision: 8604 $
014: * $Date: 2007-09-28 15:16:38 +0200 (Fre, 28 Sep 2007) $
015: */
016:
017: package helma.objectmodel.db;
018:
019: import java.rmi.Naming;
020: import java.util.Vector;
021: import java.util.List;
022:
023: /**
024: * This class replicates the updates of transactions to other applications via RMI
025: */
026: public class Replicator implements Runnable, NodeChangeListener {
027: Vector urls;
028: Vector add;
029: Vector delete;
030: Vector currentAdd;
031: Vector currentDelete;
032: Thread runner;
033: NodeManager nmgr;
034:
035: /**
036: * Creates a new Replicator object.
037: *
038: * @param nmgr ...
039: */
040: public Replicator(NodeManager nmgr) {
041: urls = new Vector();
042: add = new Vector();
043: delete = new Vector();
044: this .nmgr = nmgr;
045: runner = new Thread(this );
046: runner.start();
047: }
048:
049: /**
050: *
051: *
052: * @param url ...
053: */
054: public void addUrl(String url) {
055: urls.addElement(url);
056:
057: if (nmgr.logReplication) {
058: nmgr.app.logEvent("Adding replication listener: " + url);
059: }
060: }
061:
062: /**
063: *
064: */
065: public void run() {
066: while (Thread.currentThread() == runner) {
067: if (prepareReplication()) {
068: for (int i = 0; i < urls.size(); i++) {
069: try {
070: String url = (String) urls.elementAt(i);
071: IReplicationListener listener = (IReplicationListener) Naming
072: .lookup(url);
073:
074: if (listener == null) {
075: throw new NullPointerException(
076: "Replication listener not bound for URL "
077: + url);
078: }
079:
080: listener.replicateCache(currentAdd,
081: currentDelete);
082:
083: if (nmgr.logReplication) {
084: nmgr.app
085: .logEvent("Sent cache replication event: "
086: + currentAdd.size()
087: + " added, "
088: + currentDelete.size()
089: + " deleted");
090: }
091: } catch (Exception x) {
092: nmgr.app
093: .logEvent("Error sending cache replication event: "
094: + x);
095: if (nmgr.app.debug()) {
096: x.printStackTrace();
097: }
098: }
099: }
100: }
101:
102: try {
103: if (runner != null) {
104: Thread.sleep(1000L);
105: }
106: } catch (InterruptedException ir) {
107: runner = null;
108: }
109: }
110: }
111:
112: /**
113: * Called when a transaction is committed that has created, modified,
114: * deleted or changed the child collection one or more nodes.
115: */
116: public synchronized void nodesChanged(List inserted, List updated,
117: List deleted, List parents) {
118: add.addAll(inserted);
119: add.addAll(updated);
120: delete.addAll(deleted);
121: }
122:
123: private synchronized boolean prepareReplication() {
124: if ((add.size() == 0) && (delete.size() == 0)) {
125: return false;
126: }
127:
128: currentAdd = add;
129: currentDelete = delete;
130: add = new Vector();
131: delete = new Vector();
132:
133: return true;
134: }
135: }
|