summaryrefslogtreecommitdiff
path: root/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java')
-rw-r--r--trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java931
1 files changed, 0 insertions, 931 deletions
diff --git a/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
deleted file mode 100644
index 2324d441cc..0000000000
--- a/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ /dev/null
@@ -1,931 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.client;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverNoopSupport;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.util.Serial;
-import org.apache.qpid.transport.ExecutionException;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.ExchangeBoundResult;
-import org.apache.qpid.transport.Future;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.qpid.transport.Option.*;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-
-import java.lang.ref.WeakReference;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * This is a 0.10 Session
- */
-public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
- implements SessionListener
-{
-
- /**
- * This class logger
- */
- private static final Logger _logger = LoggerFactory.getLogger(AMQSession_0_10.class);
-
- private static Timer timer = new Timer("ack-flusher", true);
- private static class Flusher extends TimerTask
- {
-
- private WeakReference<AMQSession_0_10> session;
- public Flusher(AMQSession_0_10 session)
- {
- this.session = new WeakReference<AMQSession_0_10>(session);
- }
-
- public void run() {
- AMQSession_0_10 ssn = session.get();
- if (ssn == null)
- {
- cancel();
- }
- else
- {
- try
- {
- ssn.flushAcknowledgments(true);
- }
- catch (Throwable t)
- {
- _logger.error("error flushing acks", t);
- }
- }
- }
- }
-
-
- /**
- * The underlying QpidSession
- */
- private Session _qpidSession;
-
- /**
- * The latest qpid Exception that has been reaised.
- */
- private Object _currentExceptionLock = new Object();
- private SessionException _currentException;
-
- // a ref on the qpid connection
- protected org.apache.qpid.transport.Connection _qpidConnection;
-
- private long maxAckDelay = Long.getLong("qpid.session.max_ack_delay", 1000);
- private TimerTask flushTask = null;
- private RangeSet unacked = new RangeSet();
- private int unackedCount = 0;
-
- /**
- * USed to store the range of in tx messages
- */
- private RangeSet _txRangeSet = new RangeSet();
- private int _txSize = 0;
- //--- constructors
-
- /**
- * Creates a new session on a connection.
- *
- * @param con The connection on which to create the session.
- * @param channelId The unique identifier for the session.
- * @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
- * @param messageFactoryRegistry The message factory factory for the session.
- * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
- * @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
- * @param qpidConnection The qpid connection
- */
- AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry,
- int defaultPrefetchHighMark, int defaultPrefetchLowMark)
- {
-
- super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark,
- defaultPrefetchLowMark);
- _qpidConnection = qpidConnection;
- _qpidSession = _qpidConnection.createSession(1);
- _qpidSession.setSessionListener(this);
- if (_transacted)
- {
- _qpidSession.txSelect();
- }
-
- if (maxAckDelay > 0)
- {
- flushTask = new Flusher(this);
- timer.schedule(flushTask, new Date(), maxAckDelay);
- }
- }
-
- /**
- * Creates a new session on a connection with the default 0-10 message factory.
- *
- * @param con The connection on which to create the session.
- * @param channelId The unique identifier for the session.
- * @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
- * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
- * @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
- * @param qpidConnection The connection
- */
- AMQSession_0_10(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
- boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
- {
-
- this(qpidConnection, con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(),
- defaultPrefetchHigh, defaultPrefetchLow);
- }
-
- private void addUnacked(int id)
- {
- synchronized (unacked)
- {
- unacked.add(id);
- unackedCount++;
- }
- }
-
- private void clearUnacked()
- {
- synchronized (unacked)
- {
- unacked.clear();
- unackedCount = 0;
- }
- }
-
- //------- overwritten methods of class AMQSession
-
- void failoverPrep()
- {
- super.failoverPrep();
- clearUnacked();
- }
-
- /**
- * Acknowledge one or many messages.
- *
- * @param deliveryTag The tag of the last message to be acknowledged.
- * @param multiple <tt>true</tt> to acknowledge all messages up to and including the one specified by the
- * delivery tag, <tt>false</tt> to just acknowledge that message.
- */
-
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on session " + _channelId);
- }
- // acknowledge this message
- if (multiple)
- {
- for (Long messageTag : _unacknowledgedMessageTags)
- {
- if( messageTag <= deliveryTag )
- {
- addUnacked(messageTag.intValue());
- _unacknowledgedMessageTags.remove(messageTag);
- }
- }
- //empty the list of unack messages
-
- }
- else
- {
- addUnacked((int) deliveryTag);
- _unacknowledgedMessageTags.remove(deliveryTag);
- }
-
- long prefetch = getAMQConnection().getMaxPrefetch();
-
- if (unackedCount >= prefetch/2 || maxAckDelay <= 0)
- {
- flushAcknowledgments();
- }
- }
-
- void flushAcknowledgments()
- {
- flushAcknowledgments(false);
- }
-
- void flushAcknowledgments(boolean setSyncBit)
- {
- synchronized (unacked)
- {
- if (unackedCount > 0)
- {
- messageAcknowledge
- (unacked, _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE,setSyncBit);
- clearUnacked();
- }
- }
- }
-
- void messageAcknowledge(RangeSet ranges, boolean accept)
- {
- messageAcknowledge(ranges,accept,false);
- }
-
- void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
- {
- Session ssn = getQpidSession();
- for (Range range : ranges)
- {
- ssn.processed(range);
- }
- ssn.flushProcessed(accept ? BATCH : NONE);
- if (accept)
- {
- ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
- }
- }
-
- /**
- * Bind a queue with an exchange.
- *
- * @param queueName Specifies the name of the queue to bind. If the queue name is empty,
- * refers to the current
- * queue for the session, which is the last declared queue.
- * @param exchangeName The exchange name.
- * @param routingKey Specifies the routing key for the binding.
- * @param arguments 0_8 specific
- */
- public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey,
- final FieldTable arguments, final AMQShortString exchangeName,
- final AMQDestination destination, final boolean nowait)
- throws AMQException, FailoverException
- {
- Map args = FiledTableSupport.convertToMap(arguments);
- // this is there only becasue the broker may expect a value for x-match
- if( ! args.containsKey("x-match") )
- {
- args.put("x-match", "any");
- }
-
- for (AMQShortString rk: destination.getBindingKeys())
- {
- _logger.debug("Binding queue : " + queueName.toString() + " exchange: " + exchangeName.toString() + " using binding key " + rk.asString());
- getQpidSession().exchangeBind(queueName.toString(), exchangeName.toString(), rk.toString(), args);
- }
- if (!nowait)
- {
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
- }
- }
-
-
- /**
- * Close this session.
- *
- * @param timeout no used / 0_8 specific
- * @throws AMQException
- * @throws FailoverException
- */
- public void sendClose(long timeout) throws AMQException, FailoverException
- {
- if (flushTask != null)
- {
- flushTask.cancel();
- flushTask = null;
- }
- flushAcknowledgments();
- getQpidSession().sync();
- getQpidSession().close();
- getCurrentException();
- }
-
-
- /**
- * Commit the receipt and the delivery of all messages exchanged by this session resources.
- */
- public void sendCommit() throws AMQException, FailoverException
- {
- getQpidSession().setAutoSync(true);
- try
- {
- getQpidSession().txCommit();
- }
- finally
- {
- getQpidSession().setAutoSync(false);
- }
- // We need to sync so that we get notify of an error.
- getCurrentException();
- }
-
- /**
- * Create a queue with a given name.
- *
- * @param name The queue name
- * @param autoDelete If this field is set and the exclusive field is also set,
- * then the queue is deleted when the connection closes.
- * @param durable If set when creating a new queue,
- * the queue will be marked as durable.
- * @param exclusive Exclusive queues can only be used from one connection at a time.
- * @param arguments Exclusive queues can only be used from one connection at a time.
- * @throws AMQException
- * @throws FailoverException
- */
- public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
- final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
- {
- getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
- autoDelete ? Option.AUTO_DELETE : Option.NONE,
- exclusive ? Option.EXCLUSIVE : Option.NONE);
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
- }
-
- /**
- * This method asks the broker to redeliver all unacknowledged messages
- *
- * @throws AMQException
- * @throws FailoverException
- */
- public void sendRecover() throws AMQException, FailoverException
- {
- // release all unack messages
- RangeSet ranges = new RangeSet();
- while (true)
- {
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
- ranges.add((int) (long) tag);
- }
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
- }
-
- public void releaseForRollback()
- {
- getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
- _txRangeSet.clear();
- _txSize = 0;
- }
-
- /**
- * Release (0_8 notion of Reject) an acquired message
- *
- * @param deliveryTag the message ID
- * @param requeue always true
- */
- public void rejectMessage(long deliveryTag, boolean requeue)
- {
- // The value of requeue is always true
- RangeSet ranges = new RangeSet();
- ranges.add((int) deliveryTag);
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- //I don't think we need to sync
- }
-
- /**
- * Create an 0_10 message consumer
- */
- public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal,
- final boolean exclusive, String messageSelector,
- final FieldTable ft, final boolean noConsume,
- final boolean autoClose) throws JMSException
- {
-
- final AMQProtocolHandler protocolHandler = getProtocolHandler();
- return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
- prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
- }
-
- /**
- * Bind a queue with an exchange.
- */
-
- public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
- throws JMSException
- {
- return isQueueBound(exchangeName,queueName,routingKey,null);
- }
-
- public boolean isQueueBound(final AMQDestination destination) throws JMSException
- {
- return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getRoutingKey(),destination.getBindingKeys());
- }
-
- public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey,AMQShortString[] bindingKeys)
- throws JMSException
- {
- String rk = null;
- boolean res;
- if (bindingKeys != null && bindingKeys.length>0)
- {
- rk = bindingKeys[0].toString();
- }
- else if (routingKey != null)
- {
- rk = routingKey.toString();
- }
-
- ExchangeBoundResult bindingQueryResult =
- getQpidSession().exchangeBound(exchangeName.toString(),queueName.toString(), rk, null).get();
-
- if (rk == null)
- {
- res = !(bindingQueryResult.getExchangeNotFound() || bindingQueryResult.getQueueNotFound());
- }
- else
- {
- res = !(bindingQueryResult.getKeyNotMatched() || bindingQueryResult.getQueueNotFound() || bindingQueryResult
- .getQueueNotMatched());
- }
- return res;
- }
-
- /**
- * This method is invoked when a consumer is creted
- * Registers the consumer with the broker
- */
- public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, int tag)
- throws AMQException, FailoverException
- {
- boolean preAcquire;
- try
- {
- preAcquire = ( ! consumer.isNoConsume() &&
- (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
- || !(consumer.getDestination() instanceof AMQQueue);
- getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
- getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
- preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, null,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- }
- catch (JMSException e)
- {
- throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
- }
-
- String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
-
- if (! prefetch())
- {
- getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
- }
- else
- {
- getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
- }
- getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
- Option.UNRELIABLE);
- // We need to sync so that we get notify of an error.
- // only if not immediat prefetch
- if(prefetch() && (isStarted() || _immediatePrefetch))
- {
- // set the flow
- getQpidSession().messageFlow(consumerTag,
- MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch(),
- Option.UNRELIABLE);
- }
-
- if (!nowait)
- {
- getQpidSession().sync();
- getCurrentException();
- }
- }
-
- /**
- * Create an 0_10 message producer
- */
- public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent,
- long producerId)
- {
- return new BasicMessageProducer_0_10(_connection, (AMQDestination) destination, _transacted, _channelId, this,
- getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
-
- }
-
- /**
- * creates an exchange if it does not already exist
- */
- public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type,
- final AMQProtocolHandler protocolHandler, final boolean nowait)
- throws AMQException, FailoverException
- {
- getQpidSession().exchangeDeclare(name.toString(),
- type.toString(),
- null,
- null,
- name.toString().startsWith("amq.")? Option.PASSIVE:Option.NONE);
- // We need to sync so that we get notify of an error.
- if (!nowait)
- {
- getQpidSession().sync();
- getCurrentException();
- }
- }
-
- /**
- * Declare a queue with the given queueName
- */
- public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait)
- throws AMQException, FailoverException
- {
- // do nothing this is only used by 0_8
- }
-
- /**
- * Declare a queue with the given queueName
- */
- public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
- throws AMQException, FailoverException
- {
- AMQShortString res;
- if (amqd.getAMQQueueName() == null)
- {
- // generate a name for this queue
- res = new AMQShortString("TempQueue" + UUID.randomUUID());
- }
- else
- {
- res = amqd.getAMQQueueName();
- }
- Map<String,Object> arguments = null;
- if (noLocal)
- {
- arguments = new HashMap<String,Object>();
- arguments.put("no-local", true);
- }
- getQpidSession().queueDeclare(res.toString(), null, arguments,
- amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
- amqd.isDurable() ? Option.DURABLE : Option.NONE,
- !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- // passive --> false
- if (!nowait)
- {
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
- }
- return res;
- }
-
- /**
- * deletes a queue
- */
- public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
- {
- getQpidSession().queueDelete(queueName.toString());
- // ifEmpty --> false
- // ifUnused --> false
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
- }
-
- /**
- * Activate/deactivate the message flow for all the consumers of this session.
- */
- public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
- {
- if (suspend)
- {
- for (BasicMessageConsumer consumer : _consumers.values())
- {
- getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
- Option.UNRELIABLE);
- }
- }
- else
- {
- for (BasicMessageConsumer_0_10 consumer : _consumers.values())
- {
- String consumerTag = String.valueOf(consumer.getConsumerTag());
- //only set if msg list is null
- try
- {
- if (! prefetch())
- {
- if (consumer.getMessageListener() != null)
- {
- getQpidSession().messageFlow(consumerTag,
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
- }
- }
- else
- {
- getQpidSession()
- .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
- getAMQConnection().getMaxPrefetch(),
- Option.UNRELIABLE);
- }
- getQpidSession()
- .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF,
- Option.UNRELIABLE);
- }
- catch (Exception e)
- {
- throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error while trying to get the listener", e);
- }
- }
- }
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
- }
-
-
- public void sendRollback() throws AMQException, FailoverException
- {
- getQpidSession().txRollback();
- // We need to sync so that we get notify of an error.
- getQpidSession().sync();
- getCurrentException();
- }
-
- //------ Private methods
- /**
- * Access to the underlying Qpid Session
- *
- * @return The associated Qpid Session.
- */
- protected Session getQpidSession()
- {
- return _qpidSession;
- }
-
-
- /**
- * Get the latest thrown exception.
- *
- * @throws org.apache.qpid.AMQException get the latest thrown error.
- */
- public void getCurrentException() throws AMQException
- {
- synchronized (_currentExceptionLock)
- {
- if (_currentException != null)
- {
- SessionException se = _currentException;
- _currentException = null;
- ExecutionException ee = se.getException();
- int code;
- if (ee == null)
- {
- code = 0;
- }
- else
- {
- code = ee.getErrorCode().getValue();
- }
- throw new AMQException
- (AMQConstant.getConstant(code), se.getMessage(), se);
- }
- }
- }
-
- public void opened(Session ssn) {}
-
- public void resumed(Session ssn)
- {
- _qpidConnection = ssn.getConnection();
- try
- {
- resubscribe();
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void message(Session ssn, MessageTransfer xfr)
- {
- messageReceived(new UnprocessedMessage_0_10(xfr));
- }
-
- public void exception(Session ssn, SessionException exc)
- {
- synchronized (_currentExceptionLock)
- {
- _currentException = exc;
- }
- }
-
- public void closed(Session ssn) {}
-
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
- throws AMQException
- {
- /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
- return new FailoverNoopSupport<AMQShortString, AMQException>(
- new FailoverProtectedOperation<AMQShortString, AMQException>()
- {
- public AMQShortString execute() throws AMQException, FailoverException
- {
- // Generate the queue name if the destination indicates that a client generated name is to be used.
- if (amqd.isNameRequired())
- {
- String binddingKey = "";
- for(AMQShortString key : amqd.getBindingKeys())
- {
- binddingKey = binddingKey + "_" + key.toString();
- }
- amqd.setQueueName(new AMQShortString( binddingKey + "@"
- + amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
- }
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
- }
- }, _connection).execute();
- }
-
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic=checkValidTopic(topic, true);
- AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
-
- TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName=((AMQTopic) topic).getBindingKeys()[0];
- }
- else
- {
- topicName=new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
-
- protected Long requestQueueDepth(AMQDestination amqd)
- {
- return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
- }
-
-
- /**
- * Store non committed messages for this session
- * With 0.10 messages are consumed with window mode, we must send a completion
- * before the window size is reached so credits don't dry up.
- * @param id
- */
- @Override protected void addDeliveredMessage(long id)
- {
- _txRangeSet.add((int) id);
- _txSize++;
- // this is a heuristic, we may want to have that configurable
- if (_connection.getMaxPrefetch() == 1 ||
- _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
- {
- // send completed so consumer credits don't dry up
- messageAcknowledge(_txRangeSet, false);
- }
- }
-
- @Override public void commit() throws JMSException
- {
- checkTransacted();
- try
- {
- if( _txSize > 0 )
- {
- messageAcknowledge(_txRangeSet, true);
- _txRangeSet.clear();
- _txSize = 0;
- }
- sendCommit();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
- }
- }
-
- protected final boolean tagLE(long tag1, long tag2)
- {
- return Serial.le((int) tag1, (int) tag2);
- }
-
- protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
- {
- return Serial.lt((int) currentMark, (int) deliveryTag);
- }
-
- public AMQMessageDelegateFactory getMessageDelegateFactory()
- {
- return AMQMessageDelegateFactory.FACTORY_0_10;
- }
-
-}