diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 19:05:05 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2009-07-20 19:05:05 +0000 |
commit | ce5c014723b7ca37b05ef11ac1c37da65dd5aaa1 (patch) | |
tree | 2746b9fc9089b1cb8fdb6f72315660ecaab8a454 | |
parent | 94a235124bceb2b54d16bba52c9eb8fb32932b27 (diff) | |
download | qpid-python-ce5c014723b7ca37b05ef11ac1c37da65dd5aaa1.tar.gz |
Java Broker 0-10 Exploratory work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@795958 13f79535-47bb-0310-9956-ffa450edef68
85 files changed, 1699 insertions, 648 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java index aa64fab6cd..fe648fc899 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java @@ -20,10 +20,7 @@ */ package org.apache.qpid.extras.exchanges.diagnostic; -import java.util.List; -import java.util.Map; import java.util.ArrayList; -import java.util.Collection; import javax.management.JMException; import javax.management.openmbean.OpenDataException; @@ -34,8 +31,8 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.junit.extensions.util.SizeOf; import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor; @@ -193,20 +190,20 @@ public class DiagnosticExchange extends AbstractExchange return false; } - public void route(IncomingMessage payload) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException { Long value = new Long(SizeOf.getUsedMemory()); AMQShortString key = new AMQShortString("memory"); - FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders(); + FieldTable headers = ((BasicContentHeaderProperties)payload.getMessageHeader().properties).getHeaders(); headers.put(key, value); - ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers); + ((BasicContentHeaderProperties)payload.getMessageHeader().properties).setHeaders(headers); AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue")); ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); queues.add(q); - payload.enqueue(queues); + return queues; } diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java index e43bd2ddc0..b64983ccb0 100644 --- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java +++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java @@ -23,14 +23,15 @@ package org.apache.qpid.extras.exchanges.example; import java.util.List; import java.util.Map; +import java.util.ArrayList; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; public class TestExchange implements Exchange { @@ -63,6 +64,16 @@ public class TestExchange implements Exchange return false; } + public boolean isBound(String bindingKey, AMQQueue queue) + { + return false; + } + + public boolean isBound(String bindingKey) + { + return false; + } + public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete) throws AMQException { @@ -102,8 +113,9 @@ public class TestExchange implements Exchange { } - public void route(IncomingMessage message) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage message) throws AMQException { + return new ArrayList<AMQQueue>(); } public int getTicket() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index aa390d6c26..c6e370206e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -57,6 +57,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.message.ServerMessage; public class AMQChannel { @@ -157,27 +158,35 @@ public class AMQChannel public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException { - if (_currentMessage == null) - { - throw new AMQException("Received content header without previously receiving a BasicPublish frame"); - } - else + StoreContext.setCurrentContext(_storeContext); + try { - if (_log.isDebugEnabled()) + if (_currentMessage == null) { - _log.debug("Content header received on channel " + _channelId); + throw new AMQException("Received content header without previously receiving a BasicPublish frame"); } + else + { + if (_log.isDebugEnabled()) + { + _log.debug("Content header received on channel " + _channelId); + } - _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setContentHeaderBody(contentHeaderBody); - _currentMessage.setExpiration(); + _currentMessage.setExpiration(); - routeCurrentMessage(); + routeCurrentMessage(); - _currentMessage.routingComplete(_messageStore, _messageHandleFactory); + _currentMessage.routingComplete(_messageStore, _messageHandleFactory); - deliverCurrentMessageIfComplete(); + deliverCurrentMessageIfComplete(); + } + } + finally + { + StoreContext.clearCurrentContext(); } } @@ -212,6 +221,7 @@ public class AMQChannel public void publishContentBody(ContentBody contentBody) throws AMQException { + StoreContext.setCurrentContext(_storeContext); if (_currentMessage == null) { throw new AMQException("Received content body without previously receiving a JmsPublishBody"); @@ -231,6 +241,7 @@ public class AMQChannel _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk( contentBody)); + deliverCurrentMessageIfComplete(); } catch (AMQException e) @@ -240,6 +251,10 @@ public class AMQChannel _currentMessage = null; throw e; } + finally + { + StoreContext.clearCurrentContext(); + } } protected void routeCurrentMessage() throws AMQException @@ -425,7 +440,7 @@ public class AMQChannel { if (entry.getQueue() == null) { - _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity()); + _log.debug("Adding unacked message with a null queue:" + entry); } else { @@ -487,7 +502,7 @@ public class AMQChannel if (!unacked.isQueueDeleted()) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery unacked.release(); @@ -518,7 +533,7 @@ public class AMQChannel if (unacked != null) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery if (!unacked.isQueueDeleted()) @@ -551,7 +566,7 @@ public class AMQChannel } else { - _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); unacked.discard(_storeContext); @@ -612,7 +627,7 @@ public class AMQChannel - AMQMessage msg = message.getMessage(); + ServerMessage msg = message.getMessage(); AMQQueue queue = message.getQueue(); // Our Java Client will always suspend the channel when resending! @@ -631,7 +646,7 @@ public class AMQChannel // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + message.setRedelivered(true); Subscription sub = message.getDeliveredSubscription(); @@ -829,14 +844,25 @@ public class AMQChannel { if (!_returnMessages.isEmpty()) { + StoreContext sc =StoreContext.setCurrentContext(_storeContext); for (RequiredDeliveryException bouncedMessage : _returnMessages) { - AMQMessage message = bouncedMessage.getAMQMessage(); - _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + ServerMessage serverMessage = bouncedMessage.getAMQMessage(); + if(serverMessage instanceof AMQMessage) + { + AMQMessage message = (AMQMessage) serverMessage; + _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); - message.decrementReference(_storeContext); + } + else + { + // TODO AMQP 0-10 Message + throw new RuntimeException("not yet implemented conversion of 0-10 messages"); + } + bouncedMessage.release(); } + StoreContext.setCurrentContext(sc); _returnMessages.clear(); } @@ -884,8 +910,18 @@ public class AMQChannel public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { - getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + ServerMessage msg = entry.getMessage(); + if(msg instanceof AMQMessage) + { + getProtocolSession().getProtocolOutputConverter().writeDeliver((AMQMessage)msg, getChannelId(), + deliveryTag, sub.getConsumerTag()); + } + else + { + //TODO - Convert 0-10 Message into 0-8/9 message + } } + }; public ClientDeliveryMethod getClientDeliveryMethod() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 29494c4118..29c37a0ec8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; @@ -56,9 +57,8 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException { - - AMQMessage msg = message.getMessage(); - msg.setRedelivered(true); + + message.setRedelivered(true); final Subscription subscription = message.getDeliveredSubscription(); if (subscription != null) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index e402dd6956..27c1705da6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -45,6 +45,9 @@ import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.NewThreadExecutor; import org.apache.qpid.AMQException; +import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.*; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.pool.ReadWriteThreadModel; @@ -56,6 +59,9 @@ import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; import org.apache.qpid.server.protocol.AMQPProtocolProvider; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.transport.ServerConnection; /** * Main entry point for AMQPD. @@ -314,6 +320,29 @@ public class Main } bind(port, serverConfig); + + + IApplicationRegistry appRegistry = ApplicationRegistry.getInstance(); + + final ConnectionDelegate delegate = + new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, "localhost"); + + + ConnectionBinding cb = new ConnectionBinding() + { + public Connection connection() + { + ServerConnection conn = new ServerConnection(); + conn.setConnectionDelegate(delegate); + return conn; + } + }; + + int port_0_10 = port + 1; + + org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor + ("0.0.0.0", port_0_10, cb); + ioa.start(); } /** diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java index 3f1947d65a..be6232ed6b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java @@ -23,6 +23,9 @@ package org.apache.qpid.server; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.AMQMessageReference; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; /** * Signals that a required delivery could not be made. This could be bacuse of the immediate flag being set and the @@ -39,9 +42,9 @@ import org.apache.qpid.server.queue.AMQMessage; */ public abstract class RequiredDeliveryException extends AMQException { - private AMQMessage _amqMessage; + private MessageReference _amqMessage; - public RequiredDeliveryException(String message, AMQMessage payload) + public RequiredDeliveryException(String message, ServerMessage payload) { super(message); @@ -54,20 +57,20 @@ public abstract class RequiredDeliveryException extends AMQException super(message); } - public void setMessage(final AMQMessage payload) + public void setMessage(final ServerMessage payload) { // Increment the reference as this message is in the routing phase // and so will have the ref decremented as routing fails. // we need to keep this message around so we can return it in the // handler. So increment here. - _amqMessage = payload.takeReference(); + _amqMessage = payload.newReference(); } - public AMQMessage getAMQMessage() + public ServerMessage getAMQMessage() { - return _amqMessage; + return _amqMessage.getMessage(); } public AMQConstant getErrorCode() @@ -76,4 +79,9 @@ public abstract class RequiredDeliveryException extends AMQException } public abstract AMQConstant getReplyCode(); + + public void release() + { + _amqMessage.release(); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index db3a05eb52..7322a4add6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -130,7 +130,7 @@ public class TxAck implements TxnOp //in memory (persistent changes will be rolled back by store) for (QueueEntry msg : _unacked.values()) { - msg.getMessage().takeReference(); + // TODO - should requeue, whole thing is messed up } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 247558bb34..f1f87eb1d2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -204,4 +205,15 @@ public abstract class AbstractExchange implements Exchange, Managable { return getVirtualHost().getQueueRegistry(); } + + public boolean isBound(String bindingKey, AMQQueue queue) + { + return isBound(new AMQShortString(bindingKey), queue); + } + + public boolean isBound(String bindingKey) + { + return isBound(new AMQShortString(bindingKey)); + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 0ab8208d88..25ced58060 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.protocol.ExchangeInitialiser; -import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -57,6 +56,11 @@ public class DefaultExchangeRegistry implements ExchangeRegistry new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this); } + public Exchange getExchange(String exchangeName) + { + return getExchange(new AMQShortString(exchangeName)); + } + public MessageStore getMessageStore() { return _host.getMessageStore(); @@ -134,6 +138,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry { throw new AMQException("Exchange '" + exchange + "' does not exist"); } - exch.route(payload); + payload.enqueue(exch.route(payload)); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4b609f592b..02e83f3dd3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -40,9 +40,9 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; public class DirectExchange extends AbstractExchange { @@ -192,10 +192,10 @@ public class DirectExchange extends AbstractExchange } } - public void route(IncomingMessage payload) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException { - final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey(); + final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey()); final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey); @@ -204,7 +204,8 @@ public class DirectExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - payload.enqueue(queues); + return queues; + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 06209c5458..7ff5e4ef96 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -24,12 +24,11 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; -import java.util.List; -import java.util.Map; +import java.util.ArrayList; public interface Exchange { @@ -54,7 +53,7 @@ public interface Exchange void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - void route(IncomingMessage message) throws AMQException; + ArrayList<AMQQueue> route(InboundMessage message) throws AMQException; /** @@ -93,6 +92,8 @@ public interface Exchange */ boolean hasBindings(); - + boolean isBound(String bindingKey, AMQQueue queue); + + boolean isBound(String bindingKey); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index fe3b19e74e..6b42187f9c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -48,4 +48,6 @@ public interface ExchangeRegistry extends MessageRouter Collection<AMQShortString> getExchangeNames(); void initialise() throws AMQException; + + Exchange getExchange(String exchangeName); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 23c716a0db..2a037782eb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -28,9 +28,9 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; import javax.management.JMException; import javax.management.MBeanException; @@ -182,7 +182,7 @@ public class FanoutExchange extends AbstractExchange } } - public void route(IncomingMessage payload) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException { @@ -191,7 +191,7 @@ public class FanoutExchange extends AbstractExchange _logger.debug("Publishing message to queue " + _queues); } - payload.enqueue(new ArrayList(_queues)); + return new ArrayList(_queues); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 2b7df4361a..fde4cfd6a2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.log4j.Logger; import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.message.AMQMessageHeader; /** * Defines binding and matching based on a set of headers. @@ -139,7 +140,7 @@ class HeadersBinding * @return true if the headers define any required keys and match any required * values */ - public boolean matches(FieldTable headers) + public boolean matches(AMQMessageHeader headers) { if(headers == null) { @@ -151,13 +152,13 @@ class HeadersBinding } } - private boolean and(FieldTable headers) + private boolean and(AMQMessageHeader headers) { - if(headers.keys().containsAll(required)) + if(headers.containsHeaders(required)) { for(Map.Entry<String, Object> e : matches.entrySet()) { - if(!e.getValue().equals(headers.getObject(e.getKey()))) + if(!e.getValue().equals(headers.getHeader(e.getKey()))) { return false; } @@ -171,11 +172,11 @@ class HeadersBinding } - private boolean or(final FieldTable headers) + private boolean or(final AMQMessageHeader headers) { - if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor())) + if(required.isEmpty() || passesRequiredOr(headers)) { - return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor())) + return ((!matches.isEmpty()) && passesMatchesOr(headers)) || (required.isEmpty() && matches.isEmpty()); } else @@ -184,6 +185,32 @@ class HeadersBinding } } + private boolean passesMatchesOr(AMQMessageHeader headers) + { + for(Map.Entry<String,Object> entry : matches.entrySet()) + { + if(!headers.containsHeader(entry.getKey()) + || !((entry.getValue() == null && headers.getHeader(entry.getKey()) == null) + || (entry.getValue().equals(headers.getHeader(entry.getKey()))))) + { + return false; + } + } + return true; + } + + private boolean passesRequiredOr(AMQMessageHeader headers) + { + for(String name : required) + { + if(headers.containsHeader(name)) + { + return true; + } + } + return false; + } + private void processSpecial(String key, Object value) { if("X-match".equalsIgnoreCase(key)) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index fc667db17b..02e129621c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -31,9 +31,10 @@ import org.apache.qpid.framing.AMQTypedValue; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.AMQMessageHeader; import javax.management.JMException; import javax.management.openmbean.ArrayType; @@ -50,7 +51,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Collection; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -237,31 +237,31 @@ public class HeadersExchange extends AbstractExchange } } - public void route(IncomingMessage payload) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException { - FieldTable headers = getHeaders(payload.getContentHeaderBody()); + AMQMessageHeader header = payload.getMessageHeader(); if (_logger.isDebugEnabled()) { - _logger.debug("Exchange " + getName() + ": routing message with headers " + headers); + _logger.debug("Exchange " + getName() + ": routing message with headers " + header); } boolean routed = false; ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>(); for (Registration e : _bindings) { - if (e.binding.matches(headers)) + if (e.binding.matches(header)) { if (_logger.isDebugEnabled()) { _logger.debug("Exchange " + getName() + ": delivering message with headers " + - headers + " to " + e.queue.getName()); + header + " to " + e.queue.getName()); } queues.add(e.queue); routed = true; } } - payload.enqueue(queues); + return queues; } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index be7a1dc196..71481ec730 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -30,13 +30,13 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortStringTokenizer; -import org.apache.qpid.server.queue.IncomingMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.message.InboundMessage; import javax.management.JMException; import javax.management.MBeanException; @@ -109,7 +109,7 @@ public class TopicExchange extends AbstractExchange private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>(); - private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>(); + private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>(); public static class Binding { @@ -160,7 +160,7 @@ public class TopicExchange extends AbstractExchange private final class TopicExchangeResult implements TopicMatcherResult { private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>(); - private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>(); + private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>(); public void addUnfilteredQueue(AMQQueue queue) { @@ -190,12 +190,12 @@ public class TopicExchange extends AbstractExchange } - public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + public void addFilteredQueue(AMQQueue queue, MessageFilter filter) { - Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); if(filters == null) { - filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(); + filters = new ConcurrentHashMap<MessageFilter,Integer>(); _filteredQueues.put(queue, filters); } Integer instances = filters.get(filter); @@ -210,9 +210,9 @@ public class TopicExchange extends AbstractExchange } - public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter) + public void removeFilteredQueue(AMQQueue queue, MessageFilter filter) { - Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); if(filters != null) { Integer instances = filters.get(filter); @@ -237,11 +237,11 @@ public class TopicExchange extends AbstractExchange } public void replaceQueueFilter(AMQQueue queue, - MessageFilter<RuntimeException> oldFilter, - MessageFilter<RuntimeException> newFilter) + MessageFilter oldFilter, + MessageFilter newFilter) { - Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue); - Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters); + Map<MessageFilter,Integer> filters = _filteredQueues.get(queue); + Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters); Integer oldFilterInstances = filters.get(oldFilter); if(oldFilterInstances == 1) { @@ -263,7 +263,7 @@ public class TopicExchange extends AbstractExchange _filteredQueues.put(queue,newFilters); } - public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues) + public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues) { if(queues == null) { @@ -284,11 +284,11 @@ public class TopicExchange extends AbstractExchange queues.addAll(_unfilteredQueues.keySet()); if(!_filteredQueues.isEmpty()) { - for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet()) + for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet()) { if(!queues.contains(entry.getKey())) { - for(MessageFilter<RuntimeException> filter : entry.getValue().keySet()) + for(MessageFilter filter : entry.getValue().keySet()) { if(filter.matches(msg)) { @@ -456,18 +456,18 @@ public class TopicExchange extends AbstractExchange } - private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args) + private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQException { final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); - WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString); + WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); JMSSelectorFilter selector = null; if(selectorRef == null || (selector = selectorRef.get())==null) { - selector = new JMSSelectorFilter<RuntimeException>(selectorString); - _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector)); + selector = new JMSSelectorFilter(selectorString); + _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector)); } return selector; } @@ -528,10 +528,12 @@ public class TopicExchange extends AbstractExchange return normalizedString; } - public void route(IncomingMessage payload) throws AMQException + public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException { - final AMQShortString routingKey = payload.getRoutingKey(); + final AMQShortString routingKey = payload.getRoutingKey() == null + ? AMQShortString.EMPTY_STRING + : new AMQShortString(payload.getRoutingKey()); // The copy here is unfortunate, but not too bad relevant to the amount of // things created and copied in getMatchedQueues @@ -543,7 +545,7 @@ public class TopicExchange extends AbstractExchange _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); } - payload.enqueue(queues); + return queues; } @@ -646,7 +648,7 @@ public class TopicExchange extends AbstractExchange } } - private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey) + private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey) { Collection<TopicMatcherResult> results = _parser.parse(routingKey); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java index a964bce306..2ead9e57af 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java @@ -26,7 +26,7 @@ import org.apache.qpid.server.queue.Filterable; /** * An expression which performs an operation on two expression values */ -public abstract class ArithmeticExpression<E extends Exception> extends BinaryExpression<E> +public abstract class ArithmeticExpression extends BinaryExpression { protected static final int INTEGER = 1; @@ -248,7 +248,7 @@ public abstract class ArithmeticExpression<E extends Exception> extends BinaryEx } } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Object lvalue = left.evaluate(message); if (lvalue == null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java index 7308de80d6..024257bea9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java @@ -23,23 +23,23 @@ package org.apache.qpid.server.filter; /** * An expression which performs an operation on two expression values. */ -public abstract class BinaryExpression<E extends Exception> implements Expression<E> +public abstract class BinaryExpression implements Expression { - protected Expression<E> left; - protected Expression<E> right; + protected Expression left; + protected Expression right; - public BinaryExpression(Expression<E> left, Expression<E> right) + public BinaryExpression(Expression left, Expression right) { this.left = left; this.right = right; } - public Expression<E> getLeft() + public Expression getLeft() { return left; } - public Expression<E> getRight() + public Expression getRight() { return right; } @@ -90,7 +90,7 @@ public abstract class BinaryExpression<E extends Exception> implements Expressio /** * @param expression */ - public void setRight(Expression<E> expression) + public void setRight(Expression expression) { right = expression; } @@ -98,7 +98,7 @@ public abstract class BinaryExpression<E extends Exception> implements Expressio /** * @param expression */ - public void setLeft(Expression<E> expression) + public void setLeft(Expression expression) { left = expression; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java index 9beb9798d0..da9737e780 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java @@ -28,14 +28,13 @@ import org.apache.qpid.server.queue.Filterable; * A BooleanExpression is an expression that always * produces a Boolean result. */ -public interface BooleanExpression<E extends Exception> extends Expression<E> +public interface BooleanExpression extends Expression { /** * @param message * @return true if the expression evaluates to Boolean.TRUE. - * @throws E */ - public boolean matches(Filterable<E> message) throws E; + public boolean matches(Filterable message); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java index 921005c462..e1cfda6b21 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java @@ -34,15 +34,15 @@ import org.apache.qpid.server.queue.Filterable; /** * A filter performing a comparison of two objects */ -public abstract class ComparisonExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E> +public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression { - public static<E extends Exception> BooleanExpression<E> createBetween(Expression<E> value, Expression left, Expression<E> right) + public static BooleanExpression createBetween(Expression value, Expression left, Expression right) { return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right)); } - public static<E extends Exception> BooleanExpression<E> createNotBetween(Expression<E> value, Expression<E> left, Expression<E> right) + public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) { return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right)); } @@ -73,7 +73,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx REGEXP_CONTROL_CHARS.add(new Character('!')); } - static class LikeExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E> + static class LikeExpression extends UnaryExpression implements BooleanExpression { Pattern likePattern; @@ -81,7 +81,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx /** * @param right */ - public LikeExpression(Expression<E> right, String like, int escape) + public LikeExpression(Expression right, String like, int escape) { super(right); @@ -138,7 +138,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx /** * org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext) */ - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Object rv = this.getRight().evaluate(message); @@ -158,7 +158,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE; } - public boolean matches(Filterable<E> message) throws E + public boolean matches(Filterable message) { Object object = evaluate(message); @@ -236,7 +236,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx return doCreateEqual(left, right); } - private static<E extends Exception> BooleanExpression<E> doCreateEqual(Expression<E> left, Expression<E> right) + private static BooleanExpression doCreateEqual(Expression left, Expression right) { return new EqualExpression(left, right); } @@ -388,7 +388,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx super(left, right); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Comparable lv = (Comparable) left.evaluate(message); if (lv == null) @@ -550,21 +550,21 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx protected abstract boolean asBoolean(int answer); - public boolean matches(Filterable<E> message) throws E + public boolean matches(Filterable message) { Object object = evaluate(message); return (object != null) && (object == Boolean.TRUE); } - private static class EqualExpression<E extends Exception> extends ComparisonExpression<E> + private static class EqualExpression extends ComparisonExpression { - public EqualExpression(final Expression<E> left, final Expression<E> right) + public EqualExpression(final Expression left, final Expression right) { super(left, right); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Object lv = left.evaluate(message); Object rv = right.evaluate(message); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java index 3ed2286f2e..5cb0cde9fb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java @@ -32,17 +32,17 @@ import org.apache.qpid.server.queue.Filterable; /** * Represents a constant expression */ -public class ConstantExpression<E extends Exception> implements Expression<E> +public class ConstantExpression implements Expression { - static class BooleanConstantExpression<E extends Exception> extends ConstantExpression<E> implements BooleanExpression<E> + static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression { public BooleanConstantExpression(Object value) { super(value); } - public boolean matches(Filterable<E> message) throws E + public boolean matches(Filterable message) { Object object = evaluate(message); @@ -121,7 +121,7 @@ public class ConstantExpression<E extends Exception> implements Expression<E> this.value = value; } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { return value; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java index f2ebe41d26..ae9eaaefd3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java @@ -27,12 +27,12 @@ import org.apache.qpid.server.queue.Filterable; /** * Represents an expression */ -public interface Expression<E extends Exception> +public interface Expression { /** * @return the value of this expression */ - public Object evaluate(Filterable<E> message) throws E; + public Object evaluate(Filterable message); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java index dd3c126ee5..a7a1eab3c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -27,13 +27,13 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.Filterable; import org.apache.qpid.AMQException; -public interface FilterManager<E extends Exception> +public interface FilterManager { - void add(MessageFilter<E> filter); + void add(MessageFilter filter); - void remove(MessageFilter<E> filter); + void remove(MessageFilter filter); - boolean allAllow(Filterable<E> msg); + boolean allAllow(Filterable msg); boolean hasFilters(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 96c9353872..f86f5dab62 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -26,12 +26,12 @@ import org.apache.qpid.server.filter.jms.selector.SelectorParser; import org.apache.qpid.server.queue.Filterable; -public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E> +public class JMSSelectorFilter implements MessageFilter { private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class); private String _selector; - private BooleanExpression<E> _matcher; + private BooleanExpression _matcher; public JMSSelectorFilter(String selector) throws AMQException { @@ -39,7 +39,7 @@ public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E> _matcher = new SelectorParser().parse(selector); } - public boolean matches(Filterable<E> message) throws E + public boolean matches(Filterable message) { boolean match = _matcher.matches(message); if(_logger.isDebugEnabled()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java index 094363ed9a..2d8e01a49a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java @@ -27,15 +27,15 @@ import org.apache.qpid.server.queue.Filterable; /** * A filter performing a comparison of two objects */ -public abstract class LogicExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E> +public abstract class LogicExpression extends BinaryExpression implements BooleanExpression { - public static<E extends Exception> BooleanExpression createOR(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue) + public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue) { return new OrExpression(lvalue, rvalue); } - public static<E extends Exception> BooleanExpression createAND(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue) + public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue) { return new AndExpression(lvalue, rvalue); } @@ -49,23 +49,23 @@ public abstract class LogicExpression<E extends Exception> extends BinaryExpress super(left, right); } - public abstract Object evaluate(Filterable<E> message) throws E; + public abstract Object evaluate(Filterable message); - public boolean matches(Filterable<E> message) throws E + public boolean matches(Filterable message) { Object object = evaluate(message); return (object != null) && (object == Boolean.TRUE); } - private static class OrExpression<E extends Exception> extends LogicExpression<E> + private static class OrExpression extends LogicExpression { - public OrExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue) + public OrExpression(final BooleanExpression lvalue, final BooleanExpression rvalue) { super(lvalue, rvalue); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Boolean lv = (Boolean) left.evaluate(message); @@ -86,14 +86,14 @@ public abstract class LogicExpression<E extends Exception> extends BinaryExpress } } - private static class AndExpression<E extends Exception> extends LogicExpression<E> + private static class AndExpression extends LogicExpression { - public AndExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue) + public AndExpression(final BooleanExpression lvalue, final BooleanExpression rvalue) { super(lvalue, rvalue); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Boolean lv = (Boolean) left.evaluate(message); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java index 58fc55f8e6..febe68ece9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -24,7 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.Filterable; -public interface MessageFilter<E extends Exception> +public interface MessageFilter { - boolean matches(Filterable<E> message) throws E; + boolean matches(Filterable message); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index 946274f936..11fdeae2b1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -27,7 +27,6 @@ import java.util.HashMap; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.server.queue.Filterable; @@ -35,7 +34,7 @@ import org.apache.qpid.server.queue.Filterable; /** * Represents a property expression */ -public class PropertyExpression<E extends Exception> implements Expression<E> +public class PropertyExpression implements Expression { // Constants - defined the same as JMS private static final int NON_PERSISTENT = 1; @@ -44,12 +43,12 @@ public class PropertyExpression<E extends Exception> implements Expression<E> private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class); - private static final HashMap<String, Expression<? extends Exception>> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression<? extends Exception>>(); + private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>(); { - JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression<E>() + JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression() { - public Object evaluate(Filterable<E> message) + public Object evaluate(Filterable message) { //TODO return null; @@ -73,9 +72,9 @@ public class PropertyExpression<E extends Exception> implements Expression<E> JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression()); - JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>() + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() { - public Object evaluate(Filterable message) throws E + public Object evaluate(Filterable message) { return message.isRedelivered(); } @@ -83,7 +82,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> } private final String name; - private final Expression<E> jmsPropertyExpression; + private final Expression jmsPropertyExpression; public boolean outerTest() { @@ -96,10 +95,10 @@ public class PropertyExpression<E extends Exception> implements Expression<E> - jmsPropertyExpression = (Expression<E>) JMS_PROPERTY_EXPRESSIONS.get(name); + jmsPropertyExpression = (Expression) JMS_PROPERTY_EXPRESSIONS.get(name); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { if (jmsPropertyExpression != null) @@ -108,17 +107,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E> } else { - - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) message.getContentHeaderBody().properties; - - if (_logger.isDebugEnabled()) - { - _logger.debug("Looking up property:" + name); - _logger.debug("Properties are:" + _properties.getHeaders().keySet()); - } - - return _properties.getHeaders().getObject(name); + return message.getMessageHeader().getHeader(name); } } @@ -158,39 +147,30 @@ public class PropertyExpression<E extends Exception> implements Expression<E> } - private static class ReplyToExpression<E extends Exception> implements Expression<E> + private static class ReplyToExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { - - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; - AMQShortString replyTo = _properties.getReplyTo(); - - return (replyTo == null) ? null : replyTo.toString(); - + String replyTo = message.getMessageHeader().getReplyTo(); + return replyTo; } } - private static class TypeExpression<E extends Exception> implements Expression<E> + private static class TypeExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; - AMQShortString type = _properties.getType(); - return (type == null) ? null : type.toString(); + String type = message.getMessageHeader().getType(); + return type; } } - private static class DeliveryModeExpression<E extends Exception> implements Expression<E> + private static class DeliveryModeExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT; if (_logger.isDebugEnabled()) @@ -202,68 +182,53 @@ public class PropertyExpression<E extends Exception> implements Expression<E> } } - private static class PriorityExpression<E extends Exception> implements Expression<E> + private static class PriorityExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; - - return (int) _properties.getPriority(); + byte priority = message.getMessageHeader().getPriority(); + return (int) priority; } } - private static class MessageIDExpression<E extends Exception> implements Expression<E> + private static class MessageIDExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; - AMQShortString messageId = _properties.getMessageId(); + String messageId = message.getMessageHeader().getMessageId(); - return (messageId == null) ? null : messageId; + return messageId; } } - private static class TimestampExpression<E extends Exception> implements Expression<E> + private static class TimestampExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; - - return _properties.getTimestamp(); + long timestamp = message.getMessageHeader().getTimestamp(); + return timestamp; } } - private static class CorrelationIdExpression<E extends Exception> implements Expression<E> + private static class CorrelationIdExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; - AMQShortString correlationId = _properties.getCorrelationId(); - return (correlationId == null) ? null : correlationId.toString(); + String correlationId = message.getMessageHeader().getCorrelationId(); + + return correlationId; } } - private static class ExpirationExpression<E extends Exception> implements Expression<E> + private static class ExpirationExpression implements Expression { - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { - - CommonContentHeaderProperties _properties = - (CommonContentHeaderProperties) - message.getContentHeaderBody().properties; - - return _properties.getExpiration(); + long expiration = message.getMessageHeader().getExpiration(); + return expiration; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java index cb738e1489..83f537b632 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java @@ -27,43 +27,34 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.Filterable; -public class SimpleFilterManager implements FilterManager<AMQException> +public class SimpleFilterManager implements FilterManager { private final Logger _logger = Logger.getLogger(SimpleFilterManager.class); - private final ConcurrentLinkedQueue<MessageFilter<AMQException>> _filters; + private final ConcurrentLinkedQueue<MessageFilter> _filters; public SimpleFilterManager() { _logger.debug("Creating SimpleFilterManager"); - _filters = new ConcurrentLinkedQueue<MessageFilter<AMQException>>(); + _filters = new ConcurrentLinkedQueue<MessageFilter>(); } - public void add(MessageFilter<AMQException> filter) + public void add(MessageFilter filter) { _filters.add(filter); } - public void remove(MessageFilter<AMQException> filter) + public void remove(MessageFilter filter) { _filters.remove(filter); } - public boolean allAllow(Filterable<AMQException> msg) + public boolean allAllow(Filterable msg) { - for (MessageFilter<AMQException> filter : _filters) + for (MessageFilter filter : _filters) { - try + if (!filter.matches(msg)) { - if (!filter.matches(msg)) - { - return false; - } - } - catch (AMQException e) - { - //fixme - e.printStackTrace(); return false; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java index 799a38af5a..9e03ecd8bd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java @@ -35,18 +35,18 @@ import org.apache.qpid.server.queue.Filterable; /** * An expression which performs an operation on two expression values */ -public abstract class UnaryExpression<E extends Exception> implements Expression<E> +public abstract class UnaryExpression implements Expression { private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE); - protected Expression<E> right; + protected Expression right; - public static<E extends Exception> Expression<E> createNegate(Expression<E> left) + public static Expression createNegate(Expression left) { return new NegativeExpression(left); } - public static<E extends Exception> BooleanExpression createInExpression(PropertyExpression<E> right, List elements, final boolean not) + public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not) { // Use a HashSet if there are many elements. @@ -69,14 +69,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression return new InExpression(right, inList, not); } - abstract static class BooleanUnaryExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E> + abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression { - public BooleanUnaryExpression(Expression<E> left) + public BooleanUnaryExpression(Expression left) { super(left); } - public boolean matches(Filterable<E> message) throws E + public boolean matches(Filterable message) { Object object = evaluate(message); @@ -85,7 +85,7 @@ public abstract class UnaryExpression<E extends Exception> implements Expression } ; - public static<E extends Exception> BooleanExpression<E> createNOT(BooleanExpression<E> left) + public static<E extends Exception> BooleanExpression createNOT(BooleanExpression left) { return new NotExpression(left); } @@ -100,7 +100,7 @@ public abstract class UnaryExpression<E extends Exception> implements Expression return new XQueryExpression(xpath); } - public static<E extends Exception> BooleanExpression createBooleanCast(Expression<E> left) + public static<E extends Exception> BooleanExpression createBooleanCast(Expression left) { return new BooleanCastExpression(left); } @@ -151,7 +151,7 @@ public abstract class UnaryExpression<E extends Exception> implements Expression this.right = left; } - public Expression<E> getRight() + public Expression getRight() { return right; } @@ -204,14 +204,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression */ public abstract String getExpressionSymbol(); - private static class NegativeExpression<E extends Exception> extends UnaryExpression<E> + private static class NegativeExpression extends UnaryExpression { - public NegativeExpression(final Expression<E> left) + public NegativeExpression(final Expression left) { super(left); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Object rvalue = right.evaluate(message); if (rvalue == null) @@ -233,19 +233,19 @@ public abstract class UnaryExpression<E extends Exception> implements Expression } } - private static class InExpression<E extends Exception> extends BooleanUnaryExpression<E> + private static class InExpression extends BooleanUnaryExpression { private final Collection _inList; private final boolean _not; - public InExpression(final PropertyExpression<E> right, final Collection inList, final boolean not) + public InExpression(final PropertyExpression right, final Collection inList, final boolean not) { super(right); _inList = inList; _not = not; } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Object rvalue = right.evaluate(message); @@ -309,14 +309,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression } } - private static class NotExpression<E extends Exception> extends BooleanUnaryExpression<E> + private static class NotExpression extends BooleanUnaryExpression { - public NotExpression(final BooleanExpression<E> left) + public NotExpression(final BooleanExpression left) { super(left); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Boolean lvalue = (Boolean) right.evaluate(message); if (lvalue == null) @@ -333,14 +333,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression } } - private static class BooleanCastExpression<E extends Exception> extends BooleanUnaryExpression<E> + private static class BooleanCastExpression extends BooleanUnaryExpression { - public BooleanCastExpression(final Expression<E> left) + public BooleanCastExpression(final Expression left) { super(left); } - public Object evaluate(Filterable<E> message) throws E + public Object evaluate(Filterable message) { Object rvalue = right.evaluate(message); if (rvalue == null) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java index 1311178fb1..75010a79cf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java @@ -71,7 +71,7 @@ public final class XPathExpression implements BooleanExpression { private final XPathEvaluator evaluator; static public interface XPathEvaluator { - public boolean evaluate(Filterable message) throws AMQException; + public boolean evaluate(Filterable message); } XPathExpression(String xpath) { @@ -93,7 +93,7 @@ public final class XPathExpression implements BooleanExpression { } } - public Object evaluate(Filterable message) throws AMQException { + public Object evaluate(Filterable message) { // try { //FIXME this is flow to disk work // if( message.isDropped() ) @@ -118,7 +118,7 @@ public final class XPathExpression implements BooleanExpression { * @return true if the expression evaluates to Boolean.TRUE. * @throws AMQException */ - public boolean matches(Filterable message) throws AMQException + public boolean matches(Filterable message) { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java index c13f81cd08..55954c7578 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java @@ -36,7 +36,7 @@ public final class XQueryExpression implements BooleanExpression { this.xpath = xpath; } - public Object evaluate(Filterable message) throws AMQException { + public Object evaluate(Filterable message) { return Boolean.FALSE; } @@ -49,7 +49,7 @@ public final class XQueryExpression implements BooleanExpression { * @return true if the expression evaluates to Boolean.TRUE. * @throws AMQException */ - public boolean matches(Filterable message) throws AMQException + public boolean matches(Filterable message) { Object object = evaluate(message); return object!=null && object==Boolean.TRUE; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java index cc67776682..a423370b11 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java @@ -43,7 +43,7 @@ public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator { this.xpath = xpath; } - public boolean evaluate(Filterable m) throws AMQException + public boolean evaluate(Filterable m) { // TODO - we would have to check the content type and then evaluate the content // here... is this really a feature we wish to implement? - RobG diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java index 96a1071135..bfed4f4c60 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java @@ -1,11 +1,8 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.Set; -import java.util.HashSet; /* * @@ -52,7 +49,7 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager return _bytesCredit.get() > 0L; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(ServerMessage msg) { final long msgSize = msg.getSize(); if(hasCredit()) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index a249a6e63a..089a34dde3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -1,6 +1,7 @@ package org.apache.qpid.server.flow; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -40,5 +41,5 @@ public interface FlowCreditManager public boolean hasCredit(); - public boolean useCreditForMessage(AMQMessage msg); + public boolean useCreditForMessage(ServerMessage msg); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java index d63431c3eb..f3f36700d8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -37,7 +37,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements return true; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(ServerMessage msg) { return true; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java index 9c377481de..706bdcae61 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java @@ -1,8 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; - -import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -54,7 +52,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl return (_messageCredit > 0L) && ( _bytesCredit > 0L ); } - public synchronized boolean useCreditForMessage(AMQMessage msg) + public synchronized boolean useCreditForMessage(ServerMessage msg) { if(_messageCredit == 0L) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java index c1b3a09006..abbc91a1ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java @@ -1,6 +1,6 @@ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicLong; @@ -50,7 +50,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen return _messageCredit.get() > 0L; } - public boolean useCreditForMessage(AMQMessage msg) + public boolean useCreditForMessage(ServerMessage msg) { if(hasCredit()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java index be0300f2c1..17710df3ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.flow; -import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.message.ServerMessage; public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager { @@ -123,7 +123,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F && (_messageCreditLimit == 0L || _messageCredit > 0); } - public synchronized boolean useCreditForMessage(final AMQMessage msg) + public synchronized boolean useCreditForMessage(final ServerMessage msg) { if(_messageCreditLimit != 0L) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 001b7858ec..d0b4ca6855 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -130,8 +131,16 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB throws AMQException
{
singleMessageCredit.useCreditForMessage(entry.getMessage());
- session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
- deliveryTag, queue.getMessageCount());
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ session.getProtocolOutputConverter().writeGetOk((AMQMessage)(entry.getMessage()), channel.getChannelId(),
+ deliveryTag, queue.getMessageCount());
+ }
+ else
+ {
+ //TODO Convert AMQP 0-10 message
+ throw new RuntimeException("Not implemented conversion of 0-10 message");
+ }
}
};
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index f3cab10ed7..2b011ae5b6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -87,7 +87,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR return; } - if (!message.getMessage().isReferenced()) + if (message.getMessage() == null) { _logger.warn("Message as already been purged, unable to Reject."); return; @@ -96,7 +96,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java index 9b23d88838..da4687ac56 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.TxCommitBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; @@ -61,7 +62,7 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { throw body.getChannelNotFoundException(channelId); } - + StoreContext.setCurrentContext(channel.getStoreContext()); channel.commit(); MethodRegistry methodRegistry = session.getMethodRegistry(); @@ -74,5 +75,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody> { throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage()); } + finally + { + StoreContext.clearCurrentContext(); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index 2b55d294b5..34eef881a6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -36,8 +36,6 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 65184fe744..9f2f182138 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -1,25 +1,25 @@ package org.apache.qpid.server.output.amqp0_9;
-/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.mina.common.ByteBuffer;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index b987dae16d..da682ed11e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.message.*; import java.util.Iterator; @@ -41,12 +42,12 @@ import java.util.concurrent.atomic.AtomicInteger; /** * A deliverable message. */ -public class AMQMessage implements Filterable<AMQException> +public class AMQMessage implements Filterable, ServerMessage { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); - private final AtomicInteger _referenceCount = new AtomicInteger(1); + private final AtomicInteger _referenceCount = new AtomicInteger(0); private final AMQMessageHandle _messageHandle; @@ -72,7 +73,7 @@ public class AMQMessage implements Filterable<AMQException> private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier; private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - + private final AMQMessageHeader _messageHeader; /** @@ -202,6 +203,7 @@ public class AMQMessage implements Filterable<AMQException> _messageHandle = factory.createMessageHandle(messageId, store, true); _storeContext = txnConext.getStoreContext(); _size = _messageHandle.getBodySize(txnConext.getStoreContext()); + _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(txnConext.getStoreContext())); } /** @@ -221,6 +223,7 @@ public class AMQMessage implements Filterable<AMQException> { _messageHandle = messageHandle; _storeContext = storeConext; + _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(storeConext)); if(info.isImmediate()) { @@ -234,6 +237,7 @@ public class AMQMessage implements Filterable<AMQException> protected AMQMessage(AMQMessage msg) throws AMQException { _messageHandle = msg._messageHandle; + _messageHeader = msg._messageHeader; _storeContext = msg._storeContext; _flags = msg._flags; _size = msg._size; @@ -315,12 +319,11 @@ public class AMQMessage implements Filterable<AMQException> * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. * - * @param storeContext * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed */ - public void decrementReference(StoreContext storeContext) throws MessageCleanupException + public void decrementReference() throws MessageCleanupException { int count = _referenceCount.decrementAndGet(); @@ -342,13 +345,12 @@ public class AMQMessage implements Filterable<AMQException> // and the handle has not yet been constructed if (_messageHandle != null) { - _messageHandle.removeMessage(storeContext); + _messageHandle.removeMessage(StoreContext.getCurrentContext()); } } catch (AMQException e) { - // to maintain consistency, we revert the count - incrementReference(); + throw new MessageCleanupException(getMessageId(), e); } } @@ -373,7 +375,18 @@ public class AMQMessage implements Filterable<AMQException> return (_flags & DELIVERED_TO_CONSUMER) != 0; } - public boolean isPersistent() throws AMQException + public String getRoutingKey() + { + // TODO + return null; + } + + public AMQMessageHeader getMessageHeader() + { + return _messageHeader; + } + + public boolean isPersistent() { return _messageHandle.isPersistent(); } @@ -455,6 +468,26 @@ public class AMQMessage implements Filterable<AMQException> } + public boolean isImmediate() + { + return (_flags & IMMEDIATE) == IMMEDIATE; + } + + public long getExpiration() + { + return _expiration; + } + + public MessageReference newReference() + { + return new AMQMessageReference(this); + } + + public Long getMessageNumber() + { + return getMessageId(); + } + public Object getPublisherClientInstance() { //todo store sessionIdentifier/client id with message in store diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 014b348822..8b69aac7c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -20,15 +20,13 @@ */ package org.apache.qpid.server.queue; -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; @@ -88,7 +86,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue> int delete() throws AMQException; - QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException; + QueueEntry enqueue(ServerMessage message) throws AMQException; void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 785b668687..8bb958ed3f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -36,6 +36,7 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.message.ServerMessage; import javax.management.JMException; import javax.management.MBeanException; @@ -246,7 +247,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que /** * Checks if there is any notification to be send to the listeners */ - public void checkForNotification(AMQMessage msg) throws AMQException + public void checkForNotification(ServerMessage msg) throws AMQException { final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); @@ -333,48 +334,60 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName); } - AMQMessage msg = entry.getMessage(); - // get message content - Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); - List<Byte> msgContent = new ArrayList<Byte>(); - while (cBodies.hasNext()) + ServerMessage serverMsg = entry.getMessage(); + + if(serverMsg instanceof AMQMessage) { - ContentChunk body = cBodies.next(); - if (body.getSize() != 0) + AMQMessage msg = (AMQMessage) serverMsg; + // get message content + Iterator<ContentChunk> cBodies = msg.getContentBodyIterator(); + List<Byte> msgContent = new ArrayList<Byte>(); + while (cBodies.hasNext()) { + ContentChunk body = cBodies.next(); if (body.getSize() != 0) { - ByteBuffer slice = body.getData().slice(); - for (int j = 0; j < slice.limit(); j++) + if (body.getSize() != 0) { - msgContent.add(slice.get()); + ByteBuffer slice = body.getData().slice(); + for (int j = 0; j < slice.limit(); j++) + { + msgContent.add(slice.get()); + } } } } - } - try - { - // Create header attributes list - CommonContentHeaderProperties headerProperties = - (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; - String mimeType = null, encoding = null; - if (headerProperties != null) + + try { - AMQShortString mimeTypeShortSting = headerProperties.getContentType(); - mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); - encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); - } + // Create header attributes list + CommonContentHeaderProperties headerProperties = + (CommonContentHeaderProperties) msg.getContentHeaderBody().properties; + String mimeType = null, encoding = null; + if (headerProperties != null) + { + AMQShortString mimeTypeShortSting = headerProperties.getContentType(); + mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString(); + encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString(); + } - Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) }; + + return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues); + } + catch (AMQException e) + { + JMException jme = new JMException("Error creating header attributes list: " + e); + jme.initCause(e); + throw jme; + } - return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues); } - catch (AMQException e) + else { - JMException jme = new JMException("Error creating header attributes list: " + e); - jme.initCause(e); - throw jme; + // TODO 0-10 Messages for MBean + return null; } } @@ -398,13 +411,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { long position = i; - AMQMessage msg = list.get(i - 1).getMessage(); - ContentHeaderBody headerBody = msg.getContentHeaderBody(); - // Create header attributes list - String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position}; - CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); - _messageList.put(messageData); + ServerMessage serverMsg = list.get(i - 1).getMessage(); + if(serverMsg instanceof AMQMessage) + { + AMQMessage msg = (AMQMessage) serverMsg; + ContentHeaderBody headerBody = msg.getContentHeaderBody(); + // Create header attributes list + String[] headerAttributes = getMessageHeaderProperties(headerBody); + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position }; + CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues); + _messageList.put(messageData); + } + else + { + // TODO 0-10 Message + } } } catch (AMQException e) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index cbe9246f09..0f33f5d8ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -68,4 +68,9 @@ public class DefaultQueueRegistry implements QueueRegistry { return _queueMap.values(); } + + public AMQQueue getQueue(String queue) + { + return getQueue(new AMQShortString(queue)); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java index d38932bb61..eaa3992e98 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java @@ -22,12 +22,13 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.AMQMessageHeader; -public interface Filterable<E extends Exception> +public interface Filterable { - ContentHeaderBody getContentHeaderBody() throws E; + AMQMessageHeader getMessageHeader(); - boolean isPersistent() throws E; + boolean isPersistent(); boolean isRedelivered(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index d5e0b4d187..5babc3e3d4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -28,16 +28,20 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.ContentHeaderBodyAdapter; +import org.apache.qpid.server.message.AMQMessageReference; import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Collection; -public class IncomingMessage implements Filterable<RuntimeException> +public class IncomingMessage implements Filterable, InboundMessage { /** Used for debugging purposes. */ @@ -73,6 +77,7 @@ public class IncomingMessage implements Filterable<RuntimeException> private long _expiration; private Exchange _exchange; + private AMQMessageHeader _messageHeader; public IncomingMessage(final Long messageId, @@ -90,6 +95,7 @@ public class IncomingMessage implements Filterable<RuntimeException> public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException { _contentHeaderBody = contentHeaderBody; + _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody); } public void setExpiration() @@ -158,17 +164,19 @@ public class IncomingMessage implements Filterable<RuntimeException> } AMQMessage message = null; + AMQMessageReference ref = null; try { // first we allow the handle to know that the message has been fully received. This is useful if it is // maintaining any calculated values based on content chunks _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(), - _messagePublishInfo, getContentHeaderBody()); + _messagePublishInfo, getContentHeader()); message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo); + ref = (AMQMessageReference) message.newReference(); message.setExpiration(_expiration); message.setClientIdentifier(_publisher.getSessionIdentifier()); @@ -177,8 +185,8 @@ public class IncomingMessage implements Filterable<RuntimeException> // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ? - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null; + AMQShortString userID = getContentHeader().properties instanceof BasicContentHeaderProperties ? + ((BasicContentHeaderProperties) getContentHeader().properties).getUserId() : null; if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString())) { @@ -202,7 +210,7 @@ public class IncomingMessage implements Filterable<RuntimeException> { int offset; final int queueCount = _destinationQueues.size(); - message.incrementReference(queueCount); + if(queueCount == 1) { offset = 0; @@ -233,7 +241,8 @@ public class IncomingMessage implements Filterable<RuntimeException> finally { // Remove refence for routing process . Reference count should now == delivered queue count - if(message != null) message.decrementReference(_txnContext.getStoreContext()); + + if(ref != null) ref.release(); } } @@ -250,40 +259,51 @@ public class IncomingMessage implements Filterable<RuntimeException> public boolean allContentReceived() { - return (_bodyLengthReceived == getContentHeaderBody().bodySize); + return (_bodyLengthReceived == getContentHeader().bodySize); } - public AMQShortString getExchange() throws AMQException + public AMQShortString getExchange() { return _messagePublishInfo.getExchange(); } - public AMQShortString getRoutingKey() throws AMQException + public String getRoutingKey() { - return _messagePublishInfo.getRoutingKey(); + return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); } - public boolean isMandatory() throws AMQException + public String getBinding() + { + return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); + } + + + public boolean isMandatory() { return _messagePublishInfo.isMandatory(); } - public boolean isImmediate() throws AMQException + public boolean isImmediate() { return _messagePublishInfo.isImmediate(); } - public ContentHeaderBody getContentHeaderBody() + public ContentHeaderBody getContentHeader() { return _contentHeaderBody; } + public AMQMessageHeader getMessageHeader() + { + return _messageHeader; + } + public boolean isPersistent() { - return getContentHeaderBody().properties instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() == + return getContentHeader().properties instanceof BasicContentHeaderProperties && + ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } @@ -292,6 +312,11 @@ public class IncomingMessage implements Filterable<RuntimeException> return false; } + public long getSize() + { + return getContentHeader().bodySize; + } + public void setMessageStore(final MessageStore messageStore) { _messageStore = messageStore; @@ -309,7 +334,8 @@ public class IncomingMessage implements Filterable<RuntimeException> public void route() throws AMQException { - _exchange.route(this); + enqueue(_exchange.route(this)); + } public void enqueue(final ArrayList<AMQQueue> queues) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java index d6fd1eec89..97601048ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.message.ServerMessage; /** * NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate @@ -35,7 +36,7 @@ import org.apache.qpid.server.RequiredDeliveryException; */ public class NoConsumersException extends RequiredDeliveryException { - public NoConsumersException(AMQMessage message) + public NoConsumersException(ServerMessage message) { super("Immediate delivery is not possible.", message); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6f9efd3200..d1fb0f3fe6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -21,13 +21,14 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,26 +42,19 @@ public enum NotificationCheck },
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
+ messageSize = (msg == null) ? 0 : msg.getSize();
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
return true;
}
}
@@ -70,7 +64,7 @@ public enum NotificationCheck },
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -91,7 +85,7 @@ public enum NotificationCheck },
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -133,6 +127,6 @@ public enum NotificationCheck return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java index fd46a8a5ff..0c6b84d2b6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.CommonContentHeaderProperties; import org.apache.qpid.AMQException; +import org.apache.qpid.server.message.ServerMessage; public class PriorityQueueList implements QueueEntryList { @@ -52,26 +53,18 @@ public class PriorityQueueList implements QueueEntryList return _queue; } - public QueueEntry add(AMQMessage message) + public QueueEntry add(ServerMessage message) { - try + int index = message.getMessageHeader().getPriority() - _priorityOffset; + if(index >= _priorities) { - int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset; - if(index >= _priorities) - { - index = _priorities-1; - } - else if(index < 0) - { - index = 0; - } - return _priorityLists[index].add(message); + index = _priorities-1; } - catch (AMQException e) + else if(index < 0) { - // TODO - fix AMQ Exception - throw new RuntimeException(e); + index = 0; } + return _priorityLists[index].add(message); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 2657c459a9..1deb465127 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -3,6 +3,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -133,7 +134,7 @@ public interface QueueEntry extends Comparable<QueueEntry> AMQQueue getQueue(); - AMQMessage getMessage(); + ServerMessage getMessage(); long getSize(); @@ -155,8 +156,6 @@ public interface QueueEntry extends Comparable<QueueEntry> void release(); - String debugIdentity(); - boolean immediateAndNotDelivered(); void setRedelivered(boolean b); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index dbad5438dc..4cb07c3006 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageReference; import org.apache.log4j.Logger; import java.util.Set; @@ -42,7 +44,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; + private MessageReference _message; private Set<Subscription> _rejectedBy = null; @@ -75,6 +77,8 @@ public class QueueEntryImpl implements QueueEntry private volatile long _entryId; volatile QueueEntryImpl _next; + private boolean _deliveredToConsumer; + private boolean _redelivered; QueueEntryImpl(SimpleQueueEntryList queueEntryList) @@ -84,18 +88,18 @@ public class QueueEntryImpl implements QueueEntry } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId) + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId) { _queueEntryList = queueEntryList; - _message = message; + _message = message == null ? null : message.newReference(); _entryIdUpdater.set(this, entryId); } - public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) + public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message) { _queueEntryList = queueEntryList; - _message = message; + _message = message == null ? null : message.newReference(); } protected void setEntryId(long entryId) @@ -113,9 +117,9 @@ public class QueueEntryImpl implements QueueEntry return _queueEntryList.getQueue(); } - public AMQMessage getMessage() + public ServerMessage getMessage() { - return _message; + return _message == null ? null : _message.getMessage(); } public long getSize() @@ -125,12 +129,21 @@ public class QueueEntryImpl implements QueueEntry public boolean getDeliveredToConsumer() { - return getMessage().getDeliveredToConsumer(); + return _deliveredToConsumer; } public boolean expired() throws AMQException { - return getMessage().expired(getQueue()); + long expiration = getMessage().getExpiration(); + if (expiration != 0L) + { + long now = System.currentTimeMillis(); + + return (now > expiration); + } + + return false; + } public boolean isAcquired() @@ -167,7 +180,7 @@ public class QueueEntryImpl implements QueueEntry public void setDeliveredToSubscription() { - getMessage().setDeliveredToConsumer(); + _deliveredToConsumer = true; } public void release() @@ -175,20 +188,15 @@ public class QueueEntryImpl implements QueueEntry _stateUpdater.set(this,AVAILABLE_STATE); } - public String debugIdentity() - { - return getMessage().debugIdentity(); - } - public boolean immediateAndNotDelivered() { - return _message.immediateAndNotDelivered(); + return getMessage().isImmediate() && !_deliveredToConsumer; } public void setRedelivered(boolean b) { - getMessage().setRedelivered(b); + _redelivered = b; } public Subscription getDeliveredSubscription() @@ -223,7 +231,7 @@ public class QueueEntryImpl implements QueueEntry } else { - _log.warn("Requesting rejection by null subscriber:" + debugIdentity()); + _log.warn("Requesting rejection by null subscriber:" + this); } } @@ -284,7 +292,9 @@ public class QueueEntryImpl implements QueueEntry { if(delete()) { - getMessage().decrementReference(storeContext); + StoreContext sc = StoreContext.setCurrentContext(storeContext); + _message.release(); + StoreContext.setCurrentContext(sc); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 313e076f61..b4042ce02c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.ServerMessage; + public interface QueueEntryList { AMQQueue getQueue(); - QueueEntry add(AMQMessage message); + QueueEntry add(ServerMessage message); QueueEntry next(QueueEntry node); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index 1210f0e97c..b47182e750 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -40,4 +40,5 @@ public interface QueueRegistry Collection<AMQQueue> getQueues(); + AMQQueue getQueue(String queue); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 08daf715ef..a96a6d624a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -29,6 +29,8 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; /* * @@ -319,7 +321,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // ------ Enqueue / Dequeue - public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException + public QueueEntry enqueue(ServerMessage message) throws AMQException { incrementQueueCount(); @@ -406,8 +408,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + if (entry.immediateAndNotDelivered()) { + StoreContext storeContext = StoreContext.getCurrentContext(); dequeue(storeContext, entry); entry.dispose(storeContext); } @@ -462,7 +466,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Simple Queues don't :-) } - private void incrementQueueSize(final AMQMessage message) + private void incrementQueueSize(final ServerMessage message) { getAtomicQueueSize().addAndGet(message.getSize()); } @@ -573,10 +577,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { - AMQMessage msg = entry.getMessage(); + ServerMessage msg = entry.getMessage(); if (msg.isPersistent()) { - _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId()); + _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageNumber()); } //entry.dispose(storeContext); @@ -767,7 +771,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessage().getMessageNumber(); return messageId >= fromMessageId && messageId <= toMessageId; } @@ -786,7 +790,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - _complete = entry.getMessage().getMessageId() == messageId; + _complete = entry.getMessage().getMessageNumber() == messageId; return _complete; } @@ -828,7 +832,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessage().getMessageNumber(); return (messageId >= fromMessageId) && (messageId <= toMessageId) && entry.acquire(); @@ -847,11 +851,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in on the message store. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); + ServerMessage message = entry.getMessage(); if (message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageId()); + store.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); } // dequeue does not decrement the refence count entry.dequeue(storeContext); @@ -882,9 +886,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { + StoreContext.setCurrentContext(storeContext); + for (QueueEntry entry : entries) { - toQueue.enqueue(storeContext, entry.getMessage()); + toQueue.enqueue(entry.getMessage()); entry.delete(); } } @@ -896,6 +902,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { throw new RuntimeException(e); } + finally + { + StoreContext.clearCurrentContext(); + + } } @@ -912,17 +923,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); - if ((messageId >= fromMessageId) - && (messageId <= toMessageId)) - { - if (!entry.isDeleted()) - { - return entry.getMessage().incrementReference(); - } - } - - return false; + final long messageId = entry.getMessage().getMessageNumber(); + return ((messageId >= fromMessageId) + && (messageId <= toMessageId)); } public boolean filterComplete() @@ -938,11 +941,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in on the message store. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); + ServerMessage message = entry.getMessage(); - if (message.isReferenced() && message.isPersistent() && toQueue.isDurable()) + if (message.isPersistent() && toQueue.isDurable()) { - store.enqueueMessage(storeContext, toQueue, message.getMessageId()); + + StoreContext sc = StoreContext.setCurrentContext(storeContext); + store.enqueueMessage(storeContext, toQueue, message.getMessageNumber()); + StoreContext.setCurrentContext(sc); + } } @@ -973,9 +980,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { for (QueueEntry entry : entries) { - if (entry.getMessage().isReferenced()) + + ServerMessage message = entry.getMessage(); + if (message != null) { - toQueue.enqueue(storeContext, entry.getMessage()); + toQueue.enqueue(entry.getMessage()); } } } @@ -1001,7 +1010,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { QueueEntry node = queueListIterator.getNode(); - final long messageId = node.getMessage().getMessageId(); + final long messageId = node.getMessage().getMessageNumber(); if ((messageId >= fromMessageId) && (messageId <= toMessageId) @@ -1418,7 +1427,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - @Override + public void checkMessageStatus() throws AMQException { @@ -1581,7 +1590,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener for (int i = 0; i < num && !it.atTail(); i++) { it.advance(); - ids.add(it.getNode().getMessage().getMessageId()); + ids.add(it.getNode().getMessage().getMessageNumber()); } return ids; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index a46c5ae2e8..c5a2972720 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -1,5 +1,8 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; + import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /* @@ -74,7 +77,7 @@ public class SimpleQueueEntryList implements QueueEntryList } - public QueueEntry add(AMQMessage message) + public QueueEntry add(ServerMessage message) { QueueEntryImpl node = new QueueEntryImpl(this, message); for (;;) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java index 18269fa5e8..34e35171e5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java @@ -1352,7 +1352,10 @@ public class DerbyMessageStore implements MessageStore public void process() throws AMQException { - _queue.enqueue(_context, _message); + StoreContext.setCurrentContext(_context); + _queue.enqueue(_message); + StoreContext.clearCurrentContext(); + } @@ -1414,7 +1417,7 @@ public class DerbyMessageStore implements MessageStore if(message != null) { - message.incrementReference(); +// message.incrementReference(); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index fdb56a1a55..1e75ca04f9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -32,9 +32,12 @@ public class StoreContext { private static final Logger _logger = Logger.getLogger(StoreContext.class); + private static final ThreadLocal<StoreContext> _threadLocalContext = new ThreadLocal<StoreContext>(); + private String _name; private Object _payload; + public StoreContext() { _name = "StoreContext"; @@ -68,4 +71,24 @@ public class StoreContext { return "<_name = " + _name + ", _payload = " + _payload + ">"; } + + + public static StoreContext setCurrentContext(StoreContext context) + { + StoreContext sc = getCurrentContext(); + _threadLocalContext.set(context); + return sc; + } + + public static StoreContext getCurrentContext() + { + return _threadLocalContext.get(); + } + + public static void clearCurrentContext() + { + _threadLocalContext.set(null); + } + + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 7aa9d1e3af..dc69e21731 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -32,8 +32,10 @@ import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.filter.FilterManager; @@ -377,16 +379,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { if (_logger.isDebugEnabled()) { - _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity()); + _logger.debug("Subscription:" + this + " rejected message:" + entry); } // return false; } if (_noLocal) { - //todo - client id should be recoreded so we don't have to handle + + AMQMessage message = (AMQMessage) entry.getMessage(); + + //todo - client id should be recorded so we don't have to handle // the case where this is null. - final Object publisherId = entry.getMessage().getPublisherClientInstance(); + final Object publisherId = message.getPublisherClientInstance(); // We don't want local messages so check to see if message is one we sent Object localInstance; @@ -404,8 +409,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage localInstance = getProtocolSession().getClientIdentifier(); - //todo - client id should be recoreded so we don't have to do the null check - if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier())) + //todo - client id should be recorded so we don't have to do the null check + if (localInstance != null && localInstance.equals(message.getPublisherIdentifier())) { return false; } @@ -417,7 +422,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage if (_logger.isDebugEnabled()) { - _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); + _logger.debug("(" + this + ") checking filters for message (" + entry); } return checkFilters(entry); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java new file mode 100644 index 0000000000..3a61099386 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Method; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public class ServerConnection extends Connection +{ + @Override + protected void invoke(Method method) + { + super.invoke(method); + } + + @Override + protected void setState(State state) + { + super.setState(state); + } + + @Override + public ServerConnectionDelegate getConnectionDelegate() + { + return (ServerConnectionDelegate) super.getConnectionDelegate(); + } + + public void setConnectionDelegate(ServerConnectionDelegate delegate) + { + super.setConnectionDelegate(delegate); + } + + private VirtualHost _virtualHost; + + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public void setVirtualHost(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java new file mode 100644 index 0000000000..0f65fc10ce --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.*; + +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslException; +import java.util.*; + + +public class ServerConnectionDelegate extends ServerDelegate +{ + + private String _localFQDN; + private final IApplicationRegistry _appRegistry; + + + public ServerConnectionDelegate(IApplicationRegistry appRegistry, + String localFQDN) + { + this(Collections.EMPTY_MAP, Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); + } + + + public ServerConnectionDelegate(Map<String, Object> properties, + List<Object> locales, + IApplicationRegistry appRegistry, + String localFQDN) + { + super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales); + _appRegistry = appRegistry; + _localFQDN = localFQDN; + } + + private static List<Object> parseToList(String mechanisms) + { + List<Object> list = new ArrayList<Object>(); + StringTokenizer tokenizer = new StringTokenizer(mechanisms, " "); + while(tokenizer.hasMoreTokens()) + { + list.add(tokenizer.nextToken()); + } + return list; + } + + @Override public ServerSession getSession(Connection conn, SessionAttach atc) + { + + SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry); + + ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0); + //ssn.setSessionListener(new Echo()); + return ssn; + } + + + + + @Override + protected SaslServer createSaslServer(String mechanism) throws SaslException + { + return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN); + + } + + + @Override public void connectionOpen(Connection conn, ConnectionOpen open) + { + ServerConnection sconn = (ServerConnection) conn; + + VirtualHost vhost; + String vhostName; + if(open.hasVirtualHost()) + { + vhostName = open.getVirtualHost(); + } + else + { + vhostName = ""; + } + vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName); + + if(vhost != null) + { + sconn.setVirtualHost(vhost); + + sconn.invoke(new ConnectionOpenOk(Collections.EMPTY_LIST)); + + sconn.setState(Connection.State.OPEN); + } + else + { + sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown vistrulhost '"+vhostName+"'")); + sconn.setState(Connection.State.CLOSING); + } + + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java new file mode 100644 index 0000000000..0271949101 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.*; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.AMQException; + +import java.util.ArrayList; + +public class ServerSession extends Session +{ + ServerSession(Connection connection, Binary name, long expiry) + { + super(connection, name, expiry); + } + + ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) + { + super(connection, delegate, name, expiry); + } + + public void enqueue(ServerMessage message, ArrayList<AMQQueue> queues) + { + // TODO Txn + + try + { + for(AMQQueue q : queues) + { + q.enqueue(message); + } + } + catch (AMQException e) + { + // TODO + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java new file mode 100644 index 0000000000..e9cf1b6474 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -0,0 +1,402 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import org.apache.qpid.transport.*; +import org.apache.qpid.server.registry.IApplicationRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.AMQException; + +import java.util.ArrayList; + +public class ServerSessionDelegate extends SessionDelegate +{ + private final IApplicationRegistry _appRegistry; + + public ServerSessionDelegate(IApplicationRegistry appRegistry) + { + _appRegistry = appRegistry; + } + + @Override + public void messageAccept(Session session, MessageAccept method) + { + super.messageAccept(session, method); + } + + @Override + public void messageReject(Session session, MessageReject method) + { + super.messageReject(session, method); + } + + @Override + public void messageRelease(Session session, MessageRelease method) + { + super.messageRelease(session, method); + } + + @Override + public void messageAcquire(Session session, MessageAcquire method) + { + super.messageAcquire(session, method); + } + + @Override + public void messageResume(Session session, MessageResume method) + { + super.messageResume(session, method); + } + + @Override + public void messageSubscribe(Session session, MessageSubscribe method) + { + super.messageSubscribe(session, method); + } + + + @Override + public void messageTransfer(Session ssn, MessageTransfer xfr) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn); + Exchange exchange; + if(xfr.hasDestination()) + { + exchange = exchangeRegistry.getExchange(xfr.getDestination()); + } + else + { + exchange = exchangeRegistry.getDefaultExchange(); + } + + MessageTransferMessage message = new MessageTransferMessage(xfr); + try + { + ArrayList<AMQQueue> queues = exchange.route(message); + + ((ServerSession) ssn).enqueue(message, queues); + + + System.out.println(queues); + + ssn.processed(xfr); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + + + super.messageTransfer(ssn, xfr); //To change body of overridden methods use File | Settings | File Templates. + } + + @Override + public void messageCancel(Session session, MessageCancel method) + { + super.messageCancel(session, method); + } + + @Override + public void messageFlush(Session session, MessageFlush method) + { + super.messageFlush(session, method); + } + + @Override + public void txSelect(Session session, TxSelect method) + { + super.txSelect(session, method); + } + + @Override + public void txCommit(Session session, TxCommit method) + { + super.txCommit(session, method); + } + + @Override + public void txRollback(Session session, TxRollback method) + { + super.txRollback(session, method); + } + + @Override + public void dtxSelect(Session session, DtxSelect method) + { + super.dtxSelect(session, method); + } + + @Override + public void dtxStart(Session session, DtxStart method) + { + super.dtxStart(session, method); + } + + @Override + public void dtxEnd(Session session, DtxEnd method) + { + super.dtxEnd(session, method); + } + + @Override + public void dtxCommit(Session session, DtxCommit method) + { + super.dtxCommit(session, method); + } + + @Override + public void dtxForget(Session session, DtxForget method) + { + super.dtxForget(session, method); + } + + @Override + public void dtxGetTimeout(Session session, DtxGetTimeout method) + { + super.dtxGetTimeout(session, method); + } + + @Override + public void dtxPrepare(Session session, DtxPrepare method) + { + super.dtxPrepare(session, method); + } + + @Override + public void dtxRecover(Session session, DtxRecover method) + { + super.dtxRecover(session, method); + } + + @Override + public void dtxRollback(Session session, DtxRollback method) + { + super.dtxRollback(session, method); + } + + @Override + public void dtxSetTimeout(Session session, DtxSetTimeout method) + { + super.dtxSetTimeout(session, method); + } + + @Override + public void exchangeDeclare(Session session, ExchangeDeclare method) + { + String exchangeName = method.getExchange(); + + Exchange exchange = getExchange(session, exchangeName); + + if(method.getPassive()) + { + if(exchange == null) + { + ExecutionException ex = new ExecutionException(); + ex.setErrorCode(ExecutionErrorCode.NOT_FOUND); + ex.setCommandId(method.getId()); + + ex.setDescription("not-found: exchange-name '"+exchangeName+"'"); + + session.invoke(ex); + session.close(); + } + + } + else + { + // TODO + } + super.exchangeDeclare(session, method); + } + + private Exchange getExchange(Session session, String exchangeName) + { + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); + return exchangeRegistry.getExchange(exchangeName); + } + + private ExchangeRegistry getExchangeRegistry(Session session) + { + VirtualHost virtualHost = getVirtualHost(session); + return virtualHost.getExchangeRegistry(); + + } + + private VirtualHost getVirtualHost(Session session) + { + ServerConnection conn = getServerConnection(session); + VirtualHost vhost = conn.getVirtualHost(); + return vhost; + } + + private ServerConnection getServerConnection(Session session) + { + ServerConnection conn = (ServerConnection) session.getConnection(); + return conn; + } + + @Override + public void exchangeDelete(Session session, ExchangeDelete method) + { + super.exchangeDelete(session, method); + } + + @Override + public void exchangeQuery(Session session, ExchangeQuery method) + { + super.exchangeQuery(session, method); + + } + + @Override + public void exchangeBind(Session session, ExchangeBind method) + { + super.exchangeBind(session, method); + } + + @Override + public void exchangeUnbind(Session session, ExchangeUnbind method) + { + super.exchangeUnbind(session, method); + } + + @Override + public void exchangeBound(Session session, ExchangeBound method) + { + + + ExchangeBoundResult result = new ExchangeBoundResult(); + if(method.hasExchange()) + { + Exchange exchange = getExchange(session, method.getExchange()); + + if(exchange == null) + { + result.setExchangeNotFound(true); + } + + if(method.hasQueue()) + { + + AMQQueue queue = getQueue(session, method.getQueue()); + if(queue == null) + { + result.setQueueNotFound(true); + } + + if(exchange != null && queue != null) + { + + if(method.hasBindingKey()) + { + + if(method.hasArguments()) + { + // TODO + } + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue)); + + } + + result.setQueueNotMatched(!exchange.isBound(queue)); + + } + } + else if(exchange != null && method.hasBindingKey()) + { + if(method.hasArguments()) + { + // TODO + } + result.setKeyNotMatched(!exchange.isBound(method.getBindingKey())); + + } + + } + else if(method.hasQueue()) + { + AMQQueue queue = getQueue(session, method.getQueue()); + if(queue == null) + { + result.setQueueNotFound(true); + } + else + { + if(method.hasBindingKey()) + { + if(method.hasArguments()) + { + // TODO + } + + // TODO + } + } + + } + + + session.executionResult((int) method.getId(), result); + super.exchangeBound(session, method); + } + + private AMQQueue getQueue(Session session, String queue) + { + QueueRegistry queueRegistry = getQueueRegistry(session); + return queueRegistry.getQueue(queue); + } + + private QueueRegistry getQueueRegistry(Session session) + { + return getVirtualHost(session).getQueueRegistry(); + } + + @Override + public void queueDeclare(Session session, QueueDeclare method) + { + super.queueDeclare(session, method); + } + + @Override + public void queueDelete(Session session, QueueDelete method) + { + super.queueDelete(session, method); + } + + @Override + public void queuePurge(Session session, QueuePurge method) + { + super.queuePurge(session, method); + } + + @Override + public void queueQuery(Session session, QueueQuery method) + { + super.queueQuery(session, method); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java index 3c71282c57..ebec6af9f0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java @@ -25,6 +25,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.ack.TxAck; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -93,10 +94,11 @@ public class LocalTransactionalContext implements TransactionalContext public void process() throws AMQException { - _message.incrementReference(); + MessageReference ref = _message.newReference(); try { - QueueEntry entry = _queue.enqueue(getStoreContext(),_message); + StoreContext.setCurrentContext(getStoreContext()); + QueueEntry entry = _queue.enqueue(_message); if(entry.immediateAndNotDelivered()) { @@ -105,7 +107,8 @@ public class LocalTransactionalContext implements TransactionalContext } finally { - _message.decrementReference(getStoreContext()); + ref.release(); + StoreContext.clearCurrentContext(); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 28af36e3db..76cb7a397d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -90,8 +90,13 @@ public class NonTransactionalContext implements TransactionalContext public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException { - QueueEntry entry = queue.enqueue(_storeContext, message); - + StoreContext.setCurrentContext(getStoreContext()); + + QueueEntry entry = queue.enqueue(message); + + StoreContext.clearCurrentContext(); + + //following check implements the functionality //required by the 'immediate' flag: if(entry.immediateAndNotDelivered()) @@ -128,7 +133,7 @@ public class NonTransactionalContext implements TransactionalContext { if (debug) { - _log.debug("Discarding message: " + message.getMessage().getMessageId()); + _log.debug("Discarding message: " + message.getMessage().getMessageNumber()); } if(message.getMessage().isPersistent()) { @@ -171,7 +176,7 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { - _log.debug("Discarding message: " + msg.getMessage().getMessageId()); + _log.debug("Discarding message: " + msg.getMessage().getMessageNumber()); } if(msg.getMessage().isPersistent()) { @@ -187,7 +192,7 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + - msg.getMessage().getMessageId()); + msg.getMessage().getMessageNumber()); } } if(_inTran) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java index 731f6140f9..9e83b93129 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -100,7 +101,7 @@ public class Dump extends Show for (QueueEntry entry : messages) { - AMQMessage msg = entry.getMessage(); + ServerMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; @@ -112,7 +113,7 @@ public class Dump extends Show // Show general message information hex.add(Show.Columns.ID.name()); - ascii.add(msg.getMessageId().toString()); + ascii.add(msg.getMessageNumber().toString()); hex.add(Console.ROW_DIVIDER); ascii.add(Console.ROW_DIVIDER); @@ -136,110 +137,114 @@ public class Dump extends Show hex.add(Console.ROW_DIVIDER); ascii.add(Console.ROW_DIVIDER); - Iterator bodies = msg.getContentBodyIterator(); - if (bodies.hasNext()) + if(msg instanceof AMQMessage) { - hex.add("Hex"); - hex.add(Console.ROW_DIVIDER); + Iterator bodies = ((AMQMessage)msg).getContentBodyIterator(); + if (bodies.hasNext()) + { + hex.add("Hex"); + hex.add(Console.ROW_DIVIDER); - ascii.add("ASCII"); - ascii.add(Console.ROW_DIVIDER); - while (bodies.hasNext()) - { - ContentChunk chunk = (ContentChunk) bodies.next(); + ascii.add("ASCII"); + ascii.add(Console.ROW_DIVIDER); - //Duplicate so we don't destroy original data :) - ByteBuffer hexBuffer = chunk.getData().duplicate(); + while (bodies.hasNext()) + { + ContentChunk chunk = (ContentChunk) bodies.next(); - ByteBuffer charBuffer = hexBuffer.duplicate(); + //Duplicate so we don't destroy original data :) + ByteBuffer hexBuffer = chunk.getData().duplicate(); - Hex hexencoder = new Hex(); + ByteBuffer charBuffer = hexBuffer.duplicate(); - while (hexBuffer.hasRemaining()) - { - byte[] line = new byte[LINE_SIZE]; + Hex hexencoder = new Hex(); - int bufsize = hexBuffer.remaining(); - if (bufsize < LINE_SIZE) - { - hexBuffer.get(line, 0, bufsize); - } - else + while (hexBuffer.hasRemaining()) { - bufsize = line.length; - hexBuffer.get(line); - } + byte[] line = new byte[LINE_SIZE]; - byte[] encoded = hexencoder.encode(line); + int bufsize = hexBuffer.remaining(); + if (bufsize < LINE_SIZE) + { + hexBuffer.get(line, 0, bufsize); + } + else + { + bufsize = line.length; + hexBuffer.get(line); + } - try - { - String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING); - String hexLine = ""; + byte[] encoded = hexencoder.encode(line); - int strKength = encStr.length(); - for (int c = 0; c < strKength; c++) + try { - hexLine += encStr.charAt(c); + String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING); + String hexLine = ""; - if (c % 2 == 1 && SPACE_BYTES) + int strKength = encStr.length(); + for (int c = 0; c < strKength; c++) { - hexLine += BYTE_SPACER; + hexLine += encStr.charAt(c); + + if (c % 2 == 1 && SPACE_BYTES) + { + hexLine += BYTE_SPACER; + } } - } - hex.add(hexLine); - } - catch (UnsupportedEncodingException e) - { - _console.println(e.getMessage()); - return null; + hex.add(hexLine); + } + catch (UnsupportedEncodingException e) + { + _console.println(e.getMessage()); + return null; + } } - } - while (charBuffer.hasRemaining()) - { - String asciiLine = ""; - - for (int pos = 0; pos < LINE_SIZE; pos++) + while (charBuffer.hasRemaining()) { - if (charBuffer.hasRemaining()) - { - byte ch = charBuffer.get(); + String asciiLine = ""; - if (isPrintable(ch)) + for (int pos = 0; pos < LINE_SIZE; pos++) + { + if (charBuffer.hasRemaining()) { - asciiLine += (char) ch; + byte ch = charBuffer.get(); + + if (isPrintable(ch)) + { + asciiLine += (char) ch; + } + else + { + asciiLine += NON_PRINTING_ASCII_CHAR; + } + + if (SPACE_BYTES) + { + asciiLine += BYTE_SPACER; + } } else { - asciiLine += NON_PRINTING_ASCII_CHAR; - } - - if (SPACE_BYTES) - { - asciiLine += BYTE_SPACER; + break; } } - else - { - break; - } - } - ascii.add(asciiLine); + ascii.add(asciiLine); + } } } - } - else - { - List<String> result = new LinkedList<String>(); + else + { + List<String> result = new LinkedList<String>(); - display.add(result); - result.add("No ContentBodies"); + display.add(result); + result.add("No ContentBodies"); + } } } @@ -252,7 +257,7 @@ public class Dump extends Show return display; } - private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg, + private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg, String title, boolean routing, boolean headers, boolean messageHeaders) { List<QueueEntry> single = new LinkedList<QueueEntry>(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java index a8dd58ca83..b3ff1d43e1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java @@ -172,7 +172,7 @@ public class Move extends AbstractCommand { for (QueueEntry msg : messages) { - ids.add(msg.getMessage().getMessageId()); + ids.add(msg.getMessage().getMessageNumber()); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 2fa017fc64..d846d9976b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -26,9 +26,9 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.QueueEntryImpl; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.tools.messagestore.MessageStoreTool; import org.apache.qpid.tools.utils.Console; @@ -337,30 +337,24 @@ public class Show extends AbstractCommand //Add create the table of data for (QueueEntry entry : messages) { - AMQMessage msg = entry.getMessage(); + ServerMessage msg = entry.getMessage(); if (!includeMsg(msg, msgids)) { continue; } - id.add(msg.getMessageId().toString()); + id.add(msg.getMessageNumber().toString()); size.add("" + msg.getSize()); arrival.add("" + msg.getArrivalTime()); - try - { - ispersitent.add(msg.isPersistent() ? "true" : "false"); - } - catch (AMQException e) - { - ispersitent.add("n/a"); - } + ispersitent.add(msg.isPersistent() ? "true" : "false"); + isredelivered.add(msg.isRedelivered() ? "true" : "false"); - isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); + isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false"); // msg.getMessageHandle(); @@ -368,7 +362,10 @@ public class Show extends AbstractCommand try { - headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties); + if(msg instanceof AMQMessage) + { + headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties); + } } catch (AMQException e) { @@ -417,7 +414,11 @@ public class Show extends AbstractCommand MessagePublishInfo info = null; try { - info = msg.getMessagePublishInfo(); + if(msg instanceof AMQMessage) + { + info = ((AMQMessage)msg).getMessagePublishInfo(); + } + } catch (AMQException e) { @@ -457,14 +458,14 @@ public class Show extends AbstractCommand return data; } - protected boolean includeMsg(AMQMessage msg, List<Long> msgids) + protected boolean includeMsg(ServerMessage msg, List<Long> msgids) { if (msgids == null) { return true; } - Long msgid = msg.getMessageId(); + Long msgid = msg.getMessageNumber(); boolean found = false; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 5fbf9484f7..ad39094d2c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -89,7 +89,7 @@ public class ExtractResendAndRequeueTest extends TestCase while(queueEntries.advance()) { QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry); + _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry); // Store the entry for future inspection _referenceList.add(entry); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java index dfbbd56d6f..0b48feddcd 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java @@ -38,7 +38,6 @@ import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TestMemoryMessageStore; import org.apache.qpid.server.store.StoreContext; -import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; @@ -163,7 +162,9 @@ public class TxAckTest extends TestCase }; TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); - _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); + StoreContext sc = StoreContext.setCurrentContext(new StoreContext()); + _map.add(deliveryTag, _queue.enqueue(message)); + StoreContext.setCurrentContext(sc); } _acked = acked; _unacked = unacked; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6dcb187a37..6634eb3e60 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -33,6 +33,8 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; import org.apache.qpid.server.RequiredDeliveryException; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.log4j.Logger; @@ -255,9 +257,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase * @throws AMQException */ @Override - public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException + public QueueEntry enqueue(ServerMessage msg) throws AMQException { - messages.add( new HeadersExchangeTest.Message(msg)); + messages.add( new HeadersExchangeTest.Message((AMQMessage) msg)); return new QueueEntry() { @@ -326,11 +328,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public String debugIdentity() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - public boolean immediateAndNotDelivered() { return false; //To change body of implemented methods use File | Settings | File Templates. @@ -438,7 +435,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase } - public ContentHeaderBody getContentHeaderBody() + public ContentHeaderBody getContentHeader() { try { @@ -522,7 +519,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase void route(Exchange exchange) throws AMQException { - exchange.route(_incoming); + _incoming.enqueue(exchange.route(_incoming)); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index aa25e207a9..a26f7f16d7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -76,7 +76,7 @@ public class DestWildExchangeTest extends TestCase IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession); - _exchange.route(message); + message.enqueue(_exchange.route(message)); Assert.assertEquals(0, queue.getMessageCount()); } @@ -100,7 +100,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -140,7 +140,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -159,7 +159,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -198,7 +198,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -217,7 +217,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -236,7 +236,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -254,7 +254,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -294,7 +294,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -312,7 +312,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -352,7 +352,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -384,7 +384,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -425,7 +425,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -464,7 +464,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -495,7 +495,7 @@ public class DestWildExchangeTest extends TestCase private void routeMessage(final IncomingMessage message) throws AMQException { - _exchange.route(message); + message.enqueue(_exchange.route(message)); message.routingComplete(_store, new MessageHandleFactory()); message.deliverToQueues(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 86ba96bf5d..7c027880b4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -22,16 +22,85 @@ package org.apache.qpid.server.exchange; import java.util.Map; import java.util.HashMap; +import java.util.Set; import junit.framework.TestCase; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.message.AMQMessageHeader; /** */ public class HeadersBindingTest extends TestCase { + + private class MockHeader implements AMQMessageHeader + { + + private final Map<String, Object> _headers = new HashMap<String, Object>(); + + public String getCorrelationId() + { + return null; + } + + public long getExpiration() + { + return 0; + } + + public String getMessageId() + { + return null; + } + + public byte getPriority() + { + return 0; + } + + public long getTimestamp() + { + return 0; + } + + public String getType() + { + return null; + } + + public String getReplyTo() + { + return null; + } + + public Object getHeader(String name) + { + return _headers.get(name); + } + + public boolean containsHeaders(Set<String> names) + { + return _headers.keySet().containsAll(names); + } + + public boolean containsHeader(String name) + { + return _headers.containsKey(name); + } + + public void setString(String key, String value) + { + setObject(key,value); + } + + public void setObject(String key, Object value) + { + _headers.put(key,value); + } + } + private FieldTable bindHeaders = new FieldTable(); - private FieldTable matchHeaders = new FieldTable(); + private MockHeader matchHeaders = new MockHeader(); public void testDefault_1() { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index aff7af6952..768024498a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -42,19 +42,19 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest { // Enqueue messages in order - _queue.enqueue(null, createMessage(1L, (byte) 10)); - _queue.enqueue(null, createMessage(2L, (byte) 4)); - _queue.enqueue(null, createMessage(3L, (byte) 0)); + _queue.enqueue(createMessage(1L, (byte) 10)); + _queue.enqueue(createMessage(2L, (byte) 4)); + _queue.enqueue(createMessage(3L, (byte) 0)); // Enqueue messages in reverse order - _queue.enqueue(null, createMessage(4L, (byte) 0)); - _queue.enqueue(null, createMessage(5L, (byte) 4)); - _queue.enqueue(null, createMessage(6L, (byte) 10)); + _queue.enqueue(createMessage(4L, (byte) 0)); + _queue.enqueue(createMessage(5L, (byte) 4)); + _queue.enqueue(createMessage(6L, (byte) 10)); // Enqueue messages out of order - _queue.enqueue(null, createMessage(7L, (byte) 4)); - _queue.enqueue(null, createMessage(8L, (byte) 10)); - _queue.enqueue(null, createMessage(9L, (byte) 0)); + _queue.enqueue(createMessage(7L, (byte) 4)); + _queue.enqueue(createMessage(8L, (byte) 10)); + _queue.enqueue(createMessage(9L, (byte) 0)); // Register subscriber _queue.registerSubscription(_subscription, false); @@ -63,17 +63,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest ArrayList<QueueEntry> msgs = _subscription.getMessages(); try { - assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId()); - assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId()); - assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId()); + assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber()); + assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber()); + assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber()); - assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId()); - assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId()); - assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId()); + assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber()); + assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber()); + assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber()); - assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId()); - assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId()); - assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId()); + assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber()); + assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber()); + assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber()); } catch (AssertionFailedError afe) { @@ -81,7 +81,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest int index = 1; for (QueueEntry qe : msgs) { - System.err.println(index + ":" + qe.getMessage().getMessageId()); + System.err.println(index + ":" + qe.getMessage().getMessageNumber()); index++; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 20503bf15c..966fb63186 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.AMQException; import org.apache.commons.configuration.Configuration; @@ -160,7 +161,7 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException + public QueueEntry enqueue(ServerMessage message) throws AMQException { return null; //To change body of implemented methods use File | Settings | File Templates. } @@ -280,7 +281,7 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override + public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. @@ -321,7 +322,7 @@ public class MockAMQQueue implements AMQQueue return 0; //To change body of implemented methods use File | Settings | File Templates. } - @Override + public void setMinimumAlertRepeatGap(long value) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 37f91e7464..850e241cbd 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -49,11 +49,6 @@ public class MockQueueEntry implements QueueEntry } - public String debugIdentity() - { - return null; - } - public boolean delete() { return false; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 3084dc7fa1..72bbd7fe0c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -185,7 +185,7 @@ public class SimpleAMQQueueTest extends TestCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); + _queue.enqueue(messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); // Check removing the subscription removes it's information from the queue @@ -196,7 +196,7 @@ public class SimpleAMQQueueTest extends TestCase 1 == _queue.getActiveConsumerCount()); AMQMessage messageB = createMessage(new Long (25)); - _queue.enqueue(null, messageB); + _queue.enqueue(messageB); QueueEntry entry = _subscription.getLastSeenEntry(); assertNull(entry); } @@ -204,7 +204,7 @@ public class SimpleAMQQueueTest extends TestCase public void testQueueNoSubscriber() throws AMQException, InterruptedException { AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); + _queue.enqueue(messageA); _queue.registerSubscription(_subscription, false); Thread.sleep(150); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); @@ -223,7 +223,7 @@ public class SimpleAMQQueueTest extends TestCase // Check sending a message ends up with the subscriber AMQMessage messageA = createMessage(new Long(24)); - _queue.enqueue(null, messageA); + _queue.enqueue(messageA); assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); // Check we cannot add a second subscriber to the queue @@ -261,7 +261,7 @@ public class SimpleAMQQueueTest extends TestCase _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); - _queue.enqueue(null, message); + _queue.enqueue(message); _queue.unregisterSubscription(_subscription); assertTrue("Queue was not deleted when subscription was removed", _queue.isDeleted()); @@ -272,7 +272,7 @@ public class SimpleAMQQueueTest extends TestCase _queue.registerSubscription(_subscription, false); Long id = new Long(26); AMQMessage message = createMessage(id); - _queue.enqueue(null, message); + _queue.enqueue(message); QueueEntry entry = _subscription.getLastSeenEntry(); entry.setRedelivered(true); _queue.resend(entry, _subscription); @@ -286,7 +286,7 @@ public class SimpleAMQQueueTest extends TestCase AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); // Get message id Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); @@ -302,7 +302,7 @@ public class SimpleAMQQueueTest extends TestCase Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5); @@ -323,7 +323,7 @@ public class SimpleAMQQueueTest extends TestCase Long messageId = new Long(i); AMQMessage message = createMessage(messageId); // Put message on queue - _queue.enqueue(null, message); + _queue.enqueue(message); } // Get message ids List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java index 2346660d25..d88d39002a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -44,6 +44,7 @@ public class TestReferenceCounting extends TestCase { super.setUp(); _store = new TestMemoryMessageStore(); + StoreContext.setCurrentContext(_storeContext); } /** @@ -96,7 +97,7 @@ public class TestReferenceCounting extends TestCase assertEquals(1, _store.getMessageMetaDataMap().size()); - message.decrementReference(_storeContext); + message.decrementReference(); assertEquals(1, _store.getMessageMetaDataMap().size()); } @@ -158,7 +159,7 @@ public class TestReferenceCounting extends TestCase assertEquals(1, _store.getMessageMetaDataMap().size()); message = message.takeReference(); - message.decrementReference(_storeContext); + message.decrementReference(); assertEquals(1, _store.getMessageMetaDataMap().size()); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index ed01c91804..e9034d25d3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -904,10 +904,13 @@ public class FieldTable } } + public Object get(String key) + { + return get(new AMQShortString(key)); + } public Object get(AMQShortString key) { - return getObject(key); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 1cdd1da72b..43445232d6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -55,7 +55,7 @@ public class Connection extends ConnectionInvoker private static final Logger log = Logger.get(Connection.class); - enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } class DefaultConnectionListener implements ConnectionListener { @@ -118,7 +118,7 @@ public class Connection extends ConnectionInvoker sender.setIdleTimeout(idleTimeout); } - void setState(State state) + protected void setState(State state) { synchronized (lock) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index 2833565afc..8caf29ecb5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -52,13 +52,28 @@ public class ServerDelegate extends ConnectionDelegate { private SaslServer saslServer; + private List<Object> _locales; + private List<Object> _mechanisms; + private Map<String, Object> _clientProperties; + + + public ServerDelegate() + { + this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8")); + } + + protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales) + { + _clientProperties = clientProperties; + _mechanisms = mechanisms; + _locales = locales; + } public void init(Connection conn, ProtocolHeader hdr) { conn.send(new ProtocolHeader(1, 0, 10)); - List<Object> utf8 = new ArrayList<Object>(); - utf8.add("utf8"); - conn.connectionStart(null, Collections.EMPTY_LIST, utf8); + + conn.connectionStart(_clientProperties, _mechanisms, _locales); } @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok) @@ -77,8 +92,8 @@ public class ServerDelegate extends ConnectionDelegate try { - SaslServer ss = Sasl.createSaslServer - (mechanism, "AMQP", "localhost", null, null); + + SaslServer ss = createSaslServer(mechanism); if (ss == null) { conn.connectionClose @@ -95,6 +110,14 @@ public class ServerDelegate extends ConnectionDelegate } } + protected SaslServer createSaslServer(String mechanism) + throws SaslException + { + SaslServer ss = Sasl.createSaslServer + (mechanism, "AMQP", "localhost", null, null); + return ss; + } + private void secure(Connection conn, byte[] response) { SaslServer ss = conn.getSaslServer(); @@ -133,9 +156,16 @@ public class ServerDelegate extends ConnectionDelegate @Override public void connectionOpen(Connection conn, ConnectionOpen open) { conn.connectionOpenOk(Collections.EMPTY_LIST); + conn.setState(OPEN); } + protected Session getSession(Connection conn, SessionDelegate delegate, SessionAttach atc) + { + return new Session(conn, delegate, new Binary(atc.getName()), 0); + } + + public Session getSession(Connection conn, SessionAttach atc) { return new Session(conn, new Binary(atc.getName()), 0); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 74150e043d..a8757bcb3c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -82,7 +82,7 @@ public class Session extends SessionInvoker private Binary name; private long expiry; private int channel; - private SessionDelegate delegate = new SessionDelegate(); + private SessionDelegate delegate; private SessionListener listener = new DefaultSessionListener(); private long timeout = 60000; private boolean autoSync = false; @@ -112,9 +112,15 @@ public class Session extends SessionInvoker private Thread resumer = null; - Session(Connection connection, Binary name, long expiry) + protected Session(Connection connection, Binary name, long expiry) + { + this(connection, new SessionDelegate(), name, expiry); + } + + protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this.connection = connection; + this.delegate = delegate; this.name = name; this.expiry = expiry; initReceiver(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java new file mode 100644 index 0000000000..0c432eba6f --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.*; + +public class SimpleConnectionTest extends TestCase +{ + public void testConnection() + { + try + { + AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test"); + QueueSession s = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueSender p = s.createSender(new AMQQueue("amq.direct", "queue")); + p.send(s.createTextMessage("test")); + + QueueReceiver r = s.createReceiver(new AMQQueue("amq.direct", "queue")); + conn.start(); + Message m = r.receive(); + + Thread.sleep(60000L); + conn.close(); + } + catch (AMQException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (URLSyntaxException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + catch (JMSException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } +} |