From 1db5a8a2329ec064d1683294ee1a3d8d233de42d Mon Sep 17 00:00:00 2001 From: Stephen Vinoski Date: Sat, 18 Nov 2006 02:12:32 +0000 Subject: directory moves required for maven merge git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@476414 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/exchange/DefaultExchangeRegistry.java | 95 ++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java new file mode 100644 index 0000000000..eb9d1acb59 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.exchange; + +import org.apache.qpid.AMQException; +import org.apache.qpid.server.protocol.ExchangeInitialiser; +import org.apache.qpid.server.queue.AMQMessage; +import org.apache.log4j.Logger; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class DefaultExchangeRegistry implements ExchangeRegistry +{ + private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class); + + /** + * Maps from exchange name to exchange instance + */ + private ConcurrentMap _exchangeMap = new ConcurrentHashMap(); + + public DefaultExchangeRegistry(ExchangeFactory exchangeFactory) + { + //create 'standard' exchanges: + try + { + new ExchangeInitialiser().initialise(exchangeFactory, this); + } + catch(AMQException e) + { + _log.error("Failed to initialise exchanges: ", e); + } + } + + public void registerExchange(Exchange exchange) + { + _exchangeMap.put(exchange.getName(), exchange); + } + + public void unregisterExchange(String name, boolean inUse) throws AMQException + { + // TODO: check inUse argument + Exchange e = _exchangeMap.remove(name); + if (e != null) + { + e.close(); + } + else + { + throw new AMQException("Unknown exchange " + name); + } + } + + public Exchange getExchange(String name) + { + return _exchangeMap.get(name); + } + + /** + * Routes content through exchanges, delivering it to 1 or more queues. + * @param payload + * @throws AMQException if something goes wrong delivering data + */ + public void routeContent(AMQMessage payload) throws AMQException + { + final String exchange = payload.getPublishBody().exchange; + final Exchange exch = _exchangeMap.get(exchange); + // there is a small window of opportunity for the exchange to be deleted in between + // the JmsPublish being received (where the exchange is validated) and the final + // content body being received (which triggers this method) + if (exch == null) + { + throw new AMQException("Exchange '" + exchange + "' does not exist"); + } + exch.route(payload); + } +} -- cgit v1.2.1