001: /*
002: * Copyright 2004-2006 the original author or authors.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.compass.gps.device.hibernate.embedded;
018:
019: import java.util.Iterator;
020: import java.util.Map;
021: import java.util.Properties;
022: import java.util.WeakHashMap;
023: import java.util.concurrent.ConcurrentHashMap;
024: import javax.transaction.Status;
025: import javax.transaction.Synchronization;
026:
027: import org.apache.commons.logging.Log;
028: import org.apache.commons.logging.LogFactory;
029: import org.compass.core.Compass;
030: import org.compass.core.CompassException;
031: import org.compass.core.CompassSession;
032: import org.compass.core.CompassTransaction;
033: import org.compass.core.config.CompassConfiguration;
034: import org.compass.core.config.CompassConfigurationFactory;
035: import org.compass.core.config.CompassEnvironment;
036: import org.compass.core.config.CompassSettings;
037: import org.compass.core.mapping.CascadeMapping;
038: import org.compass.core.mapping.ResourceMapping;
039: import org.compass.core.spi.InternalCompass;
040: import org.compass.core.transaction.LocalTransactionFactory;
041: import org.compass.core.util.ClassUtils;
042: import org.compass.gps.device.hibernate.lifecycle.HibernateMirrorFilter;
043: import org.hibernate.Transaction;
044: import org.hibernate.cfg.Configuration;
045: import org.hibernate.event.EventSource;
046: import org.hibernate.event.Initializable;
047: import org.hibernate.event.PostDeleteEvent;
048: import org.hibernate.event.PostDeleteEventListener;
049: import org.hibernate.event.PostInsertEvent;
050: import org.hibernate.event.PostInsertEventListener;
051: import org.hibernate.event.PostUpdateEvent;
052: import org.hibernate.event.PostUpdateEventListener;
053: import org.hibernate.mapping.PersistentClass;
054:
055: /**
056: * An Hibernate event listener allowing to run Compass embedded within Hibernate. The embedded mode
057: * will allow to automatically (with minimal configuration) get Compass configured to mirror changes
058: * done through Hibernate to the search engine, as well as simply indexing the whole database content.
059: *
060: * <p>Configuration of this listener is simple:
061: * <pre>
062: * <hibernate-configuration>
063: * <session-factory>
064: *
065: * <event type="post-update">
066: * <listener class="org.compass.gps.device.hibernate.embedded.CompassEventListener"/>
067: * </event>
068: * <event type="post-insert">
069: * <listener class="org.compass.gps.device.hibernate.embedded.CompassEventListener"/>
070: * </event>
071: * <event type="post-delete">
072: * <listener class="org.compass.gps.device.hibernate.embedded.CompassEventListener"/>
073: * </event>
074: *
075: * </session-factory>
076: * </hibernate-configuration>
077: * </pre>
078: *
079: * <p>When using Hiberante annotations or entity manager Compass also contains Hibernate search event class
080: * so it will be automatically detected. In such a case, there is no need for the event listener configuration.
081: *
082: * <p>Once the above configuration is set, then Compass is "installed" within Hibernate. In order to enable
083: * Compass, the search engine connection url must be set using Hibernate properties configuration. For example:
084: * <code><property name="compass.engine.connection">testindex</property></code>.
085: *
086: * <p>Compass will automatically go over the mapped classes in Hibernate and will check if they have Compass
087: * mappings. If they do, they will be added to the searchable entities. If no such searchable classes are found,
088: * this listener will perform no operations.
089: *
090: * <p>Compass additional configuration can be set using typical Hiberante properties configuration using the
091: * <code>compass.</code> prefix. If using an external Compass configuration file is preferred, then the
092: * <code>compass.hibernate.config</code> can be configured and point to the location of a Compass configuration
093: * file.
094: *
095: * <p>Embedded Compass also allows to use {@link org.compass.gps.CompassGps#index()} in order to complely reindex
096: * the database. See {@link org.compass.gps.device.hibernate.embedded.HibernateHelper} for more information. In
097: * order to configure the Compass instance that will be used to index the database, the <code>gps.index.</code>
098: * can be used.
099: *
100: * <p>Transaction management is automatically bounded to Hibernate by using Compass local transaction. If other
101: * transaction strategies are used (such as JTA Sync or XA) then the Compass transaction will be bounded to them
102: * and not the Hibernate transaction.
103: *
104: * <p>A user defined {@link org.compass.gps.device.hibernate.lifecycle.HibernateMirrorFilter} can be used to
105: * filter out mirror operations. In order to configure one, the <code>compass.hibernate.mirrorFilter</code>
106: * can be used with the implementation class FQN.
107: *
108: * <p>In order to get the {@link Compass} instnace bounded to this Hibernate configuration, the
109: * {@link HibernateHelper} can be used. This is mainly used in order to perform search operations on the
110: * index and get a Compass Gps in order to reindex the database.
111: *
112: * @author kimchy
113: */
114: public class CompassEventListener implements PostDeleteEventListener,
115: PostInsertEventListener, PostUpdateEventListener, Initializable {
116:
117: private static Log log = LogFactory
118: .getLog(CompassEventListener.class);
119:
120: private static final String COMPASS_PREFIX = "compass";
121:
122: private static final String COMPASS_GPS_INDEX_PREFIX = "gps.index.";
123:
124: public static final String COMPASS_CONFIG_LOCATION = "compass.hibernate.config";
125:
126: public static final String COMPASS_MIRROR_FILTER = "compass.hibernate.mirrorFilter";
127:
128: private static ThreadLocal<WeakHashMap<Configuration, CompassHolder>> contexts = new ThreadLocal<WeakHashMap<Configuration, CompassHolder>>();
129:
130: private CompassHolder compassHolder;
131:
132: public void initialize(Configuration cfg) {
133: compassHolder = getCompassHolder(cfg);
134: }
135:
136: public Compass getCompass() {
137: return this .compassHolder.compass;
138: }
139:
140: public Properties getIndexSettings() {
141: return this .compassHolder.indexSettings;
142: }
143:
144: public void onPostDelete(PostDeleteEvent event) {
145: if (compassHolder == null) {
146: return;
147: }
148: Object entity = event.getEntity();
149: if (!hasMappingForEntity(entity.getClass(),
150: CascadeMapping.Cascade.DELETE)) {
151: return;
152: }
153: if (compassHolder.mirrorFilter != null) {
154: if (compassHolder.mirrorFilter.shouldFilterDelete(event)) {
155: return;
156: }
157: }
158: TransactionSyncHolder holder = getOrCreateHolder(event
159: .getSession());
160: if (log.isTraceEnabled()) {
161: log.trace("Deleting [" + entity + "]");
162: }
163: holder.session.delete(entity);
164: afterOperation(holder);
165: }
166:
167: public void onPostInsert(PostInsertEvent event) {
168: if (compassHolder == null) {
169: return;
170: }
171: Object entity = event.getEntity();
172: if (!hasMappingForEntity(entity.getClass(),
173: CascadeMapping.Cascade.CREATE)) {
174: return;
175: }
176: if (compassHolder.mirrorFilter != null) {
177: if (compassHolder.mirrorFilter.shouldFilterInsert(event)) {
178: return;
179: }
180: }
181: TransactionSyncHolder holder = getOrCreateHolder(event
182: .getSession());
183: if (log.isTraceEnabled()) {
184: log.trace("Creating [" + entity + "]");
185: }
186: holder.session.create(entity);
187: afterOperation(holder);
188: }
189:
190: public void onPostUpdate(PostUpdateEvent event) {
191: if (compassHolder == null) {
192: return;
193: }
194: Object entity = event.getEntity();
195: if (!hasMappingForEntity(entity.getClass(),
196: CascadeMapping.Cascade.SAVE)) {
197: return;
198: }
199: if (compassHolder.mirrorFilter != null) {
200: if (compassHolder.mirrorFilter.shouldFilterUpdate(event)) {
201: return;
202: }
203: }
204: TransactionSyncHolder holder = getOrCreateHolder(event
205: .getSession());
206: if (log.isTraceEnabled()) {
207: log.trace("Updating [" + entity + "]");
208: }
209: holder.session.save(entity);
210: afterOperation(holder);
211: }
212:
213: private TransactionSyncHolder getOrCreateHolder(EventSource session) {
214: if (session.isTransactionInProgress()) {
215: Transaction transaction = session.getTransaction();
216: TransactionSyncHolder holder = compassHolder.syncHolderPerTx
217: .get(transaction);
218: if (holder == null) {
219: holder = new TransactionSyncHolder();
220: holder.session = compassHolder.compass.openSession();
221: holder.tr = holder.session.beginTransaction();
222: holder.transacted = true;
223: transaction
224: .registerSynchronization(new CompassEmbeddedSyncronization(
225: holder, transaction));
226: compassHolder.syncHolderPerTx.put(transaction, holder);
227: }
228: return holder;
229: } else {
230: TransactionSyncHolder holder = new TransactionSyncHolder();
231: holder.session = compassHolder.compass.openSession();
232: holder.tr = holder.session.beginTransaction();
233: holder.transacted = false;
234: return holder;
235: }
236: }
237:
238: private void afterOperation(TransactionSyncHolder holder) {
239: if (holder.transacted) {
240: return;
241: }
242: holder.tr.commit();
243: holder.session.close();
244: }
245:
246: private CompassHolder getCompassHolder(Configuration cfg) {
247: WeakHashMap<Configuration, CompassHolder> contextMap = contexts
248: .get();
249: if (contextMap == null) {
250: contextMap = new WeakHashMap<Configuration, CompassHolder>(
251: 2);
252: contexts.set(contextMap);
253: }
254: CompassHolder compassHolder = contextMap.get(cfg);
255: if (compassHolder == null) {
256: compassHolder = initCompassHolder(cfg);
257: if (compassHolder != null) {
258: if (log.isDebugEnabled()) {
259: log.debug("Regsitering new Compass Holder ["
260: + compassHolder + "]");
261: }
262: contextMap.put(cfg, compassHolder);
263: }
264: }
265: return compassHolder;
266: }
267:
268: private CompassHolder initCompassHolder(Configuration cfg) {
269: Properties compassProperties = new Properties();
270: //noinspection unchecked
271: Properties props = cfg.getProperties();
272: for (Map.Entry entry : props.entrySet()) {
273: String key = (String) entry.getKey();
274: if (key.startsWith(COMPASS_PREFIX)) {
275: compassProperties.put(entry.getKey(), entry.getValue());
276: }
277: if (key.startsWith(COMPASS_GPS_INDEX_PREFIX)) {
278: compassProperties.put(entry.getKey(), entry.getValue());
279: }
280: }
281: if (compassProperties.isEmpty()) {
282: if (log.isDebugEnabled()) {
283: log
284: .debug("No Compass properties defined, disabling Compass");
285: }
286: return null;
287: }
288:
289: CompassConfiguration compassConfiguration = CompassConfigurationFactory
290: .newConfiguration();
291: CompassSettings settings = compassConfiguration.getSettings();
292: settings.addSettings(compassProperties);
293:
294: String configLocation = (String) compassProperties
295: .get(COMPASS_CONFIG_LOCATION);
296: if (configLocation != null) {
297: compassConfiguration.configure(configLocation);
298: }
299:
300: boolean atleastOneClassAdded = false;
301: for (Iterator it = cfg.getClassMappings(); it.hasNext();) {
302: PersistentClass clazz = (PersistentClass) it.next();
303: Class<?> mappedClass = clazz.getMappedClass();
304: atleastOneClassAdded |= compassConfiguration
305: .tryAddClass(mappedClass);
306: }
307: if (!atleastOneClassAdded) {
308: if (log.isDebugEnabled()) {
309: log
310: .debug("No searchable class mappings found in Hibernate class mappings, disabling Compass");
311: }
312: return null;
313: }
314:
315: CompassHolder compassHolder = new CompassHolder();
316: compassHolder.compassProperties = compassProperties;
317:
318: compassHolder.commitBeforeCompletion = settings
319: .getSettingAsBoolean(
320: CompassEnvironment.Transaction.COMMIT_BEFORE_COMPLETION,
321: false);
322:
323: String transactionFactory = (String) compassProperties
324: .get(CompassEnvironment.Transaction.FACTORY);
325: if (transactionFactory == null
326: || LocalTransactionFactory.class.getName().equals(
327: transactionFactory)) {
328: compassHolder.hibernateControlledTransaction = true;
329: // if the settings is configured to use local transaciton, disable thread bound setting since
330: // we are using Hibernate to managed transaction scope (using the transaction to holder map) and not thread locals
331: if (settings
332: .getSetting(CompassEnvironment.Transaction.DISABLE_THREAD_BOUND_LOCAL_TRANSATION) == null) {
333: settings
334: .setBooleanSetting(
335: CompassEnvironment.Transaction.DISABLE_THREAD_BOUND_LOCAL_TRANSATION,
336: true);
337: }
338: } else {
339: // Hibernate is not controlling the transaction (using JTA Sync or XA), don't commit/rollback
340: // with Hibernate transaction listeners
341: compassHolder.hibernateControlledTransaction = false;
342: }
343:
344: compassHolder.indexSettings = new Properties();
345: for (Map.Entry entry : compassProperties.entrySet()) {
346: String key = (String) entry.getKey();
347: if (key.startsWith(COMPASS_GPS_INDEX_PREFIX)) {
348: compassHolder.indexSettings.put(key
349: .substring(COMPASS_GPS_INDEX_PREFIX.length()),
350: entry.getValue());
351: }
352: }
353:
354: String mirrorFilterClass = compassHolder.compassProperties
355: .getProperty(COMPASS_MIRROR_FILTER);
356: if (mirrorFilterClass != null) {
357: try {
358: compassHolder.mirrorFilter = (HibernateMirrorFilter) ClassUtils
359: .forName(
360: mirrorFilterClass,
361: compassHolder.compass.getSettings()
362: .getClassLoader())
363: .newInstance();
364: } catch (Exception e) {
365: throw new CompassException(
366: "Failed to create mirror filter ["
367: + mirrorFilterClass + "]", e);
368: }
369: }
370:
371: compassHolder.compass = compassConfiguration.buildCompass();
372:
373: return compassHolder;
374: }
375:
376: private boolean hasMappingForEntity(Class clazz,
377: CascadeMapping.Cascade cascade) {
378: ResourceMapping resourceMapping = ((InternalCompass) compassHolder.compass)
379: .getMapping().getRootMappingByClass(clazz);
380: if (resourceMapping != null) {
381: return true;
382: }
383: resourceMapping = ((InternalCompass) compassHolder.compass)
384: .getMapping().getNonRootMappingByClass(clazz);
385: if (resourceMapping == null) {
386: return false;
387: }
388: return resourceMapping.operationAllowed(cascade);
389: }
390:
391: private class CompassHolder {
392:
393: ConcurrentHashMap<Transaction, TransactionSyncHolder> syncHolderPerTx = new ConcurrentHashMap<Transaction, TransactionSyncHolder>();
394:
395: Properties compassProperties;
396:
397: Properties indexSettings;
398:
399: boolean commitBeforeCompletion;
400:
401: boolean hibernateControlledTransaction;
402:
403: HibernateMirrorFilter mirrorFilter;
404:
405: Compass compass;
406: }
407:
408: private class TransactionSyncHolder {
409:
410: public CompassSession session;
411:
412: public CompassTransaction tr;
413:
414: public boolean transacted;
415: }
416:
417: private class CompassEmbeddedSyncronization implements
418: Synchronization {
419:
420: private Transaction transaction;
421:
422: private TransactionSyncHolder holder;
423:
424: private CompassEmbeddedSyncronization(
425: TransactionSyncHolder holder, Transaction transaction) {
426: this .holder = holder;
427: this .transaction = transaction;
428: }
429:
430: public void beforeCompletion() {
431: if (!compassHolder.commitBeforeCompletion) {
432: return;
433: }
434: if (holder.session.isClosed()) {
435: return;
436: }
437: if (compassHolder.hibernateControlledTransaction) {
438: if (log.isTraceEnabled()) {
439: log
440: .trace("Committing compass transaction using Hibernate synchronization beforeCompletion on thread ["
441: + Thread.currentThread().getName()
442: + "]");
443: }
444: holder.tr.commit();
445: }
446: }
447:
448: public void afterCompletion(int status) {
449: try {
450: if (holder.session.isClosed()) {
451: return;
452: }
453: if (!compassHolder.commitBeforeCompletion) {
454: if (compassHolder.hibernateControlledTransaction) {
455: try {
456: if (status == Status.STATUS_COMMITTED) {
457: if (log.isTraceEnabled()) {
458: log
459: .trace("Committing compass transaction using Hibernate synchronization afterCompletion on thread ["
460: + Thread
461: .currentThread()
462: .getName()
463: + "]");
464: }
465: holder.tr.commit();
466: } else {
467: if (log.isTraceEnabled()) {
468: log
469: .trace("Rolling back compass transaction using Hibernate synchronization afterCompletion on thread ["
470: + Thread
471: .currentThread()
472: .getName()
473: + "]");
474: }
475: holder.tr.rollback();
476: }
477: } finally {
478: holder.session.close();
479: }
480: }
481: }
482: } catch (Exception e) {
483: // TODO swallow??????
484: log.error(
485: "Exception occured when sync with transaction",
486: e);
487: } finally {
488: compassHolder.syncHolderPerTx.remove(transaction);
489: }
490: }
491: }
492: }
|