summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-22 17:16:44 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-22 17:16:44 +0000
commitd779fd968deef2dd5395a93f39f70ebf677ea1b3 (patch)
tree99d69cf9e9499797d649873c5a7f985965be4459
parent455c63b69c150017c5fbbda0b9a56b29bc89326d (diff)
downloadqpid-python-d779fd968deef2dd5395a93f39f70ebf677ea1b3.tar.gz
QPID-5504 : initial refactoring to move common code into shared classes, make transports work similarly with respect to message routing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1560424 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java3
-rw-r--r--java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java6
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java88
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java37
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java2
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java6
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java10
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java5
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java1
-rwxr-xr-xjava/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java20
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java42
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java26
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java17
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java41
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java171
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java141
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java43
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java146
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java200
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java57
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java2
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java43
-rwxr-xr-xjava/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java4
-rw-r--r--java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java4
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java7
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java51
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java2
-rwxr-xr-xjava/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java10
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java137
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java7
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java3
-rw-r--r--java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java2
-rw-r--r--java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java1
-rw-r--r--java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java2
-rw-r--r--java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java81
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java10
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java62
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java30
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java32
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java12
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java68
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java68
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java50
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java68
52 files changed, 353 insertions, 1494 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
index 4c9eae6f3e..9d52241c4c 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
@@ -663,9 +663,8 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade
{
final MessagePublishInfo publishBody = readMessagePublishInfo(input);
final ContentHeaderBody contentHeaderBody = readContentHeaderBody(input);
- final int contentChunkCount = input.readInt();
- return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+ return new MessageMetaData(publishBody, contentHeaderBody);
}
catch (Exception e)
{
diff --git a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 76b990038d..3d6821ad47 100644
--- a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -94,7 +94,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
- MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
@@ -150,7 +150,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight());
assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize());
- BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
+ BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties();
assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
@@ -392,7 +392,7 @@ public class BDBMessageStoreTest extends MessageStoreTest
ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
- MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8);
StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
storedMessage_0_8.addContent(0, chunk1);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
index 63bd1e45a0..d19f127b16 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
@@ -47,10 +47,6 @@ public interface AMQMessageHeader
String getReplyTo();
- String getReplyToExchange();
- String getReplyToRoutingKey();
-
-
Object getHeader(String name);
boolean containsHeaders(Set<String> names);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 8311dbd5ff..e5aa6c1158 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -23,9 +23,11 @@ package org.apache.qpid.server.message;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T>
+public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
@@ -33,10 +35,13 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
+ private final Object _connectionReference;
- public AbstractServerMessageImpl(StoredMessage<T> handle)
+
+ public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
{
_handle = handle;
+ _connectionReference = connectionReference;
}
public StoredMessage<T> getStoredMessage()
@@ -44,16 +49,11 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
return _handle;
}
- public boolean incrementReference()
+ private boolean incrementReference()
{
- return incrementReference(1);
- }
-
- public boolean incrementReference(int count)
- {
- if(_refCountUpdater.addAndGet(this, count) <= 0)
+ if(_refCountUpdater.incrementAndGet(this) <= 0)
{
- _refCountUpdater.addAndGet(this, -count);
+ _refCountUpdater.decrementAndGet(this);
return false;
}
else
@@ -67,7 +67,7 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
* message store.
*
*/
- public void decrementReference()
+ private void decrementReference()
{
int count = _refCountUpdater.decrementAndGet(this);
@@ -104,8 +104,72 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")";
}
- protected int getReferenceCount()
+ private int getReferenceCount()
{
return _referenceCount;
}
+
+ @Override
+ final public MessageReference<X> newReference()
+ {
+ return new Reference();
+ }
+
+ @Override
+ final public boolean isPersistent()
+ {
+ return _handle.getMetaData().isPersistent();
+ }
+
+ @Override
+ final public long getMessageNumber()
+ {
+ return getStoredMessage().getMessageNumber();
+ }
+
+ @Override
+ final public int getContent(ByteBuffer buf, int offset)
+ {
+ return getStoredMessage().getContent(offset, buf);
+ }
+
+ @Override
+ final public ByteBuffer getContent(int offset, int size)
+ {
+ return getStoredMessage().getContent(offset, size);
+ }
+
+ final public Object getConnectionReference()
+ {
+ return _connectionReference;
+ }public String toString()
+ {
+ return "Message[" + debugIdentity() + "]";
+ }
+
+ private final class Reference implements MessageReference<X>
+ {
+
+ private final AtomicBoolean _released = new AtomicBoolean(false);
+
+ private Reference()
+ {
+ incrementReference();
+ }
+
+ public X getMessage()
+ {
+ return (X) AbstractServerMessageImpl.this;
+ }
+
+ public void release()
+ {
+ if(!_released.getAndSet(true))
+ {
+ decrementReference();
+ }
+ }
+
+ }
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
index 1b3fdb1870..03f1d6649c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
@@ -34,4 +34,6 @@ public interface InboundMessage extends Filterable
boolean isRedelivered();
long getSize();
+
+ Object getConnectionReference();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
index 399f8f9327..eda85507f0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,39 +20,8 @@
*/
package org.apache.qpid.server.message;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class MessageReference<M extends ServerMessage>
+public interface MessageReference<M extends ServerMessage>
{
-
- private final AtomicBoolean _released = new AtomicBoolean(false);
-
- private volatile M _message;
-
- public MessageReference(M message)
- {
- _message = message;
- onReference(message);
- }
-
- abstract protected void onReference(M message);
-
- abstract protected void onRelease(M message);
-
- public M getMessage()
- {
- return _message;
- }
-
- public void release()
- {
- if(!_released.getAndSet(true))
- {
- if(_message != null)
- {
- onRelease(_message);
- }
- }
- }
-
+ public M getMessage();
+ public void release();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index e1ad2fd0ca..a975ba1bc8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -37,8 +37,6 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enquea
long getSize();
- boolean isImmediate();
-
long getExpiration();
MessageReference newReference();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
index df26037eed..c8e2c3a6fb 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
@@ -67,4 +67,10 @@ public class InboundMessageAdapter implements InboundMessage
{
return _entry.getSize();
}
+
+ @Override
+ public Object getConnectionReference()
+ {
+ return (_entry.getMessage() instanceof InboundMessage) ? ((InboundMessage) _entry.getMessage()).getConnectionReference() : null;
+ }
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 833df34fd8..8919599cba 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -96,16 +96,6 @@ public class HeadersBindingTest extends TestCase
return null;
}
- public String getReplyToExchange()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public String getReplyToRoutingKey()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public Object getHeader(String name)
{
return _headers.get(name);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 25f1df7a20..24a507173c 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,11 +130,6 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
return _storedMsg;
}
- @Override
- public boolean isImmediate()
- {
- return false;
- }
@Override
public boolean isPersistent()
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index c6ae0c6e47..5244a7f51b 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -113,7 +113,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
deliveryProps.setExpiration(serverMsg.getExpiration());
- deliveryProps.setImmediate(serverMsg.isImmediate());
deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 2ca680869f..a471e53fc6 100755
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
+public class MessageMetaData_0_10 implements StorableMessageMetaData
{
private Header _header;
private DeliveryProperties _deliveryProps;
@@ -53,8 +53,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10();
private volatile ByteBuffer _encoded;
- private Object _connectionReference;
-
public MessageMetaData_0_10(MessageTransfer xfr)
{
@@ -202,12 +200,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
}
- public boolean isRedelivered()
- {
- // The *Message* is never redelivered, only queue entries are...
- return false;
- }
-
public long getArrivalTime()
{
return _arrivalTime;
@@ -218,16 +210,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
return _header;
}
- public void setConnectionReference(Object connectionReference)
- {
- _connectionReference = connectionReference;
- }
-
- public Object getConnectionReference()
- {
- return _connectionReference;
- }
-
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
{
public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index 34cf6998ab..e01fb474ac 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -30,15 +30,12 @@ import org.apache.qpid.transport.Header;
import java.nio.ByteBuffer;
-public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage
{
- private Object _connectionRef;
-
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
{
- super(storeMessage);
- _connectionRef = connectionRef;
+ super(storeMessage, connectionRef);
}
private MessageMetaData_0_10 getMetaData()
@@ -56,12 +53,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet
return getMetaData().getMessageHeader();
}
- public boolean isPersistent()
- {
- return getMetaData().isPersistent();
- }
-
-
public boolean isRedelivered()
{
// The *Message* is never redelivered, only queue entries are... this is here so that filters
@@ -71,7 +62,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet
public long getSize()
{
-
return getMetaData().getSize();
}
@@ -85,32 +75,11 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet
return getMetaData().getExpiration();
}
- public MessageReference newReference()
- {
- return new TransferMessageReference(this);
- }
-
- public long getMessageNumber()
- {
- return getStoredMessage().getMessageNumber();
- }
-
public long getArrivalTime()
{
return getMetaData().getArrivalTime();
}
- public int getContent(ByteBuffer buf, int offset)
- {
- return getStoredMessage().getContent(offset, buf);
- }
-
-
- public ByteBuffer getContent(int offset, int size)
- {
- return getStoredMessage().getContent(offset,size);
- }
-
public Header getHeader()
{
return getMetaData().getHeader();
@@ -118,13 +87,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet
public ByteBuffer getBody()
{
-
return getContent(0, (int)getSize());
}
-
- public Object getConnectionReference()
- {
- return _connectionRef;
- }
-
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index abe784cefa..f98eb09b43 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -184,7 +184,7 @@ public class ServerSession extends Session
return isCommandsFull(id);
}
- public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
+ public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -768,10 +768,7 @@ public class ServerSession extends Session
public boolean onSameConnection(InboundMessage inbound)
{
- return ((inbound instanceof MessageTransferMessage)
- && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference())
- || ((inbound instanceof MessageMetaData_0_10)
- && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference());
+ return inbound.getConnectionReference() == getConnection().getReference();
}
@@ -852,31 +849,25 @@ public class ServerSession extends Session
private class PostEnqueueAction implements ServerTransaction.Action
{
- private List<? extends BaseQueue> _queues;
- private ServerMessage _message;
+ private final MessageReference<MessageTransferMessage> _reference;
+ private final List<? extends BaseQueue> _queues;
private final boolean _transactional;
- public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional)
+ public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional)
{
+ _reference = message.newReference();
_transactional = transactional;
- setState(queues, message);
- }
-
- public void setState(List<? extends BaseQueue> queues, ServerMessage message)
- {
- _message = message;
_queues = queues;
}
public void postCommit()
{
- MessageReference<?> ref = _message.newReference();
for(int i = 0; i < _queues.size(); i++)
{
try
{
BaseQueue queue = _queues.get(i);
- queue.enqueue(_message, _transactional, null);
+ queue.enqueue(_reference.getMessage(), _transactional, null);
if(queue instanceof AMQQueue)
{
((AMQQueue)queue).checkCapacity(ServerSession.this);
@@ -889,12 +880,13 @@ public class ServerSession extends Session
throw new RuntimeException(e);
}
}
- ref.release();
+ _reference.release();
}
public void onRollback()
{
// NO-OP
+ _reference.release();
}
}
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index a0b60ae640..182e71c957 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -297,7 +298,6 @@ public class ServerSessionDelegate extends SessionDelegate
}
final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
- messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
{
@@ -309,11 +309,16 @@ public class ServerSessionDelegate extends SessionDelegate
}
final Exchange exchangeInUse;
- List<? extends BaseQueue> queues = exchange.route(messageMetaData);
+ final MessageStore store = getVirtualHost(ssn).getMessageStore();
+ final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
+ final ServerSession serverSession = (ServerSession) ssn;
+ MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+ MessageReference<MessageTransferMessage> reference = message.newReference();
+ List<? extends BaseQueue> queues = exchange.route(message);
if(queues.isEmpty() && exchange.getAlternateExchange() != null)
{
final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(messageMetaData);
+ queues = alternateExchange.route(message);
if (!queues.isEmpty())
{
exchangeInUse = alternateExchange;
@@ -328,12 +333,8 @@ public class ServerSessionDelegate extends SessionDelegate
exchangeInUse = exchange;
}
- final ServerSession serverSession = (ServerSession) ssn;
if(!queues.isEmpty())
{
- final MessageStore store = getVirtualHost(ssn).getMessageStore();
- final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
@@ -352,7 +353,6 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
-
if(serverSession.isTransactional())
{
serverSession.processed(xfr);
@@ -361,6 +361,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
}
+ reference.release();
}
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java
deleted file mode 100644
index 0c04f22232..0000000000
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.apache.qpid.server.message.MessageReference;
-
-public class TransferMessageReference extends MessageReference<MessageTransferMessage>
-{
- public TransferMessageReference(MessageTransferMessage message)
- {
- super(message);
- }
-
- protected void onReference(MessageTransferMessage message)
- {
- message.incrementReference();
- }
-
- protected void onRelease(MessageTransferMessage message)
- {
- message.decrementReference();
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index a603807f87..c6e0dfc3e2 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -49,7 +49,6 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
@@ -271,7 +270,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
throw new AMQSecurityException("Permission denied: " + e.getName());
}
- _currentMessage = new IncomingMessage(info, getProtocolSession().getReference());
+ _currentMessage = new IncomingMessage(info);
_currentMessage.setExchange(e);
}
@@ -291,12 +290,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_currentMessage.setContentHeaderBody(contentHeaderBody);
- _currentMessage.setExpiration();
-
- _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime());
-
- _currentMessage.route();
-
deliverCurrentMessageIfComplete();
}
}
@@ -309,56 +302,62 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
try
{
- final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
- if(!checkMessageUserId(_currentMessage.getContentHeader()))
- {
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage));
- }
- else
+ final MessageMetaData messageMetaData =
+ new MessageMetaData(_currentMessage.getMessagePublishInfo(),
+ _currentMessage.getContentHeader(),
+ getProtocolSession().getLastReceivedTime());
+
+ final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
+ final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
+ MessageReference reference = amqMessage.newReference();
+ try
{
- if(destinationQueues == null || destinationQueues.isEmpty())
+ int bodyCount = _currentMessage.getBodyCount();
+ if(bodyCount > 0)
{
- handleUnroutableMessage();
+ long bodyLengthReceived = 0;
+ for(int i = 0 ; i < bodyCount ; i++)
+ {
+ ContentBody contentChunk = _currentMessage.getContentChunk(i);
+ handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload()));
+ bodyLengthReceived += contentChunk.getSize();
+ }
+ }
+
+ if(!checkMessageUserId(_currentMessage.getContentHeader()))
+ {
+ _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage));
}
else
{
- final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData());
- _currentMessage.setStoredMessage(handle);
- int bodyCount = _currentMessage.getBodyCount();
- if(bodyCount > 0)
+ final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage);
+
+ if(destinationQueues == null || destinationQueues.isEmpty())
{
- long bodyLengthReceived = 0;
- for(int i = 0 ; i < bodyCount ; i++)
- {
- ContentChunk contentChunk = _currentMessage.getContentChunk(i);
- handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
- bodyLengthReceived += contentChunk.getSize();
- }
+ handleUnroutableMessage(amqMessage);
}
-
- _transaction.addPostTransactionAction(new ServerTransaction.Action()
+ else
{
- public void postCommit()
- {
- }
-
- public void onRollback()
- {
- handle.remove();
- }
- });
-
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
- incrementOutstandingTxnsIfNecessary();
- _currentMessage.getStoredMessage().flushToStore();
+ _transaction.enqueue(destinationQueues,
+ amqMessage,
+ new MessageDeliveryAction(amqMessage, destinationQueues));
+ incrementOutstandingTxnsIfNecessary();
+ handle.flushToStore();
+
+ }
}
}
+ finally
+ {
+ reference.release();
+ }
+
}
finally
{
long bodySize = _currentMessage.getSize();
- long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp();
+ long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
_session.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
@@ -374,9 +373,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* @throws AMQConnectionException if the message is mandatoryclose-on-no-route
* @see AMQProtocolSession#isCloseWhenNoRoute()
*/
- private void handleUnroutableMessage() throws AMQConnectionException
+ private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
{
- boolean mandatory = _currentMessage.isMandatory();
+ boolean mandatory = message.isMandatory();
String description = currentMessageDescription();
boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
@@ -398,13 +397,18 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
(Throwable) null);
}
- if (mandatory || _currentMessage.isImmediate())
+ if (mandatory || message.isImmediate())
{
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage));
+ _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message));
}
else
{
- _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
+ _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(),
+ _currentMessage.getMessagePublishInfo().getRoutingKey() == null
+ ? null
+ : _currentMessage.getMessagePublishInfo()
+ .getRoutingKey()
+ .toString()));
}
}
@@ -417,15 +421,17 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return String.format(
"[Exchange: %s, Routing key: %s]",
- _currentMessage.getExchange(),
- _currentMessage.getRoutingKey());
+ _currentMessage.getExchangeName(),
+ _currentMessage.getMessagePublishInfo().getRoutingKey() == null
+ ? null
+ : _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
}
public void publishContentBody(ContentBody contentBody) throws AMQException
{
if (_currentMessage == null)
{
- throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+ throw new AMQException("Received content body without previously receiving a Content Header");
}
if (_logger.isDebugEnabled())
@@ -435,10 +441,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
try
{
- final ContentChunk contentChunk =
- _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
-
- _currentMessage.addContentBodyFrame(contentChunk);
+ _currentMessage.addContentBodyFrame(contentBody);
deliverCurrentMessageIfComplete();
}
@@ -1157,24 +1160,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+ private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
throws AMQException
{
- AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage());
+ AMQMessage message = new AMQMessage(handle, _session.getReference());
- message.setExpiration(incomingMessage.getExpiration());
- message.setConnectionIdentifier(_session.getReference());
+ final BasicContentHeaderProperties properties =
+ incomingMessage.getContentHeader().getProperties();
+
+ long expiration = properties.getExpiration();
+ message.setExpiration(expiration);
return message;
}
private boolean checkMessageUserId(ContentHeaderBody header)
{
- AMQShortString userID =
- header.getProperties() instanceof BasicContentHeaderProperties
- ? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
- : null;
-
+ AMQShortString userID = header.getProperties().getUserId();
return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
@@ -1208,13 +1210,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private class MessageDeliveryAction implements ServerTransaction.Action
{
- private IncomingMessage _incommingMessage;
+ private final MessageReference<AMQMessage> _reference;
private List<? extends BaseQueue> _destinationQueues;
- public MessageDeliveryAction(IncomingMessage currentMessage,
+ public MessageDeliveryAction(AMQMessage currentMessage,
List<? extends BaseQueue> destinationQueues)
{
- _incommingMessage = currentMessage;
+ _reference = currentMessage.newReference();
_destinationQueues = destinationQueues;
}
@@ -1222,10 +1224,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
try
{
- final boolean immediate = _incommingMessage.isImmediate();
-
- final AMQMessage amqMessage = createAMQMessage(_incommingMessage);
- MessageReference ref = amqMessage.newReference();
+ AMQMessage message = _reference.getMessage();
+ final boolean immediate = message.isImmediate();
for(int i = 0; i < _destinationQueues.size(); i++)
{
@@ -1242,7 +1242,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
action = null;
}
- queue.enqueue(amqMessage, isTransactional(), action);
+ queue.enqueue(message, isTransactional(), action);
if(queue instanceof AMQQueue)
{
@@ -1251,8 +1251,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- _incommingMessage.getStoredMessage().flushToStore();
- ref.release();
+ message.getStoredMessage().flushToStore();
+ _reference.release();
}
catch (AMQException e)
{
@@ -1265,6 +1265,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
// Maybe keep track of entries that were created and then delete them here in case of failure
// to in memory enqueue
+ _reference.release();
}
private class ImmediateAction implements BaseQueue.PostEnqueueAction
@@ -1375,28 +1376,30 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private class WriteReturnAction implements ServerTransaction.Action
{
private final AMQConstant _errorCode;
- private final IncomingMessage _message;
private final String _description;
+ private final MessageReference<AMQMessage> _reference;
public WriteReturnAction(AMQConstant errorCode,
String description,
- IncomingMessage message)
+ AMQMessage message)
{
_errorCode = errorCode;
- _message = message;
_description = description;
+ _reference = message.newReference();
}
public void postCommit()
{
try
{
- _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(),
- _message.getContentHeader(),
- _message,
+ AMQMessage message = _reference.getMessage();
+ _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
_channelId,
_errorCode.getCode(),
AMQShortString.validValueOf(_description));
+ _reference.release();
}
catch (AMQException e)
{
@@ -1408,6 +1411,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void onRollback()
{
+ _reference.release();
}
}
@@ -1470,12 +1474,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public boolean onSameConnection(InboundMessage inbound)
{
- if(inbound instanceof IncomingMessage)
- {
- IncomingMessage incoming = (IncomingMessage) inbound;
- return getProtocolSession().getReference() == incoming.getConnectionReference();
- }
- return false;
+ return getProtocolSession().getReference() == inbound.getConnectionReference();
}
public int getUnacknowledgedMessageCount()
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 416a4da183..b73b6bc0aa 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -28,66 +28,41 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
-import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
+public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
/** Flag to indicate that this message requires 'immediate' delivery. */
- private static final byte IMMEDIATE = 0x01;
-
- /**
- * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
- * for messages published with the 'immediate' flag.
- */
-
- private static final byte DELIVERED_TO_CONSUMER = 0x02;
-
- private byte _flags = 0;
-
private long _expiration;
private final long _size;
- private Object _connectionIdentifier;
- private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
-
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
this(handle, null);
}
- public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef)
+ public AMQMessage(StoredMessage<MessageMetaData> handle, Object connectionReference)
{
- super(handle);
-
-
- final MessageMetaData metaData = handle.getMetaData();
- _size = metaData.getContentSize();
- final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
-
- if(messagePublishInfo.isImmediate())
- {
- _flags |= IMMEDIATE;
- }
+ super(handle, connectionReference);
+ _size = handle.getMetaData().getContentSize();
}
public void setExpiration(final long expiration)
{
-
_expiration = expiration;
-
}
public MessageMetaData getMessageMetaData()
@@ -100,21 +75,6 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
return getMessageMetaData().getContentHeaderBody();
}
- public Long getMessageId()
- {
- return getStoredMessage().getMessageNumber();
- }
-
- /**
- * Called selectors to determin if the message has already been sent
- *
- * @return _deliveredToConsumer
- */
- public boolean getDeliveredToConsumer()
- {
- return (_flags & DELIVERED_TO_CONSUMER) != 0;
- }
-
public String getRoutingKey()
{
MessageMetaData messageMetaData = getMessageMetaData();
@@ -134,22 +94,10 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
return getMessageMetaData().getMessageHeader();
}
- public boolean isPersistent()
- {
- return getMessageMetaData().isPersistent();
- }
-
- /**
- * Called to enforce the 'immediate' flag.
- *
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
- */
- public boolean immediateAndNotDelivered()
+ @Override
+ public boolean isRedelivered()
{
-
- return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
+ return false;
}
public MessagePublishInfo getMessagePublishInfo()
@@ -162,90 +110,27 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
return getMessageMetaData().getArrivalTime();
}
- /**
- * Checks to see if the message has expired. If it has the message is dequeued.
- *
- * @param queue The queue to check the expiration against. (Currently not used)
- *
- * @return true if the message has expire
- *
- * @throws AMQException
- */
- public boolean expired(AMQQueue queue) throws AMQException
- {
-
- if (_expiration != 0L)
- {
- long now = System.currentTimeMillis();
-
- return (now > _expiration);
- }
-
- return false;
- }
-
- /**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
- * And for selector efficiency.
- */
- public void setDeliveredToConsumer()
- {
- _flags |= DELIVERED_TO_CONSUMER;
- }
-
public long getSize()
{
return _size;
-
}
public boolean isImmediate()
{
- return (_flags & IMMEDIATE) == IMMEDIATE;
- }
-
- public long getExpiration()
- {
- return _expiration;
- }
-
- public MessageReference newReference()
- {
- return new AMQMessageReference(this);
+ return getMessagePublishInfo().isImmediate();
}
- public long getMessageNumber()
- {
- return getStoredMessage().getMessageNumber();
- }
-
-
- public Object getConnectionIdentifier()
- {
- return _connectionIdentifier;
-
- }
- public void setConnectionIdentifier(final Object connectionIdentifier)
+ public boolean isMandatory()
{
- _connectionIdentifier = connectionIdentifier;
+ return getMessagePublishInfo().isMandatory();
}
- public String toString()
- {
- return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount();
- }
-
- public int getContent(ByteBuffer buf, int offset)
+ public long getExpiration()
{
- return getStoredMessage().getContent(offset, buf);
+ return _expiration;
}
- public ByteBuffer getContent(int offset, int size)
- {
- return getStoredMessage().getContent(offset, size);
- }
-
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java
deleted file mode 100644
index 3adc9f70cd..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import org.apache.qpid.server.message.MessageReference;
-
-public class AMQMessageReference extends MessageReference<AMQMessage>
-{
-
-
- public AMQMessageReference(AMQMessage message)
- {
- super(message);
- }
-
- protected void onReference(AMQMessage message)
- {
- message.incrementReference();
- }
-
- protected void onRelease(AMQMessage message)
- {
- message.decrementReference();
- }
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java
deleted file mode 100644
index f5c43003a4..0000000000
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8;
-
-import java.util.Collection;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-
-import java.util.Set;
-import org.apache.qpid.server.message.AMQMessageHeader;
-
-public class ContentHeaderBodyAdapter implements AMQMessageHeader
-{
- private final ContentHeaderBody _contentHeaderBody;
-
- public ContentHeaderBodyAdapter(ContentHeaderBody contentHeaderBody)
- {
- _contentHeaderBody = contentHeaderBody;
- }
-
- private BasicContentHeaderProperties getProperties()
- {
- return (BasicContentHeaderProperties) _contentHeaderBody.getProperties();
- }
-
- public String getCorrelationId()
- {
- return getProperties().getCorrelationIdAsString();
- }
-
- public long getExpiration()
- {
- return getProperties().getExpiration();
- }
-
- public String getUserId()
- {
- return getProperties().getUserIdAsString();
- }
-
- public String getAppId()
- {
- return getProperties().getAppIdAsString();
- }
-
- public String getMessageId()
- {
- return getProperties().getMessageIdAsString();
- }
-
- public String getMimeType()
- {
- return getProperties().getContentTypeAsString();
- }
-
- public String getEncoding()
- {
- return getProperties().getEncodingAsString();
- }
-
- public byte getPriority()
- {
- return getProperties().getPriority();
- }
-
- public long getTimestamp()
- {
- return getProperties().getTimestamp();
- }
-
- public String getType()
- {
- return getProperties().getTypeAsString();
- }
-
- public String getReplyTo()
- {
- return getProperties().getReplyToAsString();
- }
-
- public String getReplyToExchange()
- {
- // TODO
- return getReplyTo();
- }
-
- public String getReplyToRoutingKey()
- {
- // TODO
- return getReplyTo();
-
- }
-
- public Object getHeader(String name)
- {
- FieldTable ft = getProperties().getHeaders();
- return ft.get(name);
- }
-
- public boolean containsHeaders(Set<String> names)
- {
- FieldTable ft = getProperties().getHeaders();
- for(String name : names)
- {
- if(!ft.containsKey(name))
- {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Collection<String> getHeaderNames()
- {
- FieldTable ft = getProperties().getHeaders();
- return ft.keys();
- }
-
- public boolean containsHeader(String name)
- {
- FieldTable ft = getProperties().getHeaders();
- return ft.containsKey(name);
- }
-
-
-}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
index 5267651a66..5a9a51ff59 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
@@ -24,96 +24,44 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.Filterable;
import org.apache.qpid.server.store.StoredMessage;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
+public class IncomingMessage
{
- /** Used for debugging purposes. */
- private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
-
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
-
+ private Exchange _exchange;
/**
* Keeps a track of how many bytes we have received in body frames
*/
private long _bodyLengthReceived = 0;
+ private List<ContentBody> _contentChunks = new ArrayList<ContentBody>();
- /**
- * This is stored during routing, to know the queues to which this message should immediately be
- * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
- * by the message handle.
- */
- private List<? extends BaseQueue> _destinationQueues;
-
- private long _expiration;
-
- private Exchange _exchange;
-
- private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
-
- // we keep both the original meta data object and the store reference to it just in case the
- // store would otherwise flow it to disk
-
- private MessageMetaData _messageMetaData;
-
- private StoredMessage<MessageMetaData> _storedMessageHandle;
- private Object _connectionReference;
-
-
- public IncomingMessage(
- final MessagePublishInfo info
- )
- {
- this(info, null);
- }
-
- public IncomingMessage(MessagePublishInfo info, Object reference)
+ public IncomingMessage(MessagePublishInfo info)
{
_messagePublishInfo = info;
- _connectionReference = reference;
}
- public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
+ public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody)
{
_contentHeaderBody = contentHeaderBody;
}
- public void setExpiration()
- {
- _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
- }
-
- public MessageMetaData headersReceived(long currentTime)
- {
- _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
- return _messageMetaData;
- }
-
-
- public List<? extends BaseQueue> getDestinationQueues()
+ public MessagePublishInfo getMessagePublishInfo()
{
- return _destinationQueues;
+ return _messagePublishInfo;
}
- public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
+ public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException
{
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -124,31 +72,14 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return (_bodyLengthReceived == getContentHeader().getBodySize());
}
- public AMQShortString getExchange()
+ public AMQShortString getExchangeName()
{
return _messagePublishInfo.getExchange();
}
- public String getRoutingKey()
- {
- return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
- }
-
- public String getBinding()
- {
- return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
- }
-
-
- public boolean isMandatory()
- {
- return _messagePublishInfo.isMandatory();
- }
-
-
- public boolean isImmediate()
+ public Exchange getExchange()
{
- return _messagePublishInfo.isImmediate();
+ return _exchange;
}
public ContentHeaderBody getContentHeader()
@@ -156,129 +87,24 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
return _contentHeaderBody;
}
-
- public AMQMessageHeader getMessageHeader()
- {
- return _messageMetaData.getMessageHeader();
- }
-
- public boolean isPersistent()
- {
- return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
- BasicContentHeaderProperties.PERSISTENT;
- }
-
- public boolean isRedelivered()
- {
- return false;
- }
-
-
public long getSize()
{
return getContentHeader().getBodySize();
}
- public long getMessageNumber()
- {
- return _storedMessageHandle.getMessageNumber();
- }
-
public void setExchange(final Exchange e)
{
_exchange = e;
}
- public void route()
- {
- enqueue(_exchange.route(this));
-
- }
-
- public void enqueue(final List<? extends BaseQueue> queues)
- {
- _destinationQueues = queues;
- }
-
- public MessagePublishInfo getMessagePublishInfo()
- {
- return _messagePublishInfo;
- }
-
- public long getExpiration()
- {
- return _expiration;
- }
-
public int getBodyCount() throws AMQException
{
return _contentChunks.size();
}
- public ContentChunk getContentChunk(int index)
+ public ContentBody getContentChunk(int index)
{
return _contentChunks.get(index);
}
-
- public int getContent(ByteBuffer buf, int offset)
- {
- int pos = 0;
- int written = 0;
- for(ContentChunk cb : _contentChunks)
- {
- ByteBuffer data = ByteBuffer.wrap(cb.getData());
- if(offset+written >= pos && offset < pos + data.limit())
- {
- ByteBuffer src = data.duplicate();
- src.position(offset+written - pos);
- src = src.slice();
-
- if(buf.remaining() < src.limit())
- {
- src.limit(buf.remaining());
- }
- int count = src.limit();
- buf.put(src);
- written += count;
- if(buf.remaining() == 0)
- {
- break;
- }
- }
- pos+=data.limit();
- }
- return written;
-
- }
-
-
- public ByteBuffer getContent(int offset, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- getContent(buf,offset);
- buf.flip();
- return buf;
- }
-
- public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
- {
- _storedMessageHandle = storedMessageHandle;
- }
-
- public StoredMessage<MessageMetaData> getStoredMessage()
- {
- return _storedMessageHandle;
- }
-
- public Object getConnectionReference()
- {
- return _connectionReference;
- }
-
- public MessageMetaData getMessageMetaData()
- {
- return _messageMetaData;
- }
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 4cc590d8cc..ead28c6e26 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -46,11 +46,10 @@ import java.util.Set;
*/
public class MessageMetaData implements StorableMessageMetaData
{
- private MessagePublishInfo _messagePublishInfo;
+ private final MessagePublishInfo _messagePublishInfo;
- private ContentHeaderBody _contentHeaderBody;
+ private final ContentHeaderBody _contentHeaderBody;
- private int _contentChunkCount;
private long _arrivalTime;
private static final byte MANDATORY_FLAG = 1;
@@ -58,59 +57,36 @@ public class MessageMetaData implements StorableMessageMetaData
public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8();
- public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+ public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody)
{
- this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis());
+ this(publishBody,contentHeaderBody, System.currentTimeMillis());
}
- public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime)
+ public MessageMetaData(MessagePublishInfo publishBody,
+ ContentHeaderBody contentHeaderBody,
+ long arrivalTime)
{
_contentHeaderBody = contentHeaderBody;
_messagePublishInfo = publishBody;
- _contentChunkCount = contentChunkCount;
_arrivalTime = arrivalTime;
}
- public int getContentChunkCount()
- {
- return _contentChunkCount;
- }
-
- public void setContentChunkCount(int contentChunkCount)
- {
- _contentChunkCount = contentChunkCount;
- }
public ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- {
- _contentHeaderBody = contentHeaderBody;
- }
-
public MessagePublishInfo getMessagePublishInfo()
{
return _messagePublishInfo;
}
- public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo)
- {
- _messagePublishInfo = messagePublishInfo;
- }
-
public long getArrivalTime()
{
return _arrivalTime;
}
- public void setArrivalTime(long arrivalTime)
- {
- _arrivalTime = arrivalTime;
- }
-
public MessageMetaDataType getType()
{
return TYPE;
@@ -169,8 +145,7 @@ public class MessageMetaData implements StorableMessageMetaData
public boolean isPersistent()
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
- return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT;
+ return _contentHeaderBody.getProperties().getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT;
}
private static class MetaDataFactory implements MessageMetaDataType.Factory
@@ -219,7 +194,7 @@ public class MessageMetaData implements StorableMessageMetaData
return routingKey;
}
};
- return new MessageMetaData(publishBody, chb, 0, arrivalTime);
+ return new MessageMetaData(publishBody, chb, arrivalTime);
}
catch (AMQException e)
{
@@ -242,7 +217,7 @@ public class MessageMetaData implements StorableMessageMetaData
{
private BasicContentHeaderProperties getProperties()
{
- return (BasicContentHeaderProperties) getContentHeaderBody().getProperties();
+ return getContentHeaderBody().getProperties();
}
public String getUserId()
@@ -300,18 +275,6 @@ public class MessageMetaData implements StorableMessageMetaData
return getProperties().getReplyToAsString();
}
- public String getReplyToExchange()
- {
- // TODO
- return getReplyTo();
- }
-
- public String getReplyToRoutingKey()
- {
- // TODO
- return getReplyTo();
- }
-
public Object getHeader(String name)
{
FieldTable ft = getProperties().getHeaders();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
index d1d86fe478..f069042db3 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
@@ -488,7 +488,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
{
AMQMessage message = (AMQMessage) entry.getMessage();
- final Object publisherReference = message.getConnectionIdentifier();
+ final Object publisherReference = message.getConnectionReference();
// We don't want local messages so check to see if message is one we sent
Object localReference = getProtocolSession().getReference();
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 4ab64ca100..176d1858f1 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -119,8 +119,6 @@ public class AckTest extends QpidTestCase
return new AMQShortString("rk");
}
};
- final IncomingMessage msg = new IncomingMessage(publishBody);
- //IncomingMessage msg2 = null;
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
ContentHeaderBody cb = new ContentHeaderBody();
cb.setProperties(b);
@@ -131,42 +129,35 @@ public class AckTest extends QpidTestCase
b.setDeliveryMode((byte) 2);
}
- msg.setContentHeaderBody(cb);
-
// we increment the reference here since we are not delivering the messaging to any queues, which is where
// the reference is normally incremented. The test is easier to construct if we have direct access to the
// subscription
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
- msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis());
+ MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis());
final StoredMessage storedMessage = _messageStore.addMessage(mmd);
- msg.setStoredMessage(storedMessage);
final AMQMessage message = new AMQMessage(storedMessage);
- if(msg.allContentReceived())
- {
- ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, message, new ServerTransaction.Action() {
- public void postCommit()
+ ServerTransaction txn = new AutoCommitTransaction(_messageStore);
+ txn.enqueue(_queue, message, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
{
- try
- {
-
- _queue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
- public void onRollback()
+ _queue.enqueue(message);
+ }
+ catch (AMQException e)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ throw new RuntimeException(e);
}
- });
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
- }
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
index 15573a871f..50145e5c6d 100755
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java
@@ -57,9 +57,9 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData>
{
FieldTable headers = new FieldTable();
headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue));
- ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers);
+ ( chb.getProperties()).setHeaders(headers);
}
- _metaData = new MessageMetaData(info, chb, 0);
+ _metaData = new MessageMetaData(info, chb);
_content = ByteBuffer.allocate(_metaData.getContentSize());
}
diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
index 87fbcfa9b3..227e9794da 100644
--- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
+++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
@@ -81,7 +81,7 @@ public class ReferenceCountingTest extends QpidTestCase
- MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
@@ -139,7 +139,7 @@ public class ReferenceCountingTest extends QpidTestCase
final ContentHeaderBody chb = createPersistentContentHeader();
- MessageMetaData mmd = new MessageMetaData(info, chb, 0);
+ MessageMetaData mmd = new MessageMetaData(info, chb);
StoredMessage storedMessage = _store.addMessage(mmd);
AMQMessage message = new AMQMessage(storedMessage);
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 4cb9767514..4082f22e9c 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -51,6 +51,8 @@ public class Connection_1_0 implements ConnectionEventListener
private final ConnectionEndpoint _conn;
private final long _connectionId;
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
+ private final Object _reference = new Object();
+
public static interface Task
@@ -79,6 +81,11 @@ public class Connection_1_0 implements ConnectionEventListener
}
+ public Object getReference()
+ {
+ return _reference;
+ }
+
public void remoteSessionCreation(SessionEndpoint endpoint)
{
Session_1_0 session = new Session_1_0(_vhost, this);
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 2cef27267b..836eb69350 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -53,34 +54,48 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- final List<? extends BaseQueue> queues = _exchange.route(message);
+ List<? extends BaseQueue> queues = _exchange.route(message);
- txn.enqueue(queues,message, new ServerTransaction.Action()
+ if(queues == null || queues.isEmpty())
{
+ Exchange altExchange = _exchange.getAlternateExchange();
+ if(altExchange != null)
+ {
+ queues = altExchange.route(message);
+ }
+ }
- BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
+ if(queues != null && !queues.isEmpty())
+ {
+ final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
- public void postCommit()
+ txn.enqueue(queues,message, new ServerTransaction.Action()
{
- for(int i = 0; i < _queues.length; i++)
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
{
- try
- {
- _queues[i].enqueue(message);
- }
- catch (AMQException e)
+ for(int i = 0; i < baseQueues.length; i++)
{
- // TODO
- throw new RuntimeException(e);
+ try
+ {
+ baseQueues[i].enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
}
+ _reference.release();
}
- }
- public void onRollback()
- {
- // NO-OP
- }
- });
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ }
return ACCEPTED;
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 5925bf0c32..78ca9ff2a6 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -272,7 +272,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
@Override
public void remove()
{
- serverMessage.getStoredMessage().remove();
+ throw new UnsupportedOperationException();
}
};
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 29a8a0c723..8db6c86fc4 100755
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -504,16 +504,6 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
}
}
- public String getReplyToExchange()
- {
- return null; //TODO
- }
-
- public String getReplyToRoutingKey()
- {
- return null; //TODO
- }
-
public String getAppId()
{
//TODO
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index 9b11c0f48d..e367c83c8a 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -21,35 +21,23 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.StoredMessage;
-public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage
{
-
- private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
- AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
-
- private volatile int _referenceCount = 0;
-
- private final StoredMessage<MessageMetaData_1_0> _storedMessage;
private List<ByteBuffer> _fragments;
- private WeakReference<Session_1_0> _session;
private long _arrivalTime;
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
- _storedMessage = storedMessage;
- _session = null;
+ super(storedMessage, null);
_fragments = restoreFragments(storedMessage);
}
@@ -75,11 +63,10 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
final List<ByteBuffer> fragments,
- final Session_1_0 session)
+ final Object connectionReference)
{
- _storedMessage = storedMessage;
+ super(storedMessage, connectionReference);
_fragments = fragments;
- _session = new WeakReference<Session_1_0>(session);
_arrivalTime = System.currentTimeMillis();
}
@@ -98,7 +85,7 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM
private MessageMetaData_1_0 getMessageMetaData()
{
- return _storedMessage.getMetaData();
+ return getStoredMessage().getMetaData();
}
public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader()
@@ -106,16 +93,6 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM
return getMessageMetaData().getMessageHeader();
}
- public StoredMessage getStoredMessage()
- {
- return _storedMessage;
- }
-
- public boolean isPersistent()
- {
- return getMessageMetaData().isPersistent();
- }
-
public boolean isRedelivered()
{
// TODO
@@ -136,121 +113,19 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM
return size;
}
- public boolean isImmediate()
- {
- return false;
- }
-
public long getExpiration()
{
return getMessageHeader().getExpiration();
}
- public MessageReference<Message_1_0> newReference()
- {
- return new Reference(this);
- }
-
- public long getMessageNumber()
- {
- return _storedMessage.getMessageNumber();
- }
-
public long getArrivalTime()
{
return _arrivalTime;
}
- public int getContent(final ByteBuffer buf, final int offset)
- {
- return _storedMessage.getContent(offset, buf);
- }
-
- public ByteBuffer getContent(int offset, int size)
- {
- ByteBuffer buf = ByteBuffer.allocate(size);
- buf.limit(getContent(buf, offset));
-
- return buf;
- }
-
public List<ByteBuffer> getFragments()
{
return _fragments;
}
- public Session_1_0 getSession()
- {
- return _session == null ? null : _session.get();
- }
-
-
- public boolean incrementReference()
- {
- if(_refCountUpdater.incrementAndGet(this) <= 0)
- {
- _refCountUpdater.decrementAndGet(this);
- return false;
- }
- else
- {
- return true;
- }
- }
-
- /**
- * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
- * message store.
- */
-
- public void decrementReference()
- {
- int count = _refCountUpdater.decrementAndGet(this);
-
- // note that the operation of decrementing the reference count and then removing the message does not
- // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
- // the message has been passed to all queues. i.e. we are
- // not relying on the all the increments having taken place before the delivery manager decrements.
- if (count == 0)
- {
- // set the reference count way below 0 so that we can detect that the message has been deleted
- // this is to guard against the message being spontaneously recreated (from the mgmt console)
- // by copying from other queues at the same time as it is being removed.
- _refCountUpdater.set(this,Integer.MIN_VALUE/2);
-
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_storedMessage != null)
- {
- _storedMessage.remove();
- }
- }
- else
- {
- if (count < 0)
- {
- throw new RuntimeException("Reference count for message id " + getMessageNumber()
- + " has gone below 0.");
- }
- }
- }
-
- public static class Reference extends MessageReference<Message_1_0>
- {
- public Reference(Message_1_0 message)
- {
- super(message);
- }
-
- protected void onReference(Message_1_0 message)
- {
- message.incrementReference();
- }
-
- protected void onRelease(Message_1_0 message)
- {
- message.decrementReference();
- }
-
- }
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
index e971672767..927972c8b2 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -41,6 +41,7 @@ import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -160,8 +161,8 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv
storedMessage.flushToStore();
- Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession());
-
+ Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference());
+ MessageReference<Message_1_0> reference = message.newReference();
Binary transactionId = null;
org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
@@ -231,6 +232,8 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv
}
});
}
+
+ reference.release();
}
}
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 80d7595e01..ad05bd8a1b 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -541,8 +541,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
@Override
public boolean onSameConnection(InboundMessage inbound)
{
- // TODO
- return false;
+ return inbound.getConnectionReference() == getConnection().getReference();
}
@Override
diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index b9695ba87a..ce653766ff 100644
--- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -149,7 +149,7 @@ class Subscription_1_0 implements Subscription
{
if(entry.getMessage() instanceof Message_1_0)
{
- if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession())
+ if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference())
{
return false;
}
diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 92bccf871f..f1843de8ac 100644
--- a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -225,7 +225,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
{
return new MessageMetaData(convertPublishBody(message),
convertContentHeaderBody(message, vhost),
- 1,
message.getArrivalTime());
}
diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index e1e8fbd9d3..544099f1f2 100644
--- a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -121,7 +121,7 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
body.flip();
BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+ message_0_8.getContentHeaderBody().getProperties();
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
if(exchange != null)
diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
index 0d9d59ff56..bbea177260 100644
--- a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
+++ b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
@@ -56,7 +56,7 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess
header.setDurable(serverMessage.isPersistent());
BasicContentHeaderProperties contentHeader =
- (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+ serverMessage.getContentHeaderBody().getProperties();
header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
final long expiration = serverMessage.getExpiration();
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index fe9f9f4d00..dbfbb743ec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -30,7 +30,6 @@ import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
@@ -73,7 +72,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
private static final boolean STRICT_AMQP_COMPLIANCE =
Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
- private ContentHeaderProperties _contentHeaderProperties;
+ private BasicContentHeaderProperties _contentHeaderProperties;
// The base set of items that needs to be set.
private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag)
@@ -81,7 +80,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
super(deliveryTag);
_contentHeaderProperties = properties;
_readableProperties = (_contentHeaderProperties != null);
- _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()
+ _headerAdapter = new JMSHeaderAdapter(_readableProperties ? _contentHeaderProperties.getHeaders()
: (new BasicContentHeaderProperties()).getHeaders() );
}
@@ -90,7 +89,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
this(new BasicContentHeaderProperties(), -1);
_readableProperties = false;
- _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+ _headerAdapter = new JMSHeaderAdapter(_contentHeaderProperties.getHeaders());
}
@@ -337,7 +336,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
public BasicContentHeaderProperties getContentHeaderProperties()
{
- return (BasicContentHeaderProperties) _contentHeaderProperties;
+ return _contentHeaderProperties;
}
@@ -443,7 +442,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
//NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
{
- return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+ return _contentHeaderProperties.getUserIdAsString();
}
else
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index 608567674a..e52ff9acb2 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -101,7 +101,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
- (BasicContentHeaderProperties) contentHeader.getProperties(),
+ contentHeader.getProperties(),
exchange, routingKey, queueDestinationCache, topicDestinationCache);
return createMessage(delegate, data);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 4154003b23..7e1ce20238 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -110,7 +110,7 @@ public class MessageFactoryRegistry
AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache)
throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
+ BasicContentHeaderProperties properties = contentHeader.getProperties();
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 366c8231a1..60dac24cfc 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -27,7 +27,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-public class BasicContentHeaderProperties implements CommonContentHeaderProperties
+public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
public static final int NON_PERSISTENT = 1;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
deleted file mode 100644
index 7162c37062..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.framing;
-
-public interface CommonContentHeaderProperties extends ContentHeaderProperties
-{
- AMQShortString getContentType();
-
- void setContentType(AMQShortString contentType);
-
- FieldTable getHeaders();
-
- void setHeaders(FieldTable headers);
-
- byte getDeliveryMode();
-
- void setDeliveryMode(byte deliveryMode);
-
- byte getPriority();
-
- void setPriority(byte priority);
-
- AMQShortString getCorrelationId();
-
- void setCorrelationId(AMQShortString correlationId);
-
- AMQShortString getReplyTo();
-
- void setReplyTo(AMQShortString replyTo);
-
- long getExpiration();
-
- void setExpiration(long expiration);
-
- AMQShortString getMessageId();
-
- void setMessageId(AMQShortString messageId);
-
- long getTimestamp();
-
- void setTimestamp(long timestamp);
-
- AMQShortString getType();
-
- void setType(AMQShortString type);
-
- AMQShortString getUserId();
-
- void setUserId(AMQShortString userId);
-
- AMQShortString getAppId();
-
- void setAppId(AMQShortString appId);
-
- AMQShortString getClusterId();
-
- void setClusterId(AMQShortString clusterId);
-
- AMQShortString getEncoding();
-
- void setEncoding(AMQShortString encoding);
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index f6fa89a91c..131d796af4 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -39,7 +39,7 @@ public class ContentHeaderBody implements AMQBody
private long bodySize;
/** must never be null */
- private ContentHeaderProperties properties;
+ private BasicContentHeaderProperties properties;
public ContentHeaderBody()
{
@@ -57,13 +57,13 @@ public class ContentHeaderBody implements AMQBody
}
- public ContentHeaderBody(ContentHeaderProperties props, int classId)
+ public ContentHeaderBody(BasicContentHeaderProperties props, int classId)
{
properties = props;
this.classId = classId;
}
- public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize)
+ public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize)
{
this(props, classId);
this.weight = weight;
@@ -121,12 +121,12 @@ public class ContentHeaderBody implements AMQBody
return new AMQFrame(channelId, body);
}
- public ContentHeaderProperties getProperties()
+ public BasicContentHeaderProperties getProperties()
{
return properties;
}
- public void setProperties(ContentHeaderProperties props)
+ public void setProperties(BasicContentHeaderProperties props)
{
properties = props;
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
deleted file mode 100644
index 2e1b988aa3..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-
-/**
- * There will be an implementation of this interface for each content type. All content types have associated
- * header properties and this provides a way to encode and decode them.
- */
-public interface ContentHeaderProperties
-{
- /**
- * Writes the property list to the buffer, in a suitably encoded form.
- * @param buffer The buffer to write to
- */
- void writePropertyListPayload(DataOutput buffer) throws IOException;
-
- /**
- * Populates the properties from buffer.
- * @param buffer The buffer to read from.
- * @param propertyFlags he property flags.
- * @throws AMQFrameDecodingException when the buffer does not contain valid data
- */
- void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size)
- throws AMQFrameDecodingException, IOException;
-
- /**
- * @return the size of the encoded property list in bytes.
- */
- int getPropertyListSize();
-
- /**
- * Gets the property flags. Property flags indicate which properties are set in the list. The
- * position and meaning of each flag is defined in the protocol specification for the particular
- * content type with which these properties are associated.
- * @return flags
- */
- int getPropertyFlags();
-
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
index ff97c0b28f..55961db06b 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -38,11 +38,11 @@ public class ContentHeaderPropertiesFactory
{
}
- public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
+ public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
DataInput buffer, int size)
throws AMQFrameDecodingException, IOException
{
- ContentHeaderProperties properties;
+ BasicContentHeaderProperties properties;
// AMQP version change: "Hardwired" version to major=8, minor=0
// TODO: Change so that the actual version is obtained from
// the ProtocolInitiation object for this session.
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
deleted file mode 100644
index 470b7b05e3..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.framing.abstraction;
-
-public interface ContentChunk
-{
- int getSize();
- byte[] getData();
-
- void reduceToFit();
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
deleted file mode 100644
index 01d1a8a17b..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.framing.abstraction;
-
-import org.apache.qpid.framing.AMQMethodBody;
-
-
-public interface MessagePublishInfoConverter
-{
- public MessagePublishInfo convertToInfo(AMQMethodBody body);
- public AMQMethodBody convertToBody(MessagePublishInfo info);
-
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
index d1e53d6907..b3897771c5 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
@@ -21,14 +21,10 @@
package org.apache.qpid.framing.abstraction;
-import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQMethodBody;
-public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+public interface ProtocolVersionMethodConverter
{
- AMQBody convertToBody(ContentChunk contentBody);
- ContentChunk convertToContentChunk(AMQBody body);
-
- void configure();
-
- AMQBody convertToBody(byte[] input);
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
index b3eb1211a5..6456eacab1 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
@@ -21,13 +21,10 @@
package org.apache.qpid.framing.amqp_0_9;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -35,48 +32,12 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
- private int _basicPublishClassId;
- private int _basicPublishMethodId;
public MethodConverter_0_9()
{
super((byte)0,(byte)9);
-
-
- }
-
- public AMQBody convertToBody(ContentChunk contentChunk)
- {
- if(contentChunk instanceof ContentChunk_0_9)
- {
- return ((ContentChunk_0_9)contentChunk).toBody();
- }
- else
- {
- return new ContentBody(contentChunk.getData());
- }
- }
-
- public ContentChunk convertToContentChunk(AMQBody body)
- {
- final ContentBody contentBodyChunk = (ContentBody) body;
-
- return new ContentChunk_0_9(contentBodyChunk);
-
- }
-
- public void configure()
- {
-
- _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
- _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
}
- public AMQBody convertToBody(byte[] data)
- {
- return new ContentBody(data);
- }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
@@ -103,33 +64,4 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot
}
- private static class ContentChunk_0_9 implements ContentChunk
- {
- private final ContentBody _contentBodyChunk;
-
- public ContentChunk_0_9(final ContentBody contentBodyChunk)
- {
- _contentBodyChunk = contentBodyChunk;
- }
-
- public int getSize()
- {
- return _contentBodyChunk.getSize();
- }
-
- public byte[] getData()
- {
- return _contentBodyChunk.getPayload();
- }
-
- public void reduceToFit()
- {
- _contentBodyChunk.reduceBufferToFit();
- }
-
- public AMQBody toBody()
- {
- return _contentBodyChunk;
- }
- }
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
index d33749d795..e25dc8a022 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
@@ -21,61 +21,22 @@
package org.apache.qpid.framing.amqp_0_91;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
- private int _basicPublishClassId;
- private int _basicPublishMethodId;
public MethodConverter_0_91()
{
super((byte)0,(byte)9);
-
-
- }
-
- public AMQBody convertToBody(ContentChunk contentChunk)
- {
- if(contentChunk instanceof ContentChunk_0_9)
- {
- return ((ContentChunk_0_9)contentChunk).toBody();
- }
- else
- {
- return new ContentBody(contentChunk.getData());
- }
- }
-
- public ContentChunk convertToContentChunk(AMQBody body)
- {
- final ContentBody contentBodyChunk = (ContentBody) body;
-
- return new ContentChunk_0_9(contentBodyChunk);
-
- }
-
- public void configure()
- {
-
- _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
- _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
}
- public AMQBody convertToBody(byte[] data)
- {
- return new ContentBody(data);
- }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
@@ -102,33 +63,4 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro
}
- private static class ContentChunk_0_9 implements ContentChunk
- {
- private final ContentBody _contentBodyChunk;
-
- public ContentChunk_0_9(final ContentBody contentBodyChunk)
- {
- _contentBodyChunk = contentBodyChunk;
- }
-
- public int getSize()
- {
- return _contentBodyChunk.getSize();
- }
-
- public byte[] getData()
- {
- return _contentBodyChunk.getPayload();
- }
-
- public void reduceToFit()
- {
- _contentBodyChunk.reduceBufferToFit();
- }
-
- public AMQBody toBody()
- {
- return _contentBodyChunk;
- }
- }
} \ No newline at end of file
diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
index 575816db4f..5e50c2b3fb 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
@@ -21,71 +21,21 @@
package org.apache.qpid.framing.amqp_8_0;
-import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
- private int _basicPublishClassId;
- private int _basicPublishMethodId;
-
public MethodConverter_8_0()
{
super((byte)8,(byte)0);
-
-
- }
-
- public AMQBody convertToBody(ContentChunk contentChunk)
- {
- return new ContentBody(contentChunk.getData());
}
- public ContentChunk convertToContentChunk(AMQBody body)
- {
- final ContentBody contentBodyChunk = (ContentBody) body;
-
- return new ContentChunk()
- {
-
- public int getSize()
- {
- return contentBodyChunk.getSize();
- }
-
- public byte[] getData()
- {
- return contentBodyChunk.getPayload();
- }
-
- public void reduceToFit()
- {
- contentBodyChunk.reduceBufferToFit();
- }
- };
-
- }
-
- public void configure()
- {
-
- _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
- _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
-
- }
-
- public AMQBody convertToBody(byte[] data)
- {
- return new ContentBody(data);
- }
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 07f0d0c369..19dc1a5a02 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -50,8 +50,6 @@ import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.ConflationQueue;
-import org.apache.qpid.server.protocol.v0_8.IncomingMessage;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -617,61 +615,41 @@ public class MessageStoreTest extends QpidTestCase
MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey);
- final IncomingMessage currentMessage;
-
-
- currentMessage = new IncomingMessage(messageInfo);
-
- currentMessage.setExchange(exchange);
-
ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l);
- try
- {
- currentMessage.setContentHeaderBody(headerBody);
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
+ MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
- currentMessage.setExpiration();
+ final StoredMessage<MessageMetaData> storedMessage = getVirtualHost().getMessageStore().addMessage(mmd);
+ storedMessage.flushToStore();
+ final AMQMessage currentMessage = new AMQMessage(storedMessage);
- MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
- currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd));
- currentMessage.getStoredMessage().flushToStore();
- currentMessage.route();
+ final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage);
- // check and deliver if header says body length is zero
- if (currentMessage.allContentReceived())
- {
- ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
- final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues();
- trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
- AMQMessage message = new AMQMessage(currentMessage.getStoredMessage());
+ ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
- for(BaseQueue queue : destinationQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
+ trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ for(BaseQueue queue : destinationQueues)
{
- _logger.error("Problem enqueing message", e);
+ queue.enqueue(currentMessage);
}
}
-
- public void onRollback()
+ catch (AMQException e)
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _logger.error("Problem enqueing message", e);
}
- });
- }
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
+
}
private void createAllQueues()