summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-29 16:37:13 +0000
committerRobert Greig <rgreig@apache.org>2007-01-29 16:37:13 +0000
commitf7ca6dd49e6f5de6e445549ee2f1a942a6456f0c (patch)
treea2ad4540e58b580a912ecc84bce8b8689779c8ef
parentca41f94401b2be8c1dba161a68b0fc1271d39f6c (diff)
downloadqpid-python-f7ca6dd49e6f5de6e445549ee2f1a942a6456f0c.tar.gz
QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@501096 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java15
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java2
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java2
-rw-r--r--qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java36
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java13
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java20
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java16
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java4
23 files changed, 195 insertions, 63 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index c45d1ad2c2..55009bbf49 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -324,7 +324,7 @@ public class Main implements ProtocolVersionList
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
if (connectorConfig.enableNonSSL)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 5f9fcbdc85..460d5126ca 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -23,10 +23,13 @@ package org.apache.qpid.server.protocol;
import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -121,6 +124,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_codecFactory = codecFactory;
+ try
+ {
+ IoServiceConfig config = session.getServiceConfig();
+ ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ // throw e;
+
+ }
+
+
+
+
// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
index 477a679b90..66fa9de92c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
@@ -127,10 +127,17 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
- clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
- clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
- clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+ try
+ {
+ clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID());
+ clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName());
+ clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion());
+ clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
@@ -141,6 +148,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener
new AMQShortString(selectedLocale), // locale
new AMQShortString(mechanism), // mechanism
saslResponse)); // response
+
}
catch (UnsupportedEncodingException e)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index ed1b5ae6f9..9015cabf43 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -24,12 +24,14 @@ import org.apache.log4j.Logger;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -131,6 +133,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter
session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
}
+
+ try
+ {
+
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+ threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+ }
+ catch (RuntimeException e)
+ {
+ e.printStackTrace();
+ }
+
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index d6364f45b0..5e6244d7cc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -71,7 +71,7 @@ public class SocketTransportConnection implements ITransportConnection
boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
if (readWriteThreading)
{
- cfg.setThreadModel(new ReadWriteThreadModel());
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
}
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index e9e23aefdb..99a4c5f30d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -70,7 +70,7 @@ public class TransportConnection
IoServiceConfig config = _acceptor.getDefaultConfig();
- config.setThreadModel(new ReadWriteThreadModel());
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
}
public static ITransportConnection getInstance() throws AMQTransportConnectionException
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
index de3d558f7c..f0ac0e6902 100644
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
+++ b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
@@ -82,7 +82,7 @@ public class AcceptorTest extends TestCase
sc.setSendBufferSize(32768);
sc.setReceiveBufferSize(32768);
- config.setThreadModel(new ReadWriteThreadModel());
+ config.setThreadModel(ReadWriteThreadModel.getInstance());
acceptor.bind(new InetSocketAddress(PORT),
new TestHandler());
diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
index 3eeddd7b4e..5746a32c26 100644
--- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
+++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java
@@ -83,7 +83,7 @@ public class Main extends org.apache.qpid.server.Main
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
}
String host = InetAddress.getLocalHost().getHostName();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index b29c23c2a2..e7175e7973 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -51,4 +51,9 @@ public class AMQTypedValue
AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
+
+ public String toString()
+ {
+ return "["+getType()+": "+getValue()+"]";
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index f2d1a70cdc..697a0f4249 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -164,4 +164,14 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
protocolMajor + "." + protocolMinor + " not found in protocol version list.");
}
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer(new String(header));
+ buffer.append(Integer.toHexString(protocolClass));
+ buffer.append(Integer.toHexString(protocolInstance));
+ buffer.append(Integer.toHexString(protocolMajor));
+ buffer.append(Integer.toHexString(protocolMinor));
+ return buffer.toString();
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index b9673cd48f..9b3bcfa008 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger;
* Holds events for a session that will be processed asynchronously by
* the thread pool in PoolingFilter.
*/
-class Job implements Runnable
+public class Job implements Runnable
{
private final int _maxEvents;
private final IoSession _session;
private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
private final AtomicBoolean _active = new AtomicBoolean();
- private final AtomicInteger _refCount = new AtomicInteger();
+ //private final AtomicInteger _refCount = new AtomicInteger();
private final JobCompletionHandler _completionHandler;
Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
@@ -45,21 +45,21 @@ class Job implements Runnable
_completionHandler = completionHandler;
_maxEvents = maxEvents;
}
-
- void acquire()
- {
- _refCount.incrementAndGet();
- }
-
- void release()
- {
- _refCount.decrementAndGet();
- }
-
- boolean isReferenced()
- {
- return _refCount.get() > 0;
- }
+//
+// void acquire()
+// {
+// _refCount.incrementAndGet();
+// }
+//
+// void release()
+// {
+// _refCount.decrementAndGet();
+// }
+//
+// boolean isReferenced()
+// {
+// return _refCount.get() > 0;
+// }
void add(Event evt)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
index c0026c1f36..073f1cdfa3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
@@ -48,7 +48,7 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
void fireAsynchEvent(IoSession session, Event event)
{
Job job = getJobForSession(session);
- job.acquire(); //prevents this job being removed from _jobs
+ // job.acquire(); //prevents this job being removed from _jobs
job.add(event);
//Additional checks on pool to check that it hasn't shutdown.
@@ -60,10 +60,25 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
}
+ public void createNewJobForSession(IoSession session)
+ {
+ Job job = new Job(session, this, _maxEvents);
+ session.setAttribute(_name, job);
+ }
+
private Job getJobForSession(IoSession session)
{
- Job job = _jobs.get(session);
- return job == null ? createJobForSession(session) : job;
+ return (Job) session.getAttribute(_name);
+
+/* if(job == null)
+ {
+ System.err.println("Error in " + _name);
+ Thread.dumpStack();
+ }
+
+
+ job = _jobs.get(session);
+ return job == null ? createJobForSession(session) : job;*/
}
private Job createJobForSession(IoSession session)
@@ -81,15 +96,16 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH
//Job.JobCompletionHandler
public void completed(IoSession session, Job job)
{
- if (job.isComplete())
- {
- job.release();
- if (!job.isReferenced())
- {
- _jobs.remove(session);
- }
- }
- else
+// if (job.isComplete())
+// {
+// job.release();
+// if (!job.isReferenced())
+// {
+// _jobs.remove(session);
+// }
+// }
+// else
+ if(!job.isComplete())
{
// ritchiem : 2006-12-13 Do we need to perform the additional checks here?
// Can the pool be shutdown at this point?
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
index 84b72bb0dc..c2f7f7ac48 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
@@ -26,12 +26,38 @@ import org.apache.mina.common.ThreadModel;
public class ReadWriteThreadModel implements ThreadModel
{
+
+ private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
+
+ private final PoolingFilter _asynchronousReadFilter;
+ private final PoolingFilter _asynchronousWriteFilter;
+
+ private ReadWriteThreadModel()
+ {
+ final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
+ _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
+ _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
+ }
+
+ public PoolingFilter getAsynchronousReadFilter()
+ {
+ return _asynchronousReadFilter;
+ }
+
+ public PoolingFilter getAsynchronousWriteFilter()
+ {
+ return _asynchronousWriteFilter;
+ }
+
public void buildFilterChain(IoFilterChain chain) throws Exception
{
- ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
- PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
- chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead));
- chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite));
+
+ chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
+ chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
+ }
+
+ public static ReadWriteThreadModel getInstance()
+ {
+ return _instance;
}
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
index 9a5208662b..6383d52298 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
@@ -36,25 +36,32 @@ public class PoolingFilterTest extends TestCase
public void setUp()
{
+
//Create Pool
_executorService = ReferenceCountingExecutorService.getInstance();
_executorService.acquireExecutorService();
- _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
+ _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
"AsynchronousWriteFilter");
}
public void testRejectedExecution() throws Exception
{
- _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message"));
+
+ TestSession testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
+ _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
//Shutdown the pool
_executorService.getPool().shutdownNow();
try
{
+
+ testSession = new TestSession();
+ _pool.createNewJobForSession(testSession);
//prior to fix for QPID-172 this would throw RejectedExecutionException
- _pool.filterWrite(null, new TestSession(), null);
+ _pool.filterWrite(null, testSession, null);
}
catch (RejectedExecutionException rje)
{
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java
index f10d55e9d0..aafc91b03b 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java
@@ -24,9 +24,13 @@ import org.apache.mina.common.*;
import java.net.SocketAddress;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
public class TestSession implements IoSession
{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+
public TestSession()
{
}
@@ -68,42 +72,42 @@ public class TestSession implements IoSession
public Object getAttachment()
{
- return null; //TODO
+ return getAttribute("");
}
public Object setAttachment(Object attachment)
{
- return null; //TODO
+ return setAttribute("",attachment);
}
public Object getAttribute(String key)
{
- return null; //TODO
+ return attributes.get(key);
}
public Object setAttribute(String key, Object value)
{
- return null; //TODO
+ return attributes.put(key,value);
}
public Object setAttribute(String key)
{
- return null; //TODO
+ return attributes.put(key, Boolean.TRUE);
}
public Object removeAttribute(String key)
{
- return null; //TODO
+ return attributes.remove(key);
}
public boolean containsAttribute(String key)
{
- return false; //TODO
+ return attributes.containsKey(key);
}
public Set getAttributeKeys()
{
- return null; //TODO
+ return attributes.keySet();
}
public TransportType getTransportType()
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
index 9d3c588fc8..10f5cd5667 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -106,6 +106,8 @@ public class TxAckTest extends TestCase
// TODO: fix hardcoded protocol version data
TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
false,
false,
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 84506f4f48..a9d7299bec 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -153,7 +153,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0);
+ BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
+ null,false,false,new AMQShortString(id),0);
return request;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index ba60105824..84dde9dd6f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -169,6 +169,8 @@ public class AMQQueueMBeanTest extends TestCase
// TODO: Establish some way to determine the version for the test.
BasicPublishBody publish = new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
immediate,
false,
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index 2ec4eab74e..ccc7752fd3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -99,6 +99,8 @@ public class AckTest extends TestCase
// TODO: Establish some way to determine the version for the test.
BasicPublishBody publishBody = new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
new AMQShortString("someExchange"),
false,
false,
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
index 6c48bb2bf4..cf5baa77bd 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
@@ -63,6 +63,8 @@ class MessageTestHelper extends TestCase
// TODO: Establish some way to determine the version for the test.
BasicPublishBody publish = new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
immediate,
false,
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index 0cfa4eddce..0dd7744d1f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.VersionSpecificRegistry;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -168,4 +169,9 @@ public class MockProtocolSession implements AMQProtocolSession
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
+
+ public VersionSpecificRegistry getRegistry()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
index bf422742b5..42dd1a4b74 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
@@ -52,7 +52,7 @@ public class SkeletonMessageStore implements MessageStore
{
}
- public void removeMessage(StoreContext s, long messageId)
+ public void removeMessage(StoreContext s, Long messageId)
{
}
@@ -82,27 +82,27 @@ public class SkeletonMessageStore implements MessageStore
return null;
}
- public long getNewMessageId()
+ public Long getNewMessageId()
{
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
{
}
- public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException
+ public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException
{
}
- public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+ public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
{
return null;
}
- public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+ public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
{
return null;
}
@@ -112,12 +112,12 @@ public class SkeletonMessageStore implements MessageStore
}
- public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
{
}
- public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
{
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
index e2500d9865..6eacd5168f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -53,6 +53,8 @@ public class TestReferenceCounting extends TestCase
// TODO: fix hardcoded protocol version data
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
false,
false,
@@ -82,6 +84,8 @@ public class TestReferenceCounting extends TestCase
// TODO: fix hardcoded protocol version data
AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
(byte)0,
+ BasicPublishBody.getClazz((byte)8,(byte)0),
+ BasicPublishBody.getMethod((byte)8,(byte)0),
null,
false,
false,