diff options
6 files changed, 94 insertions, 12 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java index 05071bd2a8..c59eba60b8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueSessionAdaptor.java @@ -28,7 +28,7 @@ import java.io.Serializable; * Need this adaptor class to conform to JMS spec and throw IllegalStateException * from createDurableSubscriber, unsubscribe, createTopic & createTemporaryTopic */ -public class AMQQueueSessionAdaptor implements QueueSession +public class AMQQueueSessionAdaptor implements QueueSession, AMQSessionAdapter { //holds a session for delegation protected final AMQSession _session; @@ -176,4 +176,8 @@ public class AMQQueueSessionAdaptor implements QueueSession throw new IllegalStateException("Cannot call unsubscribe from QueueSession"); } + public AMQSession getSession() + { + return _session; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 35530b39c9..f33c507d51 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -28,6 +28,7 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.client.failover.FailoverSupport; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.protocol.BlockingMethodFrameListener; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.exchange.ExchangeDefaults; @@ -64,6 +65,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _channelId; + private int _ticket; + private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; @@ -145,6 +148,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private boolean _hasMessageListeners; + /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -1114,7 +1118,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // internal false, // nowait false, // passive - 0, // ticket + getTicket(), // ticket type); // type getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); } @@ -1136,7 +1140,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // internal true, // nowait false, // passive - 0, // ticket + getTicket(), // ticket type); // type protocolHandler.writeFrame(exchangeDeclare); } @@ -1169,7 +1173,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi true, // nowait false, // passive amqd.getAMQQueueName(), // queue - 0); // ticket + getTicket()); // ticket protocolHandler.writeFrame(queueDeclare); return amqd.getAMQQueueName(); @@ -1185,7 +1189,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi true, // nowait queueName, // queue amqd.getRoutingKey(), // routingKey - 0); // ticket + getTicket()); // ticket protocolHandler.writeFrame(queueBind); } @@ -1233,7 +1237,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.isNoLocal(), // noLocal nowait, // nowait queueName, // queue - 0); // ticket + getTicket()); // ticket if (nowait) { protocolHandler.writeFrame(jmsConsume); @@ -1426,7 +1430,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi false, // ifUnused true, // nowait queueName, // queue - 0); // ticket + getTicket()); // ticket getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class); } catch (AMQException e) @@ -1824,4 +1828,48 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + + + public int getTicket() + { + return _ticket; + } + + public void setTicket(int ticket) + { + _ticket = ticket; + } + + + public void requestAccess(AMQShortString realm, boolean exclusive, boolean passive, boolean active, boolean write, boolean read) throws AMQException + { + getProtocolHandler().writeCommandFrameAndWaitForReply(AccessRequestBody.createAMQFrame(getChannelId(), + getProtocolMajorVersion(), + getProtocolMinorVersion(), + active, + exclusive, + passive, + read, + realm, + write), + new BlockingMethodFrameListener(_channelId) + { + + public boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException + { + if(frame instanceof AccessRequestOkBody) + { + setTicket(((AccessRequestOkBody)frame).getTicket()); + return true; + } + else + { + return false; + } + } + }); + + } + + } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java new file mode 100644 index 0000000000..93f10761e2 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSessionAdapter.java @@ -0,0 +1,26 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+public interface AMQSessionAdapter
+{
+ public AMQSession getSession();
+}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java index 0f50c330fb..34ef23747c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java @@ -24,7 +24,7 @@ import javax.jms.*; import javax.jms.IllegalStateException; import java.io.Serializable; -public class AMQTopicSessionAdaptor implements TopicSession +public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter { protected final AMQSession _session; @@ -199,4 +199,8 @@ public class AMQTopicSessionAdaptor implements TopicSession throw new IllegalStateException("Cannot call createTemporaryQueue from TopicSession"); } + public AMQSession getSession() + { + return _session; + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index f4fda40be5..fc3450c385 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -144,7 +144,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j false, // internal true, // nowait false, // passive - 0, // ticket + _session.getTicket(), // ticket destination.getExchangeClass()); // type _protocolHandler.writeFrame(declare); } @@ -465,7 +465,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j immediate, // immediate mandatory, // mandatory destination.getRoutingKey(), // routingKey - 0); // ticket + _session.getTicket()); // ticket diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 9015cabf43..1f83ce67c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -442,7 +442,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) throws AMQException { @@ -457,7 +457,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter * @param frame * @param listener the blocking listener. Note the calling thread will block. */ - private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, + public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, long timeout) throws AMQException { |