summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java60
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java26
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java4
6 files changed, 94 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
index 05071bd2a8..c59eba60b8 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
* Need this adaptor class to conform to JMS spec and throw IllegalStateException
* from createDurableSubscriber, unsubscribe, createTopic & createTemporaryTopic
*/
-public class AMQQueueSessionAdaptor implements QueueSession
+public class AMQQueueSessionAdaptor implements QueueSession, AMQSessionAdapter
{
//holds a session for delegation
protected final AMQSession _session;
@@ -176,4 +176,8 @@ public class AMQQueueSessionAdaptor implements QueueSession
throw new IllegalStateException("Cannot call unsubscribe from QueueSession");
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 35530b39c9..f33c507d51 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -28,6 +28,7 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -64,6 +65,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _channelId;
+ private int _ticket;
+
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
@@ -145,6 +148,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private boolean _hasMessageListeners;
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -1114,7 +1118,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, // internal
false, // nowait
false, // passive
- 0, // ticket
+ getTicket(), // ticket
type); // type
getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
}
@@ -1136,7 +1140,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, // internal
true, // nowait
false, // passive
- 0, // ticket
+ getTicket(), // ticket
type); // type
protocolHandler.writeFrame(exchangeDeclare);
}
@@ -1169,7 +1173,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
true, // nowait
false, // passive
amqd.getAMQQueueName(), // queue
- 0); // ticket
+ getTicket()); // ticket
protocolHandler.writeFrame(queueDeclare);
return amqd.getAMQQueueName();
@@ -1185,7 +1189,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
true, // nowait
queueName, // queue
amqd.getRoutingKey(), // routingKey
- 0); // ticket
+ getTicket()); // ticket
protocolHandler.writeFrame(queueBind);
}
@@ -1233,7 +1237,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
consumer.isNoLocal(), // noLocal
nowait, // nowait
queueName, // queue
- 0); // ticket
+ getTicket()); // ticket
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1426,7 +1430,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
false, // ifUnused
true, // nowait
queueName, // queue
- 0); // ticket
+ getTicket()); // ticket
getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
catch (AMQException e)
@@ -1824,4 +1828,48 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
+
+
+ public int getTicket()
+ {
+ return _ticket;
+ }
+
+ public void setTicket(int ticket)
+ {
+ _ticket = ticket;
+ }
+
+
+ public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException
+ {
+ getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(),
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ active,
+ exclusive,
+ passive,
+ read,
+ realm,
+ write),
+ new BlockingMethodFrameListener(_channelId)
+ {
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException
+ {
+ if(frame instanceof AccessRequestOkBody)
+ {
+ setTicket(((AccessRequestOkBody)frame).getTicket());
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ });
+
+ }
+
+
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
new file mode 100644
index 0000000000..93f10761e2
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+public interface AMQSessionAdapter
+{
+ public AMQSession getSession();
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
index 0f50c330fb..34ef23747c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
@@ -24,7 +24,7 @@ import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
-public class AMQTopicSessionAdaptor implements TopicSession
+public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter
{
protected final AMQSession _session;
@@ -199,4 +199,8 @@ public class AMQTopicSessionAdaptor implements TopicSession
throw new IllegalStateException("Cannot call createTemporaryQueue from TopicSession");
}
+ public AMQSession getSession()
+ {
+ return _session;
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index f4fda40be5..fc3450c385 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -144,7 +144,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
false, // internal
true, // nowait
false, // passive
- 0, // ticket
+ _session.getTicket(), // ticket
destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -465,7 +465,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
immediate, // immediate
mandatory, // mandatory
destination.getRoutingKey(), // routingKey
- 0); // ticket
+ _session.getTicket()); // ticket
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 9015cabf43..1f83ce67c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -442,7 +442,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
BlockingMethodFrameListener listener)
throws AMQException
{
@@ -457,7 +457,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* @param frame
* @param listener the blocking listener. Note the calling thread will block.
*/
- private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+ public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
BlockingMethodFrameListener listener, long timeout)
throws AMQException
{