diff options
Diffstat (limited to 'trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r-- | trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 931 |
1 files changed, 0 insertions, 931 deletions
diff --git a/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java deleted file mode 100644 index 2324d441cc..0000000000 --- a/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ /dev/null @@ -1,931 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.client; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.failover.FailoverNoopSupport; -import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.FiledTableSupport; -import org.apache.qpid.client.message.AMQMessageDelegateFactory; -import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.util.Serial; -import org.apache.qpid.transport.ExecutionException; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.RangeSet; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.ExchangeBoundResult; -import org.apache.qpid.transport.Future; -import org.apache.qpid.transport.Range; -import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.qpid.transport.Option.*; - -import javax.jms.*; -import javax.jms.IllegalStateException; - -import java.lang.ref.WeakReference; - -import java.util.Date; -import java.util.HashMap; -import java.util.UUID; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -/** - * This is a 0.10 Session - */ -public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10> - implements SessionListener -{ - - /** - * This class logger - */ - private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class); - - private static Timer timer = new Timer("ack-flusher", true); - private static class Flusher extends TimerTask - { - - private WeakReference<AMQSession_0_10> session; - public Flusher(AMQSession_0_10 session) - { - this.session = new WeakReference<AMQSession_0_10>(session); - } - - public void run() { - AMQSession_0_10 ssn = session.get(); - if (ssn == null) - { - cancel(); - } - else - { - try - { - ssn.flushAcknowledgments(true); - } - catch (Throwable t) - { - _logger.error("error flushing acks", t); - } - } - } - } - - - /** - * The underlying QpidSession - */ - private Session _qpidSession; - - /** - * The latest qpid Exception that has been reaised. - */ - private Object _currentExceptionLock = new Object(); - private SessionException _currentException; - - // a ref on the qpid connection - protected org.apache.qpid.transport.Connection _qpidConnection; - - private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000); - private TimerTask flushTask = null; - private RangeSet unacked = new RangeSet(); - private int unackedCount = 0; - - /** - * USed to store the range of in tx messages - */ - private RangeSet _txRangeSet = new RangeSet(); - private int _txSize = 0; - //--- constructors - - /** - * Creates a new session on a connection. - * - * @param con The connection on which to create the session. - * @param channelId The unique identifier for the session. - * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. - * @param messageFactoryRegistry The message factory factory for the session. - * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session. - * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session. - * @param qpidConnection The qpid connection - */ - AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, - boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, - int defaultPrefetchHighMark, int defaultPrefetchLowMark) - { - - super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, - defaultPrefetchLowMark); - _qpidConnection = qpidConnection; - _qpidSession = _qpidConnection.createSession(1); - _qpidSession.setSessionListener(this); - if (_transacted) - { - _qpidSession.txSelect(); - } - - if (maxAckDelay > 0) - { - flushTask = new Flusher(this); - timer.schedule(flushTask, new Date(), maxAckDelay); - } - } - - /** - * Creates a new session on a connection with the default 0-10 message factory. - * - * @param con The connection on which to create the session. - * @param channelId The unique identifier for the session. - * @param transacted Indicates whether or not the session is transactional. - * @param acknowledgeMode The acknoledgement mode for the session. - * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session. - * @param defaultPrefetchLow The number of prefetched messages at which to resume the session. - * @param qpidConnection The connection - */ - AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, - boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) - { - - this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), - defaultPrefetchHigh, defaultPrefetchLow); - } - - private void addUnacked(int id) - { - synchronized (unacked) - { - unacked.add(id); - unackedCount++; - } - } - - private void clearUnacked() - { - synchronized (unacked) - { - unacked.clear(); - unackedCount = 0; - } - } - - //------- overwritten methods of class AMQSession - - void failoverPrep() - { - super.failoverPrep(); - clearUnacked(); - } - - /** - * Acknowledge one or many messages. - * - * @param deliveryTag The tag of the last message to be acknowledged. - * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the - * delivery tag, <tt>false</tt> to just acknowledge that message. - */ - - public void acknowledgeMessage(long deliveryTag, boolean multiple) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId); - } - // acknowledge this message - if (multiple) - { - for (Long messageTag : _unacknowledgedMessageTags) - { - if( messageTag <= deliveryTag ) - { - addUnacked(messageTag.intValue()); - _unacknowledgedMessageTags.remove(messageTag); - } - } - //empty the list of unack messages - - } - else - { - addUnacked((int) deliveryTag); - _unacknowledgedMessageTags.remove(deliveryTag); - } - - long prefetch = getAMQConnection().getMaxPrefetch(); - - if (unackedCount >= prefetch/2 || maxAckDelay <= 0) - { - flushAcknowledgments(); - } - } - - void flushAcknowledgments() - { - flushAcknowledgments(false); - } - - void flushAcknowledgments(boolean setSyncBit) - { - synchronized (unacked) - { - if (unackedCount > 0) - { - messageAcknowledge - (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit); - clearUnacked(); - } - } - } - - void messageAcknowledge(RangeSet ranges, boolean accept) - { - messageAcknowledge(ranges,accept,false); - } - - void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit) - { - Session ssn = getQpidSession(); - for (Range range : ranges) - { - ssn.processed(range); - } - ssn.flushProcessed(accept ? BATCH : NONE); - if (accept) - { - ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE); - } - } - - /** - * Bind a queue with an exchange. - * - * @param queueName Specifies the name of the queue to bind. If the queue name is empty, - * refers to the current - * queue for the session, which is the last declared queue. - * @param exchangeName The exchange name. - * @param routingKey Specifies the routing key for the binding. - * @param arguments 0_8 specific - */ - public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, - final FieldTable arguments, final AMQShortString exchangeName, - final AMQDestination destination, final boolean nowait) - throws AMQException, FailoverException - { - Map args = FiledTableSupport.convertToMap(arguments); - // this is there only becasue the broker may expect a value for x-match - if( ! args.containsKey("x-match") ) - { - args.put("x-match", "any"); - } - - for (AMQShortString rk: destination.getBindingKeys()) - { - _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString()); - getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args); - } - if (!nowait) - { - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); - } - } - - - /** - * Close this session. - * - * @param timeout no used / 0_8 specific - * @throws AMQException - * @throws FailoverException - */ - public void sendClose(long timeout) throws AMQException, FailoverException - { - if (flushTask != null) - { - flushTask.cancel(); - flushTask = null; - } - flushAcknowledgments(); - getQpidSession().sync(); - getQpidSession().close(); - getCurrentException(); - } - - - /** - * Commit the receipt and the delivery of all messages exchanged by this session resources. - */ - public void sendCommit() throws AMQException, FailoverException - { - getQpidSession().setAutoSync(true); - try - { - getQpidSession().txCommit(); - } - finally - { - getQpidSession().setAutoSync(false); - } - // We need to sync so that we get notify of an error. - getCurrentException(); - } - - /** - * Create a queue with a given name. - * - * @param name The queue name - * @param autoDelete If this field is set and the exclusive field is also set, - * then the queue is deleted when the connection closes. - * @param durable If set when creating a new queue, - * the queue will be marked as durable. - * @param exclusive Exclusive queues can only be used from one connection at a time. - * @param arguments Exclusive queues can only be used from one connection at a time. - * @throws AMQException - * @throws FailoverException - */ - public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, - final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException - { - getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE, - autoDelete ? Option.AUTO_DELETE : Option.NONE, - exclusive ? Option.EXCLUSIVE : Option.NONE); - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); - } - - /** - * This method asks the broker to redeliver all unacknowledged messages - * - * @throws AMQException - * @throws FailoverException - */ - public void sendRecover() throws AMQException, FailoverException - { - // release all unack messages - RangeSet ranges = new RangeSet(); - while (true) - { - Long tag = _unacknowledgedMessageTags.poll(); - if (tag == null) - { - break; - } - ranges.add((int) (long) tag); - } - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); - } - - public void releaseForRollback() - { - getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED); - _txRangeSet.clear(); - _txSize = 0; - } - - /** - * Release (0_8 notion of Reject) an acquired message - * - * @param deliveryTag the message ID - * @param requeue always true - */ - public void rejectMessage(long deliveryTag, boolean requeue) - { - // The value of requeue is always true - RangeSet ranges = new RangeSet(); - ranges.add((int) deliveryTag); - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); - //I don't think we need to sync - } - - /** - * Create an 0_10 message consumer - */ - public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, - final boolean exclusive, String messageSelector, - final FieldTable ft, final boolean noConsume, - final boolean autoClose) throws JMSException - { - - final AMQProtocolHandler protocolHandler = getProtocolHandler(); - return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh, - prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); - } - - /** - * Bind a queue with an exchange. - */ - - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) - throws JMSException - { - return isQueueBound(exchangeName,queueName,routingKey,null); - } - - public boolean isQueueBound(final AMQDestination destination) throws JMSException - { - return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys()); - } - - public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys) - throws JMSException - { - String rk = null; - boolean res; - if (bindingKeys != null && bindingKeys.length>0) - { - rk = bindingKeys[0].toString(); - } - else if (routingKey != null) - { - rk = routingKey.toString(); - } - - ExchangeBoundResult bindingQueryResult = - getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get(); - - if (rk == null) - { - res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound()); - } - else - { - res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult - .getQueueNotMatched()); - } - return res; - } - - /** - * This method is invoked when a consumer is creted - * Registers the consumer with the broker - */ - public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, - boolean nowait, String messageSelector, int tag) - throws AMQException, FailoverException - { - boolean preAcquire; - try - { - preAcquire = ( ! consumer.isNoConsume() && - (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) ) - || !(consumer.getDestination() instanceof AMQQueue); - getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), - getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, - preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null, - consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - } - catch (JMSException e) - { - throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e); - } - - String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); - - if (! prefetch()) - { - getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT); - } - else - { - getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW); - } - getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, - Option.UNRELIABLE); - // We need to sync so that we get notify of an error. - // only if not immediat prefetch - if(prefetch() && (isStarted() || _immediatePrefetch)) - { - // set the flow - getQpidSession().messageFlow(consumerTag, - MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), - Option.UNRELIABLE); - } - - if (!nowait) - { - getQpidSession().sync(); - getCurrentException(); - } - } - - /** - * Create an 0_10 message producer - */ - public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent, - long producerId) - { - return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this, - getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent); - - } - - /** - * creates an exchange if it does not already exist - */ - public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, - final AMQProtocolHandler protocolHandler, final boolean nowait) - throws AMQException, FailoverException - { - getQpidSession().exchangeDeclare(name.toString(), - type.toString(), - null, - null, - name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE); - // We need to sync so that we get notify of an error. - if (!nowait) - { - getQpidSession().sync(); - getCurrentException(); - } - } - - /** - * Declare a queue with the given queueName - */ - public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) - throws AMQException, FailoverException - { - // do nothing this is only used by 0_8 - } - - /** - * Declare a queue with the given queueName - */ - public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) - throws AMQException, FailoverException - { - AMQShortString res; - if (amqd.getAMQQueueName() == null) - { - // generate a name for this queue - res = new AMQShortString("TempQueue" + UUID.randomUUID()); - } - else - { - res = amqd.getAMQQueueName(); - } - Map<String,Object> arguments = null; - if (noLocal) - { - arguments = new HashMap<String,Object>(); - arguments.put("no-local", true); - } - getQpidSession().queueDeclare(res.toString(), null, arguments, - amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, - amqd.isDurable() ? Option.DURABLE : Option.NONE, - !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - // passive --> false - if (!nowait) - { - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); - } - return res; - } - - /** - * deletes a queue - */ - public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException - { - getQpidSession().queueDelete(queueName.toString()); - // ifEmpty --> false - // ifUnused --> false - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); - } - - /** - * Activate/deactivate the message flow for all the consumers of this session. - */ - public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException - { - if (suspend) - { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - } - } - else - { - for (BasicMessageConsumer_0_10 consumer : _consumers.values()) - { - String consumerTag = String.valueOf(consumer.getConsumerTag()); - //only set if msg list is null - try - { - if (! prefetch()) - { - if (consumer.getMessageListener() != null) - { - getQpidSession().messageFlow(consumerTag, - MessageCreditUnit.MESSAGE, 1, - Option.UNRELIABLE); - } - } - else - { - getQpidSession() - .messageFlow(consumerTag, MessageCreditUnit.MESSAGE, - getAMQConnection().getMaxPrefetch(), - Option.UNRELIABLE); - } - getQpidSession() - .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF, - Option.UNRELIABLE); - } - catch (Exception e) - { - throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e); - } - } - } - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); - } - - - public void sendRollback() throws AMQException, FailoverException - { - getQpidSession().txRollback(); - // We need to sync so that we get notify of an error. - getQpidSession().sync(); - getCurrentException(); - } - - //------ Private methods - /** - * Access to the underlying Qpid Session - * - * @return The associated Qpid Session. - */ - protected Session getQpidSession() - { - return _qpidSession; - } - - - /** - * Get the latest thrown exception. - * - * @throws org.apache.qpid.AMQException get the latest thrown error. - */ - public void getCurrentException() throws AMQException - { - synchronized (_currentExceptionLock) - { - if (_currentException != null) - { - SessionException se = _currentException; - _currentException = null; - ExecutionException ee = se.getException(); - int code; - if (ee == null) - { - code = 0; - } - else - { - code = ee.getErrorCode().getValue(); - } - throw new AMQException - (AMQConstant.getConstant(code), se.getMessage(), se); - } - } - } - - public void opened(Session ssn) {} - - public void resumed(Session ssn) - { - _qpidConnection = ssn.getConnection(); - try - { - resubscribe(); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void message(Session ssn, MessageTransfer xfr) - { - messageReceived(new UnprocessedMessage_0_10(xfr)); - } - - public void exception(Session ssn, SessionException exc) - { - synchronized (_currentExceptionLock) - { - _currentException = exc; - } - } - - public void closed(Session ssn) {} - - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) - throws AMQException - { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ - return new FailoverNoopSupport<AMQShortString, AMQException>( - new FailoverProtectedOperation<AMQShortString, AMQException>() - { - public AMQShortString execute() throws AMQException, FailoverException - { - // Generate the queue name if the destination indicates that a client generated name is to be used. - if (amqd.isNameRequired()) - { - String binddingKey = ""; - for(AMQShortString key : amqd.getBindingKeys()) - { - binddingKey = binddingKey + "_" + key.toString(); - } - amqd.setQueueName(new AMQShortString( binddingKey + "@" - + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); - } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); - } - }, _connection).execute(); - } - - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - - checkNotClosed(); - AMQTopic origTopic=checkValidTopic(topic, true); - AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection); - - TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name); - if (subscriber != null) - { - if (subscriber.getTopic().equals(topic)) - { - throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange " - + name); - } - else - { - unsubscribe(name); - } - } - else - { - AMQShortString topicName; - if (topic instanceof AMQTopic) - { - topicName=((AMQTopic) topic).getBindingKeys()[0]; - } - else - { - topicName=new AMQShortString(topic.getTopicName()); - } - - if (_strictAMQP) - { - if (_strictAMQPFATAL) - { - throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP."); - } - else - { - _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' " - + "for creation durableSubscriber. Requesting queue deletion regardless."); - } - - deleteQueue(dest.getAMQQueueName()); - } - else - { - // if the queue is bound to the exchange but NOT for this topic, then the JMS spec - // says we must trash the subscription. - if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) - && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName)) - { - deleteQueue(dest.getAMQQueueName()); - } - } - } - - subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest)); - - _subscriptions.put(name, subscriber); - _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name); - - return subscriber; - } - - protected Long requestQueueDepth(AMQDestination amqd) - { - return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); - } - - - /** - * Store non committed messages for this session - * With 0.10 messages are consumed with window mode, we must send a completion - * before the window size is reached so credits don't dry up. - * @param id - */ - @Override protected void addDeliveredMessage(long id) - { - _txRangeSet.add((int) id); - _txSize++; - // this is a heuristic, we may want to have that configurable - if (_connection.getMaxPrefetch() == 1 || - _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0) - { - // send completed so consumer credits don't dry up - messageAcknowledge(_txRangeSet, false); - } - } - - @Override public void commit() throws JMSException - { - checkTransacted(); - try - { - if( _txSize > 0 ) - { - messageAcknowledge(_txRangeSet, true); - _txRangeSet.clear(); - _txSize = 0; - } - sendCommit(); - } - catch (AMQException e) - { - throw new JMSAMQException("Failed to commit: " + e.getMessage(), e); - } - catch (FailoverException e) - { - throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e); - } - } - - protected final boolean tagLE(long tag1, long tag2) - { - return Serial.le((int) tag1, (int) tag2); - } - - protected final boolean updateRollbackMark(long currentMark, long deliveryTag) - { - return Serial.lt((int) currentMark, (int) deliveryTag); - } - - public AMQMessageDelegateFactory getMessageDelegateFactory() - { - return AMQMessageDelegateFactory.FACTORY_0_10; - } - -} |