001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.jbi.nmr;
018:
019: import javax.jbi.messaging.MessageExchange;
020: import javax.xml.namespace.QName;
021:
022: import junit.framework.TestCase;
023:
024: import org.apache.commons.logging.Log;
025: import org.apache.commons.logging.LogFactory;
026: import org.apache.servicemix.jbi.container.ActivationSpec;
027: import org.apache.servicemix.jbi.container.JBIContainer;
028: import org.apache.servicemix.jbi.container.SubscriptionSpec;
029: import org.apache.servicemix.jbi.resolver.SubscriptionFilter;
030: import org.apache.servicemix.tck.ReceiverComponent;
031: import org.apache.servicemix.tck.SenderComponent;
032:
033: public class PubSubTest extends TestCase {
034:
035: private static final Log LOG = LogFactory.getLog(PubSubTest.class);
036:
037: private SenderComponent sender;
038:
039: private JBIContainer container;
040:
041: protected void setUp() throws Exception {
042: container = new JBIContainer();
043: container.setEmbedded(true);
044: container.setFlowName("seda");
045: container.init();
046: container.start();
047:
048: sender = new SenderComponent();
049: ActivationSpec as = new ActivationSpec("source", sender);
050: as.setService(new QName("http://www.test.com", "source"));
051: as.setFailIfNoDestinationEndpoint(false);
052: container.activateComponent(as);
053: }
054:
055: protected void tearDown() throws Exception {
056: container.shutDown();
057: }
058:
059: public void testPubSub() throws Exception {
060: ReceiverComponent recListener = new ReceiverComponent();
061: container.activateComponent(createReceiverAS("receiver",
062: recListener));
063: sender.sendMessages(1);
064: recListener.getMessageList().assertMessagesReceived(1);
065: }
066:
067: public void testPubSubFiltered() throws Exception {
068: ReceiverComponent recListener = new ReceiverComponent();
069: container.activateComponent(createReceiverASFiltered(
070: "receiver", recListener));
071: sender.sendMessages(1, false);
072: recListener.getMessageList().assertMessagesReceived(1);
073: }
074:
075: private ActivationSpec createReceiverAS(String id, Object component) {
076: ActivationSpec as = new ActivationSpec(id, component);
077: SubscriptionSpec ss = new SubscriptionSpec();
078: ss.setService(new QName("http://www.test.com", "source"));
079: as.setSubscriptions(new SubscriptionSpec[] { ss });
080: as.setFailIfNoDestinationEndpoint(false);
081: return as;
082: }
083:
084: private ActivationSpec createReceiverASFiltered(String id,
085: Object component) {
086: ActivationSpec as = new ActivationSpec(id, component);
087: SubscriptionSpec ss = new SubscriptionSpec();
088: ss.setService(new QName("http://www.test.com", "source"));
089: ss.setFilter(new Filter());
090: as.setSubscriptions(new SubscriptionSpec[] { ss });
091: as.setFailIfNoDestinationEndpoint(false);
092: return as;
093: }
094:
095: public static class Filter implements SubscriptionFilter {
096:
097: public boolean matches(MessageExchange arg0) {
098: LOG.info("Matches");
099: return true;
100: }
101:
102: }
103: }
|