diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
tree | 1391da89470593209466df68c0b40b89c14963b1 /java/client/src | |
parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
25 files changed, 529 insertions, 308 deletions
diff --git a/java/client/src/main/java/client.bnd b/java/client/src/main/java/client.bnd index d92d582ec8..495ea6793f 100755 --- a/java/client/src/main/java/client.bnd +++ b/java/client/src/main/java/client.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.15.0 +ver: 0.17.0 Bundle-SymbolicName: qpid-client Bundle-Version: ${ver} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index 771e80c3bc..987404cb80 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -319,18 +319,18 @@ public class AMQBrokerDetails implements BrokerDetails BrokerDetails bd = (BrokerDetails) o; - return _host.equalsIgnoreCase(bd.getHost()) && + return _host.toLowerCase().equals(bd.getHost() == null ? null : bd.getHost().toLowerCase()) && (_port == bd.getPort()) && - _transport.equalsIgnoreCase(bd.getTransport()); + _transport.toLowerCase().equals(bd.getTransport() == null ? null : bd.getTransport().toLowerCase()); //TODO do we need to compare all the options as well? } @Override public int hashCode() { - int result = _host != null ? _host.hashCode() : 0; + int result = _host != null ? _host.toLowerCase().hashCode() : 0; result = 31 * result + _port; - result = 31 * result + (_transport != null ? _transport.hashCode() : 0); + result = 31 * result + (_transport != null ? _transport.toLowerCase().hashCode() : 0); return result; } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 39ad282422..23b47c8d67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -308,9 +308,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _delegate = new AMQConnectionDelegate_0_10(this); } - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Connection:" + connectionURL); + _logger.debug("Connection:" + connectionURL); } _connectionURL = connectionURL; @@ -343,7 +343,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _protocolHandler = new AMQProtocolHandler(this); - _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } // We are not currently connected setConnected(false); @@ -435,7 +438,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect throw new AMQConnectionFailureException(message, connectionException); } - _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion()); + } _sessions.setMaxChannelID(_delegate.getMaxChannelID()); _sessions.setMinChannelID(_delegate.getMinChannelID()); @@ -462,7 +468,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect String delegateClassName = String.format ("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", pe.getMajorVersion(), pe.getMinorVersion()); - _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); + if (_logger.isDebugEnabled()) + { + _logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe); + } Class c = Class.forName(delegateClassName); Class partypes[] = new Class[1]; partypes[0] = AMQConnection.class; @@ -569,6 +578,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException { + resetClosedFlag(); return _delegate.makeBrokerConnection(brokerDetail); } @@ -968,7 +978,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); + } public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, @@ -976,7 +987,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, @@ -984,7 +995,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, @@ -993,7 +1004,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // TODO Auto-generated method stub checkNotClosed(); - return null; + throw new JmsNotImplementedException(); } public long getMaximumChannelCount() throws JMSException diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 56ee56d178..a1a06c5547 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -327,6 +327,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + _conn.setClosed(); + ExceptionListener listener = _conn.getExceptionListenerNoCheck(); if (listener == null) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java index 2313bce474..0c6031ea91 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java @@ -20,6 +20,12 @@ */ package org.apache.qpid.client; +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.filter.JMSSelectorFilter; +import org.apache.qpid.protocol.AMQConstant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,12 +54,50 @@ public class AMQQueueBrowser implements QueueBrowser _session = session; _queue = queue; _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector; - // Create Consumer to verify message selector. - BasicMessageConsumer consumer = - (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); - // Close this consumer as we are not looking to consume only to establish that, at least for now, - // the QB can be created - consumer.close(); + + + validateQueue((AMQDestination) queue); + + if(_messageSelector != null) + { + validateSelector(_messageSelector); + } + } + + private void validateSelector(String messageSelector) throws InvalidSelectorException + { + try + { + new JMSSelectorFilter(messageSelector); + } + catch (AMQInternalException e) + { + throw new InvalidSelectorException(e.getMessage()); + } + } + + private void validateQueue(AMQDestination queue) throws JMSException + { + try + { + // Essentially just test the connection/session is still active + _session.sync(); + // TODO - should really validate queue exists, but we often rely on creating the consumer to create the queue :( + // _session.declareQueuePassive( queue ); + } + catch (AMQException e) + { + if(e.getErrorCode() == AMQConstant.NOT_FOUND) + { + throw new InvalidDestinationException(e.getMessage()); + } + else + { + final JMSException jmsException = new JMSException(e.getMessage(), String.valueOf(e.getErrorCode().getCode())); + jmsException.setLinkedException(e); + throw jmsException; + } + } } public Queue getQueue() throws JMSException @@ -88,6 +132,10 @@ public class AMQQueueBrowser implements QueueBrowser public Enumeration getEnumeration() throws JMSException { checkState(); + if(!_session.getAMQConnection().started()) + { + throw new IllegalStateException("Cannot enumerate message on the queue while the Connection is stopped"); + } final BasicMessageConsumer consumer = (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false); @@ -118,12 +166,12 @@ public class AMQQueueBrowser implements QueueBrowser _consumer = consumer; prefetchMessage(); } - _logger.info("QB:created with first element:" + _nextMessage); + _logger.debug("QB:created with first element:" + _nextMessage); } public boolean hasMoreElements() { - _logger.info("QB:hasMoreElements:" + (_nextMessage != null)); + _logger.debug("QB:hasMoreElements:" + (_nextMessage != null)); return (_nextMessage != null); } @@ -136,9 +184,9 @@ public class AMQQueueBrowser implements QueueBrowser } try { - _logger.info("QB:nextElement about to receive"); + _logger.debug("QB:nextElement about to receive"); prefetchMessage(); - _logger.info("QB:nextElement received:" + _nextMessage); + _logger.debug("QB:nextElement received:" + _nextMessage); } catch (JMSException e) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e7e937b689..55d3ccb6e7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -120,18 +120,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; /** - * The default value for immediate flag used by producers created by this session is false. That is, a consumer does - * not need to be attached to a queue. - */ - private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); - - /** - * The default value for mandatory flag used by producers created by this session is true. That is, server will not - * silently drop messages where no queue is connected to the exchange for the message. - */ - private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); - - /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ @@ -542,9 +530,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } // Add creation logging to tie in with the existing close logging - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Created session:" + this); + _logger.debug("Created session:" + this); } } @@ -733,9 +721,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void close(long timeout, boolean sendClose) throws JMSException { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Closing session: " + this); + _logger.debug("Closing session: " + this); } // Ensure we only try and close an open session. @@ -904,11 +892,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // Flush any pending messages for this consumerTag if (_dispatcher != null) { - _logger.info("Dispatcher is not null"); + _logger.debug("Dispatcher is not null"); } else { - _logger.info("Dispatcher is null so created stopped dispatcher"); + _logger.debug("Dispatcher is null so created stopped dispatcher"); startDispatcherIfNecessary(true); } @@ -926,9 +914,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // There is a small window where the message is between the two queues in the dispatcher. if (consumer.isClosed()) { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Closing consumer:" + consumer.debugIdentity()); + _logger.debug("Closing consumer:" + consumer.debugIdentity()); } deregisterConsumer(consumer); @@ -1101,6 +1089,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + if(noLocal) + { + args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true); + } // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. @@ -1198,12 +1190,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public P createProducer(Destination destination) throws JMSException { - return createProducerImpl(destination, _defaultMandatoryValue, _defaultImmediateValue); + return createProducerImpl(destination, null, null); } public P createProducer(Destination destination, boolean immediate) throws JMSException { - return createProducerImpl(destination, _defaultMandatoryValue, immediate); + return createProducerImpl(destination, null, immediate); } public P createProducer(Destination destination, boolean mandatory, boolean immediate) @@ -1692,7 +1684,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { AMQProtocolHandler protocolHandler = getProtocolHandler(); declareExchange(amqd, protocolHandler, false); - AMQShortString queueName = declareQueue(amqd, protocolHandler, false); + AMQShortString queueName = declareQueue(amqd, false); bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd); } @@ -2378,9 +2370,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD); _dispatcher.setConnectionStopped(initiallyStopped); _dispatcherThread.start(); - if (_dispatcherLogger.isInfoEnabled()) + if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " created"); + _dispatcherLogger.debug(_dispatcherThread.getName() + " created"); } } else @@ -2613,7 +2605,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void sendConsume(C consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException; - private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate) + private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate) throws JMSException { return new FailoverRetrySupport<P, JMSException>( @@ -2642,8 +2634,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic }, _connection).execute(); } - public abstract P createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final long producerId) throws JMSException; + public abstract P createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, final long producerId) throws JMSException; private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException { @@ -2726,6 +2718,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException, FailoverException; + + void declareQueuePassive(AMQDestination queue) throws AMQException + { + declareQueue(queue,false,false,true); + } + /** * Declares a queue for a JMS destination. * @@ -2735,27 +2733,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * * <p/>Note that this operation automatically retries in the event of fail-over. * - * @param amqd The destination to declare as a queue. - * @param protocolHandler The protocol handler to communicate through. * + * @param amqd The destination to declare as a queue. * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of * the client. * + * + * * @throws AMQException If the queue cannot be declared for any reason. * @todo Verify the destiation is valid or throw an exception. * @todo Be aware of possible changes to parameter order as versions change. */ - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal) throws AMQException { - return declareQueue(amqd, protocolHandler, noLocal, false); + return declareQueue(amqd, noLocal, false); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, + protected AMQShortString declareQueue(final AMQDestination amqd, final boolean noLocal, final boolean nowait) + throws AMQException + { + return declareQueue(amqd, noLocal, nowait, false); + } + + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -2767,7 +2773,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic amqd.setQueueName(protocolHandler.generateQueueName()); } - sendQueueDeclare(amqd, protocolHandler, nowait); + sendQueueDeclare(amqd, protocolHandler, nowait, passive); return amqd.getAMQQueueName(); } @@ -2775,7 +2781,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException; + final boolean nowait, boolean passive) throws AMQException, FailoverException; /** * Undeclares the specified queue. @@ -2916,7 +2922,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (_delareQueues || amqd.isNameRequired()) { - declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait); + declareQueue(amqd, consumer.isNoLocal(), nowait); } bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait); } @@ -2939,7 +2945,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { suspendChannel(true); - _logger.info( + _logger.debug( "Prefetching delayed existing messages will not flow until requested via receive*() or setML()."); } catch (AMQException e) @@ -2951,7 +2957,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } else { - _logger.info("Immediately prefetching existing messages to new consumer."); + _logger.debug("Immediately prefetching existing messages to new consumer."); } try @@ -2983,18 +2989,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers) { Iterator messages = _queue.iterator(); - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:" + requeue); if (messages.hasNext()) { - _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); + _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")"); } else { - _logger.info("No messages in _queue to reject"); + _logger.debug("No messages in _queue to reject"); } } while (messages.hasNext()) @@ -3037,7 +3043,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private void resubscribeProducers() throws AMQException { ArrayList producers = new ArrayList(_producers.values()); - _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey + _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey for (Iterator it = producers.iterator(); it.hasNext();) { P producer = (P) it.next(); @@ -3127,7 +3133,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); - _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); + if (_logger.isInfoEnabled()) + { + _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); + } } public void checkFlowControl() throws InterruptedException, JMSException @@ -3141,7 +3150,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _flowControl.wait(_flowControlWaitPeriod); - _logger.warn("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + if (_logger.isInfoEnabled()) + { + _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + } } if(!_flowControl.getFlowControl()) { @@ -3200,28 +3212,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void rejectPending(C consumer) { - synchronized (_lock) - { - boolean stopped = _dispatcher.connectionStopped(); + // Reject messages on pre-receive queue + consumer.rollbackPendingMessages(); - if (!stopped) - { - _dispatcher.setConnectionStopped(true); - } + // Reject messages on pre-dispatch queue + rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - // Reject messages on pre-receive queue - consumer.rollbackPendingMessages(); + // closeConsumer + consumer.markClosed(); - // Reject messages on pre-dispatch queue - rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false); - //Let the dispatcher deal with this when it gets to them. - - // closeConsumer - consumer.markClosed(); - - _dispatcher.setConnectionStopped(stopped); - - } } public void rollback() @@ -3294,9 +3293,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void run() { - if (_dispatcherLogger.isInfoEnabled()) + if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " started"); + _dispatcherLogger.debug(_dispatcherThread.getName() + " started"); } // Allow disptacher to start stopped @@ -3318,7 +3317,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic try { Dispatchable disp; - while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null)) + while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get()) { disp.dispatch(AMQSession.this); } @@ -3328,9 +3327,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // ignored as run will exit immediately } - if (_dispatcherLogger.isInfoEnabled()) + if (_dispatcherLogger.isDebugEnabled()) { - _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this); + _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this); } } @@ -3413,7 +3412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { final C consumer = _consumers.get(message.getConsumerTag()); - if ((consumer == null) || consumer.isClosed()) + if ((consumer == null) || consumer.isClosed() || consumer.isClosing()) { if (_dispatcherLogger.isInfoEnabled()) { diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 816ad1f222..3902c726f3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -17,9 +17,20 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.jms.Destination; +import javax.jms.JMSException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; @@ -27,7 +38,6 @@ import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverNoopSupport; import org.apache.qpid.client.failover.FailoverProtectedOperation; -import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.FieldTableSupport; import org.apache.qpid.client.message.MessageFactoryRegistry; @@ -42,28 +52,14 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.*; -import org.apache.qpid.util.Serial; -import org.apache.qpid.util.Strings; - import static org.apache.qpid.transport.Option.BATCH; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; import static org.apache.qpid.transport.Option.UNRELIABLE; - -import javax.jms.Destination; -import javax.jms.JMSException; -import java.lang.ref.WeakReference; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.qpid.util.Serial; +import org.apache.qpid.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a 0.10 Session @@ -654,8 +650,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic /** * Create an 0_10 message producer */ - public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, final long producerId) throws JMSException + public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, final long producerId) throws JMSException { try { @@ -725,7 +721,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) + final boolean nowait, boolean passive) throws AMQException, FailoverException { // do nothing this is only used by 0_8 @@ -735,7 +731,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic * Declare a queue with the given queueName */ public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) + final boolean noLocal, final boolean nowait, boolean passive) throws AMQException { AMQShortString queueName; @@ -761,7 +757,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, amqd.isDurable() ? Option.DURABLE : Option.NONE, - amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE); + amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE, + passive ? Option.PASSIVE : Option.NONE); } else { @@ -931,11 +928,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return getCurrentException(); } - protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean noLocal, final boolean nowait) + protected AMQShortString declareQueue(final AMQDestination amqd, + final boolean noLocal, final boolean nowait, final boolean passive) throws AMQException { - /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/ + final AMQProtocolHandler protocolHandler = getProtocolHandler(); + return new FailoverNoopSupport<AMQShortString, AMQException>( new FailoverProtectedOperation<AMQShortString, AMQException>() { @@ -952,7 +950,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic amqd.setQueueName(new AMQShortString( binddingKey + "@" + amqd.getExchangeName().toString() + "_" + UUID.randomUUID())); } - return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait); + return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive); } }, getAMQConnection()).execute(); } @@ -1209,7 +1207,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else if(createNode) { setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,noLocal,noWait); + send0_10QueueDeclare(dest,null,noLocal,noWait, false); sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), null,dest.getExchangeName(),dest, false); break; @@ -1315,7 +1313,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } node.setExclusive(true); node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,noLocal,true); + send0_10QueueDeclare(dest,null,noLocal,true, false); getQpidSession().exchangeBind(dest.getQueueName(), dest.getAddressName(), dest.getSubject(), diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 29f1925cbc..8ab23a240e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -38,7 +38,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; -import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; @@ -401,9 +400,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler, - final boolean nowait) throws AMQException, FailoverException - { - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null); + final boolean nowait, boolean passive) throws AMQException, FailoverException + { + QueueDeclareBody body = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + passive, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null); AMQFrame queueDeclare = body.generateFrame(getChannelId()); @@ -441,8 +448,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } - public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory, - final boolean immediate, long producerId) throws JMSException + public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory, + final Boolean immediate, long producerId) throws JMSException { try { diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 8e9b1fb90f..0f8b5717d6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -199,6 +199,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); + if(noLocal) + { + ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal); + } _arguments = ft; @@ -275,7 +279,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started."); } - _logger.debug("Message listener set for destination " + _destination); + if (_logger.isDebugEnabled()) + { + _logger.debug("Message listener set for destination " + _destination); + } if (messageListener != null) { @@ -553,9 +560,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa public void close(boolean sendClose) throws JMSException { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("Closing consumer:" + debugIdentity()); + _logger.debug("Closing consumer:" + debugIdentity()); } if (!setClosed()) @@ -586,7 +593,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa // no point otherwise as the connection will be gone if (!_session.isClosed() || _session.isClosing()) { - sendCancel(); + synchronized(_session.getMessageDeliveryLock()) + { + sendCancel(); + } cleanupQueue(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 75f198e1fa..9b3b2ce0e9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -20,18 +20,8 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.message.MessageConverter; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.util.UUIDGen; -import org.apache.qpid.util.UUIDs; - +import java.io.UnsupportedEncodingException; +import java.util.UUID; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -42,77 +32,22 @@ import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; -import java.io.UnsupportedEncodingException; -import java.util.UUID; +import javax.jms.Topic; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageConverter; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.UUIDGen; +import org.apache.qpid.util.UUIDs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { - /** - * If true, messages will not get a timestamp. - */ - protected boolean isDisableTimestamps() - { - return _disableTimestamps; - } - - protected void setDisableTimestamps(boolean disableTimestamps) - { - _disableTimestamps = disableTimestamps; - } - - protected void setDestination(AMQDestination destination) - { - _destination = destination; - } - - protected AMQProtocolHandler getProtocolHandler() - { - return _protocolHandler; - } - - protected void setProtocolHandler(AMQProtocolHandler protocolHandler) - { - _protocolHandler = protocolHandler; - } - - protected int getChannelId() - { - return _channelId; - } - - protected void setChannelId(int channelId) - { - _channelId = channelId; - } - - protected void setSession(AMQSession session) - { - _session = session; - } - - protected String getUserID() - { - return _userID; - } - - protected void setUserID(String userID) - { - _userID = userID; - } - - protected PublishMode getPublishMode() - { - return publishMode; - } - - protected void setPublishMode(PublishMode publishMode) - { - this.publishMode = publishMode; - } - enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }; - private final Logger _logger = LoggerFactory.getLogger(getClass()); + private final Logger _logger ; private AMQConnection _connection; @@ -166,7 +101,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private final boolean _immediate; - private final boolean _mandatory; + private final Boolean _mandatory; private boolean _disableMessageId; @@ -174,14 +109,37 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac private String _userID; // ref user id used in the connection. - private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + + /** + * The default value for immediate flag used this producer is false. That is, a consumer does + * not need to be attached to a queue. + */ + private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + + /** + * The default value for mandatory flag used by this producer is true. That is, server will not + * silently drop messages where no queue is connected to the exchange for the message. + */ + private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + /** + * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server + * will silently drop messages where no queue is connected to the exchange for the message. + */ + private final boolean _defaultMandatoryTopicValue = + Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic", + System.getProperties().containsKey("qpid.default_mandatory") + ? System.getProperty("qpid.default_mandatory") + : "false")); private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL; - protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, + Boolean immediate, Boolean mandatory) throws AMQException { - _connection = connection; + _logger = logger; + _connection = connection; _destination = destination; _transacted = transacted; _protocolHandler = protocolHandler; @@ -193,8 +151,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac declareDestination(destination); } - _immediate = immediate; - _mandatory = mandatory; + _immediate = immediate == null ? _defaultImmediateValue : immediate; + _mandatory = mandatory == null + ? destination == null ? null + : destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : mandatory; + _userID = connection.getUsername(); setPublishMode(); } @@ -215,7 +179,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac publishMode = PublishMode.SYNC_PUBLISH_ALL; } - _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode); + if (_logger.isDebugEnabled()) + { + _logger.debug("MessageProducer " + toString() + " using publish mode : " + publishMode); + } } void resubscribe() throws AMQException @@ -381,7 +348,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, + sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, + _mandatory == null + ? destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : _mandatory, _immediate); } } @@ -394,7 +366,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac synchronized (_connection.getFailoverMutex()) { validateDestination(destination); - sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate); + sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, + _mandatory == null + ? destination instanceof Topic + ? _defaultMandatoryTopicValue + : _defaultMandatoryValue + : _mandatory, + _immediate); } } @@ -542,7 +520,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac _logger.debug("Updating original message"); origMessage.setJMSPriority(message.getJMSPriority()); origMessage.setJMSTimestamp(message.getJMSTimestamp()); - _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration()); + } origMessage.setJMSExpiration(message.getJMSExpiration()); origMessage.setJMSMessageID(message.getJMSMessageID()); } @@ -646,6 +627,69 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac } } + /** + * If true, messages will not get a timestamp. + */ + protected boolean isDisableTimestamps() + { + return _disableTimestamps; + } + + protected void setDisableTimestamps(boolean disableTimestamps) + { + _disableTimestamps = disableTimestamps; + } + + protected void setDestination(AMQDestination destination) + { + _destination = destination; + } + + protected AMQProtocolHandler getProtocolHandler() + { + return _protocolHandler; + } + + protected void setProtocolHandler(AMQProtocolHandler protocolHandler) + { + _protocolHandler = protocolHandler; + } + + protected int getChannelId() + { + return _channelId; + } + + protected void setChannelId(int channelId) + { + _channelId = channelId; + } + + protected void setSession(AMQSession session) + { + _session = session; + } + + protected String getUserID() + { + return _userID; + } + + protected void setUserID(String userID) + { + _userID = userID; + } + + protected PublishMode getPublishMode() + { + return publishMode; + } + + protected void setPublishMode(PublishMode publishMode) + { + this.publishMode = publishMode; + } + Logger getLogger() { return _logger; diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 024219cfd6..a3a1e9c28b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -61,9 +61,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, long producerId, - boolean immediate, boolean mandatory) throws AMQException + Boolean immediate, Boolean mandatory) throws AMQException { - super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); + super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory); userIDBytes = Strings.toUTF8(getUserID()); } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 3b5e361f97..21ff6c877a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -33,6 +33,9 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.MethodRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; @@ -42,11 +45,12 @@ import java.util.UUID; public class BasicMessageProducer_0_8 extends BasicMessageProducer { + private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class); BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, - AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException + AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException { - super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); + super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory); } void declareDestination(AMQDestination destination) diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java index ba26bfc485..2f7fbad30c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java +++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java @@ -81,7 +81,7 @@ public abstract class Closeable } /** - * Checks if this is closis. + * Checks if this is closing. * * @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise. */ @@ -90,6 +90,11 @@ public abstract class Closeable return _closing.get(); } + public void resetClosedFlag() + { + _closed.set(false); + } + protected boolean setClosed() { return _closed.getAndSet(true); diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java index 128aa18d30..af9048f1f5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java @@ -308,13 +308,16 @@ public class XAResourceImpl implements XAResource _xaSession.createSession(); convertExecutionErrorToXAErr( e.getException().getErrorCode()); } - Xid[] result = new Xid[res.getInDoubt().size()]; - int i = 0; - for (Object obj : res.getInDoubt()) + Xid[] result = new Xid[res.getInDoubt() != null ? res.getInDoubt().size() : 0]; + if(result.length != 0) { - org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj; - result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId()); - i++; + int i = 0; + for (Object obj : res.getInDoubt()) + { + org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj; + result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId()); + i++; + } } return result; } @@ -436,6 +439,16 @@ public class XAResourceImpl implements XAResource } } + /** + * Is this resource currently enlisted in a transaction? + * + * @return true if the resource is associated with a transaction, false otherwise. + */ + public boolean isEnlisted() + { + return (_xid != null) ; + } + //------------------------------------------------------------------------ // Private methods //------------------------------------------------------------------------ diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index 85623df8c0..f2efb6e8a5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -6,7 +6,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE 2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,6 +18,7 @@ package org.apache.qpid.client; import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.transport.RangeSet; import javax.jms.JMSException; import javax.jms.QueueSession; @@ -178,4 +179,17 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic { return (TopicSession) getSession(); } + + @Override + protected void acknowledgeImpl() + { + if (_xaResource.isEnlisted()) + { + acknowledgeMessage(Long.MAX_VALUE, true) ; + } + else + { + super.acknowledgeImpl() ; + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 558d93538b..e1a0e18262 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -95,9 +95,9 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session) { - if (_logger.isInfoEnabled()) + if (_logger.isDebugEnabled()) { - _logger.info("New Method Dispatcher:" + session); + _logger.debug("New Method Dispatcher:" + session); } DispatcherFactory factory = _dispatcherFactories.get(version); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java index 31a0440b04..bd63cdb5c5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java @@ -48,9 +48,12 @@ public class FieldTableSupport public static Map<String,Object> convertToMap(FieldTable ft) { Map<String,Object> map = new HashMap<String,Object>(); - for (AMQShortString key: ft.keySet() ) + if(ft != null) { - map.put(key.asString(), ft.getObject(key)); + for (AMQShortString key: ft.keySet() ) + { + map.put(key.asString(), ft.getObject(key)); + } } return map; diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index d380402da7..b314453e31 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -652,7 +652,8 @@ public class AMQProtocolHandler implements ProtocolEngine } writeFrame(frame); - return listener.blockForFrame(timeout); + long actualTimeout = timeout == -1 ? DEFAULT_SYNC_TIMEOUT : timeout; + return listener.blockForFrame(actualTimeout); // When control resumes before this line, a reply will have been received // that matches the criteria defined in the blocking listener } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index ced734f70f..af57fd98fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -108,7 +108,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession { _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); - _logger.info("Using ProtocolVersion for Session:" + _protocolVersion); + if (_logger.isDebugEnabled()) + { + _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion); + } _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); _connection = connection; @@ -302,7 +305,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession */ public void closeSession(AMQSession session) { - _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + if (_logger.isDebugEnabled()) + { + _logger.debug("closeSession called on protocol session for session " + session.getChannelId()); + } final int channelId = session.getChannelId(); if (channelId <= 0) { @@ -393,7 +399,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession public void setProtocolVersion(final ProtocolVersion pv) { - _logger.info("Setting ProtocolVersion to :" + pv); + if (_logger.isDebugEnabled()) + { + _logger.debug("Setting ProtocolVersion to :" + pv); + } _protocolVersion = pv; _methodRegistry = MethodRegistry.getMethodRegistry(pv); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this); diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 80d171592f..22dc17e53c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -39,7 +39,7 @@ import java.util.concurrent.locks.ReentrantLock; * differs from a 'rendezvous' in that sense. * * <p/>BlockingWaiters are used to coordinate when waiting for an an event that expect a response. - * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register + * They are always used in a 'one-shot' manner, that is, to receive just one response. Usually the caller has to register * them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they * have been completed. * @@ -51,12 +51,12 @@ import java.util.concurrent.locks.ReentrantLock; * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations </td> * <tr><td> Accept generic objects as events for processing via {@link #process}. <td> - * <tr><td> Delegate handling and undserstanding of the object to a concrete implementation. <td> + * <tr><td> Delegate handling and understanding of the object to a concrete implementation. <td> * <tr><td> Block until {@link #process} determines that waiting is no longer required <td> * <tr><td> Propagate the most recent exception to the consumer.<td> * </table> * - * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull + * @todo Interruption is caught but not handled. This could be allowed to fall through. This might actually be useful * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry * when this happens. At the very least, restore the interrupted status flag. * @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to @@ -84,13 +84,13 @@ public abstract class BlockingWaiter<T> /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */ private volatile Exception _error; - /** Holds the incomming Object. */ + /** Holds the incoming Object. */ private Object _doneObject = null; private AtomicBoolean _waiting = new AtomicBoolean(false); private boolean _closed = false; /** - * Delegates processing of the incomming object to the handler. + * Delegates processing of the incoming object to the handler. * * @param object The object to process. * @@ -146,6 +146,11 @@ public abstract class BlockingWaiter<T> */ public Object block(long timeout) throws AMQException, FailoverException { + if (timeout < 0) + { + throw new IllegalArgumentException("timeout must be zero or greater"); + } + long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout); _lock.lock(); @@ -165,26 +170,18 @@ public abstract class BlockingWaiter<T> { try { - if (timeout == -1) - { - _receivedCondition.await(); - } - else - { - nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); + nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout); - if (nanoTimeout <= 0 && !_ready && _error == null) - { - _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); - _ready = true; - } + if (nanoTimeout <= 0 && !_ready && _error == null) + { + _error = new AMQTimeoutException("Server did not respond in a timely fashion", null); + _ready = true; } } catch (InterruptedException e) { _logger.error(e.getMessage(), e); - // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess - + // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivalent to success } } } @@ -285,8 +282,8 @@ public abstract class BlockingWaiter<T> /** * Close this Waiter so that no more errors are processed. * This is a preventative method to ensure that a second error thread does not get stuck in the error method after - * the await has returned. This has not happend but in practise but if two errors occur on the Connection at - * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a + * the await has returned. This has not happened but in practise but if two errors occur on the Connection at + * the same time then it is conceivably possible for the second to get stuck if the first one is processed by a * waiter. * * Once closed any attempt to wait will throw an exception. diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java index bc3f89849e..9b202a13ee 100644 --- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java +++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java @@ -20,8 +20,26 @@ */ package org.apache.qpid.jndi; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.Topic; +import javax.naming.ConfigurationException; +import javax.naming.Context; +import javax.naming.NamingException; +import javax.naming.spi.InitialContextFactory; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQDestination; @@ -33,23 +51,10 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLSyntaxException; import org.apache.qpid.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Queue; -import javax.jms.Topic; -import javax.naming.ConfigurationException; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.spi.InitialContextFactory; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; public class PropertiesFileInitialContextFactory implements InitialContextFactory { @@ -60,6 +65,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor private String QUEUE_PREFIX = "queue."; private String TOPIC_PREFIX = "topic."; + @SuppressWarnings({ "rawtypes", "unchecked" }) public Context getInitialContext(Hashtable environment) throws NamingException { Map data = new ConcurrentHashMap(); @@ -68,6 +74,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor { String file = null; + if (environment.containsKey(Context.PROVIDER_URL)) { file = (String) environment.get(Context.PROVIDER_URL); @@ -77,13 +84,23 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor file = System.getProperty(Context.PROVIDER_URL); } + // Load the properties specified if (file != null) { _logger.info("Loading Properties from:" + file); + BufferedInputStream inputStream = null; - // Load the properties specified - BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(file)); + if(file.contains("file:")) + { + inputStream = new BufferedInputStream(new FileInputStream(new File(new URI(file)))); + } + else + { + inputStream = new BufferedInputStream(new FileInputStream(file)); + } + Properties p = new Properties(); + try { p.load(inputStream); @@ -119,6 +136,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" + "Due to:"+ioe.getMessage()); } + catch(URISyntaxException uoe) + { + _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" + + "Due to:"+uoe.getMessage()); + } createConnectionFactories(data, environment); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDITest.properties b/java/client/src/test/java/org/apache/qpid/jndi/JNDITest.properties index 07017a05a6..07017a05a6 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDITest.properties +++ b/java/client/src/test/java/org/apache/qpid/jndi/JNDITest.properties diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java index 576ab4fa05..2989970dcd 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java +++ b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java @@ -14,29 +14,35 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ -package org.apache.qpid.test.unit.jndi; +package org.apache.qpid.jndi; -import junit.framework.TestCase; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.framing.AMQShortString; +import java.util.Properties; +import javax.jms.Destination; import javax.jms.Queue; import javax.jms.Topic; import javax.naming.ConfigurationException; import javax.naming.Context; import javax.naming.InitialContext; -import java.util.Properties; -public class JNDIPropertyFileTest extends TestCase +import junit.framework.TestCase; + +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.framing.AMQShortString; + +public class PropertiesFileInitialContextFactoryTest extends TestCase { + private static final String FILE_URL_PATH = System.getProperty("user.dir") + "/client/src/test/java/org/apache/qpid/jndi/"; + private static final String FILE_NAME = "hello.properties"; + private Context ctx; - - public JNDIPropertyFileTest() throws Exception + + protected void setUp() throws Exception { Properties properties = new Properties(); properties.load(this.getClass().getResourceAsStream("JNDITest.properties")); @@ -44,19 +50,20 @@ public class JNDIPropertyFileTest extends TestCase //Create the initial context ctx = new InitialContext(properties); } - + + public void testQueueNamesWithTrailingSpaces() throws Exception { Queue queue = (Queue)ctx.lookup("QueueNameWithSpace"); - assertEquals("QueueNameWithSpace",queue.getQueueName()); + assertEquals("QueueNameWithSpace",queue.getQueueName()); } - + public void testTopicNamesWithTrailingSpaces() throws Exception { Topic topic = (Topic)ctx.lookup("TopicNameWithSpace"); - assertEquals("TopicNameWithSpace",topic.getTopicName()); + assertEquals("TopicNameWithSpace",topic.getTopicName()); } - + public void testMultipleTopicNamesWithTrailingSpaces() throws Exception { Topic topic = (Topic)ctx.lookup("MultipleTopicNamesWithSpace"); @@ -64,16 +71,16 @@ public class JNDIPropertyFileTest extends TestCase for (AMQShortString bindingKey: ((AMQDestination)topic).getBindingKeys()) { i++; - assertEquals("Topic" + i + "WithSpace",bindingKey.asString()); + assertEquals("Topic" + i + "WithSpace",bindingKey.asString()); } } - + public void testConfigurationErrors() throws Exception { Properties properties = new Properties(); properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); properties.put("destination.my-queue","amq.topic/test;create:always}"); - + try { ctx = new InitialContext(properties); @@ -83,6 +90,6 @@ public class JNDIPropertyFileTest extends TestCase { assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}")); } - + } } diff --git a/java/client/src/test/java/org/apache/qpid/jndi/hello.properties b/java/client/src/test/java/org/apache/qpid/jndi/hello.properties new file mode 100644 index 0000000000..d017d137fe --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/jndi/hello.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://10.0.1.46:5672' + +# Register an AMQP destination in JNDI +# destination.[jniName] = [Address Format] +destination.topicExchange = amq.topic diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index 84d91ee57e..f199961b6f 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -145,7 +145,7 @@ public class TestAMQSession extends AMQSession_0_8 } public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler, - boolean nowait) throws AMQException, FailoverException + boolean nowait, boolean passive) throws AMQException, FailoverException { } |