diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 17:16:44 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 17:16:44 +0000 |
| commit | d779fd968deef2dd5395a93f39f70ebf677ea1b3 (patch) | |
| tree | 99d69cf9e9499797d649873c5a7f985965be4459 /java/broker-core/src | |
| parent | 455c63b69c150017c5fbbda0b9a56b29bc89326d (diff) | |
| download | qpid-python-d779fd968deef2dd5395a93f39f70ebf677ea1b3.tar.gz | |
QPID-5504 : initial refactoring to move common code into shared classes, make transports work similarly with respect to message routing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1560424 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
8 files changed, 87 insertions, 67 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java index 63bd1e45a0..d19f127b16 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java @@ -47,10 +47,6 @@ public interface AMQMessageHeader String getReplyTo(); - String getReplyToExchange(); - String getReplyToRoutingKey(); - - Object getHeader(String name); boolean containsHeaders(Set<String> names); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index 8311dbd5ff..e5aa6c1158 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -23,9 +23,11 @@ package org.apache.qpid.server.message; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T> +public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T> { private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = @@ -33,10 +35,13 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat private volatile int _referenceCount = 0; private final StoredMessage<T> _handle; + private final Object _connectionReference; - public AbstractServerMessageImpl(StoredMessage<T> handle) + + public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference) { _handle = handle; + _connectionReference = connectionReference; } public StoredMessage<T> getStoredMessage() @@ -44,16 +49,11 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat return _handle; } - public boolean incrementReference() + private boolean incrementReference() { - return incrementReference(1); - } - - public boolean incrementReference(int count) - { - if(_refCountUpdater.addAndGet(this, count) <= 0) + if(_refCountUpdater.incrementAndGet(this) <= 0) { - _refCountUpdater.addAndGet(this, -count); + _refCountUpdater.decrementAndGet(this); return false; } else @@ -67,7 +67,7 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat * message store. * */ - public void decrementReference() + private void decrementReference() { int count = _refCountUpdater.decrementAndGet(this); @@ -104,8 +104,72 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")"; } - protected int getReferenceCount() + private int getReferenceCount() { return _referenceCount; } + + @Override + final public MessageReference<X> newReference() + { + return new Reference(); + } + + @Override + final public boolean isPersistent() + { + return _handle.getMetaData().isPersistent(); + } + + @Override + final public long getMessageNumber() + { + return getStoredMessage().getMessageNumber(); + } + + @Override + final public int getContent(ByteBuffer buf, int offset) + { + return getStoredMessage().getContent(offset, buf); + } + + @Override + final public ByteBuffer getContent(int offset, int size) + { + return getStoredMessage().getContent(offset, size); + } + + final public Object getConnectionReference() + { + return _connectionReference; + }public String toString() + { + return "Message[" + debugIdentity() + "]"; + } + + private final class Reference implements MessageReference<X> + { + + private final AtomicBoolean _released = new AtomicBoolean(false); + + private Reference() + { + incrementReference(); + } + + public X getMessage() + { + return (X) AbstractServerMessageImpl.this; + } + + public void release() + { + if(!_released.getAndSet(true)) + { + decrementReference(); + } + } + + } + } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java index 1b3fdb1870..03f1d6649c 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java @@ -34,4 +34,6 @@ public interface InboundMessage extends Filterable boolean isRedelivered(); long getSize(); + + Object getConnectionReference(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java index 399f8f9327..eda85507f0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java @@ -20,39 +20,8 @@ */ package org.apache.qpid.server.message; -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class MessageReference<M extends ServerMessage> +public interface MessageReference<M extends ServerMessage> { - - private final AtomicBoolean _released = new AtomicBoolean(false); - - private volatile M _message; - - public MessageReference(M message) - { - _message = message; - onReference(message); - } - - abstract protected void onReference(M message); - - abstract protected void onRelease(M message); - - public M getMessage() - { - return _message; - } - - public void release() - { - if(!_released.getAndSet(true)) - { - if(_message != null) - { - onRelease(_message); - } - } - } - + public M getMessage(); + public void release(); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index e1ad2fd0ca..a975ba1bc8 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -37,8 +37,6 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enquea long getSize(); - boolean isImmediate(); - long getExpiration(); MessageReference newReference(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java index df26037eed..c8e2c3a6fb 100755 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java @@ -67,4 +67,10 @@ public class InboundMessageAdapter implements InboundMessage { return _entry.getSize(); } + + @Override + public Object getConnectionReference() + { + return (_entry.getMessage() instanceof InboundMessage) ? ((InboundMessage) _entry.getMessage()).getConnectionReference() : null; + } } diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 833df34fd8..8919599cba 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -96,16 +96,6 @@ public class HeadersBindingTest extends TestCase return null; } - public String getReplyToExchange() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public String getReplyToRoutingKey() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - public Object getHeader(String name) { return _headers.get(name); diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index 25f1df7a20..24a507173c 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -130,11 +130,6 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM return _storedMsg; } - @Override - public boolean isImmediate() - { - return false; - } @Override public boolean isPersistent() |
