diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java new file mode 100644 index 0000000000..0e7459498a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.queue.IncomingMessage; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class DefaultExchangeRegistry implements ExchangeRegistry +{ + private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class); + + /** + * Maps from exchange name to exchange instance + */ + private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>(); + private ConcurrentMap<String, Exchange> _exchangeMapStr = new ConcurrentHashMap<String, Exchange>(); + + private Exchange _defaultExchange; + private VirtualHost _host; + + public DefaultExchangeRegistry(VirtualHost host) + { + //create 'standard' exchanges: + _host = host; + + } + + public void initialise() throws AMQException + { + new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this, getDurableConfigurationStore()); + } + + + + public DurableConfigurationStore getDurableConfigurationStore() + { + return _host.getDurableConfigurationStore(); + } + + public void registerExchange(Exchange exchange) throws AMQException + { + _exchangeMap.put(exchange.getNameShortString(), exchange); + _exchangeMapStr.put(exchange.getNameShortString().toString(), exchange); + } + + public void setDefaultExchange(Exchange exchange) + { + _defaultExchange = exchange; + } + + public Exchange getDefaultExchange() + { + return _defaultExchange; + } + + public Collection<AMQShortString> getExchangeNames() + { + return _exchangeMap.keySet(); + } + + public void unregisterExchange(AMQShortString name, boolean inUse) throws AMQException + { + // Check access + if (!_host.getSecurityManager().authoriseDelete(_exchangeMap.get(name))) + { + throw new AMQSecurityException(); + } + + // TODO: check inUse argument + + Exchange e = _exchangeMap.remove(name); + _exchangeMapStr.remove(name.toString()); + if (e != null) + { + if (e.isDurable()) + { + getDurableConfigurationStore().removeExchange(e); + } + e.close(); + } + else + { + throw new AMQException("Unknown exchange " + name); + } + } + + public void unregisterExchange(String name, boolean inUse) throws AMQException + { + unregisterExchange(new AMQShortString(name), inUse); + } + + public Exchange getExchange(AMQShortString name) + { + if ((name == null) || name.length() == 0) + { + return getDefaultExchange(); + } + else + { + return _exchangeMap.get(name); + } + + } + + public Exchange getExchange(String name) + { + if ((name == null) || name.length() == 0) + { + return getDefaultExchange(); + } + else + { + return _exchangeMapStr.get(name); + } + } + + + /** + * Routes content through exchanges, delivering it to 1 or more queues. + * @param payload + * @throws AMQException if something goes wrong delivering data + */ + public void routeContent(IncomingMessage payload) throws AMQException + { + final AMQShortString exchange = payload.getExchange(); + final Exchange exch = getExchange(exchange); + // there is a small window of opportunity for the exchange to be deleted in between + // the BasicPublish being received (where the exchange is validated) and the final + // content body being received (which triggers this method) + // TODO: check where the exchange is validated + if (exch == null) + { + throw new AMQException("Exchange '" + exchange + "' does not exist"); + } + payload.enqueue(exch.route(payload)); + } +} |