001: /*
002: * Helma License Notice
003: *
004: * The contents of this file are subject to the Helma License
005: * Version 2.0 (the "License"). You may not use this file except in
006: * compliance with the License. A copy of the License is available at
007: * http://adele.helma.org/download/helma/license.txt
008: *
009: * Copyright 1998-2003 Helma Software. All Rights Reserved.
010: *
011: * $RCSfile$
012: * $Author: hannes $
013: * $Revision: 8644 $
014: * $Date: 2007-11-20 14:02:31 +0100 (Die, 20 Nov 2007) $
015: */
016:
017: package helma.objectmodel.db;
018:
019: import helma.objectmodel.DatabaseException;
020: import helma.objectmodel.ITransaction;
021:
022: import java.sql.Connection;
023: import java.sql.Statement;
024: import java.sql.SQLException;
025: import java.util.*;
026:
027: /**
028: * A subclass of thread that keeps track of changed nodes and triggers
029: * changes in the database when a transaction is commited.
030: */
031: public class Transactor extends Thread {
032:
033: // The associated node manager
034: NodeManager nmgr;
035:
036: // List of nodes to be updated
037: private HashMap dirtyNodes;
038:
039: // List of visited clean nodes
040: private HashMap cleanNodes;
041:
042: // List of nodes whose child index has been modified
043: private HashSet parentNodes;
044:
045: // Is a transaction in progress?
046: private volatile boolean active;
047: private volatile boolean killed;
048:
049: // Transaction for the embedded database
050: protected ITransaction txn;
051:
052: // Transactions for SQL data sources
053: private HashMap sqlConnections;
054:
055: // Set of SQL connections that already have been verified
056: private HashSet testedConnections;
057:
058: // when did the current transaction start?
059: private long tstart;
060:
061: // a name to log the transaction. For HTTP transactions this is the rerquest path
062: private String tname;
063:
064: /**
065: * Creates a new Transactor object.
066: *
067: * @param runnable ...
068: * @param group ...
069: * @param nmgr ...
070: */
071: public Transactor(Runnable runnable, ThreadGroup group,
072: NodeManager nmgr) {
073: super (group, runnable, group.getName());
074: this .nmgr = nmgr;
075:
076: dirtyNodes = new HashMap();
077: cleanNodes = new HashMap();
078: parentNodes = new HashSet();
079:
080: sqlConnections = new HashMap();
081: testedConnections = new HashSet();
082: active = false;
083: killed = false;
084: }
085:
086: /**
087: * Mark a Node as modified/created/deleted during this transaction
088: *
089: * @param node ...
090: */
091: public void visitDirtyNode(Node node) {
092: if (node != null) {
093: Key key = node.getKey();
094:
095: if (!dirtyNodes.containsKey(key)) {
096: dirtyNodes.put(key, node);
097: }
098: }
099: }
100:
101: /**
102: * Unmark a Node that has previously been marked as modified during the transaction
103: *
104: * @param node ...
105: */
106: public void dropDirtyNode(Node node) {
107: if (node != null) {
108: Key key = node.getKey();
109:
110: dirtyNodes.remove(key);
111: }
112: }
113:
114: /**
115: * Get a dirty Node from this transaction.
116: * @param key the key
117: * @return the dirty node associated with the key, or null
118: */
119: public Node getDirtyNode(Key key) {
120: return (Node) dirtyNodes.get(key);
121: }
122:
123: /**
124: * Keep a reference to an unmodified Node local to this transaction
125: *
126: * @param node the node to register
127: */
128: public void visitCleanNode(Node node) {
129: if (node != null) {
130: Key key = node.getKey();
131:
132: if (!cleanNodes.containsKey(key)) {
133: cleanNodes.put(key, node);
134: }
135: }
136: }
137:
138: /**
139: * Keep a reference to an unmodified Node local to this transaction
140: *
141: * @param key the key to register with
142: * @param node the node to register
143: */
144: public void visitCleanNode(Key key, Node node) {
145: if (node != null) {
146: if (!cleanNodes.containsKey(key)) {
147: cleanNodes.put(key, node);
148: }
149: }
150: }
151:
152: /**
153: * Drop a reference to an unmodified Node previously registered with visitCleanNode().
154: * @param key the key
155: */
156: public void dropCleanNode(Key key) {
157: cleanNodes.remove(key);
158: }
159:
160: /**
161: * Get a reference to an unmodified Node local to this transaction
162: *
163: * @param key ...
164: *
165: * @return ...
166: */
167: public Node getCleanNode(Object key) {
168: return (key == null) ? null : (Node) cleanNodes.get(key);
169: }
170:
171: /**
172: *
173: *
174: * @param node ...
175: */
176: public void visitParentNode(Node node) {
177: parentNodes.add(node);
178: }
179:
180: /**
181: * Returns true if a transaction is currently active.
182: * @return true if currently a transaction is active
183: */
184: public boolean isActive() {
185: return active;
186: }
187:
188: /**
189: * Register a db connection with this transactor thread.
190: * @param src the db source
191: * @param con the connection
192: */
193: public void registerConnection(DbSource src, Connection con) {
194: sqlConnections.put(src, con);
195: // we assume a freshly created connection is ok.
196: testedConnections.add(src);
197: }
198:
199: /**
200: * Get a db connection that was previously registered with this transactor thread.
201: * @param src the db source
202: * @return the connection
203: */
204: public Connection getConnection(DbSource src) {
205: Connection con = (Connection) sqlConnections.get(src);
206: if (con != null && !testedConnections.contains(src)) {
207: // Check if the connection is still alive by executing a simple statement.
208: try {
209: Statement stmt = con.createStatement();
210: stmt.execute("SELECT 1");
211: stmt.close();
212: testedConnections.add(src);
213: } catch (SQLException sx) {
214: try {
215: con.close();
216: } catch (SQLException ignore) {/* nothing to do */
217: }
218: return null;
219: }
220: }
221: return con;
222: }
223:
224: /**
225: * Start a new transaction with the given name.
226: *
227: * @param name The name of the transaction. This is usually the request
228: * path for the underlying HTTP request.
229: *
230: * @throws Exception ...
231: */
232: public synchronized void begin(String name) throws Exception {
233: if (killed) {
234: throw new DatabaseException(
235: "Transaction started on killed thread");
236: } else if (active) {
237: abort();
238: }
239:
240: dirtyNodes.clear();
241: cleanNodes.clear();
242: parentNodes.clear();
243: testedConnections.clear();
244: txn = nmgr.db.beginTransaction();
245: active = true;
246: tstart = System.currentTimeMillis();
247: tname = name;
248: }
249:
250: /**
251: * Commit the current transaction, persisting all changes to DB.
252: *
253: * @throws Exception ...
254: */
255: public synchronized void commit() throws Exception {
256: if (killed) {
257: throw new DatabaseException(
258: "commit() called on killed transactor thread");
259: } else if (!active) {
260: return;
261: }
262: int inserted = 0;
263: int updated = 0;
264: int deleted = 0;
265:
266: ArrayList insertedNodes = null;
267: ArrayList updatedNodes = null;
268: ArrayList deletedNodes = null;
269: ArrayList modifiedParentNodes = null;
270: // if nodemanager has listeners collect dirty nodes
271: boolean hasListeners = nmgr.hasNodeChangeListeners();
272:
273: if (hasListeners) {
274: insertedNodes = new ArrayList();
275: updatedNodes = new ArrayList();
276: deletedNodes = new ArrayList();
277: modifiedParentNodes = new ArrayList();
278: }
279:
280: if (!dirtyNodes.isEmpty()) {
281: Object[] dirty = dirtyNodes.values().toArray();
282:
283: // the set to collect DbMappings to be marked as changed
284: HashSet dirtyDbMappings = new HashSet();
285:
286: for (int i = 0; i < dirty.length; i++) {
287: Node node = (Node) dirty[i];
288:
289: // update nodes in db
290: int nstate = node.getState();
291:
292: if (nstate == Node.NEW) {
293: nmgr.insertNode(nmgr.db, txn, node);
294: dirtyDbMappings.add(node.getDbMapping());
295: node.setState(Node.CLEAN);
296:
297: // register node with nodemanager cache
298: nmgr.registerNode(node);
299:
300: if (hasListeners) {
301: insertedNodes.add(node);
302: }
303:
304: inserted++;
305: nmgr.app.logEvent("inserted: Node "
306: + node.getPrototype() + "/" + node.getID());
307: } else if (nstate == Node.MODIFIED) {
308: // only mark DbMapping as dirty if updateNode returns true
309: if (nmgr.updateNode(nmgr.db, txn, node)) {
310: dirtyDbMappings.add(node.getDbMapping());
311: }
312: node.setState(Node.CLEAN);
313:
314: // update node with nodemanager cache
315: nmgr.registerNode(node);
316:
317: if (hasListeners) {
318: updatedNodes.add(node);
319: }
320:
321: updated++;
322: nmgr.app.logEvent("updated: Node "
323: + node.getPrototype() + "/" + node.getID());
324: } else if (nstate == Node.DELETED) {
325: nmgr.deleteNode(nmgr.db, txn, node);
326: dirtyDbMappings.add(node.getDbMapping());
327:
328: // remove node from nodemanager cache
329: nmgr.evictNode(node);
330:
331: if (hasListeners) {
332: deletedNodes.add(node);
333: }
334:
335: deleted++;
336: }
337:
338: node.clearWriteLock();
339: }
340:
341: // set last data change times in db-mappings
342: // long now = System.currentTimeMillis();
343: for (Iterator i = dirtyDbMappings.iterator(); i.hasNext();) {
344: DbMapping dbm = (DbMapping) i.next();
345: if (dbm != null) {
346: dbm.setLastDataChange();
347: }
348: }
349: }
350:
351: long now = System.currentTimeMillis();
352:
353: if (!parentNodes.isEmpty()) {
354: // set last subnode change times in parent nodes
355: for (Iterator i = parentNodes.iterator(); i.hasNext();) {
356: Node node = (Node) i.next();
357: node.markSubnodesChanged();
358: if (hasListeners) {
359: modifiedParentNodes.add(node);
360: }
361: }
362: }
363:
364: if (hasListeners) {
365: nmgr.fireNodeChangeEvent(insertedNodes, updatedNodes,
366: deletedNodes, modifiedParentNodes);
367: }
368:
369: // clear the node collections
370: recycle();
371:
372: if (active) {
373: active = false;
374: nmgr.db.commitTransaction(txn);
375: txn = null;
376: }
377:
378: nmgr.app.logAccess(tname + " " + inserted + " inserted, "
379: + updated + " updated, " + deleted + " deleted in "
380: + (now - tstart) + " millis");
381:
382: // unset transaction name
383: tname = null;
384: }
385:
386: /**
387: * Abort the current transaction, rolling back all changes made.
388: */
389: public synchronized void abort() {
390: Object[] dirty = dirtyNodes.values().toArray();
391:
392: // evict dirty nodes from cache
393: for (int i = 0; i < dirty.length; i++) {
394: Node node = (Node) dirty[i];
395:
396: // Declare node as invalid, so it won't be used by other threads
397: // that want to write on it and remove it from cache
398: nmgr.evictNode(node);
399: node.clearWriteLock();
400: }
401:
402: long now = System.currentTimeMillis();
403:
404: // set last subnode change times in parent nodes
405: for (Iterator i = parentNodes.iterator(); i.hasNext();) {
406: Node node = (Node) i.next();
407: node.markSubnodesChanged();
408: }
409:
410: // clear the node collections
411: recycle();
412: // close any JDBC connections associated with this transactor thread
413: closeConnections();
414:
415: if (active) {
416: active = false;
417:
418: if (txn != null) {
419: nmgr.db.abortTransaction(txn);
420: txn = null;
421: }
422:
423: nmgr.app
424: .logAccess(tname + " aborted after "
425: + (System.currentTimeMillis() - tstart)
426: + " millis");
427: }
428:
429: // unset transaction name
430: tname = null;
431: }
432:
433: /**
434: * Kill this transaction thread. Used as last measure only.
435: */
436: public synchronized void kill() {
437: killed = true;
438: interrupt();
439:
440: // Interrupt the thread if it has not noticed the flag (e.g. because it is busy
441: // reading from a network socket).
442: if (isAlive()) {
443: interrupt();
444: try {
445: join(1000);
446: } catch (InterruptedException ir) {
447: // interrupted by other thread
448: }
449: }
450:
451: if (isAlive()
452: && "true".equals(nmgr.app
453: .getProperty("requestTimeoutStop"))) {
454: // still running - check if we ought to stop() it
455: try {
456: Thread.sleep(2000);
457: if (isAlive()) {
458: // thread is still running, pull emergency break
459: nmgr.app.logEvent("Stopping Thread for Transactor "
460: + this );
461: stop();
462: }
463: } catch (InterruptedException ir) {
464: // interrupted by other thread
465: }
466: }
467: }
468:
469: /**
470: * Closes all open JDBC connections
471: */
472: public void closeConnections() {
473: if (sqlConnections != null) {
474: for (Iterator i = sqlConnections.values().iterator(); i
475: .hasNext();) {
476: try {
477: Connection con = (Connection) i.next();
478:
479: con.close();
480: nmgr.app.logEvent("Closing DB connection: " + con);
481: } catch (Exception ignore) {
482: // exception closing db connection, ignore
483: }
484: }
485:
486: sqlConnections.clear();
487: }
488: }
489:
490: /**
491: * Clear collections and throw them away. They may have grown large,
492: * so the benefit of keeping them (less GC) needs to be weighted against
493: * the potential increas in memory usage.
494: */
495: private synchronized void recycle() {
496: // clear the node collections to ease garbage collection
497: dirtyNodes.clear();
498: cleanNodes.clear();
499: parentNodes.clear();
500: testedConnections.clear();
501: }
502:
503: /**
504: * Return the name of the current transaction. This is usually the request
505: * path for the underlying HTTP request.
506: */
507: public String getTransactionName() {
508: return tname;
509: }
510:
511: /**
512: * Return a string representation of this Transactor thread
513: *
514: * @return ...
515: */
516: public String toString() {
517: return "Transactor[" + tname + "]";
518: }
519: }
|