diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 17:16:44 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 17:16:44 +0000 |
commit | d779fd968deef2dd5395a93f39f70ebf677ea1b3 (patch) | |
tree | 99d69cf9e9499797d649873c5a7f985965be4459 | |
parent | 455c63b69c150017c5fbbda0b9a56b29bc89326d (diff) | |
download | qpid-python-d779fd968deef2dd5395a93f39f70ebf677ea1b3.tar.gz |
QPID-5504 : initial refactoring to move common code into shared classes, make transports work similarly with respect to message routing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1560424 13f79535-47bb-0310-9956-ffa450edef68
52 files changed, 353 insertions, 1494 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java index 4c9eae6f3e..9d52241c4c 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java @@ -663,9 +663,8 @@ public class UpgradeFrom4To5 extends AbstractStoreUpgrade { final MessagePublishInfo publishBody = readMessagePublishInfo(input); final ContentHeaderBody contentHeaderBody = readContentHeaderBody(input); - final int contentChunkCount = input.readInt(); - return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount); + return new MessageMetaData(publishBody, contentHeaderBody); } catch (Exception e) { diff --git a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 76b990038d..3d6821ad47 100644 --- a/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -94,7 +94,7 @@ public class BDBMessageStoreTest extends MessageStoreTest ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); @@ -150,7 +150,7 @@ public class BDBMessageStoreTest extends MessageStoreTest assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); - BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties(); + BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties(); assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); @@ -392,7 +392,7 @@ public class BDBMessageStoreTest extends MessageStoreTest ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8); storedMessage_0_8.addContent(0, chunk1); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java index 63bd1e45a0..d19f127b16 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java @@ -47,10 +47,6 @@ public interface AMQMessageHeader String getReplyTo(); - String getReplyToExchange(); - String getReplyToRoutingKey(); - - Object getHeader(String name); boolean containsHeaders(Set<String> names); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index 8311dbd5ff..e5aa6c1158 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -23,9 +23,11 @@ package org.apache.qpid.server.message; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T> +public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T> { private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = @@ -33,10 +35,13 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat private volatile int _referenceCount = 0; private final StoredMessage<T> _handle; + private final Object _connectionReference; - public AbstractServerMessageImpl(StoredMessage<T> handle) + + public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference) { _handle = handle; + _connectionReference = connectionReference; } public StoredMessage<T> getStoredMessage() @@ -44,16 +49,11 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat return _handle; } - public boolean incrementReference() + private boolean incrementReference() { - return incrementReference(1); - } - - public boolean incrementReference(int count) - { - if(_refCountUpdater.addAndGet(this, count) <= 0) + if(_refCountUpdater.incrementAndGet(this) <= 0) { - _refCountUpdater.addAndGet(this, -count); + _refCountUpdater.decrementAndGet(this); return false; } else @@ -67,7 +67,7 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat * message store. * */ - public void decrementReference() + private void decrementReference() { int count = _refCountUpdater.decrementAndGet(this); @@ -104,8 +104,72 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")"; } - protected int getReferenceCount() + private int getReferenceCount() { return _referenceCount; } + + @Override + final public MessageReference<X> newReference() + { + return new Reference(); + } + + @Override + final public boolean isPersistent() + { + return _handle.getMetaData().isPersistent(); + } + + @Override + final public long getMessageNumber() + { + return getStoredMessage().getMessageNumber(); + } + + @Override + final public int getContent(ByteBuffer buf, int offset) + { + return getStoredMessage().getContent(offset, buf); + } + + @Override + final public ByteBuffer getContent(int offset, int size) + { + return getStoredMessage().getContent(offset, size); + } + + final public Object getConnectionReference() + { + return _connectionReference; + }public String toString() + { + return "Message[" + debugIdentity() + "]"; + } + + private final class Reference implements MessageReference<X> + { + + private final AtomicBoolean _released = new AtomicBoolean(false); + + private Reference() + { + incrementReference(); + } + + public X getMessage() + { + return (X) AbstractServerMessageImpl.this; + } + + public void release() + { + if(!_released.getAndSet(true)) + { + decrementReference(); + } + } + + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java index 1b3fdb1870..03f1d6649c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java @@ -34,4 +34,6 @@ public interface InboundMessage extends Filterable boolean isRedelivered(); long getSize(); + + Object getConnectionReference(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java index 399f8f9327..eda85507f0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java @@ -20,39 +20,8 @@ */ package org.apache.qpid.server.message; -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class MessageReference<M extends ServerMessage> +public interface MessageReference<M extends ServerMessage> { - - private final AtomicBoolean _released = new AtomicBoolean(false); - - private volatile M _message; - - public MessageReference(M message) - { - _message = message; - onReference(message); - } - - abstract protected void onReference(M message); - - abstract protected void onRelease(M message); - - public M getMessage() - { - return _message; - } - - public void release() - { - if(!_released.getAndSet(true)) - { - if(_message != null) - { - onRelease(_message); - } - } - } - + public M getMessage(); + public void release(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index e1ad2fd0ca..a975ba1bc8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -37,8 +37,6 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enquea long getSize(); - boolean isImmediate(); - long getExpiration(); MessageReference newReference(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java index df26037eed..c8e2c3a6fb 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java @@ -67,4 +67,10 @@ public class InboundMessageAdapter implements InboundMessage { return _entry.getSize(); } + + @Override + public Object getConnectionReference() + { + return (_entry.getMessage() instanceof InboundMessage) ? ((InboundMessage) _entry.getMessage()).getConnectionReference() : null; + } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 833df34fd8..8919599cba 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -96,16 +96,6 @@ public class HeadersBindingTest extends TestCase return null; } - public String getReplyToExchange() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public String getReplyToRoutingKey() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - public Object getHeader(String name) { return _headers.get(name); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index 25f1df7a20..24a507173c 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -130,11 +130,6 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM return _storedMsg; } - @Override - public boolean isImmediate() - { - return false; - } @Override public boolean isPersistent() diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index c6ae0c6e47..5244a7f51b 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -113,7 +113,6 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M deliveryProps.setExpiration(serverMsg.getExpiration()); - deliveryProps.setImmediate(serverMsg.isImmediate()); deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index 2ca680869f..a471e53fc6 100755 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -37,7 +37,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage +public class MessageMetaData_0_10 implements StorableMessageMetaData { private Header _header; private DeliveryProperties _deliveryProps; @@ -53,8 +53,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10(); private volatile ByteBuffer _encoded; - private Object _connectionReference; - public MessageMetaData_0_10(MessageTransfer xfr) { @@ -202,12 +200,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes return _deliveryProps == null ? 0L : _deliveryProps.getExpiration(); } - public boolean isRedelivered() - { - // The *Message* is never redelivered, only queue entries are... - return false; - } - public long getArrivalTime() { return _arrivalTime; @@ -218,16 +210,6 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes return _header; } - public void setConnectionReference(Object connectionReference) - { - _connectionReference = connectionReference; - } - - public Object getConnectionReference() - { - return _connectionReference; - } - private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10> { public MessageMetaData_0_10 createMetaData(ByteBuffer buf) diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index 34cf6998ab..e01fb474ac 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -30,15 +30,12 @@ import org.apache.qpid.transport.Header; import java.nio.ByteBuffer; -public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage +public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage { - private Object _connectionRef; - public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef) { - super(storeMessage); - _connectionRef = connectionRef; + super(storeMessage, connectionRef); } private MessageMetaData_0_10 getMetaData() @@ -56,12 +53,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet return getMetaData().getMessageHeader(); } - public boolean isPersistent() - { - return getMetaData().isPersistent(); - } - - public boolean isRedelivered() { // The *Message* is never redelivered, only queue entries are... this is here so that filters @@ -71,7 +62,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet public long getSize() { - return getMetaData().getSize(); } @@ -85,32 +75,11 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet return getMetaData().getExpiration(); } - public MessageReference newReference() - { - return new TransferMessageReference(this); - } - - public long getMessageNumber() - { - return getStoredMessage().getMessageNumber(); - } - public long getArrivalTime() { return getMetaData().getArrivalTime(); } - public int getContent(ByteBuffer buf, int offset) - { - return getStoredMessage().getContent(offset, buf); - } - - - public ByteBuffer getContent(int offset, int size) - { - return getStoredMessage().getContent(offset,size); - } - public Header getHeader() { return getMetaData().getHeader(); @@ -118,13 +87,6 @@ public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMet public ByteBuffer getBody() { - return getContent(0, (int)getSize()); } - - public Object getConnectionReference() - { - return _connectionRef; - } - } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index abe784cefa..f98eb09b43 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -184,7 +184,7 @@ public class ServerSession extends Session return isCommandsFull(id); } - public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) + public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) @@ -768,10 +768,7 @@ public class ServerSession extends Session public boolean onSameConnection(InboundMessage inbound) { - return ((inbound instanceof MessageTransferMessage) - && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference()) - || ((inbound instanceof MessageMetaData_0_10) - && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference()); + return inbound.getConnectionReference() == getConnection().getReference(); } @@ -852,31 +849,25 @@ public class ServerSession extends Session private class PostEnqueueAction implements ServerTransaction.Action { - private List<? extends BaseQueue> _queues; - private ServerMessage _message; + private final MessageReference<MessageTransferMessage> _reference; + private final List<? extends BaseQueue> _queues; private final boolean _transactional; - public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional) + public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional) { + _reference = message.newReference(); _transactional = transactional; - setState(queues, message); - } - - public void setState(List<? extends BaseQueue> queues, ServerMessage message) - { - _message = message; _queues = queues; } public void postCommit() { - MessageReference<?> ref = _message.newReference(); for(int i = 0; i < _queues.size(); i++) { try { BaseQueue queue = _queues.get(i); - queue.enqueue(_message, _transactional, null); + queue.enqueue(_reference.getMessage(), _transactional, null); if(queue instanceof AMQQueue) { ((AMQQueue)queue).checkCapacity(ServerSession.this); @@ -889,12 +880,13 @@ public class ServerSession extends Session throw new RuntimeException(e); } } - ref.release(); + _reference.release(); } public void onRollback() { // NO-OP + _reference.release(); } } diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index a0b60ae640..182e71c957 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; +import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; @@ -297,7 +298,6 @@ public class ServerSessionDelegate extends SessionDelegate } final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); - messageMetaData.setConnectionReference(((ServerSession)ssn).getReference()); if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) { @@ -309,11 +309,16 @@ public class ServerSessionDelegate extends SessionDelegate } final Exchange exchangeInUse; - List<? extends BaseQueue> queues = exchange.route(messageMetaData); + final MessageStore store = getVirtualHost(ssn).getMessageStore(); + final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); + final ServerSession serverSession = (ServerSession) ssn; + MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); + MessageReference<MessageTransferMessage> reference = message.newReference(); + List<? extends BaseQueue> queues = exchange.route(message); if(queues.isEmpty() && exchange.getAlternateExchange() != null) { final Exchange alternateExchange = exchange.getAlternateExchange(); - queues = alternateExchange.route(messageMetaData); + queues = alternateExchange.route(message); if (!queues.isEmpty()) { exchangeInUse = alternateExchange; @@ -328,12 +333,8 @@ public class ServerSessionDelegate extends SessionDelegate exchangeInUse = exchange; } - final ServerSession serverSession = (ServerSession) ssn; if(!queues.isEmpty()) { - final MessageStore store = getVirtualHost(ssn).getMessageStore(); - final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); - MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); serverSession.enqueue(message, queues); storeMessage.flushToStore(); } @@ -352,7 +353,6 @@ public class ServerSessionDelegate extends SessionDelegate } } - if(serverSession.isTransactional()) { serverSession.processed(xfr); @@ -361,6 +361,7 @@ public class ServerSessionDelegate extends SessionDelegate { serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr)); } + reference.release(); } private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr, diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java deleted file mode 100644 index 0c04f22232..0000000000 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/TransferMessageReference.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_10; - -import org.apache.qpid.server.message.MessageReference; - -public class TransferMessageReference extends MessageReference<MessageTransferMessage> -{ - public TransferMessageReference(MessageTransferMessage message) - { - super(message); - } - - protected void onReference(MessageTransferMessage message) - { - message.incrementReference(); - } - - protected void onRelease(MessageTransferMessage message) - { - message.decrementReference(); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index a603807f87..c6e0dfc3e2 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -49,7 +49,6 @@ import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; @@ -271,7 +270,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { throw new AMQSecurityException("Permission denied: " + e.getName()); } - _currentMessage = new IncomingMessage(info, getProtocolSession().getReference()); + _currentMessage = new IncomingMessage(info); _currentMessage.setExchange(e); } @@ -291,12 +290,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _currentMessage.setContentHeaderBody(contentHeaderBody); - _currentMessage.setExpiration(); - - _currentMessage.headersReceived(getProtocolSession().getLastReceivedTime()); - - _currentMessage.route(); - deliverCurrentMessageIfComplete(); } } @@ -309,56 +302,62 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { try { - final List<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues(); - if(!checkMessageUserId(_currentMessage.getContentHeader())) - { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", _currentMessage)); - } - else + final MessageMetaData messageMetaData = + new MessageMetaData(_currentMessage.getMessagePublishInfo(), + _currentMessage.getContentHeader(), + getProtocolSession().getLastReceivedTime()); + + final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData); + final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle); + MessageReference reference = amqMessage.newReference(); + try { - if(destinationQueues == null || destinationQueues.isEmpty()) + int bodyCount = _currentMessage.getBodyCount(); + if(bodyCount > 0) { - handleUnroutableMessage(); + long bodyLengthReceived = 0; + for(int i = 0 ; i < bodyCount ; i++) + { + ContentBody contentChunk = _currentMessage.getContentChunk(i); + handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getPayload())); + bodyLengthReceived += contentChunk.getSize(); + } + } + + if(!checkMessageUserId(_currentMessage.getContentHeader())) + { + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", amqMessage)); } else { - final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(_currentMessage.getMessageMetaData()); - _currentMessage.setStoredMessage(handle); - int bodyCount = _currentMessage.getBodyCount(); - if(bodyCount > 0) + final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage); + + if(destinationQueues == null || destinationQueues.isEmpty()) { - long bodyLengthReceived = 0; - for(int i = 0 ; i < bodyCount ; i++) - { - ContentChunk contentChunk = _currentMessage.getContentChunk(i); - handle.addContent((int)bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData())); - bodyLengthReceived += contentChunk.getSize(); - } + handleUnroutableMessage(amqMessage); } - - _transaction.addPostTransactionAction(new ServerTransaction.Action() + else { - public void postCommit() - { - } - - public void onRollback() - { - handle.remove(); - } - }); - - _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues)); - incrementOutstandingTxnsIfNecessary(); - _currentMessage.getStoredMessage().flushToStore(); + _transaction.enqueue(destinationQueues, + amqMessage, + new MessageDeliveryAction(amqMessage, destinationQueues)); + incrementOutstandingTxnsIfNecessary(); + handle.flushToStore(); + + } } } + finally + { + reference.release(); + } + } finally { long bodySize = _currentMessage.getSize(); - long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp(); + long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp(); _session.registerMessageReceived(bodySize, timestamp); _currentMessage = null; } @@ -374,9 +373,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * @throws AMQConnectionException if the message is mandatoryclose-on-no-route * @see AMQProtocolSession#isCloseWhenNoRoute() */ - private void handleUnroutableMessage() throws AMQConnectionException + private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException { - boolean mandatory = _currentMessage.isMandatory(); + boolean mandatory = message.isMandatory(); String description = currentMessageDescription(); boolean closeOnNoRoute = _session.isCloseWhenNoRoute(); @@ -398,13 +397,18 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F (Throwable) null); } - if (mandatory || _currentMessage.isImmediate()) + if (mandatory || message.isImmediate()) { - _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), _currentMessage)); + _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message)); } else { - _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey())); + _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchangeName().asString(), + _currentMessage.getMessagePublishInfo().getRoutingKey() == null + ? null + : _currentMessage.getMessagePublishInfo() + .getRoutingKey() + .toString())); } } @@ -417,15 +421,17 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return String.format( "[Exchange: %s, Routing key: %s]", - _currentMessage.getExchange(), - _currentMessage.getRoutingKey()); + _currentMessage.getExchangeName(), + _currentMessage.getMessagePublishInfo().getRoutingKey() == null + ? null + : _currentMessage.getMessagePublishInfo().getRoutingKey().toString()); } public void publishContentBody(ContentBody contentBody) throws AMQException { if (_currentMessage == null) { - throw new AMQException("Received content body without previously receiving a JmsPublishBody"); + throw new AMQException("Received content body without previously receiving a Content Header"); } if (_logger.isDebugEnabled()) @@ -435,10 +441,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F try { - final ContentChunk contentChunk = - _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody); - - _currentMessage.addContentBodyFrame(contentChunk); + _currentMessage.addContentBodyFrame(contentBody); deliverCurrentMessageIfComplete(); } @@ -1157,24 +1160,23 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - private AMQMessage createAMQMessage(IncomingMessage incomingMessage) + private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle) throws AMQException { - AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage()); + AMQMessage message = new AMQMessage(handle, _session.getReference()); - message.setExpiration(incomingMessage.getExpiration()); - message.setConnectionIdentifier(_session.getReference()); + final BasicContentHeaderProperties properties = + incomingMessage.getContentHeader().getProperties(); + + long expiration = properties.getExpiration(); + message.setExpiration(expiration); return message; } private boolean checkMessageUserId(ContentHeaderBody header) { - AMQShortString userID = - header.getProperties() instanceof BasicContentHeaderProperties - ? ((BasicContentHeaderProperties) header.getProperties()).getUserId() - : null; - + AMQShortString userID = header.getProperties().getUserId(); return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString())); } @@ -1208,13 +1210,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private class MessageDeliveryAction implements ServerTransaction.Action { - private IncomingMessage _incommingMessage; + private final MessageReference<AMQMessage> _reference; private List<? extends BaseQueue> _destinationQueues; - public MessageDeliveryAction(IncomingMessage currentMessage, + public MessageDeliveryAction(AMQMessage currentMessage, List<? extends BaseQueue> destinationQueues) { - _incommingMessage = currentMessage; + _reference = currentMessage.newReference(); _destinationQueues = destinationQueues; } @@ -1222,10 +1224,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { try { - final boolean immediate = _incommingMessage.isImmediate(); - - final AMQMessage amqMessage = createAMQMessage(_incommingMessage); - MessageReference ref = amqMessage.newReference(); + AMQMessage message = _reference.getMessage(); + final boolean immediate = message.isImmediate(); for(int i = 0; i < _destinationQueues.size(); i++) { @@ -1242,7 +1242,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F action = null; } - queue.enqueue(amqMessage, isTransactional(), action); + queue.enqueue(message, isTransactional(), action); if(queue instanceof AMQQueue) { @@ -1251,8 +1251,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - _incommingMessage.getStoredMessage().flushToStore(); - ref.release(); + message.getStoredMessage().flushToStore(); + _reference.release(); } catch (AMQException e) { @@ -1265,6 +1265,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { // Maybe keep track of entries that were created and then delete them here in case of failure // to in memory enqueue + _reference.release(); } private class ImmediateAction implements BaseQueue.PostEnqueueAction @@ -1375,28 +1376,30 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private class WriteReturnAction implements ServerTransaction.Action { private final AMQConstant _errorCode; - private final IncomingMessage _message; private final String _description; + private final MessageReference<AMQMessage> _reference; public WriteReturnAction(AMQConstant errorCode, String description, - IncomingMessage message) + AMQMessage message) { _errorCode = errorCode; - _message = message; _description = description; + _reference = message.newReference(); } public void postCommit() { try { - _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(), - _message.getContentHeader(), - _message, + AMQMessage message = _reference.getMessage(); + _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, _channelId, _errorCode.getCode(), AMQShortString.validValueOf(_description)); + _reference.release(); } catch (AMQException e) { @@ -1408,6 +1411,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void onRollback() { + _reference.release(); } } @@ -1470,12 +1474,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public boolean onSameConnection(InboundMessage inbound) { - if(inbound instanceof IncomingMessage) - { - IncomingMessage incoming = (IncomingMessage) inbound; - return getProtocolSession().getReference() == incoming.getConnectionReference(); - } - return false; + return getProtocolSession().getReference() == inbound.getConnectionReference(); } public int getUnacknowledgedMessageCount() diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index 416a4da183..b73b6bc0aa 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -28,66 +28,41 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; -import java.lang.ref.WeakReference; import java.nio.ByteBuffer; /** * A deliverable message. */ -public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> +public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); /** Flag to indicate that this message requires 'immediate' delivery. */ - private static final byte IMMEDIATE = 0x01; - - /** - * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality - * for messages published with the 'immediate' flag. - */ - - private static final byte DELIVERED_TO_CONSUMER = 0x02; - - private byte _flags = 0; - private long _expiration; private final long _size; - private Object _connectionIdentifier; - private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); - public AMQMessage(StoredMessage<MessageMetaData> handle) { this(handle, null); } - public AMQMessage(StoredMessage<MessageMetaData> handle, WeakReference<AMQChannel> channelRef) + public AMQMessage(StoredMessage<MessageMetaData> handle, Object connectionReference) { - super(handle); - - - final MessageMetaData metaData = handle.getMetaData(); - _size = metaData.getContentSize(); - final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo(); - - if(messagePublishInfo.isImmediate()) - { - _flags |= IMMEDIATE; - } + super(handle, connectionReference); + _size = handle.getMetaData().getContentSize(); } public void setExpiration(final long expiration) { - _expiration = expiration; - } public MessageMetaData getMessageMetaData() @@ -100,21 +75,6 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> return getMessageMetaData().getContentHeaderBody(); } - public Long getMessageId() - { - return getStoredMessage().getMessageNumber(); - } - - /** - * Called selectors to determin if the message has already been sent - * - * @return _deliveredToConsumer - */ - public boolean getDeliveredToConsumer() - { - return (_flags & DELIVERED_TO_CONSUMER) != 0; - } - public String getRoutingKey() { MessageMetaData messageMetaData = getMessageMetaData(); @@ -134,22 +94,10 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> return getMessageMetaData().getMessageHeader(); } - public boolean isPersistent() - { - return getMessageMetaData().isPersistent(); - } - - /** - * Called to enforce the 'immediate' flag. - * - * @returns true if the message is marked for immediate delivery but has not been marked as delivered - * to a consumer - */ - public boolean immediateAndNotDelivered() + @Override + public boolean isRedelivered() { - - return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE; - + return false; } public MessagePublishInfo getMessagePublishInfo() @@ -162,90 +110,27 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> return getMessageMetaData().getArrivalTime(); } - /** - * Checks to see if the message has expired. If it has the message is dequeued. - * - * @param queue The queue to check the expiration against. (Currently not used) - * - * @return true if the message has expire - * - * @throws AMQException - */ - public boolean expired(AMQQueue queue) throws AMQException - { - - if (_expiration != 0L) - { - long now = System.currentTimeMillis(); - - return (now > _expiration); - } - - return false; - } - - /** - * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). - * And for selector efficiency. - */ - public void setDeliveredToConsumer() - { - _flags |= DELIVERED_TO_CONSUMER; - } - public long getSize() { return _size; - } public boolean isImmediate() { - return (_flags & IMMEDIATE) == IMMEDIATE; - } - - public long getExpiration() - { - return _expiration; - } - - public MessageReference newReference() - { - return new AMQMessageReference(this); + return getMessagePublishInfo().isImmediate(); } - public long getMessageNumber() - { - return getStoredMessage().getMessageNumber(); - } - - - public Object getConnectionIdentifier() - { - return _connectionIdentifier; - - } - public void setConnectionIdentifier(final Object connectionIdentifier) + public boolean isMandatory() { - _connectionIdentifier = connectionIdentifier; + return getMessagePublishInfo().isMandatory(); } - public String toString() - { - return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref count: " + getReferenceCount(); - } - - public int getContent(ByteBuffer buf, int offset) + public long getExpiration() { - return getStoredMessage().getContent(offset, buf); + return _expiration; } - public ByteBuffer getContent(int offset, int size) - { - return getStoredMessage().getContent(offset, size); - } - } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java deleted file mode 100644 index 3adc9f70cd..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessageReference.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import org.apache.qpid.server.message.MessageReference; - -public class AMQMessageReference extends MessageReference<AMQMessage> -{ - - - public AMQMessageReference(AMQMessage message) - { - super(message); - } - - protected void onReference(AMQMessage message) - { - message.incrementReference(); - } - - protected void onRelease(AMQMessage message) - { - message.decrementReference(); - } -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java deleted file mode 100644 index f5c43003a4..0000000000 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ContentHeaderBodyAdapter.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import java.util.Collection; -import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; - -import java.util.Set; -import org.apache.qpid.server.message.AMQMessageHeader; - -public class ContentHeaderBodyAdapter implements AMQMessageHeader -{ - private final ContentHeaderBody _contentHeaderBody; - - public ContentHeaderBodyAdapter(ContentHeaderBody contentHeaderBody) - { - _contentHeaderBody = contentHeaderBody; - } - - private BasicContentHeaderProperties getProperties() - { - return (BasicContentHeaderProperties) _contentHeaderBody.getProperties(); - } - - public String getCorrelationId() - { - return getProperties().getCorrelationIdAsString(); - } - - public long getExpiration() - { - return getProperties().getExpiration(); - } - - public String getUserId() - { - return getProperties().getUserIdAsString(); - } - - public String getAppId() - { - return getProperties().getAppIdAsString(); - } - - public String getMessageId() - { - return getProperties().getMessageIdAsString(); - } - - public String getMimeType() - { - return getProperties().getContentTypeAsString(); - } - - public String getEncoding() - { - return getProperties().getEncodingAsString(); - } - - public byte getPriority() - { - return getProperties().getPriority(); - } - - public long getTimestamp() - { - return getProperties().getTimestamp(); - } - - public String getType() - { - return getProperties().getTypeAsString(); - } - - public String getReplyTo() - { - return getProperties().getReplyToAsString(); - } - - public String getReplyToExchange() - { - // TODO - return getReplyTo(); - } - - public String getReplyToRoutingKey() - { - // TODO - return getReplyTo(); - - } - - public Object getHeader(String name) - { - FieldTable ft = getProperties().getHeaders(); - return ft.get(name); - } - - public boolean containsHeaders(Set<String> names) - { - FieldTable ft = getProperties().getHeaders(); - for(String name : names) - { - if(!ft.containsKey(name)) - { - return false; - } - } - return true; - } - - @Override - public Collection<String> getHeaderNames() - { - FieldTable ft = getProperties().getHeaders(); - return ft.keys(); - } - - public boolean containsHeader(String name) - { - FieldTable ft = getProperties().getHeaders(); - return ft.containsKey(name); - } - - -} diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java index 5267651a66..5a9a51ff59 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java @@ -24,96 +24,44 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.EnqueableMessage; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.Filterable; import org.apache.qpid.server.store.StoredMessage; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource +public class IncomingMessage { - /** Used for debugging purposes. */ - private static final Logger _logger = Logger.getLogger(IncomingMessage.class); - private final MessagePublishInfo _messagePublishInfo; private ContentHeaderBody _contentHeaderBody; - + private Exchange _exchange; /** * Keeps a track of how many bytes we have received in body frames */ private long _bodyLengthReceived = 0; + private List<ContentBody> _contentChunks = new ArrayList<ContentBody>(); - /** - * This is stored during routing, to know the queues to which this message should immediately be - * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done - * by the message handle. - */ - private List<? extends BaseQueue> _destinationQueues; - - private long _expiration; - - private Exchange _exchange; - - private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>(); - - // we keep both the original meta data object and the store reference to it just in case the - // store would otherwise flow it to disk - - private MessageMetaData _messageMetaData; - - private StoredMessage<MessageMetaData> _storedMessageHandle; - private Object _connectionReference; - - - public IncomingMessage( - final MessagePublishInfo info - ) - { - this(info, null); - } - - public IncomingMessage(MessagePublishInfo info, Object reference) + public IncomingMessage(MessagePublishInfo info) { _messagePublishInfo = info; - _connectionReference = reference; } - public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException + public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) { _contentHeaderBody = contentHeaderBody; } - public void setExpiration() - { - _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration(); - } - - public MessageMetaData headersReceived(long currentTime) - { - _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime); - return _messageMetaData; - } - - - public List<? extends BaseQueue> getDestinationQueues() + public MessagePublishInfo getMessagePublishInfo() { - return _destinationQueues; + return _messagePublishInfo; } - public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException + public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException { _bodyLengthReceived += contentChunk.getSize(); _contentChunks.add(contentChunk); @@ -124,31 +72,14 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return (_bodyLengthReceived == getContentHeader().getBodySize()); } - public AMQShortString getExchange() + public AMQShortString getExchangeName() { return _messagePublishInfo.getExchange(); } - public String getRoutingKey() - { - return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); - } - - public String getBinding() - { - return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString(); - } - - - public boolean isMandatory() - { - return _messagePublishInfo.isMandatory(); - } - - - public boolean isImmediate() + public Exchange getExchange() { - return _messagePublishInfo.isImmediate(); + return _exchange; } public ContentHeaderBody getContentHeader() @@ -156,129 +87,24 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes return _contentHeaderBody; } - - public AMQMessageHeader getMessageHeader() - { - return _messageMetaData.getMessageHeader(); - } - - public boolean isPersistent() - { - return getContentHeader().getProperties() instanceof BasicContentHeaderProperties && - ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() == - BasicContentHeaderProperties.PERSISTENT; - } - - public boolean isRedelivered() - { - return false; - } - - public long getSize() { return getContentHeader().getBodySize(); } - public long getMessageNumber() - { - return _storedMessageHandle.getMessageNumber(); - } - public void setExchange(final Exchange e) { _exchange = e; } - public void route() - { - enqueue(_exchange.route(this)); - - } - - public void enqueue(final List<? extends BaseQueue> queues) - { - _destinationQueues = queues; - } - - public MessagePublishInfo getMessagePublishInfo() - { - return _messagePublishInfo; - } - - public long getExpiration() - { - return _expiration; - } - public int getBodyCount() throws AMQException { return _contentChunks.size(); } - public ContentChunk getContentChunk(int index) + public ContentBody getContentChunk(int index) { return _contentChunks.get(index); } - - public int getContent(ByteBuffer buf, int offset) - { - int pos = 0; - int written = 0; - for(ContentChunk cb : _contentChunks) - { - ByteBuffer data = ByteBuffer.wrap(cb.getData()); - if(offset+written >= pos && offset < pos + data.limit()) - { - ByteBuffer src = data.duplicate(); - src.position(offset+written - pos); - src = src.slice(); - - if(buf.remaining() < src.limit()) - { - src.limit(buf.remaining()); - } - int count = src.limit(); - buf.put(src); - written += count; - if(buf.remaining() == 0) - { - break; - } - } - pos+=data.limit(); - } - return written; - - } - - - public ByteBuffer getContent(int offset, int size) - { - ByteBuffer buf = ByteBuffer.allocate(size); - getContent(buf,offset); - buf.flip(); - return buf; - } - - public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle) - { - _storedMessageHandle = storedMessageHandle; - } - - public StoredMessage<MessageMetaData> getStoredMessage() - { - return _storedMessageHandle; - } - - public Object getConnectionReference() - { - return _connectionReference; - } - - public MessageMetaData getMessageMetaData() - { - return _messageMetaData; - } } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java index 4cc590d8cc..ead28c6e26 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java @@ -46,11 +46,10 @@ import java.util.Set; */ public class MessageMetaData implements StorableMessageMetaData { - private MessagePublishInfo _messagePublishInfo; + private final MessagePublishInfo _messagePublishInfo; - private ContentHeaderBody _contentHeaderBody; + private final ContentHeaderBody _contentHeaderBody; - private int _contentChunkCount; private long _arrivalTime; private static final byte MANDATORY_FLAG = 1; @@ -58,59 +57,36 @@ public class MessageMetaData implements StorableMessageMetaData public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory(); private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8(); - public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) + public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody) { - this(publishBody,contentHeaderBody, contentChunkCount, System.currentTimeMillis()); + this(publishBody,contentHeaderBody, System.currentTimeMillis()); } - public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount, long arrivalTime) + public MessageMetaData(MessagePublishInfo publishBody, + ContentHeaderBody contentHeaderBody, + long arrivalTime) { _contentHeaderBody = contentHeaderBody; _messagePublishInfo = publishBody; - _contentChunkCount = contentChunkCount; _arrivalTime = arrivalTime; } - public int getContentChunkCount() - { - return _contentChunkCount; - } - - public void setContentChunkCount(int contentChunkCount) - { - _contentChunkCount = contentChunkCount; - } public ContentHeaderBody getContentHeaderBody() { return _contentHeaderBody; } - public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) - { - _contentHeaderBody = contentHeaderBody; - } - public MessagePublishInfo getMessagePublishInfo() { return _messagePublishInfo; } - public void setMessagePublishInfo(MessagePublishInfo messagePublishInfo) - { - _messagePublishInfo = messagePublishInfo; - } - public long getArrivalTime() { return _arrivalTime; } - public void setArrivalTime(long arrivalTime) - { - _arrivalTime = arrivalTime; - } - public MessageMetaDataType getType() { return TYPE; @@ -169,8 +145,7 @@ public class MessageMetaData implements StorableMessageMetaData public boolean isPersistent() { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties()); - return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; + return _contentHeaderBody.getProperties().getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT; } private static class MetaDataFactory implements MessageMetaDataType.Factory @@ -219,7 +194,7 @@ public class MessageMetaData implements StorableMessageMetaData return routingKey; } }; - return new MessageMetaData(publishBody, chb, 0, arrivalTime); + return new MessageMetaData(publishBody, chb, arrivalTime); } catch (AMQException e) { @@ -242,7 +217,7 @@ public class MessageMetaData implements StorableMessageMetaData { private BasicContentHeaderProperties getProperties() { - return (BasicContentHeaderProperties) getContentHeaderBody().getProperties(); + return getContentHeaderBody().getProperties(); } public String getUserId() @@ -300,18 +275,6 @@ public class MessageMetaData implements StorableMessageMetaData return getProperties().getReplyToAsString(); } - public String getReplyToExchange() - { - // TODO - return getReplyTo(); - } - - public String getReplyToRoutingKey() - { - // TODO - return getReplyTo(); - } - public Object getHeader(String name) { FieldTable ft = getProperties().getHeaders(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index d1d86fe478..f069042db3 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -488,7 +488,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage { AMQMessage message = (AMQMessage) entry.getMessage(); - final Object publisherReference = message.getConnectionIdentifier(); + final Object publisherReference = message.getConnectionReference(); // We don't want local messages so check to see if message is one we sent Object localReference = getProtocolSession().getReference(); diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 4ab64ca100..176d1858f1 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -119,8 +119,6 @@ public class AckTest extends QpidTestCase return new AMQShortString("rk"); } }; - final IncomingMessage msg = new IncomingMessage(publishBody); - //IncomingMessage msg2 = null; BasicContentHeaderProperties b = new BasicContentHeaderProperties(); ContentHeaderBody cb = new ContentHeaderBody(); cb.setProperties(b); @@ -131,42 +129,35 @@ public class AckTest extends QpidTestCase b.setDeliveryMode((byte) 2); } - msg.setContentHeaderBody(cb); - // we increment the reference here since we are not delivering the messaging to any queues, which is where // the reference is normally incremented. The test is easier to construct if we have direct access to the // subscription ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); qs.add(_queue); - msg.enqueue(qs); - MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis()); + MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); final StoredMessage storedMessage = _messageStore.addMessage(mmd); - msg.setStoredMessage(storedMessage); final AMQMessage message = new AMQMessage(storedMessage); - if(msg.allContentReceived()) - { - ServerTransaction txn = new AutoCommitTransaction(_messageStore); - txn.enqueue(_queue, message, new ServerTransaction.Action() { - public void postCommit() + ServerTransaction txn = new AutoCommitTransaction(_messageStore); + txn.enqueue(_queue, message, new ServerTransaction.Action() { + public void postCommit() + { + try { - try - { - - _queue.enqueue(message); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - public void onRollback() + _queue.enqueue(message); + } + catch (AMQException e) { - //To change body of implemented methods use File | Settings | File Templates. + throw new RuntimeException(e); } - }); + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); - } // we manually send the message to the subscription //_subscription.send(new QueueEntry(_queue,msg), _queue); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java index 15573a871f..50145e5c6d 100755 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MockStoredMessage.java @@ -57,9 +57,9 @@ public class MockStoredMessage implements StoredMessage<MessageMetaData> { FieldTable headers = new FieldTable(); headers.setString(headerName, headerValue == null? null :String.valueOf(headerValue)); - ((BasicContentHeaderProperties)chb.getProperties()).setHeaders(headers); + ( chb.getProperties()).setHeaders(headers); } - _metaData = new MessageMetaData(info, chb, 0); + _metaData = new MessageMetaData(info, chb); _content = ByteBuffer.allocate(_metaData.getContentSize()); } diff --git a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java index 87fbcfa9b3..227e9794da 100644 --- a/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java +++ b/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java @@ -81,7 +81,7 @@ public class ReferenceCountingTest extends QpidTestCase - MessageMetaData mmd = new MessageMetaData(info, chb, 0); + MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); @@ -139,7 +139,7 @@ public class ReferenceCountingTest extends QpidTestCase final ContentHeaderBody chb = createPersistentContentHeader(); - MessageMetaData mmd = new MessageMetaData(info, chb, 0); + MessageMetaData mmd = new MessageMetaData(info, chb); StoredMessage storedMessage = _store.addMessage(mmd); AMQMessage message = new AMQMessage(storedMessage); diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 4cb9767514..4082f22e9c 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -51,6 +51,8 @@ public class Connection_1_0 implements ConnectionEventListener private final ConnectionEndpoint _conn; private final long _connectionId; private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); + private final Object _reference = new Object(); + public static interface Task @@ -79,6 +81,11 @@ public class Connection_1_0 implements ConnectionEventListener } + public Object getReference() + { + return _reference; + } + public void remoteSessionCreation(SessionEndpoint endpoint) { Session_1_0 session = new Session_1_0(_vhost, this); diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 2cef27267b..836eb69350 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.txn.ServerTransaction; @@ -53,34 +54,48 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina public Outcome send(final Message_1_0 message, ServerTransaction txn) { - final List<? extends BaseQueue> queues = _exchange.route(message); + List<? extends BaseQueue> queues = _exchange.route(message); - txn.enqueue(queues,message, new ServerTransaction.Action() + if(queues == null || queues.isEmpty()) { + Exchange altExchange = _exchange.getAlternateExchange(); + if(altExchange != null) + { + queues = altExchange.route(message); + } + } - BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); + if(queues != null && !queues.isEmpty()) + { + final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); - public void postCommit() + txn.enqueue(queues,message, new ServerTransaction.Action() { - for(int i = 0; i < _queues.length; i++) + MessageReference _reference = message.newReference(); + + public void postCommit() { - try - { - _queues[i].enqueue(message); - } - catch (AMQException e) + for(int i = 0; i < baseQueues.length; i++) { - // TODO - throw new RuntimeException(e); + try + { + baseQueues[i].enqueue(message); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } } + _reference.release(); } - } - public void onRollback() - { - // NO-OP - } - }); + public void onRollback() + { + _reference.release(); + } + }); + } return ACCEPTED; } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 5925bf0c32..78ca9ff2a6 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -272,7 +272,7 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement @Override public void remove() { - serverMessage.getStoredMessage().remove(); + throw new UnsupportedOperationException(); } }; } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index 29a8a0c723..8db6c86fc4 100755 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -504,16 +504,6 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData } } - public String getReplyToExchange() - { - return null; //TODO - } - - public String getReplyToRoutingKey() - { - return null; //TODO - } - public String getAppId() { //TODO diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index 9b11c0f48d..e367c83c8a 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -21,35 +21,23 @@ package org.apache.qpid.server.protocol.v1_0; -import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.StoredMessage; -public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage +public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage { - - private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater = - AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount"); - - private volatile int _referenceCount = 0; - - private final StoredMessage<MessageMetaData_1_0> _storedMessage; private List<ByteBuffer> _fragments; - private WeakReference<Session_1_0> _session; private long _arrivalTime; public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage) { - _storedMessage = storedMessage; - _session = null; + super(storedMessage, null); _fragments = restoreFragments(storedMessage); } @@ -75,11 +63,10 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, final List<ByteBuffer> fragments, - final Session_1_0 session) + final Object connectionReference) { - _storedMessage = storedMessage; + super(storedMessage, connectionReference); _fragments = fragments; - _session = new WeakReference<Session_1_0>(session); _arrivalTime = System.currentTimeMillis(); } @@ -98,7 +85,7 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM private MessageMetaData_1_0 getMessageMetaData() { - return _storedMessage.getMetaData(); + return getStoredMessage().getMetaData(); } public MessageMetaData_1_0.MessageHeader_1_0 getMessageHeader() @@ -106,16 +93,6 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM return getMessageMetaData().getMessageHeader(); } - public StoredMessage getStoredMessage() - { - return _storedMessage; - } - - public boolean isPersistent() - { - return getMessageMetaData().isPersistent(); - } - public boolean isRedelivered() { // TODO @@ -136,121 +113,19 @@ public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundM return size; } - public boolean isImmediate() - { - return false; - } - public long getExpiration() { return getMessageHeader().getExpiration(); } - public MessageReference<Message_1_0> newReference() - { - return new Reference(this); - } - - public long getMessageNumber() - { - return _storedMessage.getMessageNumber(); - } - public long getArrivalTime() { return _arrivalTime; } - public int getContent(final ByteBuffer buf, final int offset) - { - return _storedMessage.getContent(offset, buf); - } - - public ByteBuffer getContent(int offset, int size) - { - ByteBuffer buf = ByteBuffer.allocate(size); - buf.limit(getContent(buf, offset)); - - return buf; - } - public List<ByteBuffer> getFragments() { return _fragments; } - public Session_1_0 getSession() - { - return _session == null ? null : _session.get(); - } - - - public boolean incrementReference() - { - if(_refCountUpdater.incrementAndGet(this) <= 0) - { - _refCountUpdater.decrementAndGet(this); - return false; - } - else - { - return true; - } - } - - /** - * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the - * message store. - */ - - public void decrementReference() - { - int count = _refCountUpdater.decrementAndGet(this); - - // note that the operation of decrementing the reference count and then removing the message does not - // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after - // the message has been passed to all queues. i.e. we are - // not relying on the all the increments having taken place before the delivery manager decrements. - if (count == 0) - { - // set the reference count way below 0 so that we can detect that the message has been deleted - // this is to guard against the message being spontaneously recreated (from the mgmt console) - // by copying from other queues at the same time as it is being removed. - _refCountUpdater.set(this,Integer.MIN_VALUE/2); - - // must check if the handle is null since there may be cases where we decide to throw away a message - // and the handle has not yet been constructed - if (_storedMessage != null) - { - _storedMessage.remove(); - } - } - else - { - if (count < 0) - { - throw new RuntimeException("Reference count for message id " + getMessageNumber() - + " has gone below 0."); - } - } - } - - public static class Reference extends MessageReference<Message_1_0> - { - public Reference(Message_1_0 message) - { - super(message); - } - - protected void onReference(Message_1_0 message) - { - message.incrementReference(); - } - - protected void onRelease(Message_1_0 message) - { - message.decrementReference(); - } - - } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index e971672767..927972c8b2 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -41,6 +41,7 @@ import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -160,8 +161,8 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv storedMessage.flushToStore(); - Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession()); - + Message_1_0 message = new Message_1_0(storedMessage, fragments, getSession().getConnection().getReference()); + MessageReference<Message_1_0> reference = message.newReference(); Binary transactionId = null; org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState(); @@ -231,6 +232,8 @@ public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, Deliv } }); } + + reference.release(); } } diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 80d7595e01..ad05bd8a1b 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -541,8 +541,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu @Override public boolean onSameConnection(InboundMessage inbound) { - // TODO - return false; + return inbound.getConnectionReference() == getConnection().getReference(); } @Override diff --git a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index b9695ba87a..ce653766ff 100644 --- a/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -149,7 +149,7 @@ class Subscription_1_0 implements Subscription { if(entry.getMessage() instanceof Message_1_0) { - if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession()) + if(_noLocal && ((Message_1_0)entry.getMessage()).getConnectionReference() == getSession().getConnection().getReference()) { return false; } diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java index 92bccf871f..f1843de8ac 100644 --- a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java +++ b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -225,7 +225,6 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra { return new MessageMetaData(convertPublishBody(message), convertContentHeaderBody(message, vhost), - 1, message.getArrivalTime()); } diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java index e1e8fbd9d3..544099f1f2 100644 --- a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java +++ b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -121,7 +121,7 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag body.flip(); BasicContentHeaderProperties properties = - (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties(); + message_0_8.getContentHeaderBody().getProperties(); final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange(); if(exchange != null) diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java index 0d9d59ff56..bbea177260 100644 --- a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java +++ b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java @@ -56,7 +56,7 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess header.setDurable(serverMessage.isPersistent()); BasicContentHeaderProperties contentHeader = - (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); + serverMessage.getContentHeaderBody().getProperties(); header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); final long expiration = serverMessage.getExpiration(); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index fe9f9f4d00..dbfbb743ec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -30,7 +30,6 @@ import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; -import org.apache.qpid.framing.ContentHeaderProperties; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; @@ -73,7 +72,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate private static final boolean STRICT_AMQP_COMPLIANCE = Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT)); - private ContentHeaderProperties _contentHeaderProperties; + private BasicContentHeaderProperties _contentHeaderProperties; // The base set of items that needs to be set. private AMQMessageDelegate_0_8(BasicContentHeaderProperties properties, long deliveryTag) @@ -81,7 +80,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate super(deliveryTag); _contentHeaderProperties = properties; _readableProperties = (_contentHeaderProperties != null); - _headerAdapter = new JMSHeaderAdapter(_readableProperties ? ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders() + _headerAdapter = new JMSHeaderAdapter(_readableProperties ? _contentHeaderProperties.getHeaders() : (new BasicContentHeaderProperties()).getHeaders() ); } @@ -90,7 +89,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { this(new BasicContentHeaderProperties(), -1); _readableProperties = false; - _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders()); + _headerAdapter = new JMSHeaderAdapter(_contentHeaderProperties.getHeaders()); } @@ -337,7 +336,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate public BasicContentHeaderProperties getContentHeaderProperties() { - return (BasicContentHeaderProperties) _contentHeaderProperties; + return _contentHeaderProperties; } @@ -443,7 +442,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below. if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString())) { - return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString(); + return _contentHeaderProperties.getUserIdAsString(); } else { diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java index 608567674a..e52ff9acb2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java @@ -101,7 +101,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory } AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr, - (BasicContentHeaderProperties) contentHeader.getProperties(), + contentHeader.getProperties(), exchange, routingKey, queueDestinationCache, topicDestinationCache); return createMessage(delegate, data); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index 4154003b23..7e1ce20238 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -110,7 +110,7 @@ public class MessageFactoryRegistry AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache) throws AMQException, JMSException { - BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties(); + BasicContentHeaderProperties properties = contentHeader.getProperties(); // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over // AMQP. When the type is null, it can only be assumed that the message is a byte message. diff --git a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index 366c8231a1..60dac24cfc 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -27,7 +27,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -public class BasicContentHeaderProperties implements CommonContentHeaderProperties +public class BasicContentHeaderProperties { //persistent & non-persistent constants, values as per JMS DeliveryMode public static final int NON_PERSISTENT = 1; diff --git a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java deleted file mode 100644 index 7162c37062..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/CommonContentHeaderProperties.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.framing; - -public interface CommonContentHeaderProperties extends ContentHeaderProperties -{ - AMQShortString getContentType(); - - void setContentType(AMQShortString contentType); - - FieldTable getHeaders(); - - void setHeaders(FieldTable headers); - - byte getDeliveryMode(); - - void setDeliveryMode(byte deliveryMode); - - byte getPriority(); - - void setPriority(byte priority); - - AMQShortString getCorrelationId(); - - void setCorrelationId(AMQShortString correlationId); - - AMQShortString getReplyTo(); - - void setReplyTo(AMQShortString replyTo); - - long getExpiration(); - - void setExpiration(long expiration); - - AMQShortString getMessageId(); - - void setMessageId(AMQShortString messageId); - - long getTimestamp(); - - void setTimestamp(long timestamp); - - AMQShortString getType(); - - void setType(AMQShortString type); - - AMQShortString getUserId(); - - void setUserId(AMQShortString userId); - - AMQShortString getAppId(); - - void setAppId(AMQShortString appId); - - AMQShortString getClusterId(); - - void setClusterId(AMQShortString clusterId); - - AMQShortString getEncoding(); - - void setEncoding(AMQShortString encoding); -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index f6fa89a91c..131d796af4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -39,7 +39,7 @@ public class ContentHeaderBody implements AMQBody private long bodySize; /** must never be null */ - private ContentHeaderProperties properties; + private BasicContentHeaderProperties properties; public ContentHeaderBody() { @@ -57,13 +57,13 @@ public class ContentHeaderBody implements AMQBody } - public ContentHeaderBody(ContentHeaderProperties props, int classId) + public ContentHeaderBody(BasicContentHeaderProperties props, int classId) { properties = props; this.classId = classId; } - public ContentHeaderBody(int classId, int weight, ContentHeaderProperties props, long bodySize) + public ContentHeaderBody(int classId, int weight, BasicContentHeaderProperties props, long bodySize) { this(props, classId); this.weight = weight; @@ -121,12 +121,12 @@ public class ContentHeaderBody implements AMQBody return new AMQFrame(channelId, body); } - public ContentHeaderProperties getProperties() + public BasicContentHeaderProperties getProperties() { return properties; } - public void setProperties(ContentHeaderProperties props) + public void setProperties(BasicContentHeaderProperties props) { properties = props; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java deleted file mode 100644 index 2e1b988aa3..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - - -/** - * There will be an implementation of this interface for each content type. All content types have associated - * header properties and this provides a way to encode and decode them. - */ -public interface ContentHeaderProperties -{ - /** - * Writes the property list to the buffer, in a suitably encoded form. - * @param buffer The buffer to write to - */ - void writePropertyListPayload(DataOutput buffer) throws IOException; - - /** - * Populates the properties from buffer. - * @param buffer The buffer to read from. - * @param propertyFlags he property flags. - * @throws AMQFrameDecodingException when the buffer does not contain valid data - */ - void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) - throws AMQFrameDecodingException, IOException; - - /** - * @return the size of the encoded property list in bytes. - */ - int getPropertyListSize(); - - /** - * Gets the property flags. Property flags indicate which properties are set in the list. The - * position and meaning of each flag is defined in the protocol specification for the particular - * content type with which these properties are associated. - * @return flags - */ - int getPropertyFlags(); - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java index ff97c0b28f..55961db06b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java @@ -38,11 +38,11 @@ public class ContentHeaderPropertiesFactory { } - public ContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, + public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags, DataInput buffer, int size) throws AMQFrameDecodingException, IOException { - ContentHeaderProperties properties; + BasicContentHeaderProperties properties; // AMQP version change: "Hardwired" version to major=8, minor=0 // TODO: Change so that the actual version is obtained from // the ProtocolInitiation object for this session. diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java deleted file mode 100644 index 470b7b05e3..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.framing.abstraction; - -public interface ContentChunk -{ - int getSize(); - byte[] getData(); - - void reduceToFit(); -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java deleted file mode 100644 index 01d1a8a17b..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.framing.abstraction; - -import org.apache.qpid.framing.AMQMethodBody; - - -public interface MessagePublishInfoConverter -{ - public MessagePublishInfo convertToInfo(AMQMethodBody body); - public AMQMethodBody convertToBody(MessagePublishInfo info); - -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java index d1e53d6907..b3897771c5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java @@ -21,14 +21,10 @@ package org.apache.qpid.framing.abstraction; -import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQMethodBody; -public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter +public interface ProtocolVersionMethodConverter { - AMQBody convertToBody(ContentChunk contentBody); - ContentChunk convertToContentChunk(AMQBody body); - - void configure(); - - AMQBody convertToBody(byte[] input); + public MessagePublishInfo convertToInfo(AMQMethodBody body); + public AMQMethodBody convertToBody(MessagePublishInfo info); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index b3eb1211a5..6456eacab1 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -21,13 +21,10 @@ package org.apache.qpid.framing.amqp_0_9; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -35,48 +32,12 @@ import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { - private int _basicPublishClassId; - private int _basicPublishMethodId; public MethodConverter_0_9() { super((byte)0,(byte)9); - - - } - - public AMQBody convertToBody(ContentChunk contentChunk) - { - if(contentChunk instanceof ContentChunk_0_9) - { - return ((ContentChunk_0_9)contentChunk).toBody(); - } - else - { - return new ContentBody(contentChunk.getData()); - } - } - - public ContentChunk convertToContentChunk(AMQBody body) - { - final ContentBody contentBodyChunk = (ContentBody) body; - - return new ContentChunk_0_9(contentBodyChunk); - - } - - public void configure() - { - - _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID; - _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; - } - public AMQBody convertToBody(byte[] data) - { - return new ContentBody(data); - } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) { @@ -103,33 +64,4 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot } - private static class ContentChunk_0_9 implements ContentChunk - { - private final ContentBody _contentBodyChunk; - - public ContentChunk_0_9(final ContentBody contentBodyChunk) - { - _contentBodyChunk = contentBodyChunk; - } - - public int getSize() - { - return _contentBodyChunk.getSize(); - } - - public byte[] getData() - { - return _contentBodyChunk.getPayload(); - } - - public void reduceToFit() - { - _contentBodyChunk.reduceBufferToFit(); - } - - public AMQBody toBody() - { - return _contentBodyChunk; - } - } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java index d33749d795..e25dc8a022 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java @@ -21,61 +21,22 @@ package org.apache.qpid.framing.amqp_0_91; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { - private int _basicPublishClassId; - private int _basicPublishMethodId; public MethodConverter_0_91() { super((byte)0,(byte)9); - - - } - - public AMQBody convertToBody(ContentChunk contentChunk) - { - if(contentChunk instanceof ContentChunk_0_9) - { - return ((ContentChunk_0_9)contentChunk).toBody(); - } - else - { - return new ContentBody(contentChunk.getData()); - } - } - - public ContentChunk convertToContentChunk(AMQBody body) - { - final ContentBody contentBodyChunk = (ContentBody) body; - - return new ContentChunk_0_9(contentBodyChunk); - - } - - public void configure() - { - - _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; - _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; - } - public AMQBody convertToBody(byte[] data) - { - return new ContentBody(data); - } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) { @@ -102,33 +63,4 @@ public class MethodConverter_0_91 extends AbstractMethodConverter implements Pro } - private static class ContentChunk_0_9 implements ContentChunk - { - private final ContentBody _contentBodyChunk; - - public ContentChunk_0_9(final ContentBody contentBodyChunk) - { - _contentBodyChunk = contentBodyChunk; - } - - public int getSize() - { - return _contentBodyChunk.getSize(); - } - - public byte[] getData() - { - return _contentBodyChunk.getPayload(); - } - - public void reduceToFit() - { - _contentBodyChunk.reduceBufferToFit(); - } - - public AMQBody toBody() - { - return _contentBodyChunk; - } - } }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java index 575816db4f..5e50c2b3fb 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -21,71 +21,21 @@ package org.apache.qpid.framing.amqp_8_0; -import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.AbstractMethodConverter; -import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter { - private int _basicPublishClassId; - private int _basicPublishMethodId; - public MethodConverter_8_0() { super((byte)8,(byte)0); - - - } - - public AMQBody convertToBody(ContentChunk contentChunk) - { - return new ContentBody(contentChunk.getData()); } - public ContentChunk convertToContentChunk(AMQBody body) - { - final ContentBody contentBodyChunk = (ContentBody) body; - - return new ContentChunk() - { - - public int getSize() - { - return contentBodyChunk.getSize(); - } - - public byte[] getData() - { - return contentBodyChunk.getPayload(); - } - - public void reduceToFit() - { - contentBodyChunk.reduceBufferToFit(); - } - }; - - } - - public void configure() - { - - _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID; - _basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID; - - } - - public AMQBody convertToBody(byte[] data) - { - return new ContentBody(data); - } public MessagePublishInfo convertToInfo(AMQMethodBody methodBody) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index 07f0d0c369..19dc1a5a02 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -50,8 +50,6 @@ import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.ConflationQueue; -import org.apache.qpid.server.protocol.v0_8.IncomingMessage; -import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.SimpleAMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -617,61 +615,41 @@ public class MessageStoreTest extends QpidTestCase MessagePublishInfo messageInfo = new TestMessagePublishInfo(exchange, false, false, routingKey); - final IncomingMessage currentMessage; - - - currentMessage = new IncomingMessage(messageInfo); - - currentMessage.setExchange(exchange); - ContentHeaderBody headerBody = new ContentHeaderBody(BasicConsumeBodyImpl.CLASS_ID,0,properties,0l); - try - { - currentMessage.setContentHeaderBody(headerBody); - } - catch (AMQException e) - { - fail(e.getMessage()); - } + MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis()); - currentMessage.setExpiration(); + final StoredMessage<MessageMetaData> storedMessage = getVirtualHost().getMessageStore().addMessage(mmd); + storedMessage.flushToStore(); + final AMQMessage currentMessage = new AMQMessage(storedMessage); - MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis()); - currentMessage.setStoredMessage(getVirtualHost().getMessageStore().addMessage(mmd)); - currentMessage.getStoredMessage().flushToStore(); - currentMessage.route(); + final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage); - // check and deliver if header says body length is zero - if (currentMessage.allContentReceived()) - { - ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - final List<? extends BaseQueue> destinationQueues = currentMessage.getDestinationQueues(); - trans.enqueue(currentMessage.getDestinationQueues(), currentMessage, new ServerTransaction.Action() { - public void postCommit() - { - try - { - AMQMessage message = new AMQMessage(currentMessage.getStoredMessage()); + ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - for(BaseQueue queue : destinationQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) + trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() { + public void postCommit() + { + try + { + for(BaseQueue queue : destinationQueues) { - _logger.error("Problem enqueing message", e); + queue.enqueue(currentMessage); } } - - public void onRollback() + catch (AMQException e) { - //To change body of implemented methods use File | Settings | File Templates. + _logger.error("Problem enqueing message", e); } - }); - } + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); + } private void createAllQueues() |