summaryrefslogtreecommitdiff
path: root/java/broker-core/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-22 17:16:44 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-22 17:16:44 +0000
commitd779fd968deef2dd5395a93f39f70ebf677ea1b3 (patch)
tree99d69cf9e9499797d649873c5a7f985965be4459 /java/broker-core/src
parent455c63b69c150017c5fbbda0b9a56b29bc89326d (diff)
downloadqpid-python-d779fd968deef2dd5395a93f39f70ebf677ea1b3.tar.gz
QPID-5504 : initial refactoring to move common code into shared classes, make transports work similarly with respect to message routing
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1560424 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-core/src')
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java88
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java37
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java2
-rwxr-xr-xjava/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java6
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java10
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java5
8 files changed, 87 insertions, 67 deletions
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
index 63bd1e45a0..d19f127b16 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
@@ -47,10 +47,6 @@ public interface AMQMessageHeader
String getReplyTo();
- String getReplyToExchange();
- String getReplyToRoutingKey();
-
-
Object getHeader(String name);
boolean containsHeaders(Set<String> names);
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 8311dbd5ff..e5aa6c1158 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -23,9 +23,11 @@ package org.apache.qpid.server.message;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T>
+public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
@@ -33,10 +35,13 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
+ private final Object _connectionReference;
- public AbstractServerMessageImpl(StoredMessage<T> handle)
+
+ public AbstractServerMessageImpl(StoredMessage<T> handle, Object connectionReference)
{
_handle = handle;
+ _connectionReference = connectionReference;
}
public StoredMessage<T> getStoredMessage()
@@ -44,16 +49,11 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
return _handle;
}
- public boolean incrementReference()
+ private boolean incrementReference()
{
- return incrementReference(1);
- }
-
- public boolean incrementReference(int count)
- {
- if(_refCountUpdater.addAndGet(this, count) <= 0)
+ if(_refCountUpdater.incrementAndGet(this) <= 0)
{
- _refCountUpdater.addAndGet(this, -count);
+ _refCountUpdater.decrementAndGet(this);
return false;
}
else
@@ -67,7 +67,7 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
* message store.
*
*/
- public void decrementReference()
+ private void decrementReference()
{
int count = _refCountUpdater.decrementAndGet(this);
@@ -104,8 +104,72 @@ public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaDat
return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageNumber() + " Ref:" + getReferenceCount() + ")";
}
- protected int getReferenceCount()
+ private int getReferenceCount()
{
return _referenceCount;
}
+
+ @Override
+ final public MessageReference<X> newReference()
+ {
+ return new Reference();
+ }
+
+ @Override
+ final public boolean isPersistent()
+ {
+ return _handle.getMetaData().isPersistent();
+ }
+
+ @Override
+ final public long getMessageNumber()
+ {
+ return getStoredMessage().getMessageNumber();
+ }
+
+ @Override
+ final public int getContent(ByteBuffer buf, int offset)
+ {
+ return getStoredMessage().getContent(offset, buf);
+ }
+
+ @Override
+ final public ByteBuffer getContent(int offset, int size)
+ {
+ return getStoredMessage().getContent(offset, size);
+ }
+
+ final public Object getConnectionReference()
+ {
+ return _connectionReference;
+ }public String toString()
+ {
+ return "Message[" + debugIdentity() + "]";
+ }
+
+ private final class Reference implements MessageReference<X>
+ {
+
+ private final AtomicBoolean _released = new AtomicBoolean(false);
+
+ private Reference()
+ {
+ incrementReference();
+ }
+
+ public X getMessage()
+ {
+ return (X) AbstractServerMessageImpl.this;
+ }
+
+ public void release()
+ {
+ if(!_released.getAndSet(true))
+ {
+ decrementReference();
+ }
+ }
+
+ }
+
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
index 1b3fdb1870..03f1d6649c 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
@@ -34,4 +34,6 @@ public interface InboundMessage extends Filterable
boolean isRedelivered();
long getSize();
+
+ Object getConnectionReference();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
index 399f8f9327..eda85507f0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageReference.java
@@ -20,39 +20,8 @@
*/
package org.apache.qpid.server.message;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class MessageReference<M extends ServerMessage>
+public interface MessageReference<M extends ServerMessage>
{
-
- private final AtomicBoolean _released = new AtomicBoolean(false);
-
- private volatile M _message;
-
- public MessageReference(M message)
- {
- _message = message;
- onReference(message);
- }
-
- abstract protected void onReference(M message);
-
- abstract protected void onRelease(M message);
-
- public M getMessage()
- {
- return _message;
- }
-
- public void release()
- {
- if(!_released.getAndSet(true))
- {
- if(_message != null)
- {
- onRelease(_message);
- }
- }
- }
-
+ public M getMessage();
+ public void release();
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index e1ad2fd0ca..a975ba1bc8 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -37,8 +37,6 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enquea
long getSize();
- boolean isImmediate();
-
long getExpiration();
MessageReference newReference();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
index df26037eed..c8e2c3a6fb 100755
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
@@ -67,4 +67,10 @@ public class InboundMessageAdapter implements InboundMessage
{
return _entry.getSize();
}
+
+ @Override
+ public Object getConnectionReference()
+ {
+ return (_entry.getMessage() instanceof InboundMessage) ? ((InboundMessage) _entry.getMessage()).getConnectionReference() : null;
+ }
}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 833df34fd8..8919599cba 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -96,16 +96,6 @@ public class HeadersBindingTest extends TestCase
return null;
}
- public String getReplyToExchange()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public String getReplyToRoutingKey()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
public Object getHeader(String name)
{
return _headers.get(name);
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 25f1df7a20..24a507173c 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,11 +130,6 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
return _storedMsg;
}
- @Override
- public boolean isImmediate()
- {
- return false;
- }
@Override
public boolean isPersistent()