diff options
Diffstat (limited to 'trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 931 |
1 files changed, 931 insertions, 0 deletions
diff --git a/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java new file mode 100644 index 0000000000..2324d441cc --- /dev/null +++ b/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -0,0 +1,931 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.failover.FailoverNoopSupport; +import org.apache.qpid.client.failover.FailoverProtectedOperation; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.FiledTableSupport; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; +import org.apache.qpid.util.Serial; +import org.apache.qpid.transport.ExecutionException; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.ExchangeBoundResult; +import org.apache.qpid.transport.Future; +import org.apache.qpid.transport.Range; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.SessionListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.qpid.transport.Option.*; + +import javax.jms.*; +import javax.jms.IllegalStateException; + +import java.lang.ref.WeakReference; + +import java.util.Date; +import java.util.HashMap; +import java.util.UUID; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +/** + * This is a 0.10 Session + */ +public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10> + implements SessionListener +{ + + /** + * This class logger + */ + private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); + + private static Timer timer = new Timer("ack-flusher", true); + private static class Flusher extends TimerTask + { + + private WeakReference<AMQSession_0_10> session; + public Flusher(AMQSession_0_10 session) + { + this.session = new WeakReference<AMQSession_0_10>(session); + } + + public void run() { + AMQSession_0_10 ssn = session.get(); + if (ssn == null) + { + cancel(); + } + else + { + try + { + ssn.flushAcknowledgments(true); + } + catch (Throwable t) + { + _logger.error("error flushing acks", t); + } + } + } + } + + + /** + * The underlying QpidSession + */ + private Session _qpidSession; + + /** + * The latest qpid Exception that has been reaised. + */ + private Object _currentExceptionLock = new Object(); + private SessionException _currentException; + + // a ref on the qpid connection + protected org.apache.qpid.transport.Connection _qpidConnection; + + private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); + private TimerTask flushTask = null; + private RangeSet unacked = new RangeSet(); + private int unackedCount = 0; + + /** + * USed to store the range of in tx messages + */ + private RangeSet _txRangeSet = new RangeSet(); + private int _txSize = 0; + //--- constructors + + /** + * Creates a new session on a connection. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param messageFactoryRegistry The message factory factory for the session. + * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. + * @param qpidConnection The qpid connection + */ + AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, + boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, + int defaultPrefetchHighMark, int defaultPrefetchLowMark) + { + + super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, + defaultPrefetchLowMark); + _qpidConnection = qpidConnection; + _qpidSession = _qpidConnection.createSession(1); + _qpidSession.setSessionListener(this); + if (_transacted) + { + _qpidSession.txSelect(); + } + + if (maxAckDelay > 0) + { + flushTask = new Flusher(this); + timer.schedule(flushTask, new Date(), maxAckDelay); + } + } + + /** + * Creates a new session on a connection with the default 0-10 message factory. + * + * @param con The connection on which to create the session. + * @param channelId The unique identifier for the session. + * @param transacted Indicates whether or not the session is transactional. + * @param acknowledgeMode The acknoledgement mode for the session. + * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. + * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. + * @param qpidConnection The connection + */ + AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, + boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) + { + + this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), + defaultPrefetchHigh, defaultPrefetchLow); + } + + private void addUnacked(int id) + { + synchronized (unacked) + { + unacked.add(id); + unackedCount++; + } + } + + private void clearUnacked() + { + synchronized (unacked) + { + unacked.clear(); + unackedCount = 0; + } + } + + //------- overwritten methods of class AMQSession + + void failoverPrep() + { + super.failoverPrep(); + clearUnacked(); + } + + /** + * Acknowledge one or many messages. + * + * @param deliveryTag The tag of the last message to be acknowledged. + * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the + * delivery tag, <tt>false</tt> to just acknowledge that message. + */ + + public void acknowledgeMessage(long deliveryTag, boolean multiple) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId); + } + // acknowledge this message + if (multiple) + { + for (Long messageTag : _unacknowledgedMessageTags) + { + if( messageTag <= deliveryTag ) + { + addUnacked(messageTag.intValue()); + _unacknowledgedMessageTags.remove(messageTag); + } + } + //empty the list of unack messages + + } + else + { + addUnacked((int) deliveryTag); + _unacknowledgedMessageTags.remove(deliveryTag); + } + + long prefetch = getAMQConnection().getMaxPrefetch(); + + if (unackedCount >= prefetch/2 || maxAckDelay <= 0) + { + flushAcknowledgments(); + } + } + + void flushAcknowledgments() + { + flushAcknowledgments(false); + } + + void flushAcknowledgments(boolean setSyncBit) + { + synchronized (unacked) + { + if (unackedCount > 0) + { + messageAcknowledge + (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); + clearUnacked(); + } + } + } + + void messageAcknowledge(RangeSet ranges, boolean accept) + { + messageAcknowledge(ranges,accept,false); + } + + void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) + { + Session ssn = getQpidSession(); + for (Range range : ranges) + { + ssn.processed(range); + } + ssn.flushProcessed(accept ? BATCH : NONE); + if (accept) + { + ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); + } + } + + /** + * Bind a queue with an exchange. + * + * @param queueName Specifies the name of the queue to bind. If the queue name is empty, + * refers to the current + * queue for the session, which is the last declared queue. + * @param exchangeName The exchange name. + * @param routingKey Specifies the routing key for the binding. + * @param arguments 0_8 specific + */ + public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, + final FieldTable arguments, final AMQShortString exchangeName, + final AMQDestination destination, final boolean nowait) + throws AMQException, FailoverException + { + Map args = FiledTableSupport.convertToMap(arguments); + // this is there only becasue the broker may expect a value for x-match + if( ! args.containsKey("x-match") ) + { + args.put("x-match", "any"); + } + + for (AMQShortString rk: destination.getBindingKeys()) + { + _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); + getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); + } + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + } + + + /** + * Close this session. + * + * @param timeout no used / 0_8 specific + * @throws AMQException + * @throws FailoverException + */ + public void sendClose(long timeout) throws AMQException, FailoverException + { + if (flushTask != null) + { + flushTask.cancel(); + flushTask = null; + } + flushAcknowledgments(); + getQpidSession().sync(); + getQpidSession().close(); + getCurrentException(); + } + + + /** + * Commit the receipt and the delivery of all messages exchanged by this session resources. + */ + public void sendCommit() throws AMQException, FailoverException + { + getQpidSession().setAutoSync(true); + try + { + getQpidSession().txCommit(); + } + finally + { + getQpidSession().setAutoSync(false); + } + // We need to sync so that we get notify of an error. + getCurrentException(); + } + + /** + * Create a queue with a given name. + * + * @param name The queue name + * @param autoDelete If this field is set and the exclusive field is also set, + * then the queue is deleted when the connection closes. + * @param durable If set when creating a new queue, + * the queue will be marked as durable. + * @param exclusive Exclusive queues can only be used from one connection at a time. + * @param arguments Exclusive queues can only be used from one connection at a time. + * @throws AMQException + * @throws FailoverException + */ + public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, + final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException + { + getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE, + autoDelete ? Option.AUTO_DELETE : Option.NONE, + exclusive ? Option.EXCLUSIVE : Option.NONE); + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + + /** + * This method asks the broker to redeliver all unacknowledged messages + * + * @throws AMQException + * @throws FailoverException + */ + public void sendRecover() throws AMQException, FailoverException + { + // release all unack messages + RangeSet ranges = new RangeSet(); + while (true) + { + Long tag = _unacknowledgedMessageTags.poll(); + if (tag == null) + { + break; + } + ranges.add((int) (long) tag); + } + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + + public void releaseForRollback() + { + getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); + _txRangeSet.clear(); + _txSize = 0; + } + + /** + * Release (0_8 notion of Reject) an acquired message + * + * @param deliveryTag the message ID + * @param requeue always true + */ + public void rejectMessage(long deliveryTag, boolean requeue) + { + // The value of requeue is always true + RangeSet ranges = new RangeSet(); + ranges.add((int) deliveryTag); + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + //I don't think we need to sync + } + + /** + * Create an 0_10 message consumer + */ + public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, + final int prefetchLow, final boolean noLocal, + final boolean exclusive, String messageSelector, + final FieldTable ft, final boolean noConsume, + final boolean autoClose) throws JMSException + { + + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal, + _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh, + prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); + } + + /** + * Bind a queue with an exchange. + */ + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) + throws JMSException + { + return isQueueBound(exchangeName,queueName,routingKey,null); + } + + public boolean isQueueBound(final AMQDestination destination) throws JMSException + { + return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); + } + + public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) + throws JMSException + { + String rk = null; + boolean res; + if (bindingKeys != null && bindingKeys.length>0) + { + rk = bindingKeys[0].toString(); + } + else if (routingKey != null) + { + rk = routingKey.toString(); + } + + ExchangeBoundResult bindingQueryResult = + getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get(); + + if (rk == null) + { + res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); + } + else + { + res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult + .getQueueNotMatched()); + } + return res; + } + + /** + * This method is invoked when a consumer is creted + * Registers the consumer with the broker + */ + public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, + boolean nowait, String messageSelector, int tag) + throws AMQException, FailoverException + { + boolean preAcquire; + try + { + preAcquire = ( ! consumer.isNoConsume() && + (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) + || !(consumer.getDestination() instanceof AMQQueue); + getQpidSession().messageSubscribe + (queueName.toString(), String.valueOf(tag), + getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, + preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null, + consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + } + catch (JMSException e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); + } + + String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + + if (! prefetch()) + { + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); + } + else + { + getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); + } + getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); + // We need to sync so that we get notify of an error. + // only if not immediat prefetch + if(prefetch() && (isStarted() || _immediatePrefetch)) + { + // set the flow + getQpidSession().messageFlow(consumerTag, + MessageCreditUnit.MESSAGE, + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); + } + } + + /** + * Create an 0_10 message producer + */ + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, + final boolean immediate, final boolean waitUntilSent, + long producerId) + { + return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this, + getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); + + } + + /** + * creates an exchange if it does not already exist + */ + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, + final AMQProtocolHandler protocolHandler, final boolean nowait) + throws AMQException, FailoverException + { + getQpidSession().exchangeDeclare(name.toString(), + type.toString(), + null, + null, + name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE); + // We need to sync so that we get notify of an error. + if (!nowait) + { + getQpidSession().sync(); + getCurrentException(); + } + } + + /** + * Declare a queue with the given queueName + */ + public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean nowait) + throws AMQException, FailoverException + { + // do nothing this is only used by 0_8 + } + + /** + * Declare a queue with the given queueName + */ + public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal, final boolean nowait) + throws AMQException, FailoverException + { + AMQShortString res; + if (amqd.getAMQQueueName() == null) + { + // generate a name for this queue + res = new AMQShortString("TempQueue" + UUID.randomUUID()); + } + else + { + res = amqd.getAMQQueueName(); + } + Map<String,Object> arguments = null; + if (noLocal) + { + arguments = new HashMap<String,Object>(); + arguments.put("no-local", true); + } + getQpidSession().queueDeclare(res.toString(), null, arguments, + amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, + amqd.isDurable() ? Option.DURABLE : Option.NONE, + !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + // passive --> false + if (!nowait) + { + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + return res; + } + + /** + * deletes a queue + */ + public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException + { + getQpidSession().queueDelete(queueName.toString()); + // ifEmpty --> false + // ifUnused --> false + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + + /** + * Activate/deactivate the message flow for all the consumers of this session. + */ + public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException + { + if (suspend) + { + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + } + } + else + { + for (BasicMessageConsumer_0_10 consumer : _consumers.values()) + { + String consumerTag = String.valueOf(consumer.getConsumerTag()); + //only set if msg list is null + try + { + if (! prefetch()) + { + if (consumer.getMessageListener() != null) + { + getQpidSession().messageFlow(consumerTag, + MessageCreditUnit.MESSAGE, 1, + Option.UNRELIABLE); + } + } + else + { + getQpidSession() + .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, + getAMQConnection().getMaxPrefetch(), + Option.UNRELIABLE); + } + getQpidSession() + .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, + Option.UNRELIABLE); + } + catch (Exception e) + { + throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e); + } + } + } + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + + + public void sendRollback() throws AMQException, FailoverException + { + getQpidSession().txRollback(); + // We need to sync so that we get notify of an error. + getQpidSession().sync(); + getCurrentException(); + } + + //------ Private methods + /** + * Access to the underlying Qpid Session + * + * @return The associated Qpid Session. + */ + protected Session getQpidSession() + { + return _qpidSession; + } + + + /** + * Get the latest thrown exception. + * + * @throws org.apache.qpid.AMQException get the latest thrown error. + */ + public void getCurrentException() throws AMQException + { + synchronized (_currentExceptionLock) + { + if (_currentException != null) + { + SessionException se = _currentException; + _currentException = null; + ExecutionException ee = se.getException(); + int code; + if (ee == null) + { + code = 0; + } + else + { + code = ee.getErrorCode().getValue(); + } + throw new AMQException + (AMQConstant.getConstant(code), se.getMessage(), se); + } + } + } + + public void opened(Session ssn) {} + + public void resumed(Session ssn) + { + _qpidConnection = ssn.getConnection(); + try + { + resubscribe(); + } + catch (AMQException e) + { + throw new RuntimeException(e); + } + } + + public void message(Session ssn, MessageTransfer xfr) + { + messageReceived(new UnprocessedMessage_0_10(xfr)); + } + + public void exception(Session ssn, SessionException exc) + { + synchronized (_currentExceptionLock) + { + _currentException = exc; + } + } + + public void closed(Session ssn) {} + + protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + final boolean noLocal, final boolean nowait) + throws AMQException + { + /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + return new FailoverNoopSupport<AMQShortString, AMQException>( + new FailoverProtectedOperation<AMQShortString, AMQException>() + { + public AMQShortString execute() throws AMQException, FailoverException + { + // Generate the queue name if the destination indicates that a client generated name is to be used. + if (amqd.isNameRequired()) + { + String binddingKey = ""; + for(AMQShortString key : amqd.getBindingKeys()) + { + binddingKey = binddingKey + "_" + key.toString(); + } + amqd.setQueueName(new AMQShortString( binddingKey + "@" + + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); + } + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); + } + }, _connection).execute(); + } + + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException + { + + checkNotClosed(); + AMQTopic origTopic=checkValidTopic(topic, true); + AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); + + TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name); + if (subscriber != null) + { + if (subscriber.getTopic().equals(topic)) + { + throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " + + name); + } + else + { + unsubscribe(name); + } + } + else + { + AMQShortString topicName; + if (topic instanceof AMQTopic) + { + topicName=((AMQTopic) topic).getBindingKeys()[0]; + } + else + { + topicName=new AMQShortString(topic.getTopicName()); + } + + if (_strictAMQP) + { + if (_strictAMQPFATAL) + { + throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); + } + else + { + _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " + + "for creation durableSubscriber. Requesting queue deletion regardless."); + } + + deleteQueue(dest.getAMQQueueName()); + } + else + { + // if the queue is bound to the exchange but NOT for this topic, then the JMS spec + // says we must trash the subscription. + if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) + && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) + { + deleteQueue(dest.getAMQQueueName()); + } + } + } + + subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); + + _subscriptions.put(name, subscriber); + _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); + + return subscriber; + } + + protected Long requestQueueDepth(AMQDestination amqd) + { + return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); + } + + + /** + * Store non committed messages for this session + * With 0.10 messages are consumed with window mode, we must send a completion + * before the window size is reached so credits don't dry up. + * @param id + */ + @Override protected void addDeliveredMessage(long id) + { + _txRangeSet.add((int) id); + _txSize++; + // this is a heuristic, we may want to have that configurable + if (_connection.getMaxPrefetch() == 1 || + _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) + { + // send completed so consumer credits don't dry up + messageAcknowledge(_txRangeSet, false); + } + } + + @Override public void commit() throws JMSException + { + checkTransacted(); + try + { + if( _txSize > 0 ) + { + messageAcknowledge(_txRangeSet, true); + _txRangeSet.clear(); + _txSize = 0; + } + sendCommit(); + } + catch (AMQException e) + { + throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); + } + catch (FailoverException e) + { + throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); + } + } + + protected final boolean tagLE(long tag1, long tag2) + { + return Serial.le((int) tag1, (int) tag2); + } + + protected final boolean updateRollbackMark(long currentMark, long deliveryTag) + { + return Serial.lt((int) currentMark, (int) deliveryTag); + } + + public AMQMessageDelegateFactory getMessageDelegateFactory() + { + return AMQMessageDelegateFactory.FACTORY_0_10; + } + +} |