summaryrefslogtreecommitdiff
path: root/java/java/client/src/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/java/client/src/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/java/client/src/org/apache/qpid/client/AMQSession.java1291
1 files changed, 0 insertions, 1291 deletions
diff --git a/java/java/client/src/org/apache/qpid/client/AMQSession.java b/java/java/client/src/org/apache/qpid/client/AMQSession.java
deleted file mode 100644
index a847658846..0000000000
--- a/java/java/client/src/org/apache/qpid/client/AMQSession.java
+++ /dev/null
@@ -1,1291 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.client;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.client.failover.FailoverSupport;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
-{
- private static final Logger _logger = Logger.getLogger(AMQSession.class);
-
- public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
- public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
-
- private AMQConnection _connection;
-
- private boolean _transacted;
-
- private int _acknowledgeMode;
-
- private int _channelId;
-
- private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
- private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
-
- /**
- * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
- * feature.
- */
- private int _nextTag = 1;
-
- /**
- * This queue is bounded and is used to store messages before being dispatched to the consumer
- */
- private final FlowControllingBlockingQueue _queue;
-
- private Dispatcher _dispatcher;
-
- private MessageFactoryRegistry _messageFactoryRegistry;
-
- /**
- * Set of all producers created by this session
- */
- private Map _producers = new ConcurrentHashMap();
-
- /**
- * Maps from consumer tag (String) to JMSMessageConsumer instance
- */
- private Map _consumers = new ConcurrentHashMap();
-
- /**
- * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
- * need to be attached to a queue
- */
- protected static final boolean DEFAULT_IMMEDIATE = false;
-
- /**
- * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently
- * drop messages where no queue is connected to the exchange for the message
- */
- protected static final boolean DEFAULT_MANDATORY = true;
-
- /**
- * The counter of the next producer id. This id is generated by the session and used only to allow the
- * producer to identify itself to the session when deregistering itself.
- * <p/>
- * Access to this id does not require to be synchronized since according to the JMS specification only one
- * thread of control is allowed to create producers for any given session instance.
- */
- private long _nextProducerId;
-
- /**
- * Track the 'stopped' state of the dispatcher, a session starts in the stopped state.
- */
- private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
- /**
- * Responsible for decoding a message fragment and passing it to the appropriate message consumer.
- */
- private class Dispatcher extends Thread
- {
- public Dispatcher()
- {
- super("Dispatcher-Channel-" + _channelId);
- }
-
- public void run()
- {
- UnprocessedMessage message;
- _stopped.set(false);
- try
- {
- while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
- {
- dispatchMessage(message);
- }
- }
- catch (InterruptedException e)
- {
- ;
- }
-
- _logger.info("Dispatcher thread terminating for channel " + _channelId);
- }
-
- private void dispatchMessage(UnprocessedMessage message)
- {
- if (message.deliverBody != null)
- {
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
-
- if (consumer == null)
- {
- _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
- }
- else
- {
- consumer.notifyMessage(message, _channelId);
- }
- }
- else
- {
- try
- {
- // Bounced message is processed here, away from the mina thread
- AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
- false,
- message.contentHeader,
- message.bodies);
-
- int errorCode = message.bounceBody.replyCode;
- String reason = message.bounceBody.replyText;
- _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
- //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
- if (errorCode == AMQConstant.NO_CONSUMERS.getCode())
- {
- _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
- }
- else
- {
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
- }
- }
- catch (Exception e)
- {
- _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e);
- }
- }
- }
-
- public void stopDispatcher()
- {
- _stopped.set(true);
- interrupt();
- }
- }
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry)
- {
- this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
- }
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
- }
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
- {
- _connection = con;
- _transacted = transacted;
- if (transacted)
- {
- _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED;
- }
- else
- {
- _acknowledgeMode = acknowledgeMode;
- }
- _channelId = channelId;
- _messageFactoryRegistry = messageFactoryRegistry;
- _defaultPrefetchHighMark = defaultPrefetchHighMark;
- _defaultPrefetchLowMark = defaultPrefetchLowMark;
-
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
- new FlowControllingBlockingQueue.ThresholdListener()
- {
- public void aboveThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
- suspendChannel();
- }
- }
-
- public void underThreshold(int currentValue)
- {
- if (_acknowledgeMode == NO_ACKNOWLEDGE)
- {
- _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
- unsuspendChannel();
- }
- }
- });
- }
- else
- {
- _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
- }
- }
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry());
- }
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
- }
-
- AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
- {
- this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
- }
-
- AMQConnection getAMQConnection()
- {
- return _connection;
- }
-
- public BytesMessage createBytesMessage() throws JMSException
- {
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
- }
-
- public MapMessage createMapMessage() throws JMSException
- {
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
- }
-
- public javax.jms.Message createMessage() throws JMSException
- {
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
- }
-
- public ObjectMessage createObjectMessage() throws JMSException
- {
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
- }
-
- public ObjectMessage createObjectMessage(Serializable object) throws JMSException
- {
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream");
- msg.setObject(object);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create message: " + e);
- }
- }
- }
-
- public StreamMessage createStreamMessage() throws JMSException
- {
- checkNotClosed();
- throw new UnsupportedOperationException("Stream messages not supported");
- }
-
- public TextMessage createTextMessage() throws JMSException
- {
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
-
- try
- {
- return (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
- }
- }
-
- public TextMessage createTextMessage(String text) throws JMSException
- {
- synchronized(_connection.getFailoverMutex())
- {
- checkNotClosed();
- try
- {
- TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain");
- msg.setText(text);
- return msg;
- }
- catch (AMQException e)
- {
- throw new JMSException("Unable to create text message: " + e);
- }
- }
- }
-
- public boolean getTransacted() throws JMSException
- {
- checkNotClosed();
- return _transacted;
- }
-
- public int getAcknowledgeMode() throws JMSException
- {
- checkNotClosed();
- return _acknowledgeMode;
- }
-
- public void commit() throws JMSException
- {
- checkTransacted();
- try
- {
- // Acknowledge up to message last delivered (if any) for each consumer.
- //need to send ack for messages delivered to consumers so far
- for (Iterator i = _consumers.values().iterator(); i.hasNext();)
- {
- //Sends acknowledgement to server
- ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
- }
-
- // Commits outstanding messages sent and outstanding acknowledgements.
- _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class);
- }
- catch (AMQException e)
- {
- JMSException exception = new JMSException("Failed to commit: " + e.getMessage());
- exception.setLinkedException(e);
- throw exception;
- }
- }
-
- public void rollback() throws JMSException
- {
- checkTransacted();
- try
- {
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class);
- }
- catch (AMQException e)
- {
- throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
- }
- }
-
- public void close() throws JMSException
- {
- // We must close down all producers and consumers in an orderly fashion. This is the only method
- // that can be called from a different thread of control from the one controlling the session
- synchronized(_connection.getFailoverMutex())
- {
- _closed.set(true);
-
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
-
- try
- {
- _connection.getProtocolHandler().closeSession(this);
- final AMQFrame frame = ChannelCloseBody.createAMQFrame(
- getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
- _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
- // When control resumes at this point, a reply will have been received that
- // indicates the broker has closed the channel successfully
-
- }
- catch (AMQException e)
- {
- throw new JMSException("Error closing session: " + e);
- }
- finally
- {
- _connection.deregisterSession(_channelId);
- }
- }
- }
-
- /**
- * Close all producers or consumers. This is called either in the error case or when closing the session normally.
- *
- * @param amqe the exception, may be null to indicate no error has occurred
- */
- private void closeProducersAndConsumers(AMQException amqe)
- {
- try
- {
- closeProducers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
- try
- {
- closeConsumers(amqe);
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
- }
-
- /**
- * Called when the server initiates the closure of the session
- * unilaterally.
- *
- * @param e the exception that caused this session to be closed. Null causes the
- */
- public void closed(Throwable e)
- {
- synchronized(_connection.getFailoverMutex())
- {
- // An AMQException has an error code and message already and will be passed in when closure occurs as a
- // result of a channel close request
- _closed.set(true);
- AMQException amqe;
- if (e instanceof AMQException)
- {
- amqe = (AMQException) e;
- }
- else
- {
- amqe = new AMQException(_logger, "Closing session forcibly", e);
- }
- _connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
- }
- }
-
- /**
- * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
- * failover when the client has veoted resubscription.
- * <p/>
- * The caller of this method must already hold the failover mutex.
- */
- void markClosed()
- {
- _closed.set(true);
- _connection.deregisterSession(_channelId);
- markClosedProducersAndConsumers();
- }
-
- private void markClosedProducersAndConsumers()
- {
- try
- {
- // no need for a markClosed* method in this case since there is no protocol traffic closing a producer
- closeProducers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
- try
- {
- markClosedConsumers();
- }
- catch (JMSException e)
- {
- _logger.error("Error closing session: " + e, e);
- }
- }
-
- /**
- * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is
- * currently no way of propagating errors to message producers (this is a JMS limitation).
- */
- private void closeProducers() throws JMSException
- {
- // we need to clone the list of producers since the close() method updates the _producers collection
- // which would result in a concurrent modification exception
- final ArrayList clonedProducers = new ArrayList(_producers.values());
-
- final Iterator it = clonedProducers.iterator();
- while (it.hasNext())
- {
- final BasicMessageProducer prod = (BasicMessageProducer) it.next();
- prod.close();
- }
- // at this point the _producers map is empty
- }
-
- /**
- * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
- *
- * @param error not null if this is a result of an error occurring at the connection level
- */
- private void closeConsumers(Throwable error) throws JMSException
- {
- if (_dispatcher != null)
- {
- _dispatcher.stopDispatcher();
- }
- // we need to clone the list of consumers since the close() method updates the _consumers collection
- // which would result in a concurrent modification exception
- final ArrayList clonedConsumers = new ArrayList(_consumers.values());
-
- final Iterator it = clonedConsumers.iterator();
- while (it.hasNext())
- {
- final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
- if (error != null)
- {
- con.notifyError(error);
- }
- else
- {
- con.close();
- }
- }
- // at this point the _consumers map will be empty
- }
-
- private void markClosedConsumers() throws JMSException
- {
- if (_dispatcher != null)
- {
- _dispatcher.stopDispatcher();
- }
- // we need to clone the list of consumers since the close() method updates the _consumers collection
- // which would result in a concurrent modification exception
- final ArrayList clonedConsumers = new ArrayList(_consumers.values());
-
- final Iterator it = clonedConsumers.iterator();
- while (it.hasNext())
- {
- final BasicMessageConsumer con = (BasicMessageConsumer) it.next();
- con.markClosed();
- }
- // at this point the _consumers map will be empty
- }
-
- /**
- * Asks the broker to resend all unacknowledged messages for the session.
- *
- * @throws JMSException
- */
- public void recover() throws JMSException
- {
- checkNotClosed();
- checkNotTransacted(); // throws IllegalStateException if a transacted session
-
- _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
- }
-
- public MessageListener getMessageListener() throws JMSException
- {
- checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
- }
-
- public void setMessageListener(MessageListener listener) throws JMSException
- {
- checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
- }
-
- public void run()
- {
- throw new java.lang.UnsupportedOperationException();
- }
-
- public MessageProducer createProducer(Destination destination, boolean mandatory,
- boolean immediate, boolean waitUntilSent)
- throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
- }
-
- public MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
- throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate);
- }
-
- public MessageProducer createProducer(Destination destination, boolean immediate)
- throws JMSException
- {
- return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
- }
-
- public MessageProducer createProducer(Destination destination) throws JMSException
- {
- return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
- }
-
- private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory,
- boolean immediate)
- throws JMSException
- {
- return createProducerImpl(destination, mandatory, immediate, false);
- }
-
- private org.apache.qpid.jms.MessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
- final boolean immediate, final boolean waitUntilSent)
- throws JMSException
- {
- return (org.apache.qpid.jms.MessageProducer) new FailoverSupport()
- {
- public Object operation() throws JMSException
- {
- checkNotClosed();
-
- return new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
- AMQSession.this, _connection.getProtocolHandler(),
- getNextProducerId(), immediate, mandatory, waitUntilSent);
- }
- }.execute(_connection);
- }
-
- /**
- * Creates a QueueReceiver
- * @param destination
- * @return QueueReceiver - a wrapper around our MessageConsumer
- * @throws JMSException
- */
- public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
- {
- AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
- return new QueueReceiverAdaptor(dest, consumer);
- }
-
- /**
- * Creates a QueueReceiver using a message selector
- * @param destination
- * @param messageSelector
- * @return QueueReceiver - a wrapper around our MessageConsumer
- * @throws JMSException
- */
- public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
- {
- AMQQueue dest = (AMQQueue) destination;
- BasicMessageConsumer consumer = (BasicMessageConsumer)
- createConsumer(destination, messageSelector);
- return new QueueReceiverAdaptor(dest, consumer);
- }
-
- public MessageConsumer createConsumer(Destination destination) throws JMSException
- {
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
- }
-
- public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
- {
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
- }
-
- public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
- throws JMSException
- {
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
- }
-
- public MessageConsumer createConsumer(Destination destination,
- int prefetch,
- boolean noLocal,
- boolean exclusive,
- String selector) throws JMSException
- {
- return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
- }
-
-
- public MessageConsumer createConsumer(Destination destination,
- int prefetchHigh,
- int prefetchLow,
- boolean noLocal,
- boolean exclusive,
- String selector) throws JMSException
- {
- return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
- }
-
- public MessageConsumer createConsumer(Destination destination,
- int prefetch,
- boolean noLocal,
- boolean exclusive,
- String selector,
- FieldTable rawSelector) throws JMSException
- {
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
- selector, rawSelector);
- }
-
- public MessageConsumer createConsumer(Destination destination,
- int prefetchHigh,
- int prefetchLow,
- boolean noLocal,
- boolean exclusive,
- String selector,
- FieldTable rawSelector) throws JMSException
- {
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
- selector, rawSelector);
- }
-
- protected MessageConsumer createConsumerImpl(final Destination destination,
- final int prefetchHigh,
- final int prefetchLow,
- final boolean noLocal,
- final boolean exclusive,
- final String selector,
- final FieldTable rawSelector) throws JMSException
- {
- return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
- {
- public Object operation() throws JMSException
- {
- checkNotClosed();
-
- AMQDestination amqd = (AMQDestination) destination;
-
- final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
- // TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = new FieldTable();
- //if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
- if (rawSelector != null)
- {
- ft.putAll(rawSelector);
- }
- BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
- _messageFactoryRegistry, AMQSession.this,
- protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
- _acknowledgeMode);
-
- try
- {
- registerConsumer(consumer);
- }
- catch (AMQException e)
- {
- JMSException ex = new JMSException("Error registering consumer: " + e);
- ex.setLinkedException(e);
- throw ex;
- }
-
- return consumer;
- }
- }.execute(_connection);
- }
-
- public void declareExchange(String name, String type)
- {
- declareExchange(name, type, _connection.getProtocolHandler());
- }
-
- public void declareExchangeSynch(String name, String type) throws AMQException
- {
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null);
- _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
- }
-
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler)
- {
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
- }
-
- private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler)
- {
- AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null);
- protocolHandler.writeFrame(exchangeDeclare);
- }
-
- /**
- * Declare the queue.
- *
- * @param amqd
- * @param protocolHandler
- * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
- * @throws AMQException
- */
- private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
- {
- // For queues (but not topics) we generate the name in the client rather than the
- // server. This allows the name to be reused on failover if required. In general,
- // the destination indicates whether it wants a name generated or not.
- if (amqd.isNameRequired())
- {
- amqd.setQueueName(protocolHandler.generateQueueName());
- }
-
- AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(),
- false, amqd.isDurable(), amqd.isExclusive(),
- amqd.isAutoDelete(), true, null);
-
- protocolHandler.writeFrame(queueDeclare);
- return amqd.getQueueName();
- }
-
- private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException
- {
- AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0,
- queueName, amqd.getExchangeName(),
- amqd.getRoutingKey(), true, ft);
-
- protocolHandler.writeFrame(queueBind);
- }
-
- /**
- * Register to consume from the queue.
- *
- * @param queueName
- * @return the consumer tag generated by the broker
- */
- private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow,
- boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException
- {
- //fixme prefetch values are not used here. Do we need to have them as parametsrs?
- //need to generate a consumer tag on the client so we can exploit the nowait flag
- String tag = Integer.toString(_nextTag++);
-
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag, noLocal,
- acknowledgeMode == Session.NO_ACKNOWLEDGE,
- exclusive, true);
-
- protocolHandler.writeFrame(jmsConsume);
- return tag;
- }
-
- public Queue createQueue(String queueName) throws JMSException
- {
- if (queueName.indexOf('/') == -1)
- {
- return new AMQQueue(queueName);
- }
- else
- {
- try
- {
- return new AMQQueue(new AMQBindingURL(queueName));
- }
- catch (URLSyntaxException urlse)
- {
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
-
- throw jmse;
- }
- }
- }
-
- /**
- * Creates a QueueReceiver wrapping a MessageConsumer
- * @param queue
- * @return QueueReceiver
- * @throws JMSException
- */
- public QueueReceiver createReceiver(Queue queue) throws JMSException
- {
- AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
- return new QueueReceiverAdaptor(dest, consumer);
- }
-
- /**
- * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
- * @param queue
- * @param messageSelector
- * @return QueueReceiver
- * @throws JMSException
- */
- public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
- {
- AMQQueue dest = (AMQQueue) queue;
- BasicMessageConsumer consumer = (BasicMessageConsumer)
- createConsumer(dest, messageSelector);
- return new QueueReceiverAdaptor(dest, consumer);
- }
-
- public QueueSender createSender(Queue queue) throws JMSException
- {
- return (QueueSender) createProducer(queue);
- }
-
- public Topic createTopic(String topicName) throws JMSException
- {
- if (topicName.indexOf('/') == -1)
- {
- return new AMQTopic(topicName);
- }
- else
- {
- try
- {
- return new AMQTopic(new AMQBindingURL(topicName));
- }
- catch (URLSyntaxException urlse)
- {
- JMSException jmse = new JMSException(urlse.getReason());
- jmse.setLinkedException(urlse);
-
- throw jmse;
- }
- }
- }
-
- /**
- * Creates a non-durable subscriber
- * @param topic
- * @return TopicSubscriber - a wrapper round our MessageConsumer
- * @throws JMSException
- */
- public TopicSubscriber createSubscriber(Topic topic) throws JMSException
- {
- AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- }
-
- /**
- * Creates a non-durable subscriber with a message selector
- * @param topic
- * @param messageSelector
- * @param noLocal
- * @return TopicSubscriber - a wrapper round our MessageConsumer
- * @throws JMSException
- */
- public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
- {
- AMQTopic dest = new AMQTopic(topic.getTopicName());
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
- }
-
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- * If a name is reused in creating a new subscriber with a different topic/selecto or no-local
- * flag then the subcriber will receive messages matching the old subscription AND the new one.
- * The spec states that the new one should replace the old one.
- * TODO: fix it.
- */
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
- {
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
- return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- }
-
- /**
- * Note, currently this does not handle reuse of the same name with different topics correctly.
- */
- public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
- throws JMSException
- {
- AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
- BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
- return new TopicSubscriberAdaptor(dest, consumer);
- }
-
- public TopicPublisher createPublisher(Topic topic) throws JMSException
- {
- return (TopicPublisher) createProducer(topic);
- }
-
- public QueueBrowser createBrowser(Queue queue) throws JMSException
- {
- throw new UnsupportedOperationException("Queue browsing not supported");
- }
-
- public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
- {
- throw new UnsupportedOperationException("Queue browsing not supported");
- }
-
- public TemporaryQueue createTemporaryQueue() throws JMSException
- {
- return new AMQTemporaryQueue();
- }
-
- public TemporaryTopic createTemporaryTopic() throws JMSException
- {
- return new AMQTemporaryTopic();
- }
-
- public void unsubscribe(String name) throws JMSException
- {
- //send a queue.delete for the subscription
- String queue = _connection.getClientID() + ":" + name;
- AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true);
- _connection.getProtocolHandler().writeFrame(frame);
- }
-
- private void checkTransacted() throws JMSException
- {
- if (!getTransacted())
- {
- throw new IllegalStateException("Session is not transacted");
- }
- }
-
- private void checkNotTransacted() throws JMSException
- {
- if (getTransacted())
- {
- throw new IllegalStateException("Session is transacted");
- }
- }
-
- /**
- * Invoked by the MINA IO thread (indirectly) when a message is received from the transport.
- * Puts the message onto the queue read by the dispatcher.
- *
- * @param message the message that has been received
- */
- public void messageReceived(UnprocessedMessage message)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Message received in session with channel id " + _channelId);
- }
-
- _queue.add(message);
- }
-
- /**
- * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
- * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
- * AUTO_ACK or similar.
- *
- * @param deliveryTag the tag of the last message to be acknowledged
- * @param multiple if true will acknowledge all messages up to and including the one specified by the
- * delivery tag
- */
- public void acknowledgeMessage(long deliveryTag, boolean multiple)
- {
- final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
- }
- _connection.getProtocolHandler().writeFrame(ackFrame);
- }
-
- public int getDefaultPrefetch()
- {
- return _defaultPrefetchHighMark;
- }
-
- public int getDefaultPrefetchHigh()
- {
- return _defaultPrefetchHighMark;
- }
-
- public int getDefaultPrefetchLow()
- {
- return _defaultPrefetchLowMark;
- }
-
- public int getChannelId()
- {
- return _channelId;
- }
-
- void start()
- {
- if (_dispatcher != null)
- {
- //then we stopped this and are restarting, so signal server to resume delivery
- unsuspendChannel();
- }
- _dispatcher = new Dispatcher();
- _dispatcher.setDaemon(true);
- _dispatcher.start();
- }
-
- void stop()
- {
- //stop the server delivering messages to this session
- suspendChannel();
-
-//stop the dispatcher thread
- _stopped.set(true);
- }
-
- boolean isStopped()
- {
- return _stopped.get();
- }
-
- /**
- * Callers must hold the failover mutex before calling this method.
- *
- * @param consumer
- * @throws AMQException
- */
- void registerConsumer(BasicMessageConsumer consumer) throws AMQException
- {
- AMQDestination amqd = consumer.getDestination();
-
- AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
-
- declareExchange(amqd, protocolHandler);
-
- String queueName = declareQueue(amqd, protocolHandler);
-
- bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
-
- String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(),
- consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode());
-
- consumer.setConsumerTag(consumerTag);
- _consumers.put(consumerTag, consumer);
- }
-
- /**
- * Called by the MessageConsumer when closing, to deregister the consumer from the
- * map from consumerTag to consumer instance.
- *
- * @param consumerTag the consumer tag, that was broker-generated
- */
- void deregisterConsumer(String consumerTag)
- {
- _consumers.remove(consumerTag);
- }
-
- private void registerProducer(long producerId, MessageProducer producer)
- {
- _producers.put(new Long(producerId), producer);
- }
-
- void deregisterProducer(long producerId)
- {
- _producers.remove(new Long(producerId));
- }
-
- private long getNextProducerId()
- {
- return ++_nextProducerId;
- }
-
- /**
- * Resubscribes all producers and consumers. This is called when performing failover.
- *
- * @throws AMQException
- */
- void resubscribe() throws AMQException
- {
- resubscribeProducers();
- resubscribeConsumers();
- }
-
- private void resubscribeProducers() throws AMQException
- {
- ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove
- for (Iterator it = producers.iterator(); it.hasNext();)
- {
- BasicMessageProducer producer = (BasicMessageProducer) it.next();
- producer.resubscribe();
- }
- }
-
- private void resubscribeConsumers() throws AMQException
- {
- ArrayList consumers = new ArrayList(_consumers.values());
- _consumers.clear();
-
- for (Iterator it = consumers.iterator(); it.hasNext();)
- {
- BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
- registerConsumer(consumer);
- }
- }
-
- private void suspendChannel()
- {
- _logger.warn("Suspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false);
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
- }
-
- private void unsuspendChannel()
- {
- _logger.warn("Unsuspending channel");
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true);
- _connection.getProtocolHandler().writeFrame(channelFlowFrame);
- }
-}