summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-07-20 19:05:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-07-20 19:05:05 +0000
commitce5c014723b7ca37b05ef11ac1c37da65dd5aaa1 (patch)
tree2746b9fc9089b1cb8fdb6f72315660ecaab8a454
parent94a235124bceb2b54d16bba52c9eb8fb32932b27 (diff)
downloadqpid-python-ce5c014723b7ca37b05ef11ac1c37da65dd5aaa1.tar.gz
Java Broker 0-10 Exploratory work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@795958 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java13
-rw-r--r--qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java82
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java41
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java26
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java117
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java51
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java93
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java60
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java48
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java67
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java65
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java122
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java60
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java402
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java153
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java30
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java71
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java38
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java18
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java10
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java64
85 files changed, 1699 insertions, 648 deletions
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
index aa64fab6cd..fe648fc899 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.extras.exchanges.diagnostic;
-import java.util.List;
-import java.util.Map;
import java.util.ArrayList;
-import java.util.Collection;
import javax.management.JMException;
import javax.management.openmbean.OpenDataException;
@@ -34,8 +31,8 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.junit.extensions.util.SizeOf;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
@@ -193,20 +190,20 @@ public class DiagnosticExchange extends AbstractExchange
return false;
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
Long value = new Long(SizeOf.getUsedMemory());
AMQShortString key = new AMQShortString("memory");
- FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
+ FieldTable headers = ((BasicContentHeaderProperties)payload.getMessageHeader().properties).getHeaders();
headers.put(key, value);
- ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
+ ((BasicContentHeaderProperties)payload.getMessageHeader().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
queues.add(q);
- payload.enqueue(queues);
+ return queues;
}
diff --git a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
index e43bd2ddc0..b64983ccb0 100644
--- a/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
+++ b/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
@@ -23,14 +23,15 @@ package org.apache.qpid.extras.exchanges.example;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
public class TestExchange implements Exchange
{
@@ -63,6 +64,16 @@ public class TestExchange implements Exchange
return false;
}
+ public boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ return false;
+ }
+
+ public boolean isBound(String bindingKey)
+ {
+ return false;
+ }
+
public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
throws AMQException
{
@@ -102,8 +113,9 @@ public class TestExchange implements Exchange
{
}
- public void route(IncomingMessage message) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage message) throws AMQException
{
+ return new ArrayList<AMQQueue>();
}
public int getTicket()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index aa390d6c26..c6e370206e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -57,6 +57,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.LocalTransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.ServerMessage;
public class AMQChannel
{
@@ -157,27 +158,35 @@ public class AMQChannel
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
throws AMQException
{
- if (_currentMessage == null)
- {
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
- }
- else
+ StoreContext.setCurrentContext(_storeContext);
+ try
{
- if (_log.isDebugEnabled())
+ if (_currentMessage == null)
{
- _log.debug("Content header received on channel " + _channelId);
+ throw new AMQException("Received content header without previously receiving a BasicPublish frame");
}
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Content header received on channel " + _channelId);
+ }
- _currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
- _currentMessage.setExpiration();
+ _currentMessage.setExpiration();
- routeCurrentMessage();
+ routeCurrentMessage();
- _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+ _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
- deliverCurrentMessageIfComplete();
+ deliverCurrentMessageIfComplete();
+ }
+ }
+ finally
+ {
+ StoreContext.clearCurrentContext();
}
}
@@ -212,6 +221,7 @@ public class AMQChannel
public void publishContentBody(ContentBody contentBody) throws AMQException
{
+ StoreContext.setCurrentContext(_storeContext);
if (_currentMessage == null)
{
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
@@ -231,6 +241,7 @@ public class AMQChannel
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
contentBody));
+
deliverCurrentMessageIfComplete();
}
catch (AMQException e)
@@ -240,6 +251,10 @@ public class AMQChannel
_currentMessage = null;
throw e;
}
+ finally
+ {
+ StoreContext.clearCurrentContext();
+ }
}
protected void routeCurrentMessage() throws AMQException
@@ -425,7 +440,7 @@ public class AMQChannel
{
if (entry.getQueue() == null)
{
- _log.debug("Adding unacked message with a null queue:" + entry.debugIdentity());
+ _log.debug("Adding unacked message with a null queue:" + entry);
}
else
{
@@ -487,7 +502,7 @@ public class AMQChannel
if (!unacked.isQueueDeleted())
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
unacked.release();
@@ -518,7 +533,7 @@ public class AMQChannel
if (unacked != null)
{
// Mark message redelivered
- unacked.getMessage().setRedelivered(true);
+ unacked.setRedelivered(true);
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
@@ -551,7 +566,7 @@ public class AMQChannel
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
unacked.discard(_storeContext);
@@ -612,7 +627,7 @@ public class AMQChannel
- AMQMessage msg = message.getMessage();
+ ServerMessage msg = message.getMessage();
AMQQueue queue = message.getQueue();
// Our Java Client will always suspend the channel when resending!
@@ -631,7 +646,7 @@ public class AMQChannel
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
- msg.setRedelivered(true);
+ message.setRedelivered(true);
Subscription sub = message.getDeliveredSubscription();
@@ -829,14 +844,25 @@ public class AMQChannel
{
if (!_returnMessages.isEmpty())
{
+ StoreContext sc =StoreContext.setCurrentContext(_storeContext);
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
- AMQMessage message = bouncedMessage.getAMQMessage();
- _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ ServerMessage serverMessage = bouncedMessage.getAMQMessage();
+ if(serverMessage instanceof AMQMessage)
+ {
+ AMQMessage message = (AMQMessage) serverMessage;
+ _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
- message.decrementReference(_storeContext);
+ }
+ else
+ {
+ // TODO AMQP 0-10 Message
+ throw new RuntimeException("not yet implemented conversion of 0-10 messages");
+ }
+ bouncedMessage.release();
}
+ StoreContext.setCurrentContext(sc);
_returnMessages.clear();
}
@@ -884,8 +910,18 @@ public class AMQChannel
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag());
+ ServerMessage msg = entry.getMessage();
+ if(msg instanceof AMQMessage)
+ {
+ getProtocolSession().getProtocolOutputConverter().writeDeliver((AMQMessage)msg, getChannelId(),
+ deliveryTag, sub.getConsumerTag());
+ }
+ else
+ {
+ //TODO - Convert 0-10 Message into 0-8/9 message
+ }
}
+
};
public ClientDeliveryMethod getClientDeliveryMethod()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
index 29494c4118..29c37a0ec8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -56,9 +57,8 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
-
- AMQMessage msg = message.getMessage();
- msg.setRedelivered(true);
+
+ message.setRedelivered(true);
final Subscription subscription = message.getDeliveredSubscription();
if (subscription != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index e402dd6956..27c1705da6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -45,6 +45,9 @@ import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.*;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -56,6 +59,9 @@ import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
import org.apache.qpid.server.protocol.AMQPProtocolProvider;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.transport.ServerConnection;
/**
* Main entry point for AMQPD.
@@ -314,6 +320,29 @@ public class Main
}
bind(port, serverConfig);
+
+
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+
+ final ConnectionDelegate delegate =
+ new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, "localhost");
+
+
+ ConnectionBinding cb = new ConnectionBinding()
+ {
+ public Connection connection()
+ {
+ ServerConnection conn = new ServerConnection();
+ conn.setConnectionDelegate(delegate);
+ return conn;
+ }
+ };
+
+ int port_0_10 = port + 1;
+
+ org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor
+ ("0.0.0.0", port_0_10, cb);
+ ioa.start();
}
/**
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
index 3f1947d65a..be6232ed6b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
@@ -23,6 +23,9 @@ package org.apache.qpid.server;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageReference;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
/**
* Signals that a required delivery could not be made. This could be bacuse of the immediate flag being set and the
@@ -39,9 +42,9 @@ import org.apache.qpid.server.queue.AMQMessage;
*/
public abstract class RequiredDeliveryException extends AMQException
{
- private AMQMessage _amqMessage;
+ private MessageReference _amqMessage;
- public RequiredDeliveryException(String message, AMQMessage payload)
+ public RequiredDeliveryException(String message, ServerMessage payload)
{
super(message);
@@ -54,20 +57,20 @@ public abstract class RequiredDeliveryException extends AMQException
super(message);
}
- public void setMessage(final AMQMessage payload)
+ public void setMessage(final ServerMessage payload)
{
// Increment the reference as this message is in the routing phase
// and so will have the ref decremented as routing fails.
// we need to keep this message around so we can return it in the
// handler. So increment here.
- _amqMessage = payload.takeReference();
+ _amqMessage = payload.newReference();
}
- public AMQMessage getAMQMessage()
+ public ServerMessage getAMQMessage()
{
- return _amqMessage;
+ return _amqMessage.getMessage();
}
public AMQConstant getErrorCode()
@@ -76,4 +79,9 @@ public abstract class RequiredDeliveryException extends AMQException
}
public abstract AMQConstant getReplyCode();
+
+ public void release()
+ {
+ _amqMessage.release();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index db3a05eb52..7322a4add6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -130,7 +130,7 @@ public class TxAck implements TxnOp
//in memory (persistent changes will be rolled back by store)
for (QueueEntry msg : _unacked.values())
{
- msg.getMessage().takeReference();
+ // TODO - should requeue, whole thing is messed up
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 247558bb34..f1f87eb1d2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -204,4 +205,15 @@ public abstract class AbstractExchange implements Exchange, Managable
{
return getVirtualHost().getQueueRegistry();
}
+
+ public boolean isBound(String bindingKey, AMQQueue queue)
+ {
+ return isBound(new AMQShortString(bindingKey), queue);
+ }
+
+ public boolean isBound(String bindingKey)
+ {
+ return isBound(new AMQShortString(bindingKey));
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 0ab8208d88..25ced58060 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -24,7 +24,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -57,6 +56,11 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
}
+ public Exchange getExchange(String exchangeName)
+ {
+ return getExchange(new AMQShortString(exchangeName));
+ }
+
public MessageStore getMessageStore()
{
return _host.getMessageStore();
@@ -134,6 +138,6 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
{
throw new AMQException("Exchange '" + exchange + "' does not exist");
}
- exch.route(payload);
+ payload.enqueue(exch.route(payload));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 4b609f592b..02e83f3dd3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -40,9 +40,9 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
public class DirectExchange extends AbstractExchange
{
@@ -192,10 +192,10 @@ public class DirectExchange extends AbstractExchange
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
- final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : payload.getRoutingKey();
+ final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey());
final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
@@ -204,7 +204,8 @@ public class DirectExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + queues);
}
- payload.enqueue(queues);
+ return queues;
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 06209c5458..7ff5e4ef96 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -24,12 +24,11 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
public interface Exchange
{
@@ -54,7 +53,7 @@ public interface Exchange
void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
- void route(IncomingMessage message) throws AMQException;
+ ArrayList<AMQQueue> route(InboundMessage message) throws AMQException;
/**
@@ -93,6 +92,8 @@ public interface Exchange
*/
boolean hasBindings();
-
+ boolean isBound(String bindingKey, AMQQueue queue);
+
+ boolean isBound(String bindingKey);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index fe3b19e74e..6b42187f9c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -48,4 +48,6 @@ public interface ExchangeRegistry extends MessageRouter
Collection<AMQShortString> getExchangeNames();
void initialise() throws AMQException;
+
+ Exchange getExchange(String exchangeName);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 23c716a0db..2a037782eb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -28,9 +28,9 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -182,7 +182,7 @@ public class FanoutExchange extends AbstractExchange
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
@@ -191,7 +191,7 @@ public class FanoutExchange extends AbstractExchange
_logger.debug("Publishing message to queue " + _queues);
}
- payload.enqueue(new ArrayList(_queues));
+ return new ArrayList(_queues);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 2b7df4361a..fde4cfd6a2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.message.AMQMessageHeader;
/**
* Defines binding and matching based on a set of headers.
@@ -139,7 +140,7 @@ class HeadersBinding
* @return true if the headers define any required keys and match any required
* values
*/
- public boolean matches(FieldTable headers)
+ public boolean matches(AMQMessageHeader headers)
{
if(headers == null)
{
@@ -151,13 +152,13 @@ class HeadersBinding
}
}
- private boolean and(FieldTable headers)
+ private boolean and(AMQMessageHeader headers)
{
- if(headers.keys().containsAll(required))
+ if(headers.containsHeaders(required))
{
for(Map.Entry<String, Object> e : matches.entrySet())
{
- if(!e.getValue().equals(headers.getObject(e.getKey())))
+ if(!e.getValue().equals(headers.getHeader(e.getKey())))
{
return false;
}
@@ -171,11 +172,11 @@ class HeadersBinding
}
- private boolean or(final FieldTable headers)
+ private boolean or(final AMQMessageHeader headers)
{
- if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor()))
+ if(required.isEmpty() || passesRequiredOr(headers))
{
- return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor()))
+ return ((!matches.isEmpty()) && passesMatchesOr(headers))
|| (required.isEmpty() && matches.isEmpty());
}
else
@@ -184,6 +185,32 @@ class HeadersBinding
}
}
+ private boolean passesMatchesOr(AMQMessageHeader headers)
+ {
+ for(Map.Entry<String,Object> entry : matches.entrySet())
+ {
+ if(!headers.containsHeader(entry.getKey())
+ || !((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
+ || (entry.getValue().equals(headers.getHeader(entry.getKey())))))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean passesRequiredOr(AMQMessageHeader headers)
+ {
+ for(String name : required)
+ {
+ if(headers.containsHeader(name))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void processSpecial(String key, Object value)
{
if("X-match".equalsIgnoreCase(key))
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index fc667db17b..02e129621c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -31,9 +31,10 @@ import org.apache.qpid.framing.AMQTypedValue;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
import javax.management.JMException;
import javax.management.openmbean.ArrayType;
@@ -50,7 +51,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -237,31 +237,31 @@ public class HeadersExchange extends AbstractExchange
}
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
- FieldTable headers = getHeaders(payload.getContentHeaderBody());
+ AMQMessageHeader header = payload.getMessageHeader();
if (_logger.isDebugEnabled())
{
- _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
+ _logger.debug("Exchange " + getName() + ": routing message with headers " + header);
}
boolean routed = false;
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
for (Registration e : _bindings)
{
- if (e.binding.matches(headers))
+ if (e.binding.matches(header))
{
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": delivering message with headers " +
- headers + " to " + e.queue.getName());
+ header + " to " + e.queue.getName());
}
queues.add(e.queue);
routed = true;
}
}
- payload.enqueue(queues);
+ return queues;
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index be7a1dc196..71481ec730 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -30,13 +30,13 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortStringTokenizer;
-import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.message.InboundMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -109,7 +109,7 @@ public class TopicExchange extends AbstractExchange
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
- private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
+ private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
public static class Binding
{
@@ -160,7 +160,7 @@ public class TopicExchange extends AbstractExchange
private final class TopicExchangeResult implements TopicMatcherResult
{
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
- private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
+ private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
public void addUnfilteredQueue(AMQQueue queue)
{
@@ -190,12 +190,12 @@ public class TopicExchange extends AbstractExchange
}
- public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ public void addFilteredQueue(AMQQueue queue, MessageFilter filter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
if(filters == null)
{
- filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
+ filters = new ConcurrentHashMap<MessageFilter,Integer>();
_filteredQueues.put(queue, filters);
}
Integer instances = filters.get(filter);
@@ -210,9 +210,9 @@ public class TopicExchange extends AbstractExchange
}
- public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
+ public void removeFilteredQueue(AMQQueue queue, MessageFilter filter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
if(filters != null)
{
Integer instances = filters.get(filter);
@@ -237,11 +237,11 @@ public class TopicExchange extends AbstractExchange
}
public void replaceQueueFilter(AMQQueue queue,
- MessageFilter<RuntimeException> oldFilter,
- MessageFilter<RuntimeException> newFilter)
+ MessageFilter oldFilter,
+ MessageFilter newFilter)
{
- Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
- Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
+ Map<MessageFilter,Integer> filters = _filteredQueues.get(queue);
+ Map<MessageFilter,Integer> newFilters = new ConcurrentHashMap<MessageFilter,Integer>(filters);
Integer oldFilterInstances = filters.get(oldFilter);
if(oldFilterInstances == 1)
{
@@ -263,7 +263,7 @@ public class TopicExchange extends AbstractExchange
_filteredQueues.put(queue,newFilters);
}
- public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
{
if(queues == null)
{
@@ -284,11 +284,11 @@ public class TopicExchange extends AbstractExchange
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
- for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
+ for(Map.Entry<AMQQueue, Map<MessageFilter, Integer>> entry : _filteredQueues.entrySet())
{
if(!queues.contains(entry.getKey()))
{
- for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
+ for(MessageFilter filter : entry.getValue().keySet())
{
if(filter.matches(msg))
{
@@ -456,18 +456,18 @@ public class TopicExchange extends AbstractExchange
}
- private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
+ private JMSSelectorFilter createSelectorFilter(final FieldTable args)
throws AMQException
{
final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
- WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
+ WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
JMSSelectorFilter selector = null;
if(selectorRef == null || (selector = selectorRef.get())==null)
{
- selector = new JMSSelectorFilter<RuntimeException>(selectorString);
- _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
+ selector = new JMSSelectorFilter(selectorString);
+ _selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
}
return selector;
}
@@ -528,10 +528,12 @@ public class TopicExchange extends AbstractExchange
return normalizedString;
}
- public void route(IncomingMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
{
- final AMQShortString routingKey = payload.getRoutingKey();
+ final AMQShortString routingKey = payload.getRoutingKey() == null
+ ? AMQShortString.EMPTY_STRING
+ : new AMQShortString(payload.getRoutingKey());
// The copy here is unfortunate, but not too bad relevant to the amount of
// things created and copied in getMatchedQueues
@@ -543,7 +545,7 @@ public class TopicExchange extends AbstractExchange
_logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
}
- payload.enqueue(queues);
+ return queues;
}
@@ -646,7 +648,7 @@ public class TopicExchange extends AbstractExchange
}
}
- private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
+ private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
index a964bce306..2ead9e57af 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ArithmeticExpression.java
@@ -26,7 +26,7 @@ import org.apache.qpid.server.queue.Filterable;
/**
* An expression which performs an operation on two expression values
*/
-public abstract class ArithmeticExpression<E extends Exception> extends BinaryExpression<E>
+public abstract class ArithmeticExpression extends BinaryExpression
{
protected static final int INTEGER = 1;
@@ -248,7 +248,7 @@ public abstract class ArithmeticExpression<E extends Exception> extends BinaryEx
}
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object lvalue = left.evaluate(message);
if (lvalue == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
index 7308de80d6..024257bea9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BinaryExpression.java
@@ -23,23 +23,23 @@ package org.apache.qpid.server.filter;
/**
* An expression which performs an operation on two expression values.
*/
-public abstract class BinaryExpression<E extends Exception> implements Expression<E>
+public abstract class BinaryExpression implements Expression
{
- protected Expression<E> left;
- protected Expression<E> right;
+ protected Expression left;
+ protected Expression right;
- public BinaryExpression(Expression<E> left, Expression<E> right)
+ public BinaryExpression(Expression left, Expression right)
{
this.left = left;
this.right = right;
}
- public Expression<E> getLeft()
+ public Expression getLeft()
{
return left;
}
- public Expression<E> getRight()
+ public Expression getRight()
{
return right;
}
@@ -90,7 +90,7 @@ public abstract class BinaryExpression<E extends Exception> implements Expressio
/**
* @param expression
*/
- public void setRight(Expression<E> expression)
+ public void setRight(Expression expression)
{
right = expression;
}
@@ -98,7 +98,7 @@ public abstract class BinaryExpression<E extends Exception> implements Expressio
/**
* @param expression
*/
- public void setLeft(Expression<E> expression)
+ public void setLeft(Expression expression)
{
left = expression;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
index 9beb9798d0..da9737e780 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
@@ -28,14 +28,13 @@ import org.apache.qpid.server.queue.Filterable;
* A BooleanExpression is an expression that always
* produces a Boolean result.
*/
-public interface BooleanExpression<E extends Exception> extends Expression<E>
+public interface BooleanExpression extends Expression
{
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
- * @throws E
*/
- public boolean matches(Filterable<E> message) throws E;
+ public boolean matches(Filterable message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
index 921005c462..e1cfda6b21 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
@@ -34,15 +34,15 @@ import org.apache.qpid.server.queue.Filterable;
/**
* A filter performing a comparison of two objects
*/
-public abstract class ComparisonExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression
{
- public static<E extends Exception> BooleanExpression<E> createBetween(Expression<E> value, Expression left, Expression<E> right)
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right)
{
return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
}
- public static<E extends Exception> BooleanExpression<E> createNotBetween(Expression<E> value, Expression<E> left, Expression<E> right)
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right)
{
return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
}
@@ -73,7 +73,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx
REGEXP_CONTROL_CHARS.add(new Character('!'));
}
- static class LikeExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
+ static class LikeExpression extends UnaryExpression implements BooleanExpression
{
Pattern likePattern;
@@ -81,7 +81,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx
/**
* @param right
*/
- public LikeExpression(Expression<E> right, String like, int escape)
+ public LikeExpression(Expression right, String like, int escape)
{
super(right);
@@ -138,7 +138,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx
/**
* org.apache.activemq.filter.Expression#evaluate(MessageEvaluationContext)
*/
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rv = this.getRight().evaluate(message);
@@ -158,7 +158,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx
return likePattern.matcher((String) rv).matches() ? Boolean.TRUE : Boolean.FALSE;
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
@@ -236,7 +236,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx
return doCreateEqual(left, right);
}
- private static<E extends Exception> BooleanExpression<E> doCreateEqual(Expression<E> left, Expression<E> right)
+ private static BooleanExpression doCreateEqual(Expression left, Expression right)
{
return new EqualExpression(left, right);
}
@@ -388,7 +388,7 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx
super(left, right);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Comparable lv = (Comparable) left.evaluate(message);
if (lv == null)
@@ -550,21 +550,21 @@ public abstract class ComparisonExpression<E extends Exception> extends BinaryEx
protected abstract boolean asBoolean(int answer);
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return (object != null) && (object == Boolean.TRUE);
}
- private static class EqualExpression<E extends Exception> extends ComparisonExpression<E>
+ private static class EqualExpression extends ComparisonExpression
{
- public EqualExpression(final Expression<E> left, final Expression<E> right)
+ public EqualExpression(final Expression left, final Expression right)
{
super(left, right);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object lv = left.evaluate(message);
Object rv = right.evaluate(message);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
index 3ed2286f2e..5cb0cde9fb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
@@ -32,17 +32,17 @@ import org.apache.qpid.server.queue.Filterable;
/**
* Represents a constant expression
*/
-public class ConstantExpression<E extends Exception> implements Expression<E>
+public class ConstantExpression implements Expression
{
- static class BooleanConstantExpression<E extends Exception> extends ConstantExpression<E> implements BooleanExpression<E>
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression
{
public BooleanConstantExpression(Object value)
{
super(value);
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
@@ -121,7 +121,7 @@ public class ConstantExpression<E extends Exception> implements Expression<E>
this.value = value;
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
return value;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
index f2ebe41d26..ae9eaaefd3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
@@ -27,12 +27,12 @@ import org.apache.qpid.server.queue.Filterable;
/**
* Represents an expression
*/
-public interface Expression<E extends Exception>
+public interface Expression
{
/**
* @return the value of this expression
*/
- public Object evaluate(Filterable<E> message) throws E;
+ public Object evaluate(Filterable message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
index dd3c126ee5..a7a1eab3c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -27,13 +27,13 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
import org.apache.qpid.AMQException;
-public interface FilterManager<E extends Exception>
+public interface FilterManager
{
- void add(MessageFilter<E> filter);
+ void add(MessageFilter filter);
- void remove(MessageFilter<E> filter);
+ void remove(MessageFilter filter);
- boolean allAllow(Filterable<E> msg);
+ boolean allAllow(Filterable msg);
boolean hasFilters();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 96c9353872..f86f5dab62 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -26,12 +26,12 @@ import org.apache.qpid.server.filter.jms.selector.SelectorParser;
import org.apache.qpid.server.queue.Filterable;
-public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E>
+public class JMSSelectorFilter implements MessageFilter
{
private final static Logger _logger = org.apache.log4j.Logger.getLogger(JMSSelectorFilter.class);
private String _selector;
- private BooleanExpression<E> _matcher;
+ private BooleanExpression _matcher;
public JMSSelectorFilter(String selector) throws AMQException
{
@@ -39,7 +39,7 @@ public class JMSSelectorFilter<E extends Exception> implements MessageFilter<E>
_matcher = new SelectorParser().parse(selector);
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
boolean match = _matcher.matches(message);
if(_logger.isDebugEnabled())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
index 094363ed9a..2d8e01a49a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
@@ -27,15 +27,15 @@ import org.apache.qpid.server.queue.Filterable;
/**
* A filter performing a comparison of two objects
*/
-public abstract class LogicExpression<E extends Exception> extends BinaryExpression<E> implements BooleanExpression<E>
+public abstract class LogicExpression extends BinaryExpression implements BooleanExpression
{
- public static<E extends Exception> BooleanExpression createOR(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
+ public static BooleanExpression createOR(BooleanExpression lvalue, BooleanExpression rvalue)
{
return new OrExpression(lvalue, rvalue);
}
- public static<E extends Exception> BooleanExpression createAND(BooleanExpression<E> lvalue, BooleanExpression<E> rvalue)
+ public static BooleanExpression createAND(BooleanExpression lvalue, BooleanExpression rvalue)
{
return new AndExpression(lvalue, rvalue);
}
@@ -49,23 +49,23 @@ public abstract class LogicExpression<E extends Exception> extends BinaryExpress
super(left, right);
}
- public abstract Object evaluate(Filterable<E> message) throws E;
+ public abstract Object evaluate(Filterable message);
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return (object != null) && (object == Boolean.TRUE);
}
- private static class OrExpression<E extends Exception> extends LogicExpression<E>
+ private static class OrExpression extends LogicExpression
{
- public OrExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+ public OrExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
{
super(lvalue, rvalue);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Boolean lv = (Boolean) left.evaluate(message);
@@ -86,14 +86,14 @@ public abstract class LogicExpression<E extends Exception> extends BinaryExpress
}
}
- private static class AndExpression<E extends Exception> extends LogicExpression<E>
+ private static class AndExpression extends LogicExpression
{
- public AndExpression(final BooleanExpression<E> lvalue, final BooleanExpression<E> rvalue)
+ public AndExpression(final BooleanExpression lvalue, final BooleanExpression rvalue)
{
super(lvalue, rvalue);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Boolean lv = (Boolean) left.evaluate(message);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
index 58fc55f8e6..febe68ece9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -24,7 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
-public interface MessageFilter<E extends Exception>
+public interface MessageFilter
{
- boolean matches(Filterable<E> message) throws E;
+ boolean matches(Filterable message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
index 946274f936..11fdeae2b1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.server.queue.Filterable;
@@ -35,7 +34,7 @@ import org.apache.qpid.server.queue.Filterable;
/**
* Represents a property expression
*/
-public class PropertyExpression<E extends Exception> implements Expression<E>
+public class PropertyExpression implements Expression
{
// Constants - defined the same as JMS
private static final int NON_PERSISTENT = 1;
@@ -44,12 +43,12 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
- private static final HashMap<String, Expression<? extends Exception>> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression<? extends Exception>>();
+ private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
{
- JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression<E>()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new Expression()
{
- public Object evaluate(Filterable<E> message)
+ public Object evaluate(Filterable message)
{
//TODO
return null;
@@ -73,9 +72,9 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression());
- JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression<E>()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression()
{
- public Object evaluate(Filterable message) throws E
+ public Object evaluate(Filterable message)
{
return message.isRedelivered();
}
@@ -83,7 +82,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
}
private final String name;
- private final Expression<E> jmsPropertyExpression;
+ private final Expression jmsPropertyExpression;
public boolean outerTest()
{
@@ -96,10 +95,10 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
- jmsPropertyExpression = (Expression<E>) JMS_PROPERTY_EXPRESSIONS.get(name);
+ jmsPropertyExpression = (Expression) JMS_PROPERTY_EXPRESSIONS.get(name);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
if (jmsPropertyExpression != null)
@@ -108,17 +107,7 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
}
else
{
-
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Looking up property:" + name);
- _logger.debug("Properties are:" + _properties.getHeaders().keySet());
- }
-
- return _properties.getHeaders().getObject(name);
+ return message.getMessageHeader().getHeader(name);
}
}
@@ -158,39 +147,30 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
}
- private static class ReplyToExpression<E extends Exception> implements Expression<E>
+ private static class ReplyToExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
-
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString replyTo = _properties.getReplyTo();
-
- return (replyTo == null) ? null : replyTo.toString();
-
+ String replyTo = message.getMessageHeader().getReplyTo();
+ return replyTo;
}
}
- private static class TypeExpression<E extends Exception> implements Expression<E>
+ private static class TypeExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString type = _properties.getType();
- return (type == null) ? null : type.toString();
+ String type = message.getMessageHeader().getType();
+ return type;
}
}
- private static class DeliveryModeExpression<E extends Exception> implements Expression<E>
+ private static class DeliveryModeExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
if (_logger.isDebugEnabled())
@@ -202,68 +182,53 @@ public class PropertyExpression<E extends Exception> implements Expression<E>
}
}
- private static class PriorityExpression<E extends Exception> implements Expression<E>
+ private static class PriorityExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return (int) _properties.getPriority();
+ byte priority = message.getMessageHeader().getPriority();
+ return (int) priority;
}
}
- private static class MessageIDExpression<E extends Exception> implements Expression<E>
+ private static class MessageIDExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString messageId = _properties.getMessageId();
+ String messageId = message.getMessageHeader().getMessageId();
- return (messageId == null) ? null : messageId;
+ return messageId;
}
}
- private static class TimestampExpression<E extends Exception> implements Expression<E>
+ private static class TimestampExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return _properties.getTimestamp();
+ long timestamp = message.getMessageHeader().getTimestamp();
+ return timestamp;
}
}
- private static class CorrelationIdExpression<E extends Exception> implements Expression<E>
+ private static class CorrelationIdExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
- AMQShortString correlationId = _properties.getCorrelationId();
- return (correlationId == null) ? null : correlationId.toString();
+ String correlationId = message.getMessageHeader().getCorrelationId();
+
+ return correlationId;
}
}
- private static class ExpirationExpression<E extends Exception> implements Expression<E>
+ private static class ExpirationExpression implements Expression
{
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
-
- CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
-
- return _properties.getExpiration();
+ long expiration = message.getMessageHeader().getExpiration();
+ return expiration;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
index cb738e1489..83f537b632 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
@@ -27,43 +27,34 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
-public class SimpleFilterManager implements FilterManager<AMQException>
+public class SimpleFilterManager implements FilterManager
{
private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
- private final ConcurrentLinkedQueue<MessageFilter<AMQException>> _filters;
+ private final ConcurrentLinkedQueue<MessageFilter> _filters;
public SimpleFilterManager()
{
_logger.debug("Creating SimpleFilterManager");
- _filters = new ConcurrentLinkedQueue<MessageFilter<AMQException>>();
+ _filters = new ConcurrentLinkedQueue<MessageFilter>();
}
- public void add(MessageFilter<AMQException> filter)
+ public void add(MessageFilter filter)
{
_filters.add(filter);
}
- public void remove(MessageFilter<AMQException> filter)
+ public void remove(MessageFilter filter)
{
_filters.remove(filter);
}
- public boolean allAllow(Filterable<AMQException> msg)
+ public boolean allAllow(Filterable msg)
{
- for (MessageFilter<AMQException> filter : _filters)
+ for (MessageFilter filter : _filters)
{
- try
+ if (!filter.matches(msg))
{
- if (!filter.matches(msg))
- {
- return false;
- }
- }
- catch (AMQException e)
- {
- //fixme
- e.printStackTrace();
return false;
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
index 799a38af5a..9e03ecd8bd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
@@ -35,18 +35,18 @@ import org.apache.qpid.server.queue.Filterable;
/**
* An expression which performs an operation on two expression values
*/
-public abstract class UnaryExpression<E extends Exception> implements Expression<E>
+public abstract class UnaryExpression implements Expression
{
private static final BigDecimal BD_LONG_MIN_VALUE = BigDecimal.valueOf(Long.MIN_VALUE);
- protected Expression<E> right;
+ protected Expression right;
- public static<E extends Exception> Expression<E> createNegate(Expression<E> left)
+ public static Expression createNegate(Expression left)
{
return new NegativeExpression(left);
}
- public static<E extends Exception> BooleanExpression createInExpression(PropertyExpression<E> right, List elements, final boolean not)
+ public static BooleanExpression createInExpression(PropertyExpression right, List elements, final boolean not)
{
// Use a HashSet if there are many elements.
@@ -69,14 +69,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
return new InExpression(right, inList, not);
}
- abstract static class BooleanUnaryExpression<E extends Exception> extends UnaryExpression<E> implements BooleanExpression<E>
+ abstract static class BooleanUnaryExpression extends UnaryExpression implements BooleanExpression
{
- public BooleanUnaryExpression(Expression<E> left)
+ public BooleanUnaryExpression(Expression left)
{
super(left);
}
- public boolean matches(Filterable<E> message) throws E
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
@@ -85,7 +85,7 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
}
;
- public static<E extends Exception> BooleanExpression<E> createNOT(BooleanExpression<E> left)
+ public static<E extends Exception> BooleanExpression createNOT(BooleanExpression left)
{
return new NotExpression(left);
}
@@ -100,7 +100,7 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
return new XQueryExpression(xpath);
}
- public static<E extends Exception> BooleanExpression createBooleanCast(Expression<E> left)
+ public static<E extends Exception> BooleanExpression createBooleanCast(Expression left)
{
return new BooleanCastExpression(left);
}
@@ -151,7 +151,7 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
this.right = left;
}
- public Expression<E> getRight()
+ public Expression getRight()
{
return right;
}
@@ -204,14 +204,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
*/
public abstract String getExpressionSymbol();
- private static class NegativeExpression<E extends Exception> extends UnaryExpression<E>
+ private static class NegativeExpression extends UnaryExpression
{
- public NegativeExpression(final Expression<E> left)
+ public NegativeExpression(final Expression left)
{
super(left);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rvalue = right.evaluate(message);
if (rvalue == null)
@@ -233,19 +233,19 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
}
}
- private static class InExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ private static class InExpression extends BooleanUnaryExpression
{
private final Collection _inList;
private final boolean _not;
- public InExpression(final PropertyExpression<E> right, final Collection inList, final boolean not)
+ public InExpression(final PropertyExpression right, final Collection inList, final boolean not)
{
super(right);
_inList = inList;
_not = not;
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rvalue = right.evaluate(message);
@@ -309,14 +309,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
}
}
- private static class NotExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ private static class NotExpression extends BooleanUnaryExpression
{
- public NotExpression(final BooleanExpression<E> left)
+ public NotExpression(final BooleanExpression left)
{
super(left);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Boolean lvalue = (Boolean) right.evaluate(message);
if (lvalue == null)
@@ -333,14 +333,14 @@ public abstract class UnaryExpression<E extends Exception> implements Expression
}
}
- private static class BooleanCastExpression<E extends Exception> extends BooleanUnaryExpression<E>
+ private static class BooleanCastExpression extends BooleanUnaryExpression
{
- public BooleanCastExpression(final Expression<E> left)
+ public BooleanCastExpression(final Expression left)
{
super(left);
}
- public Object evaluate(Filterable<E> message) throws E
+ public Object evaluate(Filterable message)
{
Object rvalue = right.evaluate(message);
if (rvalue == null)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
index 1311178fb1..75010a79cf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
@@ -71,7 +71,7 @@ public final class XPathExpression implements BooleanExpression {
private final XPathEvaluator evaluator;
static public interface XPathEvaluator {
- public boolean evaluate(Filterable message) throws AMQException;
+ public boolean evaluate(Filterable message);
}
XPathExpression(String xpath) {
@@ -93,7 +93,7 @@ public final class XPathExpression implements BooleanExpression {
}
}
- public Object evaluate(Filterable message) throws AMQException {
+ public Object evaluate(Filterable message) {
// try {
//FIXME this is flow to disk work
// if( message.isDropped() )
@@ -118,7 +118,7 @@ public final class XPathExpression implements BooleanExpression {
* @return true if the expression evaluates to Boolean.TRUE.
* @throws AMQException
*/
- public boolean matches(Filterable message) throws AMQException
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
index c13f81cd08..55954c7578 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
@@ -36,7 +36,7 @@ public final class XQueryExpression implements BooleanExpression {
this.xpath = xpath;
}
- public Object evaluate(Filterable message) throws AMQException {
+ public Object evaluate(Filterable message) {
return Boolean.FALSE;
}
@@ -49,7 +49,7 @@ public final class XQueryExpression implements BooleanExpression {
* @return true if the expression evaluates to Boolean.TRUE.
* @throws AMQException
*/
- public boolean matches(Filterable message) throws AMQException
+ public boolean matches(Filterable message)
{
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
index cc67776682..a423370b11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
@@ -43,7 +43,7 @@ public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
this.xpath = xpath;
}
- public boolean evaluate(Filterable m) throws AMQException
+ public boolean evaluate(Filterable m)
{
// TODO - we would have to check the content type and then evaluate the content
// here... is this really a feature we wish to implement? - RobG
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
index 96a1071135..bfed4f4c60 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
@@ -1,11 +1,8 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.Set;
-import java.util.HashSet;
/*
*
@@ -52,7 +49,7 @@ public class BytesOnlyCreditManager extends AbstractFlowCreditManager
return _bytesCredit.get() > 0L;
}
- public boolean useCreditForMessage(AMQMessage msg)
+ public boolean useCreditForMessage(ServerMessage msg)
{
final long msgSize = msg.getSize();
if(hasCredit())
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
index a249a6e63a..089a34dde3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
@@ -1,6 +1,7 @@
package org.apache.qpid.server.flow;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -40,5 +41,5 @@ public interface FlowCreditManager
public boolean hasCredit();
- public boolean useCreditForMessage(AMQMessage msg);
+ public boolean useCreditForMessage(ServerMessage msg);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
index d63431c3eb..f3f36700d8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
@@ -1,6 +1,6 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -37,7 +37,7 @@ public class LimitlessCreditManager extends AbstractFlowCreditManager implements
return true;
}
- public boolean useCreditForMessage(AMQMessage msg)
+ public boolean useCreditForMessage(ServerMessage msg)
{
return true;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
index 9c377481de..706bdcae61 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
@@ -1,8 +1,6 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
-
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -54,7 +52,7 @@ public class MessageAndBytesCreditManager extends AbstractFlowCreditManager impl
return (_messageCredit > 0L) && ( _bytesCredit > 0L );
}
- public synchronized boolean useCreditForMessage(AMQMessage msg)
+ public synchronized boolean useCreditForMessage(ServerMessage msg)
{
if(_messageCredit == 0L)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
index c1b3a09006..abbc91a1ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
@@ -1,6 +1,6 @@
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +50,7 @@ public class MessageOnlyCreditManager extends AbstractFlowCreditManager implemen
return _messageCredit.get() > 0L;
}
- public boolean useCreditForMessage(AMQMessage msg)
+ public boolean useCreditForMessage(ServerMessage msg)
{
if(hasCredit())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
index be0300f2c1..17710df3ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.flow;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
{
@@ -123,7 +123,7 @@ public class Pre0_10CreditManager extends AbstractFlowCreditManager implements F
&& (_messageCreditLimit == 0L || _messageCredit > 0);
}
- public synchronized boolean useCreditForMessage(final AMQMessage msg)
+ public synchronized boolean useCreditForMessage(final ServerMessage msg)
{
if(_messageCreditLimit != 0L)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
index 001b7858ec..d0b4ca6855 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -130,8 +131,16 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
throws AMQException
{
singleMessageCredit.useCreditForMessage(entry.getMessage());
- session.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(),
- deliveryTag, queue.getMessageCount());
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ session.getProtocolOutputConverter().writeGetOk((AMQMessage)(entry.getMessage()), channel.getChannelId(),
+ deliveryTag, queue.getMessageCount());
+ }
+ else
+ {
+ //TODO Convert AMQP 0-10 message
+ throw new RuntimeException("Not implemented conversion of 0-10 message");
+ }
}
};
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index f3cab10ed7..2b011ae5b6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -87,7 +87,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
return;
}
- if (!message.getMessage().isReferenced())
+ if (message.getMessage() == null)
{
_logger.warn("Message as already been purged, unable to Reject.");
return;
@@ -96,7 +96,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
index 9b23d88838..da4687ac56 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.TxCommitBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -61,7 +62,7 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
throw body.getChannelNotFoundException(channelId);
}
-
+ StoreContext.setCurrentContext(channel.getStoreContext());
channel.commit();
MethodRegistry methodRegistry = session.getMethodRegistry();
@@ -74,5 +75,9 @@ public class TxCommitHandler implements StateAwareMethodListener<TxCommitBody>
{
throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
+ finally
+ {
+ StoreContext.clearCurrentContext();
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index 2b55d294b5..34eef881a6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -36,8 +36,6 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.AMQException;
-import org.apache.mina.common.ByteBuffer;
-
import java.util.Iterator;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 65184fe744..9f2f182138 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -1,25 +1,25 @@
package org.apache.qpid.server.output.amqp0_9;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.mina.common.ByteBuffer;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index b987dae16d..da682ed11e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.message.*;
import java.util.Iterator;
@@ -41,12 +42,12 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* A deliverable message.
*/
-public class AMQMessage implements Filterable<AMQException>
+public class AMQMessage implements Filterable, ServerMessage
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- private final AtomicInteger _referenceCount = new AtomicInteger(1);
+ private final AtomicInteger _referenceCount = new AtomicInteger(0);
private final AMQMessageHandle _messageHandle;
@@ -72,7 +73,7 @@ public class AMQMessage implements Filterable<AMQException>
private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
+ private final AMQMessageHeader _messageHeader;
/**
@@ -202,6 +203,7 @@ public class AMQMessage implements Filterable<AMQException>
_messageHandle = factory.createMessageHandle(messageId, store, true);
_storeContext = txnConext.getStoreContext();
_size = _messageHandle.getBodySize(txnConext.getStoreContext());
+ _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(txnConext.getStoreContext()));
}
/**
@@ -221,6 +223,7 @@ public class AMQMessage implements Filterable<AMQException>
{
_messageHandle = messageHandle;
_storeContext = storeConext;
+ _messageHeader = new ContentHeaderBodyAdapter(_messageHandle.getContentHeaderBody(storeConext));
if(info.isImmediate())
{
@@ -234,6 +237,7 @@ public class AMQMessage implements Filterable<AMQException>
protected AMQMessage(AMQMessage msg) throws AMQException
{
_messageHandle = msg._messageHandle;
+ _messageHeader = msg._messageHeader;
_storeContext = msg._storeContext;
_flags = msg._flags;
_size = msg._size;
@@ -315,12 +319,11 @@ public class AMQMessage implements Filterable<AMQException>
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*
- * @param storeContext
*
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
*/
- public void decrementReference(StoreContext storeContext) throws MessageCleanupException
+ public void decrementReference() throws MessageCleanupException
{
int count = _referenceCount.decrementAndGet();
@@ -342,13 +345,12 @@ public class AMQMessage implements Filterable<AMQException>
// and the handle has not yet been constructed
if (_messageHandle != null)
{
- _messageHandle.removeMessage(storeContext);
+ _messageHandle.removeMessage(StoreContext.getCurrentContext());
}
}
catch (AMQException e)
{
- // to maintain consistency, we revert the count
- incrementReference();
+
throw new MessageCleanupException(getMessageId(), e);
}
}
@@ -373,7 +375,18 @@ public class AMQMessage implements Filterable<AMQException>
return (_flags & DELIVERED_TO_CONSUMER) != 0;
}
- public boolean isPersistent() throws AMQException
+ public String getRoutingKey()
+ {
+ // TODO
+ return null;
+ }
+
+ public AMQMessageHeader getMessageHeader()
+ {
+ return _messageHeader;
+ }
+
+ public boolean isPersistent()
{
return _messageHandle.isPersistent();
}
@@ -455,6 +468,26 @@ public class AMQMessage implements Filterable<AMQException>
}
+ public boolean isImmediate()
+ {
+ return (_flags & IMMEDIATE) == IMMEDIATE;
+ }
+
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
+ public MessageReference newReference()
+ {
+ return new AMQMessageReference(this);
+ }
+
+ public Long getMessageNumber()
+ {
+ return getMessageId();
+ }
+
public Object getPublisherClientInstance()
{
//todo store sessionIdentifier/client id with message in store
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 014b348822..8b69aac7c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -88,7 +86,7 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>
int delete() throws AMQException;
- QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+ QueueEntry enqueue(ServerMessage message) throws AMQException;
void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 785b668687..8bb958ed3f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -36,6 +36,7 @@ import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -246,7 +247,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
/**
* Checks if there is any notification to be send to the listeners
*/
- public void checkForNotification(AMQMessage msg) throws AMQException
+ public void checkForNotification(ServerMessage msg) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -333,48 +334,60 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
- AMQMessage msg = entry.getMessage();
- // get message content
- Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
- List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
+ ServerMessage serverMsg = entry.getMessage();
+
+ if(serverMsg instanceof AMQMessage)
{
- ContentChunk body = cBodies.next();
- if (body.getSize() != 0)
+ AMQMessage msg = (AMQMessage) serverMsg;
+ // get message content
+ Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
+ List<Byte> msgContent = new ArrayList<Byte>();
+ while (cBodies.hasNext())
{
+ ContentChunk body = cBodies.next();
if (body.getSize() != 0)
{
- ByteBuffer slice = body.getData().slice();
- for (int j = 0; j < slice.limit(); j++)
+ if (body.getSize() != 0)
{
- msgContent.add(slice.get());
+ ByteBuffer slice = body.getData().slice();
+ for (int j = 0; j < slice.limit(); j++)
+ {
+ msgContent.add(slice.get());
+ }
}
}
}
- }
- try
- {
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
+
+ try
{
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
- }
+ // Create header attributes list
+ CommonContentHeaderProperties headerProperties =
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ String mimeType = null, encoding = null;
+ if (headerProperties != null)
+ {
+ AMQShortString mimeTypeShortSting = headerProperties.getContentType();
+ mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
+ encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+ return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+ }
+ catch (AMQException e)
+ {
+ JMException jme = new JMException("Error creating header attributes list: " + e);
+ jme.initCause(e);
+ throw jme;
+ }
- return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
}
- catch (AMQException e)
+ else
{
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
+ // TODO 0-10 Messages for MBean
+ return null;
}
}
@@ -398,13 +411,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
{
long position = i;
- AMQMessage msg = list.get(i - 1).getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
- _messageList.put(messageData);
+ ServerMessage serverMsg = list.get(i - 1).getMessage();
+ if(serverMsg instanceof AMQMessage)
+ {
+ AMQMessage msg = (AMQMessage) serverMsg;
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position };
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+ _messageList.put(messageData);
+ }
+ else
+ {
+ // TODO 0-10 Message
+ }
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index cbe9246f09..0f33f5d8ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -68,4 +68,9 @@ public class DefaultQueueRegistry implements QueueRegistry
{
return _queueMap.values();
}
+
+ public AMQQueue getQueue(String queue)
+ {
+ return getQueue(new AMQShortString(queue));
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
index d38932bb61..eaa3992e98 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
@@ -22,12 +22,13 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessageHeader;
-public interface Filterable<E extends Exception>
+public interface Filterable
{
- ContentHeaderBody getContentHeaderBody() throws E;
+ AMQMessageHeader getMessageHeader();
- boolean isPersistent() throws E;
+ boolean isPersistent();
boolean isRedelivered();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index d5e0b4d187..5babc3e3d4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -28,16 +28,20 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ContentHeaderBodyAdapter;
+import org.apache.qpid.server.message.AMQMessageReference;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements Filterable, InboundMessage
{
/** Used for debugging purposes. */
@@ -73,6 +77,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
private long _expiration;
private Exchange _exchange;
+ private AMQMessageHeader _messageHeader;
public IncomingMessage(final Long messageId,
@@ -90,6 +95,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
+ _messageHeader = new ContentHeaderBodyAdapter(contentHeaderBody);
}
public void setExpiration()
@@ -158,17 +164,19 @@ public class IncomingMessage implements Filterable<RuntimeException>
}
AMQMessage message = null;
+ AMQMessageReference ref = null;
try
{
// first we allow the handle to know that the message has been fully received. This is useful if it is
// maintaining any calculated values based on content chunks
_messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
- _messagePublishInfo, getContentHeaderBody());
+ _messagePublishInfo, getContentHeader());
message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
+ ref = (AMQMessageReference) message.newReference();
message.setExpiration(_expiration);
message.setClientIdentifier(_publisher.getSessionIdentifier());
@@ -177,8 +185,8 @@ public class IncomingMessage implements Filterable<RuntimeException>
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
- AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
+ AMQShortString userID = getContentHeader().properties instanceof BasicContentHeaderProperties ?
+ ((BasicContentHeaderProperties) getContentHeader().properties).getUserId() : null;
if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
{
@@ -202,7 +210,7 @@ public class IncomingMessage implements Filterable<RuntimeException>
{
int offset;
final int queueCount = _destinationQueues.size();
- message.incrementReference(queueCount);
+
if(queueCount == 1)
{
offset = 0;
@@ -233,7 +241,8 @@ public class IncomingMessage implements Filterable<RuntimeException>
finally
{
// Remove refence for routing process . Reference count should now == delivered queue count
- if(message != null) message.decrementReference(_txnContext.getStoreContext());
+
+ if(ref != null) ref.release();
}
}
@@ -250,40 +259,51 @@ public class IncomingMessage implements Filterable<RuntimeException>
public boolean allContentReceived()
{
- return (_bodyLengthReceived == getContentHeaderBody().bodySize);
+ return (_bodyLengthReceived == getContentHeader().bodySize);
}
- public AMQShortString getExchange() throws AMQException
+ public AMQShortString getExchange()
{
return _messagePublishInfo.getExchange();
}
- public AMQShortString getRoutingKey() throws AMQException
+ public String getRoutingKey()
{
- return _messagePublishInfo.getRoutingKey();
+ return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
}
- public boolean isMandatory() throws AMQException
+ public String getBinding()
+ {
+ return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
+ }
+
+
+ public boolean isMandatory()
{
return _messagePublishInfo.isMandatory();
}
- public boolean isImmediate() throws AMQException
+ public boolean isImmediate()
{
return _messagePublishInfo.isImmediate();
}
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeader()
{
return _contentHeaderBody;
}
+ public AMQMessageHeader getMessageHeader()
+ {
+ return _messageHeader;
+ }
+
public boolean isPersistent()
{
- return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() ==
+ return getContentHeader().properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
@@ -292,6 +312,11 @@ public class IncomingMessage implements Filterable<RuntimeException>
return false;
}
+ public long getSize()
+ {
+ return getContentHeader().bodySize;
+ }
+
public void setMessageStore(final MessageStore messageStore)
{
_messageStore = messageStore;
@@ -309,7 +334,8 @@ public class IncomingMessage implements Filterable<RuntimeException>
public void route() throws AMQException
{
- _exchange.route(this);
+ enqueue(_exchange.route(this));
+
}
public void enqueue(final ArrayList<AMQQueue> queues)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
index d6fd1eec89..97601048ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.ServerMessage;
/**
* NoConsumersException is a {@link RequiredDeliveryException} that represents the failure case where an immediate
@@ -35,7 +36,7 @@ import org.apache.qpid.server.RequiredDeliveryException;
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(AMQMessage message)
+ public NoConsumersException(ServerMessage message)
{
super("Immediate delivery is not possible.", message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index 6f9efd3200..d1fb0f3fe6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -21,13 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,26 +42,19 @@ public enum NotificationCheck
},
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
+ messageSize = (msg == null) ? 0 : msg.getSize();
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
return true;
}
}
@@ -70,7 +64,7 @@ public enum NotificationCheck
},
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -91,7 +85,7 @@ public enum NotificationCheck
},
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -133,6 +127,6 @@ public enum NotificationCheck
return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
index fd46a8a5ff..0c6b84d2b6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
public class PriorityQueueList implements QueueEntryList
{
@@ -52,26 +53,18 @@ public class PriorityQueueList implements QueueEntryList
return _queue;
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(ServerMessage message)
{
- try
+ int index = message.getMessageHeader().getPriority() - _priorityOffset;
+ if(index >= _priorities)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
- {
- index = _priorities-1;
- }
- else if(index < 0)
- {
- index = 0;
- }
- return _priorityLists[index].add(message);
+ index = _priorities-1;
}
- catch (AMQException e)
+ else if(index < 0)
{
- // TODO - fix AMQ Exception
- throw new RuntimeException(e);
+ index = 0;
}
+ return _priorityLists[index].add(message);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 2657c459a9..1deb465127 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -3,6 +3,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -133,7 +134,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
AMQQueue getQueue();
- AMQMessage getMessage();
+ ServerMessage getMessage();
long getSize();
@@ -155,8 +156,6 @@ public interface QueueEntry extends Comparable<QueueEntry>
void release();
- String debugIdentity();
-
boolean immediateAndNotDelivered();
void setRedelivered(boolean b);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index dbad5438dc..4cb07c3006 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.log4j.Logger;
import java.util.Set;
@@ -42,7 +44,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
+ private MessageReference _message;
private Set<Subscription> _rejectedBy = null;
@@ -75,6 +77,8 @@ public class QueueEntryImpl implements QueueEntry
private volatile long _entryId;
volatile QueueEntryImpl _next;
+ private boolean _deliveredToConsumer;
+ private boolean _redelivered;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
@@ -84,18 +88,18 @@ public class QueueEntryImpl implements QueueEntry
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _message = message == null ? null : message.newReference();
_entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _message = message == null ? null : message.newReference();
}
protected void setEntryId(long entryId)
@@ -113,9 +117,9 @@ public class QueueEntryImpl implements QueueEntry
return _queueEntryList.getQueue();
}
- public AMQMessage getMessage()
+ public ServerMessage getMessage()
{
- return _message;
+ return _message == null ? null : _message.getMessage();
}
public long getSize()
@@ -125,12 +129,21 @@ public class QueueEntryImpl implements QueueEntry
public boolean getDeliveredToConsumer()
{
- return getMessage().getDeliveredToConsumer();
+ return _deliveredToConsumer;
}
public boolean expired() throws AMQException
{
- return getMessage().expired(getQueue());
+ long expiration = getMessage().getExpiration();
+ if (expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > expiration);
+ }
+
+ return false;
+
}
public boolean isAcquired()
@@ -167,7 +180,7 @@ public class QueueEntryImpl implements QueueEntry
public void setDeliveredToSubscription()
{
- getMessage().setDeliveredToConsumer();
+ _deliveredToConsumer = true;
}
public void release()
@@ -175,20 +188,15 @@ public class QueueEntryImpl implements QueueEntry
_stateUpdater.set(this,AVAILABLE_STATE);
}
- public String debugIdentity()
- {
- return getMessage().debugIdentity();
- }
-
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().isImmediate() && !_deliveredToConsumer;
}
public void setRedelivered(boolean b)
{
- getMessage().setRedelivered(b);
+ _redelivered = b;
}
public Subscription getDeliveredSubscription()
@@ -223,7 +231,7 @@ public class QueueEntryImpl implements QueueEntry
}
else
{
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ _log.warn("Requesting rejection by null subscriber:" + this);
}
}
@@ -284,7 +292,9 @@ public class QueueEntryImpl implements QueueEntry
{
if(delete())
{
- getMessage().decrementReference(storeContext);
+ StoreContext sc = StoreContext.setCurrentContext(storeContext);
+ _message.release();
+ StoreContext.setCurrentContext(sc);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 313e076f61..b4042ce02c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.ServerMessage;
+
public interface QueueEntryList
{
AMQQueue getQueue();
- QueueEntry add(AMQMessage message);
+ QueueEntry add(ServerMessage message);
QueueEntry next(QueueEntry node);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index 1210f0e97c..b47182e750 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -40,4 +40,5 @@ public interface QueueRegistry
Collection<AMQQueue> getQueues();
+ AMQQueue getQueue(String queue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 08daf715ef..a96a6d624a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -29,6 +29,8 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -319,7 +321,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// ------ Enqueue / Dequeue
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+ public QueueEntry enqueue(ServerMessage message) throws AMQException
{
incrementQueueCount();
@@ -406,8 +408,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+
if (entry.immediateAndNotDelivered())
{
+ StoreContext storeContext = StoreContext.getCurrentContext();
dequeue(storeContext, entry);
entry.dispose(storeContext);
}
@@ -462,7 +466,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Simple Queues don't :-)
}
- private void incrementQueueSize(final AMQMessage message)
+ private void incrementQueueSize(final ServerMessage message)
{
getAtomicQueueSize().addAndGet(message.getSize());
}
@@ -573,10 +577,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- AMQMessage msg = entry.getMessage();
+ ServerMessage msg = entry.getMessage();
if (msg.isPersistent())
{
- _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
+ _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageNumber());
}
//entry.dispose(storeContext);
@@ -767,7 +771,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessage().getMessageNumber();
return messageId >= fromMessageId && messageId <= toMessageId;
}
@@ -786,7 +790,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- _complete = entry.getMessage().getMessageId() == messageId;
+ _complete = entry.getMessage().getMessageNumber() == messageId;
return _complete;
}
@@ -828,7 +832,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessage().getMessageNumber();
return (messageId >= fromMessageId)
&& (messageId <= toMessageId)
&& entry.acquire();
@@ -847,11 +851,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in on the message store.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
+ ServerMessage message = entry.getMessage();
if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
}
// dequeue does not decrement the refence count
entry.dequeue(storeContext);
@@ -882,9 +886,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
+ StoreContext.setCurrentContext(storeContext);
+
for (QueueEntry entry : entries)
{
- toQueue.enqueue(storeContext, entry.getMessage());
+ toQueue.enqueue(entry.getMessage());
entry.delete();
}
}
@@ -896,6 +902,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
throw new RuntimeException(e);
}
+ finally
+ {
+ StoreContext.clearCurrentContext();
+
+ }
}
@@ -912,17 +923,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId))
- {
- if (!entry.isDeleted())
- {
- return entry.getMessage().incrementReference();
- }
- }
-
- return false;
+ final long messageId = entry.getMessage().getMessageNumber();
+ return ((messageId >= fromMessageId)
+ && (messageId <= toMessageId));
}
public boolean filterComplete()
@@ -938,11 +941,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in on the message store.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
+ ServerMessage message = entry.getMessage();
- if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ if (message.isPersistent() && toQueue.isDurable())
{
- store.enqueueMessage(storeContext, toQueue, message.getMessageId());
+
+ StoreContext sc = StoreContext.setCurrentContext(storeContext);
+ store.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
+ StoreContext.setCurrentContext(sc);
+
}
}
@@ -973,9 +980,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
for (QueueEntry entry : entries)
{
- if (entry.getMessage().isReferenced())
+
+ ServerMessage message = entry.getMessage();
+ if (message != null)
{
- toQueue.enqueue(storeContext, entry.getMessage());
+ toQueue.enqueue(entry.getMessage());
}
}
}
@@ -1001,7 +1010,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
QueueEntry node = queueListIterator.getNode();
- final long messageId = node.getMessage().getMessageId();
+ final long messageId = node.getMessage().getMessageNumber();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
@@ -1418,7 +1427,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- @Override
+
public void checkMessageStatus() throws AMQException
{
@@ -1581,7 +1590,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
for (int i = 0; i < num && !it.atTail(); i++)
{
it.advance();
- ids.add(it.getNode().getMessage().getMessageId());
+ ids.add(it.getNode().getMessage().getMessageNumber());
}
return ids;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index a46c5ae2e8..c5a2972720 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,5 +1,8 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/*
@@ -74,7 +77,7 @@ public class SimpleQueueEntryList implements QueueEntryList
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(ServerMessage message)
{
QueueEntryImpl node = new QueueEntryImpl(this, message);
for (;;)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
index 18269fa5e8..34e35171e5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
@@ -1352,7 +1352,10 @@ public class DerbyMessageStore implements MessageStore
public void process() throws AMQException
{
- _queue.enqueue(_context, _message);
+ StoreContext.setCurrentContext(_context);
+ _queue.enqueue(_message);
+ StoreContext.clearCurrentContext();
+
}
@@ -1414,7 +1417,7 @@ public class DerbyMessageStore implements MessageStore
if(message != null)
{
- message.incrementReference();
+// message.incrementReference();
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index fdb56a1a55..1e75ca04f9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -32,9 +32,12 @@ public class StoreContext
{
private static final Logger _logger = Logger.getLogger(StoreContext.class);
+ private static final ThreadLocal<StoreContext> _threadLocalContext = new ThreadLocal<StoreContext>();
+
private String _name;
private Object _payload;
+
public StoreContext()
{
_name = "StoreContext";
@@ -68,4 +71,24 @@ public class StoreContext
{
return "<_name = " + _name + ", _payload = " + _payload + ">";
}
+
+
+ public static StoreContext setCurrentContext(StoreContext context)
+ {
+ StoreContext sc = getCurrentContext();
+ _threadLocalContext.set(context);
+ return sc;
+ }
+
+ public static StoreContext getCurrentContext()
+ {
+ return _threadLocalContext.get();
+ }
+
+ public static void clearCurrentContext()
+ {
+ _threadLocalContext.set(null);
+ }
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 7aa9d1e3af..dc69e21731 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -32,8 +32,10 @@ import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.filter.FilterManager;
@@ -377,16 +379,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
+ _logger.debug("Subscription:" + this + " rejected message:" + entry);
}
// return false;
}
if (_noLocal)
{
- //todo - client id should be recoreded so we don't have to handle
+
+ AMQMessage message = (AMQMessage) entry.getMessage();
+
+ //todo - client id should be recorded so we don't have to handle
// the case where this is null.
- final Object publisherId = entry.getMessage().getPublisherClientInstance();
+ final Object publisherId = message.getPublisherClientInstance();
// We don't want local messages so check to see if message is one we sent
Object localInstance;
@@ -404,8 +409,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
localInstance = getProtocolSession().getClientIdentifier();
- //todo - client id should be recoreded so we don't have to do the null check
- if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
+ //todo - client id should be recorded so we don't have to do the null check
+ if (localInstance != null && localInstance.equals(message.getPublisherIdentifier()))
{
return false;
}
@@ -417,7 +422,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
if (_logger.isDebugEnabled())
{
- _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
+ _logger.debug("(" + this + ") checking filters for message (" + entry);
}
return checkFilters(entry);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
new file mode 100644
index 0000000000..3a61099386
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public class ServerConnection extends Connection
+{
+ @Override
+ protected void invoke(Method method)
+ {
+ super.invoke(method);
+ }
+
+ @Override
+ protected void setState(State state)
+ {
+ super.setState(state);
+ }
+
+ @Override
+ public ServerConnectionDelegate getConnectionDelegate()
+ {
+ return (ServerConnectionDelegate) super.getConnectionDelegate();
+ }
+
+ public void setConnectionDelegate(ServerConnectionDelegate delegate)
+ {
+ super.setConnectionDelegate(delegate);
+ }
+
+ private VirtualHost _virtualHost;
+
+
+ public VirtualHost getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public void setVirtualHost(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
new file mode 100644
index 0000000000..0f65fc10ce
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslException;
+import java.util.*;
+
+
+public class ServerConnectionDelegate extends ServerDelegate
+{
+
+ private String _localFQDN;
+ private final IApplicationRegistry _appRegistry;
+
+
+ public ServerConnectionDelegate(IApplicationRegistry appRegistry,
+ String localFQDN)
+ {
+ this(Collections.EMPTY_MAP, Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ }
+
+
+ public ServerConnectionDelegate(Map<String, Object> properties,
+ List<Object> locales,
+ IApplicationRegistry appRegistry,
+ String localFQDN)
+ {
+ super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
+ _appRegistry = appRegistry;
+ _localFQDN = localFQDN;
+ }
+
+ private static List<Object> parseToList(String mechanisms)
+ {
+ List<Object> list = new ArrayList<Object>();
+ StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
+ while(tokenizer.hasMoreTokens())
+ {
+ list.add(tokenizer.nextToken());
+ }
+ return list;
+ }
+
+ @Override public ServerSession getSession(Connection conn, SessionAttach atc)
+ {
+
+ SessionDelegate serverSessionDelegate = new ServerSessionDelegate(_appRegistry);
+
+ ServerSession ssn = new ServerSession(conn, serverSessionDelegate, new Binary(atc.getName()), 0);
+ //ssn.setSessionListener(new Echo());
+ return ssn;
+ }
+
+
+
+
+ @Override
+ protected SaslServer createSaslServer(String mechanism) throws SaslException
+ {
+ return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
+
+ }
+
+
+ @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ {
+ ServerConnection sconn = (ServerConnection) conn;
+
+ VirtualHost vhost;
+ String vhostName;
+ if(open.hasVirtualHost())
+ {
+ vhostName = open.getVirtualHost();
+ }
+ else
+ {
+ vhostName = "";
+ }
+ vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
+
+ if(vhost != null)
+ {
+ sconn.setVirtualHost(vhost);
+
+ sconn.invoke(new ConnectionOpenOk(Collections.EMPTY_LIST));
+
+ sconn.setState(Connection.State.OPEN);
+ }
+ else
+ {
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown vistrulhost '"+vhostName+"'"));
+ sconn.setState(Connection.State.CLOSING);
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
new file mode 100644
index 0000000000..0271949101
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+
+public class ServerSession extends Session
+{
+ ServerSession(Connection connection, Binary name, long expiry)
+ {
+ super(connection, name, expiry);
+ }
+
+ ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
+ {
+ super(connection, delegate, name, expiry);
+ }
+
+ public void enqueue(ServerMessage message, ArrayList<AMQQueue> queues)
+ {
+ // TODO Txn
+
+ try
+ {
+ for(AMQQueue q : queues)
+ {
+ q.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
new file mode 100644
index 0000000000..e9cf1b6474
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -0,0 +1,402 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.*;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.AMQException;
+
+import java.util.ArrayList;
+
+public class ServerSessionDelegate extends SessionDelegate
+{
+ private final IApplicationRegistry _appRegistry;
+
+ public ServerSessionDelegate(IApplicationRegistry appRegistry)
+ {
+ _appRegistry = appRegistry;
+ }
+
+ @Override
+ public void messageAccept(Session session, MessageAccept method)
+ {
+ super.messageAccept(session, method);
+ }
+
+ @Override
+ public void messageReject(Session session, MessageReject method)
+ {
+ super.messageReject(session, method);
+ }
+
+ @Override
+ public void messageRelease(Session session, MessageRelease method)
+ {
+ super.messageRelease(session, method);
+ }
+
+ @Override
+ public void messageAcquire(Session session, MessageAcquire method)
+ {
+ super.messageAcquire(session, method);
+ }
+
+ @Override
+ public void messageResume(Session session, MessageResume method)
+ {
+ super.messageResume(session, method);
+ }
+
+ @Override
+ public void messageSubscribe(Session session, MessageSubscribe method)
+ {
+ super.messageSubscribe(session, method);
+ }
+
+
+ @Override
+ public void messageTransfer(Session ssn, MessageTransfer xfr)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(ssn);
+ Exchange exchange;
+ if(xfr.hasDestination())
+ {
+ exchange = exchangeRegistry.getExchange(xfr.getDestination());
+ }
+ else
+ {
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
+
+ MessageTransferMessage message = new MessageTransferMessage(xfr);
+ try
+ {
+ ArrayList<AMQQueue> queues = exchange.route(message);
+
+ ((ServerSession) ssn).enqueue(message, queues);
+
+
+ System.out.println(queues);
+
+ ssn.processed(xfr);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+
+ super.messageTransfer(ssn, xfr); //To change body of overridden methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void messageCancel(Session session, MessageCancel method)
+ {
+ super.messageCancel(session, method);
+ }
+
+ @Override
+ public void messageFlush(Session session, MessageFlush method)
+ {
+ super.messageFlush(session, method);
+ }
+
+ @Override
+ public void txSelect(Session session, TxSelect method)
+ {
+ super.txSelect(session, method);
+ }
+
+ @Override
+ public void txCommit(Session session, TxCommit method)
+ {
+ super.txCommit(session, method);
+ }
+
+ @Override
+ public void txRollback(Session session, TxRollback method)
+ {
+ super.txRollback(session, method);
+ }
+
+ @Override
+ public void dtxSelect(Session session, DtxSelect method)
+ {
+ super.dtxSelect(session, method);
+ }
+
+ @Override
+ public void dtxStart(Session session, DtxStart method)
+ {
+ super.dtxStart(session, method);
+ }
+
+ @Override
+ public void dtxEnd(Session session, DtxEnd method)
+ {
+ super.dtxEnd(session, method);
+ }
+
+ @Override
+ public void dtxCommit(Session session, DtxCommit method)
+ {
+ super.dtxCommit(session, method);
+ }
+
+ @Override
+ public void dtxForget(Session session, DtxForget method)
+ {
+ super.dtxForget(session, method);
+ }
+
+ @Override
+ public void dtxGetTimeout(Session session, DtxGetTimeout method)
+ {
+ super.dtxGetTimeout(session, method);
+ }
+
+ @Override
+ public void dtxPrepare(Session session, DtxPrepare method)
+ {
+ super.dtxPrepare(session, method);
+ }
+
+ @Override
+ public void dtxRecover(Session session, DtxRecover method)
+ {
+ super.dtxRecover(session, method);
+ }
+
+ @Override
+ public void dtxRollback(Session session, DtxRollback method)
+ {
+ super.dtxRollback(session, method);
+ }
+
+ @Override
+ public void dtxSetTimeout(Session session, DtxSetTimeout method)
+ {
+ super.dtxSetTimeout(session, method);
+ }
+
+ @Override
+ public void exchangeDeclare(Session session, ExchangeDeclare method)
+ {
+ String exchangeName = method.getExchange();
+
+ Exchange exchange = getExchange(session, exchangeName);
+
+ if(method.getPassive())
+ {
+ if(exchange == null)
+ {
+ ExecutionException ex = new ExecutionException();
+ ex.setErrorCode(ExecutionErrorCode.NOT_FOUND);
+ ex.setCommandId(method.getId());
+
+ ex.setDescription("not-found: exchange-name '"+exchangeName+"'");
+
+ session.invoke(ex);
+ session.close();
+ }
+
+ }
+ else
+ {
+ // TODO
+ }
+ super.exchangeDeclare(session, method);
+ }
+
+ private Exchange getExchange(Session session, String exchangeName)
+ {
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+ return exchangeRegistry.getExchange(exchangeName);
+ }
+
+ private ExchangeRegistry getExchangeRegistry(Session session)
+ {
+ VirtualHost virtualHost = getVirtualHost(session);
+ return virtualHost.getExchangeRegistry();
+
+ }
+
+ private VirtualHost getVirtualHost(Session session)
+ {
+ ServerConnection conn = getServerConnection(session);
+ VirtualHost vhost = conn.getVirtualHost();
+ return vhost;
+ }
+
+ private ServerConnection getServerConnection(Session session)
+ {
+ ServerConnection conn = (ServerConnection) session.getConnection();
+ return conn;
+ }
+
+ @Override
+ public void exchangeDelete(Session session, ExchangeDelete method)
+ {
+ super.exchangeDelete(session, method);
+ }
+
+ @Override
+ public void exchangeQuery(Session session, ExchangeQuery method)
+ {
+ super.exchangeQuery(session, method);
+
+ }
+
+ @Override
+ public void exchangeBind(Session session, ExchangeBind method)
+ {
+ super.exchangeBind(session, method);
+ }
+
+ @Override
+ public void exchangeUnbind(Session session, ExchangeUnbind method)
+ {
+ super.exchangeUnbind(session, method);
+ }
+
+ @Override
+ public void exchangeBound(Session session, ExchangeBound method)
+ {
+
+
+ ExchangeBoundResult result = new ExchangeBoundResult();
+ if(method.hasExchange())
+ {
+ Exchange exchange = getExchange(session, method.getExchange());
+
+ if(exchange == null)
+ {
+ result.setExchangeNotFound(true);
+ }
+
+ if(method.hasQueue())
+ {
+
+ AMQQueue queue = getQueue(session, method.getQueue());
+ if(queue == null)
+ {
+ result.setQueueNotFound(true);
+ }
+
+ if(exchange != null && queue != null)
+ {
+
+ if(method.hasBindingKey())
+ {
+
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
+
+ }
+
+ result.setQueueNotMatched(!exchange.isBound(queue));
+
+ }
+ }
+ else if(exchange != null && method.hasBindingKey())
+ {
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+ result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
+ }
+
+ }
+ else if(method.hasQueue())
+ {
+ AMQQueue queue = getQueue(session, method.getQueue());
+ if(queue == null)
+ {
+ result.setQueueNotFound(true);
+ }
+ else
+ {
+ if(method.hasBindingKey())
+ {
+ if(method.hasArguments())
+ {
+ // TODO
+ }
+
+ // TODO
+ }
+ }
+
+ }
+
+
+ session.executionResult((int) method.getId(), result);
+ super.exchangeBound(session, method);
+ }
+
+ private AMQQueue getQueue(Session session, String queue)
+ {
+ QueueRegistry queueRegistry = getQueueRegistry(session);
+ return queueRegistry.getQueue(queue);
+ }
+
+ private QueueRegistry getQueueRegistry(Session session)
+ {
+ return getVirtualHost(session).getQueueRegistry();
+ }
+
+ @Override
+ public void queueDeclare(Session session, QueueDeclare method)
+ {
+ super.queueDeclare(session, method);
+ }
+
+ @Override
+ public void queueDelete(Session session, QueueDelete method)
+ {
+ super.queueDelete(session, method);
+ }
+
+ @Override
+ public void queuePurge(Session session, QueuePurge method)
+ {
+ super.queuePurge(session, method);
+ }
+
+ @Override
+ public void queueQuery(Session session, QueueQuery method)
+ {
+ super.queueQuery(session, method);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 3c71282c57..ebec6af9f0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -25,6 +25,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -93,10 +94,11 @@ public class LocalTransactionalContext implements TransactionalContext
public void process() throws AMQException
{
- _message.incrementReference();
+ MessageReference ref = _message.newReference();
try
{
- QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+ StoreContext.setCurrentContext(getStoreContext());
+ QueueEntry entry = _queue.enqueue(_message);
if(entry.immediateAndNotDelivered())
{
@@ -105,7 +107,8 @@ public class LocalTransactionalContext implements TransactionalContext
}
finally
{
- _message.decrementReference(getStoreContext());
+ ref.release();
+ StoreContext.clearCurrentContext();
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 28af36e3db..76cb7a397d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -90,8 +90,13 @@ public class NonTransactionalContext implements TransactionalContext
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
- QueueEntry entry = queue.enqueue(_storeContext, message);
-
+ StoreContext.setCurrentContext(getStoreContext());
+
+ QueueEntry entry = queue.enqueue(message);
+
+ StoreContext.clearCurrentContext();
+
+
//following check implements the functionality
//required by the 'immediate' flag:
if(entry.immediateAndNotDelivered())
@@ -128,7 +133,7 @@ public class NonTransactionalContext implements TransactionalContext
{
if (debug)
{
- _log.debug("Discarding message: " + message.getMessage().getMessageId());
+ _log.debug("Discarding message: " + message.getMessage().getMessageNumber());
}
if(message.getMessage().isPersistent())
{
@@ -171,7 +176,7 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
+ _log.debug("Discarding message: " + msg.getMessage().getMessageNumber());
}
if(msg.getMessage().isPersistent())
{
@@ -187,7 +192,7 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
- msg.getMessage().getMessageId());
+ msg.getMessage().getMessageNumber());
}
}
if(_inTran)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
index 731f6140f9..9e83b93129 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -100,7 +101,7 @@ public class Dump extends Show
for (QueueEntry entry : messages)
{
- AMQMessage msg = entry.getMessage();
+ ServerMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
@@ -112,7 +113,7 @@ public class Dump extends Show
// Show general message information
hex.add(Show.Columns.ID.name());
- ascii.add(msg.getMessageId().toString());
+ ascii.add(msg.getMessageNumber().toString());
hex.add(Console.ROW_DIVIDER);
ascii.add(Console.ROW_DIVIDER);
@@ -136,110 +137,114 @@ public class Dump extends Show
hex.add(Console.ROW_DIVIDER);
ascii.add(Console.ROW_DIVIDER);
- Iterator bodies = msg.getContentBodyIterator();
- if (bodies.hasNext())
+ if(msg instanceof AMQMessage)
{
- hex.add("Hex");
- hex.add(Console.ROW_DIVIDER);
+ Iterator bodies = ((AMQMessage)msg).getContentBodyIterator();
+ if (bodies.hasNext())
+ {
+ hex.add("Hex");
+ hex.add(Console.ROW_DIVIDER);
- ascii.add("ASCII");
- ascii.add(Console.ROW_DIVIDER);
- while (bodies.hasNext())
- {
- ContentChunk chunk = (ContentChunk) bodies.next();
+ ascii.add("ASCII");
+ ascii.add(Console.ROW_DIVIDER);
- //Duplicate so we don't destroy original data :)
- ByteBuffer hexBuffer = chunk.getData().duplicate();
+ while (bodies.hasNext())
+ {
+ ContentChunk chunk = (ContentChunk) bodies.next();
- ByteBuffer charBuffer = hexBuffer.duplicate();
+ //Duplicate so we don't destroy original data :)
+ ByteBuffer hexBuffer = chunk.getData().duplicate();
- Hex hexencoder = new Hex();
+ ByteBuffer charBuffer = hexBuffer.duplicate();
- while (hexBuffer.hasRemaining())
- {
- byte[] line = new byte[LINE_SIZE];
+ Hex hexencoder = new Hex();
- int bufsize = hexBuffer.remaining();
- if (bufsize < LINE_SIZE)
- {
- hexBuffer.get(line, 0, bufsize);
- }
- else
+ while (hexBuffer.hasRemaining())
{
- bufsize = line.length;
- hexBuffer.get(line);
- }
+ byte[] line = new byte[LINE_SIZE];
- byte[] encoded = hexencoder.encode(line);
+ int bufsize = hexBuffer.remaining();
+ if (bufsize < LINE_SIZE)
+ {
+ hexBuffer.get(line, 0, bufsize);
+ }
+ else
+ {
+ bufsize = line.length;
+ hexBuffer.get(line);
+ }
- try
- {
- String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
- String hexLine = "";
+ byte[] encoded = hexencoder.encode(line);
- int strKength = encStr.length();
- for (int c = 0; c < strKength; c++)
+ try
{
- hexLine += encStr.charAt(c);
+ String encStr = new String(encoded, 0, bufsize * 2, DEFAULT_ENCODING);
+ String hexLine = "";
- if (c % 2 == 1 && SPACE_BYTES)
+ int strKength = encStr.length();
+ for (int c = 0; c < strKength; c++)
{
- hexLine += BYTE_SPACER;
+ hexLine += encStr.charAt(c);
+
+ if (c % 2 == 1 && SPACE_BYTES)
+ {
+ hexLine += BYTE_SPACER;
+ }
}
- }
- hex.add(hexLine);
- }
- catch (UnsupportedEncodingException e)
- {
- _console.println(e.getMessage());
- return null;
+ hex.add(hexLine);
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ _console.println(e.getMessage());
+ return null;
+ }
}
- }
- while (charBuffer.hasRemaining())
- {
- String asciiLine = "";
-
- for (int pos = 0; pos < LINE_SIZE; pos++)
+ while (charBuffer.hasRemaining())
{
- if (charBuffer.hasRemaining())
- {
- byte ch = charBuffer.get();
+ String asciiLine = "";
- if (isPrintable(ch))
+ for (int pos = 0; pos < LINE_SIZE; pos++)
+ {
+ if (charBuffer.hasRemaining())
{
- asciiLine += (char) ch;
+ byte ch = charBuffer.get();
+
+ if (isPrintable(ch))
+ {
+ asciiLine += (char) ch;
+ }
+ else
+ {
+ asciiLine += NON_PRINTING_ASCII_CHAR;
+ }
+
+ if (SPACE_BYTES)
+ {
+ asciiLine += BYTE_SPACER;
+ }
}
else
{
- asciiLine += NON_PRINTING_ASCII_CHAR;
- }
-
- if (SPACE_BYTES)
- {
- asciiLine += BYTE_SPACER;
+ break;
}
}
- else
- {
- break;
- }
- }
- ascii.add(asciiLine);
+ ascii.add(asciiLine);
+ }
}
}
- }
- else
- {
- List<String> result = new LinkedList<String>();
+ else
+ {
+ List<String> result = new LinkedList<String>();
- display.add(result);
- result.add("No ContentBodies");
+ display.add(result);
+ result.add("No ContentBodies");
+ }
}
}
@@ -252,7 +257,7 @@ public class Dump extends Show
return display;
}
- private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
+ private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg,
String title, boolean routing, boolean headers, boolean messageHeaders)
{
List<QueueEntry> single = new LinkedList<QueueEntry>();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
index a8dd58ca83..b3ff1d43e1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
@@ -172,7 +172,7 @@ public class Move extends AbstractCommand
{
for (QueueEntry msg : messages)
{
- ids.add(msg.getMessage().getMessageId());
+ ids.add(msg.getMessage().getMessageNumber());
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index 2fa017fc64..d846d9976b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -26,9 +26,9 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.tools.messagestore.MessageStoreTool;
import org.apache.qpid.tools.utils.Console;
@@ -337,30 +337,24 @@ public class Show extends AbstractCommand
//Add create the table of data
for (QueueEntry entry : messages)
{
- AMQMessage msg = entry.getMessage();
+ ServerMessage msg = entry.getMessage();
if (!includeMsg(msg, msgids))
{
continue;
}
- id.add(msg.getMessageId().toString());
+ id.add(msg.getMessageNumber().toString());
size.add("" + msg.getSize());
arrival.add("" + msg.getArrivalTime());
- try
- {
- ispersitent.add(msg.isPersistent() ? "true" : "false");
- }
- catch (AMQException e)
- {
- ispersitent.add("n/a");
- }
+ ispersitent.add(msg.isPersistent() ? "true" : "false");
+
isredelivered.add(msg.isRedelivered() ? "true" : "false");
- isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
+ isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
// msg.getMessageHandle();
@@ -368,7 +362,10 @@ public class Show extends AbstractCommand
try
{
- headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+ if(msg instanceof AMQMessage)
+ {
+ headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
+ }
}
catch (AMQException e)
{
@@ -417,7 +414,11 @@ public class Show extends AbstractCommand
MessagePublishInfo info = null;
try
{
- info = msg.getMessagePublishInfo();
+ if(msg instanceof AMQMessage)
+ {
+ info = ((AMQMessage)msg).getMessagePublishInfo();
+ }
+
}
catch (AMQException e)
{
@@ -457,14 +458,14 @@ public class Show extends AbstractCommand
return data;
}
- protected boolean includeMsg(AMQMessage msg, List<Long> msgids)
+ protected boolean includeMsg(ServerMessage msg, List<Long> msgids)
{
if (msgids == null)
{
return true;
}
- Long msgid = msg.getMessageId();
+ Long msgid = msg.getMessageNumber();
boolean found = false;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
index 5fbf9484f7..ad39094d2c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
@@ -89,7 +89,7 @@ public class ExtractResendAndRequeueTest extends TestCase
while(queueEntries.advance())
{
QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry);
+ _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
// Store the entry for future inspection
_referenceList.add(entry);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index dfbbd56d6f..0b48feddcd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -163,7 +162,9 @@ public class TxAckTest extends TestCase
};
TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
- _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
+ StoreContext sc = StoreContext.setCurrentContext(new StoreContext());
+ _map.add(deliveryTag, _queue.enqueue(message));
+ StoreContext.setCurrentContext(sc);
}
_acked = acked;
_unacked = unacked;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 6dcb187a37..6634eb3e60 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -33,6 +33,8 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.log4j.Logger;
@@ -255,9 +257,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
* @throws AMQException
*/
@Override
- public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
+ public QueueEntry enqueue(ServerMessage msg) throws AMQException
{
- messages.add( new HeadersExchangeTest.Message(msg));
+ messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
return new QueueEntry()
{
@@ -326,11 +328,6 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public String debugIdentity()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public boolean immediateAndNotDelivered()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -438,7 +435,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
}
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeader()
{
try
{
@@ -522,7 +519,7 @@ public class AbstractHeadersExchangeTestBase extends TestCase
void route(Exchange exchange) throws AMQException
{
- exchange.route(_incoming);
+ _incoming.enqueue(exchange.route(_incoming));
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index aa25e207a9..a26f7f16d7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -76,7 +76,7 @@ public class DestWildExchangeTest extends TestCase
IncomingMessage message = new IncomingMessage(0L, info, null, _protocolSession);
- _exchange.route(message);
+ message.enqueue(_exchange.route(message));
Assert.assertEquals(0, queue.getMessageCount());
}
@@ -100,7 +100,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -140,7 +140,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -159,7 +159,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -198,7 +198,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -217,7 +217,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -236,7 +236,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -254,7 +254,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -294,7 +294,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -312,7 +312,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -352,7 +352,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -384,7 +384,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -425,7 +425,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -464,7 +464,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -495,7 +495,7 @@ public class DestWildExchangeTest extends TestCase
private void routeMessage(final IncomingMessage message)
throws AMQException
{
- _exchange.route(message);
+ message.enqueue(_exchange.route(message));
message.routingComplete(_store, new MessageHandleFactory());
message.deliverToQueues();
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 86ba96bf5d..7c027880b4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -22,16 +22,85 @@ package org.apache.qpid.server.exchange;
import java.util.Map;
import java.util.HashMap;
+import java.util.Set;
import junit.framework.TestCase;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.message.AMQMessageHeader;
/**
*/
public class HeadersBindingTest extends TestCase
{
+
+ private class MockHeader implements AMQMessageHeader
+ {
+
+ private final Map<String, Object> _headers = new HashMap<String, Object>();
+
+ public String getCorrelationId()
+ {
+ return null;
+ }
+
+ public long getExpiration()
+ {
+ return 0;
+ }
+
+ public String getMessageId()
+ {
+ return null;
+ }
+
+ public byte getPriority()
+ {
+ return 0;
+ }
+
+ public long getTimestamp()
+ {
+ return 0;
+ }
+
+ public String getType()
+ {
+ return null;
+ }
+
+ public String getReplyTo()
+ {
+ return null;
+ }
+
+ public Object getHeader(String name)
+ {
+ return _headers.get(name);
+ }
+
+ public boolean containsHeaders(Set<String> names)
+ {
+ return _headers.keySet().containsAll(names);
+ }
+
+ public boolean containsHeader(String name)
+ {
+ return _headers.containsKey(name);
+ }
+
+ public void setString(String key, String value)
+ {
+ setObject(key,value);
+ }
+
+ public void setObject(String key, Object value)
+ {
+ _headers.put(key,value);
+ }
+ }
+
private FieldTable bindHeaders = new FieldTable();
- private FieldTable matchHeaders = new FieldTable();
+ private MockHeader matchHeaders = new MockHeader();
public void testDefault_1()
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index aff7af6952..768024498a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -42,19 +42,19 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
// Enqueue messages in order
- _queue.enqueue(null, createMessage(1L, (byte) 10));
- _queue.enqueue(null, createMessage(2L, (byte) 4));
- _queue.enqueue(null, createMessage(3L, (byte) 0));
+ _queue.enqueue(createMessage(1L, (byte) 10));
+ _queue.enqueue(createMessage(2L, (byte) 4));
+ _queue.enqueue(createMessage(3L, (byte) 0));
// Enqueue messages in reverse order
- _queue.enqueue(null, createMessage(4L, (byte) 0));
- _queue.enqueue(null, createMessage(5L, (byte) 4));
- _queue.enqueue(null, createMessage(6L, (byte) 10));
+ _queue.enqueue(createMessage(4L, (byte) 0));
+ _queue.enqueue(createMessage(5L, (byte) 4));
+ _queue.enqueue(createMessage(6L, (byte) 10));
// Enqueue messages out of order
- _queue.enqueue(null, createMessage(7L, (byte) 4));
- _queue.enqueue(null, createMessage(8L, (byte) 10));
- _queue.enqueue(null, createMessage(9L, (byte) 0));
+ _queue.enqueue(createMessage(7L, (byte) 4));
+ _queue.enqueue(createMessage(8L, (byte) 10));
+ _queue.enqueue(createMessage(9L, (byte) 0));
// Register subscriber
_queue.registerSubscription(_subscription, false);
@@ -63,17 +63,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
ArrayList<QueueEntry> msgs = _subscription.getMessages();
try
{
- assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
+ assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
+ assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
+ assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
- assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
+ assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
+ assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
+ assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
- assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
+ assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
+ assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
}
catch (AssertionFailedError afe)
{
@@ -81,7 +81,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
int index = 1;
for (QueueEntry qe : msgs)
{
- System.err.println(index + ":" + qe.getMessage().getMessageId());
+ System.err.println(index + ":" + qe.getMessage().getMessageNumber());
index++;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 20503bf15c..966fb63186 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -160,7 +161,7 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+ public QueueEntry enqueue(ServerMessage message) throws AMQException
{
return null; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -280,7 +281,7 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
+
public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -321,7 +322,7 @@ public class MockAMQQueue implements AMQQueue
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
+
public void setMinimumAlertRepeatGap(long value)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 37f91e7464..850e241cbd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -49,11 +49,6 @@ public class MockQueueEntry implements QueueEntry
}
- public String debugIdentity()
- {
- return null;
- }
-
public boolean delete()
{
return false;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 3084dc7fa1..72bbd7fe0c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -185,7 +185,7 @@ public class SimpleAMQQueueTest extends TestCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
// Check removing the subscription removes it's information from the queue
@@ -196,7 +196,7 @@ public class SimpleAMQQueueTest extends TestCase
1 == _queue.getActiveConsumerCount());
AMQMessage messageB = createMessage(new Long (25));
- _queue.enqueue(null, messageB);
+ _queue.enqueue(messageB);
QueueEntry entry = _subscription.getLastSeenEntry();
assertNull(entry);
}
@@ -204,7 +204,7 @@ public class SimpleAMQQueueTest extends TestCase
public void testQueueNoSubscriber() throws AMQException, InterruptedException
{
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
@@ -223,7 +223,7 @@ public class SimpleAMQQueueTest extends TestCase
// Check sending a message ends up with the subscriber
AMQMessage messageA = createMessage(new Long(24));
- _queue.enqueue(null, messageA);
+ _queue.enqueue(messageA);
assertEquals(messageA, _subscription.getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
@@ -261,7 +261,7 @@ public class SimpleAMQQueueTest extends TestCase
_queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
_queue.unregisterSubscription(_subscription);
assertTrue("Queue was not deleted when subscription was removed",
_queue.isDeleted());
@@ -272,7 +272,7 @@ public class SimpleAMQQueueTest extends TestCase
_queue.registerSubscription(_subscription, false);
Long id = new Long(26);
AMQMessage message = createMessage(id);
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
QueueEntry entry = _subscription.getLastSeenEntry();
entry.setRedelivered(true);
_queue.resend(entry, _subscription);
@@ -286,7 +286,7 @@ public class SimpleAMQQueueTest extends TestCase
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
@@ -302,7 +302,7 @@ public class SimpleAMQQueueTest extends TestCase
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -323,7 +323,7 @@ public class SimpleAMQQueueTest extends TestCase
Long messageId = new Long(i);
AMQMessage message = createMessage(messageId);
// Put message on queue
- _queue.enqueue(null, message);
+ _queue.enqueue(message);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 2346660d25..d88d39002a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -44,6 +44,7 @@ public class TestReferenceCounting extends TestCase
{
super.setUp();
_store = new TestMemoryMessageStore();
+ StoreContext.setCurrentContext(_storeContext);
}
/**
@@ -96,7 +97,7 @@ public class TestReferenceCounting extends TestCase
assertEquals(1, _store.getMessageMetaDataMap().size());
- message.decrementReference(_storeContext);
+ message.decrementReference();
assertEquals(1, _store.getMessageMetaDataMap().size());
}
@@ -158,7 +159,7 @@ public class TestReferenceCounting extends TestCase
assertEquals(1, _store.getMessageMetaDataMap().size());
message = message.takeReference();
- message.decrementReference(_storeContext);
+ message.decrementReference();
assertEquals(1, _store.getMessageMetaDataMap().size());
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index ed01c91804..e9034d25d3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -904,10 +904,13 @@ public class FieldTable
}
}
+ public Object get(String key)
+ {
+ return get(new AMQShortString(key));
+ }
public Object get(AMQShortString key)
{
-
return getObject(key);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 1cdd1da72b..43445232d6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -55,7 +55,7 @@ public class Connection extends ConnectionInvoker
private static final Logger log = Logger.get(Connection.class);
- enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+ public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
class DefaultConnectionListener implements ConnectionListener
{
@@ -118,7 +118,7 @@ public class Connection extends ConnectionInvoker
sender.setIdleTimeout(idleTimeout);
}
- void setState(State state)
+ protected void setState(State state)
{
synchronized (lock)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index 2833565afc..8caf29ecb5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -52,13 +52,28 @@ public class ServerDelegate extends ConnectionDelegate
{
private SaslServer saslServer;
+ private List<Object> _locales;
+ private List<Object> _mechanisms;
+ private Map<String, Object> _clientProperties;
+
+
+ public ServerDelegate()
+ {
+ this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8"));
+ }
+
+ protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales)
+ {
+ _clientProperties = clientProperties;
+ _mechanisms = mechanisms;
+ _locales = locales;
+ }
public void init(Connection conn, ProtocolHeader hdr)
{
conn.send(new ProtocolHeader(1, 0, 10));
- List<Object> utf8 = new ArrayList<Object>();
- utf8.add("utf8");
- conn.connectionStart(null, Collections.EMPTY_LIST, utf8);
+
+ conn.connectionStart(_clientProperties, _mechanisms, _locales);
}
@Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
@@ -77,8 +92,8 @@ public class ServerDelegate extends ConnectionDelegate
try
{
- SaslServer ss = Sasl.createSaslServer
- (mechanism, "AMQP", "localhost", null, null);
+
+ SaslServer ss = createSaslServer(mechanism);
if (ss == null)
{
conn.connectionClose
@@ -95,6 +110,14 @@ public class ServerDelegate extends ConnectionDelegate
}
}
+ protected SaslServer createSaslServer(String mechanism)
+ throws SaslException
+ {
+ SaslServer ss = Sasl.createSaslServer
+ (mechanism, "AMQP", "localhost", null, null);
+ return ss;
+ }
+
private void secure(Connection conn, byte[] response)
{
SaslServer ss = conn.getSaslServer();
@@ -133,9 +156,16 @@ public class ServerDelegate extends ConnectionDelegate
@Override public void connectionOpen(Connection conn, ConnectionOpen open)
{
conn.connectionOpenOk(Collections.EMPTY_LIST);
+
conn.setState(OPEN);
}
+ protected Session getSession(Connection conn, SessionDelegate delegate, SessionAttach atc)
+ {
+ return new Session(conn, delegate, new Binary(atc.getName()), 0);
+ }
+
+
public Session getSession(Connection conn, SessionAttach atc)
{
return new Session(conn, new Binary(atc.getName()), 0);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 74150e043d..a8757bcb3c 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -82,7 +82,7 @@ public class Session extends SessionInvoker
private Binary name;
private long expiry;
private int channel;
- private SessionDelegate delegate = new SessionDelegate();
+ private SessionDelegate delegate;
private SessionListener listener = new DefaultSessionListener();
private long timeout = 60000;
private boolean autoSync = false;
@@ -112,9 +112,15 @@ public class Session extends SessionInvoker
private Thread resumer = null;
- Session(Connection connection, Binary name, long expiry)
+ protected Session(Connection connection, Binary name, long expiry)
+ {
+ this(connection, new SessionDelegate(), name, expiry);
+ }
+
+ protected Session(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this.connection = connection;
+ this.delegate = delegate;
this.name = name;
this.expiry = expiry;
initReceiver();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
new file mode 100644
index 0000000000..0c432eba6f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.url.URLSyntaxException;
+
+import javax.jms.*;
+
+public class SimpleConnectionTest extends TestCase
+{
+ public void testConnection()
+ {
+ try
+ {
+ AMQConnection conn = new AMQConnection("127.0.0.1", 5673, "guest", "guest", "test", "/test");
+ QueueSession s = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSender p = s.createSender(new AMQQueue("amq.direct", "queue"));
+ p.send(s.createTextMessage("test"));
+
+ QueueReceiver r = s.createReceiver(new AMQQueue("amq.direct", "queue"));
+ conn.start();
+ Message m = r.receive();
+
+ Thread.sleep(60000L);
+ conn.close();
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (URLSyntaxException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+}