diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:22:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-11 15:22:03 +0000 |
commit | 531f130c20f73c08ffc963c1e15c7b98cea05d8f (patch) | |
tree | d30c93bde8eb6515c076568102a008702c98567a /java/client | |
parent | 1a7004fd9dab8306ec31b96d9d5e2d5a44256d98 (diff) | |
download | qpid-python-531f130c20f73c08ffc963c1e15c7b98cea05d8f.tar.gz |
Updates on the refactoring work
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
4 files changed, 43 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4b8143cfb5..60f57aaf0e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -730,6 +730,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } + + private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException, FailoverException { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index bba39403a5..82970e69e1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1110,11 +1110,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException { + createQueue(name, autoDelete, durable, exclusive, null); + } + + public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive, final FieldTable arguments) throws AMQException + { + new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() { public Object execute() throws AMQException, FailoverException { - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null); + QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,arguments); AMQFrame queueDeclare = body.generateFrame(_channelId); @@ -2357,6 +2364,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi + + public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException + { + new FailoverRetrySupport<Object, AMQException>( + new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false); + + // todo send low water mark when protocol allows. + // todo Be aware of possible changes to parameter order as versions change. + getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + + return null; + } + }, _connection).execute(); + } + + + /** * Declares the named exchange and type of exchange. * diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 7ae2ddf66c..5907bd90af 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -174,7 +174,7 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) + private synchronized static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException { int port = details.getPort(); @@ -197,7 +197,7 @@ public class TransportConnection return new VmPipeTransportConnection(port); } - public static void createVMBroker(int port) throws AMQVMBrokerCreationException + public static synchronized void createVMBroker(int port) throws AMQVMBrokerCreationException { if (_acceptor == null) { @@ -275,7 +275,7 @@ public class TransportConnection } } - private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException + private static synchronized IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException { String protocolProviderClass = System.getProperty("amqj.protocolprovider.class", DEFAULT_QPID_SERVER); _logger.info("Creating Qpid protocol provider: " + protocolProviderClass); @@ -314,7 +314,7 @@ public class TransportConnection return provider; } - public static void killAllVMBrokers() + public synchronized static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); if (_acceptor != null) @@ -330,7 +330,7 @@ public class TransportConnection _currentVMPort = -1; } - public static void killVMBroker(int port) + public synchronized static void killVMBroker(int port) { synchronized (_inVmPipeAddress) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java index 39730ef3ac..241af2c861 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java @@ -24,6 +24,7 @@ import javax.jms.*; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Message; +import javax.jms.MessageProducer; import junit.framework.TestCase; @@ -32,12 +33,16 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.BasicMessageProducer; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQUndefinedDestination; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.*; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; /** @author Apache Software Foundation */ @@ -144,7 +149,7 @@ public class TopicSessionTest extends TestCase while(true) { publisher.publish(session1.createTextMessage("hello")); - Thread.sleep(THREADS); + Thread.sleep(20); } } catch(Exception e) |