summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-11 15:22:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-11 15:22:03 +0000
commit531f130c20f73c08ffc963c1e15c7b98cea05d8f (patch)
treed30c93bde8eb6515c076568102a008702c98567a /java/client
parent1a7004fd9dab8306ec31b96d9d5e2d5a44256d98 (diff)
downloadqpid-python-531f130c20f73c08ffc963c1e15c7b98cea05d8f.tar.gz
Updates on the refactoring work
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@655323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java31
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java10
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java7
4 files changed, 43 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 4b8143cfb5..60f57aaf0e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -730,6 +730,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
}
+
+
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException, FailoverException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index bba39403a5..82970e69e1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1110,11 +1110,18 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
final boolean exclusive) throws AMQException
{
+ createQueue(name, autoDelete, durable, exclusive, null);
+ }
+
+ public void createQueue(final AMQShortString name, final boolean autoDelete, final boolean durable,
+ final boolean exclusive, final FieldTable arguments) throws AMQException
+ {
+
new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+ QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,arguments);
AMQFrame queueDeclare = body.generateFrame(_channelId);
@@ -2357,6 +2364,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
+
+ public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+ {
+ new FailoverRetrySupport<Object, AMQException>(
+ new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+
+ return null;
+ }
+ }, _connection).execute();
+ }
+
+
+
/**
* Declares the named exchange and type of exchange.
*
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 7ae2ddf66c..5907bd90af 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -174,7 +174,7 @@ public class TransportConnection
return -1;
}
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
+ private synchronized static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate)
throws AMQVMBrokerCreationException
{
int port = details.getPort();
@@ -197,7 +197,7 @@ public class TransportConnection
return new VmPipeTransportConnection(port);
}
- public static void createVMBroker(int port) throws AMQVMBrokerCreationException
+ public static synchronized void createVMBroker(int port) throws AMQVMBrokerCreationException
{
if (_acceptor == null)
{
@@ -275,7 +275,7 @@ public class TransportConnection
}
}
- private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
+ private static synchronized IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
{
String protocolProviderClass = System.getProperty("amqj.protocolprovider.class", DEFAULT_QPID_SERVER);
_logger.info("Creating Qpid protocol provider: " + protocolProviderClass);
@@ -314,7 +314,7 @@ public class TransportConnection
return provider;
}
- public static void killAllVMBrokers()
+ public synchronized static void killAllVMBrokers()
{
_logger.info("Killing all VM Brokers");
if (_acceptor != null)
@@ -330,7 +330,7 @@ public class TransportConnection
_currentVMPort = -1;
}
- public static void killVMBroker(int port)
+ public synchronized static void killVMBroker(int port)
{
synchronized (_inVmPipeAddress)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index 39730ef3ac..241af2c861 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -24,6 +24,7 @@ import javax.jms.*;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Message;
+import javax.jms.MessageProducer;
import junit.framework.TestCase;
@@ -32,12 +33,16 @@ import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQUndefinedDestination;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.*;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
/** @author Apache Software Foundation */
@@ -144,7 +149,7 @@ public class TopicSessionTest extends TestCase
while(true)
{
publisher.publish(session1.createTextMessage("hello"));
- Thread.sleep(THREADS);
+ Thread.sleep(20);
}
}
catch(Exception e)