diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java | 58 |
1 files changed, 36 insertions, 22 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index e39c005750..3c3902c545 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -34,15 +34,17 @@ import javax.management.openmbean.TabularDataSupport; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; +import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.management.MBeanConstructor; -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.ManagementActor; public class DirectExchange extends AbstractExchange { @@ -114,7 +116,7 @@ public class DirectExchange extends AbstractExchange } Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])}; - CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues); + CompositeData bindingData = new CompositeDataSupport(_bindingDataType, COMPOSITE_ITEM_NAMES, bindingItemValues); _bindingList.put(bindingData); } @@ -129,6 +131,7 @@ public class DirectExchange extends AbstractExchange throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange."); } + CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger())); try { queue.bind(DirectExchange.this, new AMQShortString(binding), null); @@ -137,22 +140,23 @@ public class DirectExchange extends AbstractExchange { throw new MBeanException(ex); } + finally + { + CurrentActor.remove(); + } } }// End of MBean class - protected ExchangeMBean createMBean() throws AMQException + protected ExchangeMBean createMBean() throws JMException { - try - { - return new DirectExchangeMBean(); - } - catch (JMException ex) - { - _logger.error("Exception occured in creating the direct exchange mbean", ex); - throw new AMQException("Exception occured in creating the direct exchange mbean", ex); - } + return new DirectExchangeMBean(); + } + + public Logger getLogger() + { + return _logger; } public AMQShortString getType() @@ -160,24 +164,32 @@ public class DirectExchange extends AbstractExchange return ExchangeDefaults.DIRECT_EXCHANGE_CLASS; } + public void registerQueue(String routingKey, AMQQueue queue, Map<String,Object> args) throws AMQException + { + registerQueue(new AMQShortString(routingKey), queue); + } + public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException { + registerQueue(routingKey, queue); + } + + private void registerQueue(AMQShortString routingKey, AMQQueue queue) throws AMQException + { assert queue != null; assert routingKey != null; if (!_index.add(routingKey, queue)) { if (_logger.isDebugEnabled()) { - _logger.debug("Queue (" + queue.getName() + ")" + queue + " is already registered with routing key " + routingKey); + _logger.debug("Queue (" + queue + ") is already registered with routing key " + routingKey); } } else { if (_logger.isDebugEnabled()) { - _logger.debug("Binding queue(" + queue.getName() + ") " + queue + " with routing key " + routingKey - + (args == null ? "" : " and arguments " + args.toString()) - + " to exchange " + this); + _logger.debug("Binding queue:" + queue + " with routing key '" + routingKey +"' to exchange:" + this); } } } @@ -194,19 +206,21 @@ public class DirectExchange extends AbstractExchange } } - public void route(IncomingMessage payload) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage payload) { - final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey(); + final String routingKey = payload.getRoutingKey(); - final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); + + final ArrayList<AMQQueue> queues = (routingKey == null) ? _index.get("") : _index.get(routingKey); if (_logger.isDebugEnabled()) { _logger.debug("Publishing message to queue " + queues); } - payload.enqueue(queues); + return queues; + } |