diff options
Diffstat (limited to 'java/java/client/src/org/apache/qpid/client/AMQSession.java')
-rw-r--r-- | java/java/client/src/org/apache/qpid/client/AMQSession.java | 1291 |
1 files changed, 0 insertions, 1291 deletions
diff --git a/java/java/client/src/org/apache/qpid/client/AMQSession.java b/java/java/client/src/org/apache/qpid/client/AMQSession.java deleted file mode 100644 index a847658846..0000000000 --- a/java/java/client/src/org/apache/qpid/client/AMQSession.java +++ /dev/null @@ -1,1291 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUndeliveredException; -import org.apache.qpid.client.failover.FailoverSupport; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageFactoryRegistry; -import org.apache.qpid.client.message.UnprocessedMessage; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.client.util.FlowControllingBlockingQueue; -import org.apache.qpid.framing.*; -import org.apache.qpid.jms.Session; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.url.AMQBindingURL; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.*; -import javax.jms.IllegalStateException; -import java.io.Serializable; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -public class AMQSession extends Closeable implements Session, QueueSession, TopicSession -{ - private static final Logger _logger = Logger.getLogger(AMQSession.class); - - public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000; - public static final int DEFAULT_PREFETCH_LOW_MARK = 2500; - - private AMQConnection _connection; - - private boolean _transacted; - - private int _acknowledgeMode; - - private int _channelId; - - private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; - private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; - - /** - * Used in the consume method. We generate the consume tag on the client so that we can use the nowait - * feature. - */ - private int _nextTag = 1; - - /** - * This queue is bounded and is used to store messages before being dispatched to the consumer - */ - private final FlowControllingBlockingQueue _queue; - - private Dispatcher _dispatcher; - - private MessageFactoryRegistry _messageFactoryRegistry; - - /** - * Set of all producers created by this session - */ - private Map _producers = new ConcurrentHashMap(); - - /** - * Maps from consumer tag (String) to JMSMessageConsumer instance - */ - private Map _consumers = new ConcurrentHashMap(); - - /** - * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not - * need to be attached to a queue - */ - protected static final boolean DEFAULT_IMMEDIATE = false; - - /** - * Default value for mandatory flag used by producers created by this sessio is true, i.e. server will not silently - * drop messages where no queue is connected to the exchange for the message - */ - protected static final boolean DEFAULT_MANDATORY = true; - - /** - * The counter of the next producer id. This id is generated by the session and used only to allow the - * producer to identify itself to the session when deregistering itself. - * <p/> - * Access to this id does not require to be synchronized since according to the JMS specification only one - * thread of control is allowed to create producers for any given session instance. - */ - private long _nextProducerId; - - /** - * Track the 'stopped' state of the dispatcher, a session starts in the stopped state. - */ - private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - - /** - * Responsible for decoding a message fragment and passing it to the appropriate message consumer. - */ - private class Dispatcher extends Thread - { - public Dispatcher() - { - super("Dispatcher-Channel-" + _channelId); - } - - public void run() - { - UnprocessedMessage message; - _stopped.set(false); - try - { - while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) - { - dispatchMessage(message); - } - } - catch (InterruptedException e) - { - ; - } - - _logger.info("Dispatcher thread terminating for channel " + _channelId); - } - - private void dispatchMessage(UnprocessedMessage message) - { - if (message.deliverBody != null) - { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag); - - if (consumer == null) - { - _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring..."); - } - else - { - consumer.notifyMessage(message, _channelId); - } - } - else - { - try - { - // Bounced message is processed here, away from the mina thread - AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, - false, - message.contentHeader, - message.bodies); - - int errorCode = message.bounceBody.replyCode; - String reason = message.bounceBody.replyText; - _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); - - //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. - if (errorCode == AMQConstant.NO_CONSUMERS.getCode()) - { - _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage)); - } - else - { - if (errorCode == AMQConstant.NO_ROUTE.getCode()) - { - _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage)); - } - else - { - _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); - } - } - } - catch (Exception e) - { - _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); - } - } - } - - public void stopDispatcher() - { - _stopped.set(true); - interrupt(); - } - } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry) - { - this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK); - } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch) - { - this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch); - } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, - MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) - { - _connection = con; - _transacted = transacted; - if (transacted) - { - _acknowledgeMode = javax.jms.Session.SESSION_TRANSACTED; - } - else - { - _acknowledgeMode = acknowledgeMode; - } - _channelId = channelId; - _messageFactoryRegistry = messageFactoryRegistry; - _defaultPrefetchHighMark = defaultPrefetchHighMark; - _defaultPrefetchLowMark = defaultPrefetchLowMark; - - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark, - new FlowControllingBlockingQueue.ThresholdListener() - { - public void aboveThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue); - suspendChannel(); - } - } - - public void underThreshold(int currentValue) - { - if (_acknowledgeMode == NO_ACKNOWLEDGE) - { - _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue); - unsuspendChannel(); - } - } - }); - } - else - { - _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null); - } - } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry()); - } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetch) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch); - } - - AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) - { - this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow); - } - - AMQConnection getAMQConnection() - { - return _connection; - } - - public BytesMessage createBytesMessage() throws JMSException - { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } - } - } - - public MapMessage createMapMessage() throws JMSException - { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - return (MapMessage) _messageFactoryRegistry.createMessage("jms/map-message"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } - } - } - - public javax.jms.Message createMessage() throws JMSException - { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - return (BytesMessage) _messageFactoryRegistry.createMessage("application/octet-stream"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } - } - } - - public ObjectMessage createObjectMessage() throws JMSException - { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - return (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } - } - } - - public ObjectMessage createObjectMessage(Serializable object) throws JMSException - { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - ObjectMessage msg = (ObjectMessage) _messageFactoryRegistry.createMessage("application/java-object-stream"); - msg.setObject(object); - return msg; - } - catch (AMQException e) - { - throw new JMSException("Unable to create message: " + e); - } - } - } - - public StreamMessage createStreamMessage() throws JMSException - { - checkNotClosed(); - throw new UnsupportedOperationException("Stream messages not supported"); - } - - public TextMessage createTextMessage() throws JMSException - { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - - try - { - return (TextMessage) _messageFactoryRegistry.createMessage("text/plain"); - } - catch (AMQException e) - { - throw new JMSException("Unable to create text message: " + e); - } - } - } - - public TextMessage createTextMessage(String text) throws JMSException - { - synchronized(_connection.getFailoverMutex()) - { - checkNotClosed(); - try - { - TextMessage msg = (TextMessage) _messageFactoryRegistry.createMessage("text/plain"); - msg.setText(text); - return msg; - } - catch (AMQException e) - { - throw new JMSException("Unable to create text message: " + e); - } - } - } - - public boolean getTransacted() throws JMSException - { - checkNotClosed(); - return _transacted; - } - - public int getAcknowledgeMode() throws JMSException - { - checkNotClosed(); - return _acknowledgeMode; - } - - public void commit() throws JMSException - { - checkTransacted(); - try - { - // Acknowledge up to message last delivered (if any) for each consumer. - //need to send ack for messages delivered to consumers so far - for (Iterator i = _consumers.values().iterator(); i.hasNext();) - { - //Sends acknowledgement to server - ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered(); - } - - // Commits outstanding messages sent and outstanding acknowledgements. - _connection.getProtocolHandler().syncWrite(TxCommitBody.createAMQFrame(_channelId), TxCommitOkBody.class); - } - catch (AMQException e) - { - JMSException exception = new JMSException("Failed to commit: " + e.getMessage()); - exception.setLinkedException(e); - throw exception; - } - } - - public void rollback() throws JMSException - { - checkTransacted(); - try - { - _connection.getProtocolHandler().syncWrite( - TxRollbackBody.createAMQFrame(_channelId), TxRollbackOkBody.class); - } - catch (AMQException e) - { - throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e)); - } - } - - public void close() throws JMSException - { - // We must close down all producers and consumers in an orderly fashion. This is the only method - // that can be called from a different thread of control from the one controlling the session - synchronized(_connection.getFailoverMutex()) - { - _closed.set(true); - - // we pass null since this is not an error case - closeProducersAndConsumers(null); - - try - { - _connection.getProtocolHandler().closeSession(this); - final AMQFrame frame = ChannelCloseBody.createAMQFrame( - getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0); - _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class); - // When control resumes at this point, a reply will have been received that - // indicates the broker has closed the channel successfully - - } - catch (AMQException e) - { - throw new JMSException("Error closing session: " + e); - } - finally - { - _connection.deregisterSession(_channelId); - } - } - } - - /** - * Close all producers or consumers. This is called either in the error case or when closing the session normally. - * - * @param amqe the exception, may be null to indicate no error has occurred - */ - private void closeProducersAndConsumers(AMQException amqe) - { - try - { - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - try - { - closeConsumers(amqe); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - } - - /** - * Called when the server initiates the closure of the session - * unilaterally. - * - * @param e the exception that caused this session to be closed. Null causes the - */ - public void closed(Throwable e) - { - synchronized(_connection.getFailoverMutex()) - { - // An AMQException has an error code and message already and will be passed in when closure occurs as a - // result of a channel close request - _closed.set(true); - AMQException amqe; - if (e instanceof AMQException) - { - amqe = (AMQException) e; - } - else - { - amqe = new AMQException(_logger, "Closing session forcibly", e); - } - _connection.deregisterSession(_channelId); - closeProducersAndConsumers(amqe); - } - } - - /** - * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after - * failover when the client has veoted resubscription. - * <p/> - * The caller of this method must already hold the failover mutex. - */ - void markClosed() - { - _closed.set(true); - _connection.deregisterSession(_channelId); - markClosedProducersAndConsumers(); - } - - private void markClosedProducersAndConsumers() - { - try - { - // no need for a markClosed* method in this case since there is no protocol traffic closing a producer - closeProducers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - try - { - markClosedConsumers(); - } - catch (JMSException e) - { - _logger.error("Error closing session: " + e, e); - } - } - - /** - * Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is - * currently no way of propagating errors to message producers (this is a JMS limitation). - */ - private void closeProducers() throws JMSException - { - // we need to clone the list of producers since the close() method updates the _producers collection - // which would result in a concurrent modification exception - final ArrayList clonedProducers = new ArrayList(_producers.values()); - - final Iterator it = clonedProducers.iterator(); - while (it.hasNext()) - { - final BasicMessageProducer prod = (BasicMessageProducer) it.next(); - prod.close(); - } - // at this point the _producers map is empty - } - - /** - * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. - * - * @param error not null if this is a result of an error occurring at the connection level - */ - private void closeConsumers(Throwable error) throws JMSException - { - if (_dispatcher != null) - { - _dispatcher.stopDispatcher(); - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); - - final Iterator it = clonedConsumers.iterator(); - while (it.hasNext()) - { - final BasicMessageConsumer con = (BasicMessageConsumer) it.next(); - if (error != null) - { - con.notifyError(error); - } - else - { - con.close(); - } - } - // at this point the _consumers map will be empty - } - - private void markClosedConsumers() throws JMSException - { - if (_dispatcher != null) - { - _dispatcher.stopDispatcher(); - } - // we need to clone the list of consumers since the close() method updates the _consumers collection - // which would result in a concurrent modification exception - final ArrayList clonedConsumers = new ArrayList(_consumers.values()); - - final Iterator it = clonedConsumers.iterator(); - while (it.hasNext()) - { - final BasicMessageConsumer con = (BasicMessageConsumer) it.next(); - con.markClosed(); - } - // at this point the _consumers map will be empty - } - - /** - * Asks the broker to resend all unacknowledged messages for the session. - * - * @throws JMSException - */ - public void recover() throws JMSException - { - checkNotClosed(); - checkNotTransacted(); // throws IllegalStateException if a transacted session - - _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); - } - - public MessageListener getMessageListener() throws JMSException - { - checkNotClosed(); - throw new java.lang.UnsupportedOperationException("MessageListener interface not supported"); - } - - public void setMessageListener(MessageListener listener) throws JMSException - { - checkNotClosed(); - throw new java.lang.UnsupportedOperationException("MessageListener interface not supported"); - } - - public void run() - { - throw new java.lang.UnsupportedOperationException(); - } - - public MessageProducer createProducer(Destination destination, boolean mandatory, - boolean immediate, boolean waitUntilSent) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate, waitUntilSent); - } - - public MessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate); - } - - public MessageProducer createProducer(Destination destination, boolean immediate) - throws JMSException - { - return createProducerImpl(destination, DEFAULT_MANDATORY, immediate); - } - - public MessageProducer createProducer(Destination destination) throws JMSException - { - return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); - } - - private org.apache.qpid.jms.MessageProducer createProducerImpl(Destination destination, boolean mandatory, - boolean immediate) - throws JMSException - { - return createProducerImpl(destination, mandatory, immediate, false); - } - - private org.apache.qpid.jms.MessageProducer createProducerImpl(final Destination destination, final boolean mandatory, - final boolean immediate, final boolean waitUntilSent) - throws JMSException - { - return (org.apache.qpid.jms.MessageProducer) new FailoverSupport() - { - public Object operation() throws JMSException - { - checkNotClosed(); - - return new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId, - AMQSession.this, _connection.getProtocolHandler(), - getNextProducerId(), immediate, mandatory, waitUntilSent); - } - }.execute(_connection); - } - - /** - * Creates a QueueReceiver - * @param destination - * @return QueueReceiver - a wrapper around our MessageConsumer - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination) throws JMSException - { - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination); - return new QueueReceiverAdaptor(dest, consumer); - } - - /** - * Creates a QueueReceiver using a message selector - * @param destination - * @param messageSelector - * @return QueueReceiver - a wrapper around our MessageConsumer - * @throws JMSException - */ - public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException - { - AMQQueue dest = (AMQQueue) destination; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(destination, messageSelector); - return new QueueReceiverAdaptor(dest, consumer); - } - - public MessageConsumer createConsumer(Destination destination) throws JMSException - { - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); - } - - public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException - { - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); - } - - public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) - throws JMSException - { - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); - } - - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException - { - return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); - } - - - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector) throws JMSException - { - return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); - } - - public MessageConsumer createConsumer(Destination destination, - int prefetch, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException - { - return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, - selector, rawSelector); - } - - public MessageConsumer createConsumer(Destination destination, - int prefetchHigh, - int prefetchLow, - boolean noLocal, - boolean exclusive, - String selector, - FieldTable rawSelector) throws JMSException - { - return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, - selector, rawSelector); - } - - protected MessageConsumer createConsumerImpl(final Destination destination, - final int prefetchHigh, - final int prefetchLow, - final boolean noLocal, - final boolean exclusive, - final String selector, - final FieldTable rawSelector) throws JMSException - { - return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() - { - public Object operation() throws JMSException - { - checkNotClosed(); - - AMQDestination amqd = (AMQDestination) destination; - - final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler(); - // TODO: construct the rawSelector from the selector string if rawSelector == null - final FieldTable ft = new FieldTable(); - //if (rawSelector != null) - // ft.put("headers", rawSelector.getDataAsBytes()); - if (rawSelector != null) - { - ft.putAll(rawSelector); - } - BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal, - _messageFactoryRegistry, AMQSession.this, - protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, - _acknowledgeMode); - - try - { - registerConsumer(consumer); - } - catch (AMQException e) - { - JMSException ex = new JMSException("Error registering consumer: " + e); - ex.setLinkedException(e); - throw ex; - } - - return consumer; - } - }.execute(_connection); - } - - public void declareExchange(String name, String type) - { - declareExchange(name, type, _connection.getProtocolHandler()); - } - - public void declareExchangeSynch(String name, String type) throws AMQException - { - AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, false, null); - _connection.getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class); - } - - private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) - { - declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler); - } - - private void declareExchange(String name, String type, AMQProtocolHandler protocolHandler) - { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, 0, name, type, false, false, false, false, true, null); - protocolHandler.writeFrame(exchangeDeclare); - } - - /** - * Declare the queue. - * - * @param amqd - * @param protocolHandler - * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client. - * @throws AMQException - */ - private String declareQueue(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException - { - // For queues (but not topics) we generate the name in the client rather than the - // server. This allows the name to be reused on failover if required. In general, - // the destination indicates whether it wants a name generated or not. - if (amqd.isNameRequired()) - { - amqd.setQueueName(protocolHandler.generateQueueName()); - } - - AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, 0, amqd.getQueueName(), - false, amqd.isDurable(), amqd.isExclusive(), - amqd.isAutoDelete(), true, null); - - protocolHandler.writeFrame(queueDeclare); - return amqd.getQueueName(); - } - - private void bindQueue(AMQDestination amqd, String queueName, AMQProtocolHandler protocolHandler, FieldTable ft) throws AMQException - { - AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, 0, - queueName, amqd.getExchangeName(), - amqd.getRoutingKey(), true, ft); - - protocolHandler.writeFrame(queueBind); - } - - /** - * Register to consume from the queue. - * - * @param queueName - * @return the consumer tag generated by the broker - */ - private String consumeFromQueue(String queueName, AMQProtocolHandler protocolHandler, int prefetchHigh, int prefetchLow, - boolean noLocal, boolean exclusive, int acknowledgeMode) throws AMQException - { - //fixme prefetch values are not used here. Do we need to have them as parametsrs? - //need to generate a consumer tag on the client so we can exploit the nowait flag - String tag = Integer.toString(_nextTag++); - - AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, - queueName, tag, noLocal, - acknowledgeMode == Session.NO_ACKNOWLEDGE, - exclusive, true); - - protocolHandler.writeFrame(jmsConsume); - return tag; - } - - public Queue createQueue(String queueName) throws JMSException - { - if (queueName.indexOf('/') == -1) - { - return new AMQQueue(queueName); - } - else - { - try - { - return new AMQQueue(new AMQBindingURL(queueName)); - } - catch (URLSyntaxException urlse) - { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - - throw jmse; - } - } - } - - /** - * Creates a QueueReceiver wrapping a MessageConsumer - * @param queue - * @return QueueReceiver - * @throws JMSException - */ - public QueueReceiver createReceiver(Queue queue) throws JMSException - { - AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest); - return new QueueReceiverAdaptor(dest, consumer); - } - - /** - * Creates a QueueReceiver wrapping a MessageConsumer using a message selector - * @param queue - * @param messageSelector - * @return QueueReceiver - * @throws JMSException - */ - public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException - { - AMQQueue dest = (AMQQueue) queue; - BasicMessageConsumer consumer = (BasicMessageConsumer) - createConsumer(dest, messageSelector); - return new QueueReceiverAdaptor(dest, consumer); - } - - public QueueSender createSender(Queue queue) throws JMSException - { - return (QueueSender) createProducer(queue); - } - - public Topic createTopic(String topicName) throws JMSException - { - if (topicName.indexOf('/') == -1) - { - return new AMQTopic(topicName); - } - else - { - try - { - return new AMQTopic(new AMQBindingURL(topicName)); - } - catch (URLSyntaxException urlse) - { - JMSException jmse = new JMSException(urlse.getReason()); - jmse.setLinkedException(urlse); - - throw jmse; - } - } - } - - /** - * Creates a non-durable subscriber - * @param topic - * @return TopicSubscriber - a wrapper round our MessageConsumer - * @throws JMSException - */ - public TopicSubscriber createSubscriber(Topic topic) throws JMSException - { - AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - } - - /** - * Creates a non-durable subscriber with a message selector - * @param topic - * @param messageSelector - * @param noLocal - * @return TopicSubscriber - a wrapper round our MessageConsumer - * @throws JMSException - */ - public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException - { - AMQTopic dest = new AMQTopic(topic.getTopicName()); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal)); - } - - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - * If a name is reused in creating a new subscriber with a different topic/selecto or no-local - * flag then the subcriber will receive messages matching the old subscription AND the new one. - * The spec states that the new one should replace the old one. - * TODO: fix it. - */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException - { - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); - return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest)); - } - - /** - * Note, currently this does not handle reuse of the same name with different topics correctly. - */ - public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) - throws JMSException - { - AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name); - BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal); - return new TopicSubscriberAdaptor(dest, consumer); - } - - public TopicPublisher createPublisher(Topic topic) throws JMSException - { - return (TopicPublisher) createProducer(topic); - } - - public QueueBrowser createBrowser(Queue queue) throws JMSException - { - throw new UnsupportedOperationException("Queue browsing not supported"); - } - - public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException - { - throw new UnsupportedOperationException("Queue browsing not supported"); - } - - public TemporaryQueue createTemporaryQueue() throws JMSException - { - return new AMQTemporaryQueue(); - } - - public TemporaryTopic createTemporaryTopic() throws JMSException - { - return new AMQTemporaryTopic(); - } - - public void unsubscribe(String name) throws JMSException - { - //send a queue.delete for the subscription - String queue = _connection.getClientID() + ":" + name; - AMQFrame frame = QueueDeleteBody.createAMQFrame(_channelId, 0, queue, false, false, true); - _connection.getProtocolHandler().writeFrame(frame); - } - - private void checkTransacted() throws JMSException - { - if (!getTransacted()) - { - throw new IllegalStateException("Session is not transacted"); - } - } - - private void checkNotTransacted() throws JMSException - { - if (getTransacted()) - { - throw new IllegalStateException("Session is transacted"); - } - } - - /** - * Invoked by the MINA IO thread (indirectly) when a message is received from the transport. - * Puts the message onto the queue read by the dispatcher. - * - * @param message the message that has been received - */ - public void messageReceived(UnprocessedMessage message) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Message received in session with channel id " + _channelId); - } - - _queue.add(message); - } - - /** - * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from - * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is - * AUTO_ACK or similar. - * - * @param deliveryTag the tag of the last message to be acknowledged - * @param multiple if true will acknowledge all messages up to and including the one specified by the - * delivery tag - */ - public void acknowledgeMessage(long deliveryTag, boolean multiple) - { - final AMQFrame ackFrame = BasicAckBody.createAMQFrame(_channelId, deliveryTag, multiple); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); - } - _connection.getProtocolHandler().writeFrame(ackFrame); - } - - public int getDefaultPrefetch() - { - return _defaultPrefetchHighMark; - } - - public int getDefaultPrefetchHigh() - { - return _defaultPrefetchHighMark; - } - - public int getDefaultPrefetchLow() - { - return _defaultPrefetchLowMark; - } - - public int getChannelId() - { - return _channelId; - } - - void start() - { - if (_dispatcher != null) - { - //then we stopped this and are restarting, so signal server to resume delivery - unsuspendChannel(); - } - _dispatcher = new Dispatcher(); - _dispatcher.setDaemon(true); - _dispatcher.start(); - } - - void stop() - { - //stop the server delivering messages to this session - suspendChannel(); - -//stop the dispatcher thread - _stopped.set(true); - } - - boolean isStopped() - { - return _stopped.get(); - } - - /** - * Callers must hold the failover mutex before calling this method. - * - * @param consumer - * @throws AMQException - */ - void registerConsumer(BasicMessageConsumer consumer) throws AMQException - { - AMQDestination amqd = consumer.getDestination(); - - AMQProtocolHandler protocolHandler = _connection.getProtocolHandler(); - - declareExchange(amqd, protocolHandler); - - String queueName = declareQueue(amqd, protocolHandler); - - bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - - String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetchHigh(), consumer.getPrefetchLow(), - consumer.isNoLocal(), consumer.isExclusive(), consumer.getAcknowledgeMode()); - - consumer.setConsumerTag(consumerTag); - _consumers.put(consumerTag, consumer); - } - - /** - * Called by the MessageConsumer when closing, to deregister the consumer from the - * map from consumerTag to consumer instance. - * - * @param consumerTag the consumer tag, that was broker-generated - */ - void deregisterConsumer(String consumerTag) - { - _consumers.remove(consumerTag); - } - - private void registerProducer(long producerId, MessageProducer producer) - { - _producers.put(new Long(producerId), producer); - } - - void deregisterProducer(long producerId) - { - _producers.remove(new Long(producerId)); - } - - private long getNextProducerId() - { - return ++_nextProducerId; - } - - /** - * Resubscribes all producers and consumers. This is called when performing failover. - * - * @throws AMQException - */ - void resubscribe() throws AMQException - { - resubscribeProducers(); - resubscribeConsumers(); - } - - private void resubscribeProducers() throws AMQException - { - ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: remove - for (Iterator it = producers.iterator(); it.hasNext();) - { - BasicMessageProducer producer = (BasicMessageProducer) it.next(); - producer.resubscribe(); - } - } - - private void resubscribeConsumers() throws AMQException - { - ArrayList consumers = new ArrayList(_consumers.values()); - _consumers.clear(); - - for (Iterator it = consumers.iterator(); it.hasNext();) - { - BasicMessageConsumer consumer = (BasicMessageConsumer) it.next(); - registerConsumer(consumer); - } - } - - private void suspendChannel() - { - _logger.warn("Suspending channel"); - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, false); - _connection.getProtocolHandler().writeFrame(channelFlowFrame); - } - - private void unsuspendChannel() - { - _logger.warn("Unsuspending channel"); - AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, true); - _connection.getProtocolHandler().writeFrame(channelFlowFrame); - } -} |