diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 16:37:13 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 16:37:13 +0000 |
commit | f7ca6dd49e6f5de6e445549ee2f1a942a6456f0c (patch) | |
tree | a2ad4540e58b580a912ecc84bce8b8689779c8ef | |
parent | ca41f94401b2be8c1dba161a68b0fc1271d39f6c (diff) | |
download | qpid-python-f7ca6dd49e6f5de6e445549ee2f1a942a6456f0c.tar.gz |
QPID-327 : Patch supplied by Rob Godfrey - [race condition] PoolingFilter : Possible race condition when completing a Job
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@501096 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 195 insertions, 63 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index c45d1ad2c2..55009bbf49 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -324,7 +324,7 @@ public class Main implements ProtocolVersionList // implementation provided by MINA if (connectorConfig.enableExecutorPool) { - sconfig.setThreadModel(new ReadWriteThreadModel()); + sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); } if (connectorConfig.enableNonSSL) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 5f9fcbdc85..460d5126ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -23,10 +23,13 @@ package org.apache.qpid.server.protocol; import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoSessionConfig; +import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.common.ClientProperties; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; @@ -121,6 +124,23 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _codecFactory = codecFactory; + try + { + IoServiceConfig config = session.getServiceConfig(); + ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel(); + threadModel.getAsynchronousReadFilter().createNewJobForSession(session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); + } + catch (RuntimeException e) + { + e.printStackTrace(); + // throw e; + + } + + + + // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java index 477a679b90..66fa9de92c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java @@ -127,10 +127,17 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); FieldTable clientProperties = FieldTableFactory.newFieldTable(); - clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID()); - clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName()); - clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion()); - clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); + try + { + clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()), protocolSession.getClientID()); + clientProperties.setString(new AMQShortString(ClientProperties.product.toString()), QpidProperties.getProductName()); + clientProperties.setString(new AMQShortString(ClientProperties.version.toString()), QpidProperties.getReleaseVersion()); + clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo()); + } + catch (Exception e) + { + e.printStackTrace(); + } // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. @@ -141,6 +148,7 @@ public class ConnectionStartMethodHandler implements StateAwareMethodListener new AMQShortString(selectedLocale), // locale new AMQShortString(mechanism), // mechanism saslResponse)); // response + } catch (UnsupportedEncodingException e) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ed1b5ae6f9..9015cabf43 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -24,12 +24,14 @@ import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoServiceConfig; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.qpid.AMQConnectionClosedException; import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQTimeoutException; +import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -131,6 +133,19 @@ public class AMQProtocolHandler extends IoHandlerAdapter session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter); } + + try + { + + ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance(); + threadModel.getAsynchronousReadFilter().createNewJobForSession(session); + threadModel.getAsynchronousWriteFilter().createNewJobForSession(session); + } + catch (RuntimeException e) + { + e.printStackTrace(); + } + _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager()); _protocolSession.init(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index d6364f45b0..5e6244d7cc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -71,7 +71,7 @@ public class SocketTransportConnection implements ITransportConnection boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); if (readWriteThreading) { - cfg.setThreadModel(new ReadWriteThreadModel()); + cfg.setThreadModel(ReadWriteThreadModel.getInstance()); } SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index e9e23aefdb..99a4c5f30d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -70,7 +70,7 @@ public class TransportConnection IoServiceConfig config = _acceptor.getDefaultConfig(); - config.setThreadModel(new ReadWriteThreadModel()); + config.setThreadModel(ReadWriteThreadModel.getInstance()); } public static ITransportConnection getInstance() throws AMQTransportConnectionException diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java index de3d558f7c..f0ac0e6902 100644 --- a/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java +++ b/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java @@ -82,7 +82,7 @@ public class AcceptorTest extends TestCase sc.setSendBufferSize(32768); sc.setReceiveBufferSize(32768); - config.setThreadModel(new ReadWriteThreadModel()); + config.setThreadModel(ReadWriteThreadModel.getInstance()); acceptor.bind(new InetSocketAddress(PORT), new TestHandler()); diff --git a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java index 3eeddd7b4e..5746a32c26 100644 --- a/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java +++ b/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/Main.java @@ -83,7 +83,7 @@ public class Main extends org.apache.qpid.server.Main // implementation provided by MINA if (connectorConfig.enableExecutorPool) { - sconfig.setThreadModel(new ReadWriteThreadModel()); + sconfig.setThreadModel(ReadWriteThreadModel.getInstance()); } String host = InetAddress.getLocalHost().getHostName(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index b29c23c2a2..e7175e7973 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -51,4 +51,9 @@ public class AMQTypedValue AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
+
+ public String toString()
+ {
+ return "["+getType()+": "+getValue()+"]";
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java index f2d1a70cdc..697a0f4249 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java @@ -164,4 +164,14 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData protocolMajor + "." + protocolMinor + " not found in protocol version list."); } } + + public String toString() + { + StringBuffer buffer = new StringBuffer(new String(header)); + buffer.append(Integer.toHexString(protocolClass)); + buffer.append(Integer.toHexString(protocolInstance)); + buffer.append(Integer.toHexString(protocolMajor)); + buffer.append(Integer.toHexString(protocolMinor)); + return buffer.toString(); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java index b9673cd48f..9b3bcfa008 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger; * Holds events for a session that will be processed asynchronously by * the thread pool in PoolingFilter. */ -class Job implements Runnable +public class Job implements Runnable { private final int _maxEvents; private final IoSession _session; private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); private final AtomicBoolean _active = new AtomicBoolean(); - private final AtomicInteger _refCount = new AtomicInteger(); + //private final AtomicInteger _refCount = new AtomicInteger(); private final JobCompletionHandler _completionHandler; Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents) @@ -45,21 +45,21 @@ class Job implements Runnable _completionHandler = completionHandler; _maxEvents = maxEvents; } - - void acquire() - { - _refCount.incrementAndGet(); - } - - void release() - { - _refCount.decrementAndGet(); - } - - boolean isReferenced() - { - return _refCount.get() > 0; - } +// +// void acquire() +// { +// _refCount.incrementAndGet(); +// } +// +// void release() +// { +// _refCount.decrementAndGet(); +// } +// +// boolean isReferenced() +// { +// return _refCount.get() > 0; +// } void add(Event evt) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index c0026c1f36..073f1cdfa3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -48,7 +48,7 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH void fireAsynchEvent(IoSession session, Event event) { Job job = getJobForSession(session); - job.acquire(); //prevents this job being removed from _jobs + // job.acquire(); //prevents this job being removed from _jobs job.add(event); //Additional checks on pool to check that it hasn't shutdown. @@ -60,10 +60,25 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH } + public void createNewJobForSession(IoSession session) + { + Job job = new Job(session, this, _maxEvents); + session.setAttribute(_name, job); + } + private Job getJobForSession(IoSession session) { - Job job = _jobs.get(session); - return job == null ? createJobForSession(session) : job; + return (Job) session.getAttribute(_name); + +/* if(job == null) + { + System.err.println("Error in " + _name); + Thread.dumpStack(); + } + + + job = _jobs.get(session); + return job == null ? createJobForSession(session) : job;*/ } private Job createJobForSession(IoSession session) @@ -81,15 +96,16 @@ public class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionH //Job.JobCompletionHandler public void completed(IoSession session, Job job) { - if (job.isComplete()) - { - job.release(); - if (!job.isReferenced()) - { - _jobs.remove(session); - } - } - else +// if (job.isComplete()) +// { +// job.release(); +// if (!job.isReferenced()) +// { +// _jobs.remove(session); +// } +// } +// else + if(!job.isComplete()) { // ritchiem : 2006-12-13 Do we need to perform the additional checks here? // Can the pool be shutdown at this point? diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java index 84b72bb0dc..c2f7f7ac48 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java @@ -26,12 +26,38 @@ import org.apache.mina.common.ThreadModel; public class ReadWriteThreadModel implements ThreadModel { + + private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel(); + + private final PoolingFilter _asynchronousReadFilter; + private final PoolingFilter _asynchronousWriteFilter; + + private ReadWriteThreadModel() + { + final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); + _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); + _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); + } + + public PoolingFilter getAsynchronousReadFilter() + { + return _asynchronousReadFilter; + } + + public PoolingFilter getAsynchronousWriteFilter() + { + return _asynchronousWriteFilter; + } + public void buildFilterChain(IoFilterChain chain) throws Exception { - ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance(); - PoolingFilter asyncRead = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter"); - PoolingFilter asyncWrite = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter"); - chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(asyncRead)); - chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(asyncWrite)); + + chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter)); + chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter)); + } + + public static ReadWriteThreadModel getInstance() + { + return _instance; } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java index 9a5208662b..6383d52298 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java @@ -36,25 +36,32 @@ public class PoolingFilterTest extends TestCase public void setUp() { + //Create Pool _executorService = ReferenceCountingExecutorService.getInstance(); _executorService.acquireExecutorService(); - _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, + _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService, "AsynchronousWriteFilter"); } public void testRejectedExecution() throws Exception { - _pool.filterWrite(new NoOpFilter(), new TestSession(), new IoFilter.WriteRequest("Message")); + + TestSession testSession = new TestSession(); + _pool.createNewJobForSession(testSession); + _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message")); //Shutdown the pool _executorService.getPool().shutdownNow(); try { + + testSession = new TestSession(); + _pool.createNewJobForSession(testSession); //prior to fix for QPID-172 this would throw RejectedExecutionException - _pool.filterWrite(null, new TestSession(), null); + _pool.filterWrite(null, testSession, null); } catch (RejectedExecutionException rje) { diff --git a/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java b/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java index f10d55e9d0..aafc91b03b 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/session/TestSession.java @@ -24,9 +24,13 @@ import org.apache.mina.common.*; import java.net.SocketAddress; import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; public class TestSession implements IoSession { + private final ConcurrentMap attributes = new ConcurrentHashMap(); + public TestSession() { } @@ -68,42 +72,42 @@ public class TestSession implements IoSession public Object getAttachment() { - return null; //TODO + return getAttribute(""); } public Object setAttachment(Object attachment) { - return null; //TODO + return setAttribute("",attachment); } public Object getAttribute(String key) { - return null; //TODO + return attributes.get(key); } public Object setAttribute(String key, Object value) { - return null; //TODO + return attributes.put(key,value); } public Object setAttribute(String key) { - return null; //TODO + return attributes.put(key, Boolean.TRUE); } public Object removeAttribute(String key) { - return null; //TODO + return attributes.remove(key); } public boolean containsAttribute(String key) { - return false; //TODO + return attributes.containsKey(key); } public Set getAttributeKeys() { - return null; //TODO + return attributes.keySet(); } public TransportType getTransportType() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java index 9d3c588fc8..10f5cd5667 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java @@ -106,6 +106,8 @@ public class TxAckTest extends TestCase // TODO: fix hardcoded protocol version data TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8, (byte)0, + BasicPublishBody.getClazz((byte)8,(byte)0), + BasicPublishBody.getMethod((byte)8,(byte)0), null, false, false, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 84506f4f48..a9d7299bec 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -153,7 +153,10 @@ public class AbstractHeadersExchangeTestBase extends TestCase { // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Establish some way to determine the version for the test. - BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,null,false,false,new AMQShortString(id),0); + BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0, + BasicPublishBody.getClazz((byte)8,(byte)0), + BasicPublishBody.getMethod((byte)8,(byte)0), + null,false,false,new AMQShortString(id),0); return request; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index ba60105824..84dde9dd6f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -169,6 +169,8 @@ public class AMQQueueMBeanTest extends TestCase // TODO: Establish some way to determine the version for the test. BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0, + BasicPublishBody.getClazz((byte)8,(byte)0), + BasicPublishBody.getMethod((byte)8,(byte)0), null, immediate, false, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 2ec4eab74e..ccc7752fd3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -99,6 +99,8 @@ public class AckTest extends TestCase // TODO: Establish some way to determine the version for the test. BasicPublishBody publishBody = new BasicPublishBody((byte)8, (byte)0, + BasicPublishBody.getClazz((byte)8,(byte)0), + BasicPublishBody.getMethod((byte)8,(byte)0), new AMQShortString("someExchange"), false, false, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java index 6c48bb2bf4..cf5baa77bd 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java @@ -63,6 +63,8 @@ class MessageTestHelper extends TestCase // TODO: Establish some way to determine the version for the test. BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0, + BasicPublishBody.getClazz((byte)8,(byte)0), + BasicPublishBody.getMethod((byte)8,(byte)0), null, immediate, false, diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index 0cfa4eddce..0dd7744d1f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.VersionSpecificRegistry; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -168,4 +169,9 @@ public class MockProtocolSession implements AMQProtocolSession { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + public VersionSpecificRegistry getRegistry() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java index bf422742b5..42dd1a4b74 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java @@ -52,7 +52,7 @@ public class SkeletonMessageStore implements MessageStore { } - public void removeMessage(StoreContext s, long messageId) + public void removeMessage(StoreContext s, Long messageId) { } @@ -82,27 +82,27 @@ public class SkeletonMessageStore implements MessageStore return null; } - public long getNewMessageId() + public Long getNewMessageId() { return _messageId.getAndIncrement(); } - public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException + public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException { } - public void storeMessageMetaData(StoreContext sc, long messageId, MessageMetaData messageMetaData) throws AMQException + public void storeMessageMetaData(StoreContext sc, Long messageId, MessageMetaData messageMetaData) throws AMQException { } - public MessageMetaData getMessageMetaData(long messageId) throws AMQException + public MessageMetaData getMessageMetaData(Long messageId) throws AMQException { return null; } - public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException + public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException { return null; } @@ -112,12 +112,12 @@ public class SkeletonMessageStore implements MessageStore } - public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException + public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException { } - public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException + public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException { } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java index e2500d9865..6eacd5168f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java @@ -53,6 +53,8 @@ public class TestReferenceCounting extends TestCase // TODO: fix hardcoded protocol version data AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, (byte)0, + BasicPublishBody.getClazz((byte)8,(byte)0), + BasicPublishBody.getMethod((byte)8,(byte)0), null, false, false, @@ -82,6 +84,8 @@ public class TestReferenceCounting extends TestCase // TODO: fix hardcoded protocol version data AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8, (byte)0, + BasicPublishBody.getClazz((byte)8,(byte)0), + BasicPublishBody.getMethod((byte)8,(byte)0), null, false, false, |