summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /java/client/src
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rwxr-xr-xjava/client/src/main/java/client.bnd2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnection.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java141
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java64
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java222
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java8
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/Closeable.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java25
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java7
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java15
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java39
-rw-r--r--java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java62
-rw-r--r--java/client/src/test/java/org/apache/qpid/jndi/JNDITest.properties (renamed from java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDITest.properties)0
-rw-r--r--java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java (renamed from java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java)45
-rw-r--r--java/client/src/test/java/org/apache/qpid/jndi/hello.properties27
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java2
25 files changed, 529 insertions, 308 deletions
diff --git a/java/client/src/main/java/client.bnd b/java/client/src/main/java/client.bnd
index d92d582ec8..495ea6793f 100755
--- a/java/client/src/main/java/client.bnd
+++ b/java/client/src/main/java/client.bnd
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.15.0
+ver: 0.17.0
Bundle-SymbolicName: qpid-client
Bundle-Version: ${ver}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index 771e80c3bc..987404cb80 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -319,18 +319,18 @@ public class AMQBrokerDetails implements BrokerDetails
BrokerDetails bd = (BrokerDetails) o;
- return _host.equalsIgnoreCase(bd.getHost()) &&
+ return _host.toLowerCase().equals(bd.getHost() == null ? null : bd.getHost().toLowerCase()) &&
(_port == bd.getPort()) &&
- _transport.equalsIgnoreCase(bd.getTransport());
+ _transport.toLowerCase().equals(bd.getTransport() == null ? null : bd.getTransport().toLowerCase());
//TODO do we need to compare all the options as well?
}
@Override
public int hashCode()
{
- int result = _host != null ? _host.hashCode() : 0;
+ int result = _host != null ? _host.toLowerCase().hashCode() : 0;
result = 31 * result + _port;
- result = 31 * result + (_transport != null ? _transport.hashCode() : 0);
+ result = 31 * result + (_transport != null ? _transport.toLowerCase().hashCode() : 0);
return result;
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 39ad282422..23b47c8d67 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -308,9 +308,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_delegate = new AMQConnectionDelegate_0_10(this);
}
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Connection:" + connectionURL);
+ _logger.debug("Connection:" + connectionURL);
}
_connectionURL = connectionURL;
@@ -343,7 +343,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_protocolHandler = new AMQProtocolHandler(this);
- _logger.info("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Connecting with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+ }
// We are not currently connected
setConnected(false);
@@ -435,7 +438,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
throw new AMQConnectionFailureException(message, connectionException);
}
- _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+ }
_sessions.setMaxChannelID(_delegate.getMaxChannelID());
_sessions.setMinChannelID(_delegate.getMinChannelID());
@@ -462,7 +468,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
String delegateClassName = String.format
("org.apache.qpid.client.AMQConnectionDelegate_%s_%s",
pe.getMajorVersion(), pe.getMinorVersion());
- _logger.info("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
+ }
Class c = Class.forName(delegateClassName);
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
@@ -569,6 +578,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
{
+ resetClosedFlag();
return _delegate.makeBrokerConnection(brokerDetail);
}
@@ -968,7 +978,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
checkNotClosed();
- return null;
+ throw new JmsNotImplementedException();
+
}
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool,
@@ -976,7 +987,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
checkNotClosed();
- return null;
+ throw new JmsNotImplementedException();
}
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool,
@@ -984,7 +995,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
{
checkNotClosed();
- return null;
+ throw new JmsNotImplementedException();
}
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector,
@@ -993,7 +1004,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
// TODO Auto-generated method stub
checkNotClosed();
- return null;
+ throw new JmsNotImplementedException();
}
public long getMaximumChannelCount() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 56ee56d178..a1a06c5547 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -327,6 +327,8 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
+ _conn.setClosed();
+
ExceptionListener listener = _conn.getExceptionListenerNoCheck();
if (listener == null)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 2313bce474..0c6031ea91 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
+import org.apache.qpid.protocol.AMQConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,12 +54,50 @@ public class AMQQueueBrowser implements QueueBrowser
_session = session;
_queue = queue;
_messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
- // Create Consumer to verify message selector.
- BasicMessageConsumer consumer =
- (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
- // Close this consumer as we are not looking to consume only to establish that, at least for now,
- // the QB can be created
- consumer.close();
+
+
+ validateQueue((AMQDestination) queue);
+
+ if(_messageSelector != null)
+ {
+ validateSelector(_messageSelector);
+ }
+ }
+
+ private void validateSelector(String messageSelector) throws InvalidSelectorException
+ {
+ try
+ {
+ new JMSSelectorFilter(messageSelector);
+ }
+ catch (AMQInternalException e)
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ }
+
+ private void validateQueue(AMQDestination queue) throws JMSException
+ {
+ try
+ {
+ // Essentially just test the connection/session is still active
+ _session.sync();
+ // TODO - should really validate queue exists, but we often rely on creating the consumer to create the queue :(
+ // _session.declareQueuePassive( queue );
+ }
+ catch (AMQException e)
+ {
+ if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+ {
+ throw new InvalidDestinationException(e.getMessage());
+ }
+ else
+ {
+ final JMSException jmsException = new JMSException(e.getMessage(), String.valueOf(e.getErrorCode().getCode()));
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+ }
}
public Queue getQueue() throws JMSException
@@ -88,6 +132,10 @@ public class AMQQueueBrowser implements QueueBrowser
public Enumeration getEnumeration() throws JMSException
{
checkState();
+ if(!_session.getAMQConnection().started())
+ {
+ throw new IllegalStateException("Cannot enumerate message on the queue while the Connection is stopped");
+ }
final BasicMessageConsumer consumer =
(BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
@@ -118,12 +166,12 @@ public class AMQQueueBrowser implements QueueBrowser
_consumer = consumer;
prefetchMessage();
}
- _logger.info("QB:created with first element:" + _nextMessage);
+ _logger.debug("QB:created with first element:" + _nextMessage);
}
public boolean hasMoreElements()
{
- _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ _logger.debug("QB:hasMoreElements:" + (_nextMessage != null));
return (_nextMessage != null);
}
@@ -136,9 +184,9 @@ public class AMQQueueBrowser implements QueueBrowser
}
try
{
- _logger.info("QB:nextElement about to receive");
+ _logger.debug("QB:nextElement about to receive");
prefetchMessage();
- _logger.info("QB:nextElement received:" + _nextMessage);
+ _logger.debug("QB:nextElement received:" + _nextMessage);
}
catch (JMSException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index e7e937b689..55d3ccb6e7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -120,18 +120,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
/**
- * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
- * not need to be attached to a queue.
- */
- private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
-
- /**
- * The default value for mandatory flag used by producers created by this session is true. That is, server will not
- * silently drop messages where no queue is connected to the exchange for the message.
- */
- private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
-
- /**
* The period to wait while flow controlled before sending a log message confirming that the session is still
* waiting on flow control being revoked
*/
@@ -542,9 +530,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
// Add creation logging to tie in with the existing close logging
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Created session:" + this);
+ _logger.debug("Created session:" + this);
}
}
@@ -733,9 +721,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void close(long timeout, boolean sendClose) throws JMSException
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Closing session: " + this);
+ _logger.debug("Closing session: " + this);
}
// Ensure we only try and close an open session.
@@ -904,11 +892,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// Flush any pending messages for this consumerTag
if (_dispatcher != null)
{
- _logger.info("Dispatcher is not null");
+ _logger.debug("Dispatcher is not null");
}
else
{
- _logger.info("Dispatcher is null so created stopped dispatcher");
+ _logger.debug("Dispatcher is null so created stopped dispatcher");
startDispatcherIfNecessary(true);
}
@@ -926,9 +914,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// There is a small window where the message is between the two queues in the dispatcher.
if (consumer.isClosed())
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Closing consumer:" + consumer.debugIdentity());
+ _logger.debug("Closing consumer:" + consumer.debugIdentity());
}
deregisterConsumer(consumer);
@@ -1101,6 +1089,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+ if(noLocal)
+ {
+ args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true);
+ }
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
@@ -1198,12 +1190,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public P createProducer(Destination destination) throws JMSException
{
- return createProducerImpl(destination, _defaultMandatoryValue, _defaultImmediateValue);
+ return createProducerImpl(destination, null, null);
}
public P createProducer(Destination destination, boolean immediate) throws JMSException
{
- return createProducerImpl(destination, _defaultMandatoryValue, immediate);
+ return createProducerImpl(destination, null, immediate);
}
public P createProducer(Destination destination, boolean mandatory, boolean immediate)
@@ -1692,7 +1684,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
AMQProtocolHandler protocolHandler = getProtocolHandler();
declareExchange(amqd, protocolHandler, false);
- AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
+ AMQShortString queueName = declareQueue(amqd, false);
bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
}
@@ -2378,9 +2370,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_dispatcherThread.setDaemon(DEAMON_DISPATCHER_THREAD);
_dispatcher.setConnectionStopped(initiallyStopped);
_dispatcherThread.start();
- if (_dispatcherLogger.isInfoEnabled())
+ if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " created");
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " created");
}
}
else
@@ -2613,7 +2605,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void sendConsume(C consumer, AMQShortString queueName,
AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
- private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
+ private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
throws JMSException
{
return new FailoverRetrySupport<P, JMSException>(
@@ -2642,8 +2634,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}, _connection).execute();
}
- public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException;
+ public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, final long producerId) throws JMSException;
private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
@@ -2726,6 +2718,12 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException;
+
+ void declareQueuePassive(AMQDestination queue) throws AMQException
+ {
+ declareQueue(queue,false,false,true);
+ }
+
/**
* Declares a queue for a JMS destination.
*
@@ -2735,27 +2733,35 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
*
* <p/>Note that this operation automatically retries in the event of fail-over.
*
- * @param amqd The destination to declare as a queue.
- * @param protocolHandler The protocol handler to communicate through.
*
+ * @param amqd The destination to declare as a queue.
* @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
* the client.
*
+ *
+ *
* @throws AMQException If the queue cannot be declared for any reason.
* @todo Verify the destiation is valid or throw an exception.
* @todo Be aware of possible changes to parameter order as versions change.
*/
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal) throws AMQException
{
- return declareQueue(amqd, protocolHandler, noLocal, false);
+ return declareQueue(amqd, noLocal, false);
}
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+ protected AMQShortString declareQueue(final AMQDestination amqd,
final boolean noLocal, final boolean nowait)
+ throws AMQException
+ {
+ return declareQueue(amqd, noLocal, nowait, false);
+ }
+
+ protected AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -2767,7 +2773,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
amqd.setQueueName(protocolHandler.generateQueueName());
}
- sendQueueDeclare(amqd, protocolHandler, nowait);
+ sendQueueDeclare(amqd, protocolHandler, nowait, passive);
return amqd.getAMQQueueName();
}
@@ -2775,7 +2781,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException;
+ final boolean nowait, boolean passive) throws AMQException, FailoverException;
/**
* Undeclares the specified queue.
@@ -2916,7 +2922,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
if (_delareQueues || amqd.isNameRequired())
{
- declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+ declareQueue(amqd, consumer.isNoLocal(), nowait);
}
bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
}
@@ -2939,7 +2945,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
suspendChannel(true);
- _logger.info(
+ _logger.debug(
"Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
}
catch (AMQException e)
@@ -2951,7 +2957,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
else
{
- _logger.info("Immediately prefetching existing messages to new consumer.");
+ _logger.debug("Immediately prefetching existing messages to new consumer.");
}
try
@@ -2983,18 +2989,18 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
{
Iterator messages = _queue.iterator();
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
+ _logger.debug("Rejecting messages from _queue for Consumer tag(" + consumerTag + ") (PDispatchQ) requeue:"
+ requeue);
if (messages.hasNext())
{
- _logger.info("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
+ _logger.debug("Checking all messages in _queue for Consumer tag(" + consumerTag + ")");
}
else
{
- _logger.info("No messages in _queue to reject");
+ _logger.debug("No messages in _queue to reject");
}
}
while (messages.hasNext())
@@ -3037,7 +3043,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private void resubscribeProducers() throws AMQException
{
ArrayList producers = new ArrayList(_producers.values());
- _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
+ _logger.debug(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
for (Iterator it = producers.iterator(); it.hasNext();)
{
P producer = (P) it.next();
@@ -3127,7 +3133,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
- _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
+ }
}
public void checkFlowControl() throws InterruptedException, JMSException
@@ -3141,7 +3150,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_flowControl.wait(_flowControlWaitPeriod);
- _logger.warn("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
}
if(!_flowControl.getFlowControl())
{
@@ -3200,28 +3212,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void rejectPending(C consumer)
{
- synchronized (_lock)
- {
- boolean stopped = _dispatcher.connectionStopped();
+ // Reject messages on pre-receive queue
+ consumer.rollbackPendingMessages();
- if (!stopped)
- {
- _dispatcher.setConnectionStopped(true);
- }
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- // Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
+ // closeConsumer
+ consumer.markClosed();
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- //Let the dispatcher deal with this when it gets to them.
-
- // closeConsumer
- consumer.markClosed();
-
- _dispatcher.setConnectionStopped(stopped);
-
- }
}
public void rollback()
@@ -3294,9 +3293,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void run()
{
- if (_dispatcherLogger.isInfoEnabled())
+ if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " started");
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " started");
}
// Allow disptacher to start stopped
@@ -3318,7 +3317,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
try
{
Dispatchable disp;
- while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null))
+ while (((disp = (Dispatchable) _queue.take()) != null) && !_closed.get())
{
disp.dispatch(AMQSession.this);
}
@@ -3328,9 +3327,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// ignored as run will exit immediately
}
- if (_dispatcherLogger.isInfoEnabled())
+ if (_dispatcherLogger.isDebugEnabled())
{
- _dispatcherLogger.info(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
+ _dispatcherLogger.debug(_dispatcherThread.getName() + " thread terminating for channel " + _channelId + ":" + AMQSession.this);
}
}
@@ -3413,7 +3412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
final C consumer = _consumers.get(message.getConsumerTag());
- if ((consumer == null) || consumer.isClosed())
+ if ((consumer == null) || consumer.isClosed() || consumer.isClosing())
{
if (_dispatcherLogger.isInfoEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 816ad1f222..3902c726f3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -17,9 +17,20 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
@@ -27,7 +38,6 @@ import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
@@ -42,28 +52,14 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
-import org.apache.qpid.util.Serial;
-import org.apache.qpid.util.Strings;
-
import static org.apache.qpid.transport.Option.BATCH;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
import static org.apache.qpid.transport.Option.UNRELIABLE;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a 0.10 Session
@@ -654,8 +650,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
/**
* Create an 0_10 message producer
*/
- public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, final long producerId) throws JMSException
+ public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, final long producerId) throws JMSException
{
try
{
@@ -725,7 +721,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait)
+ final boolean nowait, boolean passive)
throws AMQException, FailoverException
{
// do nothing this is only used by 0_8
@@ -735,7 +731,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
* Declare a queue with the given queueName
*/
public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
+ final boolean noLocal, final boolean nowait, boolean passive)
throws AMQException
{
AMQShortString queueName;
@@ -761,7 +757,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
amqd.isDurable() ? Option.DURABLE : Option.NONE,
- amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+ amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE,
+ passive ? Option.PASSIVE : Option.NONE);
}
else
{
@@ -931,11 +928,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return getCurrentException();
}
- protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean noLocal, final boolean nowait)
+ protected AMQShortString declareQueue(final AMQDestination amqd,
+ final boolean noLocal, final boolean nowait, final boolean passive)
throws AMQException
{
- /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+ final AMQProtocolHandler protocolHandler = getProtocolHandler();
+
return new FailoverNoopSupport<AMQShortString, AMQException>(
new FailoverProtectedOperation<AMQShortString, AMQException>()
{
@@ -952,7 +950,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
amqd.setQueueName(new AMQShortString( binddingKey + "@"
+ amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
}
- return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
+ return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
}
}, getAMQConnection()).execute();
}
@@ -1209,7 +1207,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
else if(createNode)
{
setLegacyFiledsForQueueType(dest);
- send0_10QueueDeclare(dest,null,noLocal,noWait);
+ send0_10QueueDeclare(dest,null,noLocal,noWait, false);
sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
null,dest.getExchangeName(),dest, false);
break;
@@ -1315,7 +1313,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
node.setExclusive(true);
node.setAutoDelete(!node.isDurable());
- send0_10QueueDeclare(dest,null,noLocal,true);
+ send0_10QueueDeclare(dest,null,noLocal,true, false);
getQpidSession().exchangeBind(dest.getQueueName(),
dest.getAddressName(),
dest.getSubject(),
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 29f1925cbc..8ab23a240e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -38,7 +38,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
@@ -401,9 +400,17 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
- final boolean nowait) throws AMQException, FailoverException
- {
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+ final boolean nowait, boolean passive) throws AMQException, FailoverException
+ {
+ QueueDeclareBody body =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ passive,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null);
AMQFrame queueDeclare = body.generateFrame(getChannelId());
@@ -441,8 +448,8 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
- public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
- final boolean immediate, long producerId) throws JMSException
+ public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory,
+ final Boolean immediate, long producerId) throws JMSException
{
try
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 8e9b1fb90f..0f8b5717d6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -199,6 +199,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+ if(noLocal)
+ {
+ ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
+ }
_arguments = ft;
@@ -275,7 +279,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
}
- _logger.debug("Message listener set for destination " + _destination);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message listener set for destination " + _destination);
+ }
if (messageListener != null)
{
@@ -553,9 +560,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
public void close(boolean sendClose) throws JMSException
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("Closing consumer:" + debugIdentity());
+ _logger.debug("Closing consumer:" + debugIdentity());
}
if (!setClosed())
@@ -586,7 +593,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
// no point otherwise as the connection will be gone
if (!_session.isClosed() || _session.isClosing())
{
- sendCancel();
+ synchronized(_session.getMessageDeliveryLock())
+ {
+ sendCancel();
+ }
cleanupQueue();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 75f198e1fa..9b3b2ce0e9 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -20,18 +20,8 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.util.UUIDGen;
-import org.apache.qpid.util.UUIDs;
-
+import java.io.UnsupportedEncodingException;
+import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -42,77 +32,22 @@ import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
-import java.io.UnsupportedEncodingException;
-import java.util.UUID;
+import javax.jms.Topic;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
- /**
- * If true, messages will not get a timestamp.
- */
- protected boolean isDisableTimestamps()
- {
- return _disableTimestamps;
- }
-
- protected void setDisableTimestamps(boolean disableTimestamps)
- {
- _disableTimestamps = disableTimestamps;
- }
-
- protected void setDestination(AMQDestination destination)
- {
- _destination = destination;
- }
-
- protected AMQProtocolHandler getProtocolHandler()
- {
- return _protocolHandler;
- }
-
- protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
- {
- _protocolHandler = protocolHandler;
- }
-
- protected int getChannelId()
- {
- return _channelId;
- }
-
- protected void setChannelId(int channelId)
- {
- _channelId = channelId;
- }
-
- protected void setSession(AMQSession session)
- {
- _session = session;
- }
-
- protected String getUserID()
- {
- return _userID;
- }
-
- protected void setUserID(String userID)
- {
- _userID = userID;
- }
-
- protected PublishMode getPublishMode()
- {
- return publishMode;
- }
-
- protected void setPublishMode(PublishMode publishMode)
- {
- this.publishMode = publishMode;
- }
-
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
- private final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Logger _logger ;
private AMQConnection _connection;
@@ -166,7 +101,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private final boolean _immediate;
- private final boolean _mandatory;
+ private final Boolean _mandatory;
private boolean _disableMessageId;
@@ -174,14 +109,37 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private String _userID; // ref user id used in the connection.
- private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+
+ /**
+ * The default value for immediate flag used this producer is false. That is, a consumer does
+ * not need to be attached to a queue.
+ */
+ private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+
+ /**
+ * The default value for mandatory flag used by this producer is true. That is, server will not
+ * silently drop messages where no queue is connected to the exchange for the message.
+ */
+ private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ /**
+ * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server
+ * will silently drop messages where no queue is connected to the exchange for the message.
+ */
+ private final boolean _defaultMandatoryTopicValue =
+ Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic",
+ System.getProperties().containsKey("qpid.default_mandatory")
+ ? System.getProperty("qpid.default_mandatory")
+ : "false"));
private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
- protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+ Boolean immediate, Boolean mandatory) throws AMQException
{
- _connection = connection;
+ _logger = logger;
+ _connection = connection;
_destination = destination;
_transacted = transacted;
_protocolHandler = protocolHandler;
@@ -193,8 +151,14 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
declareDestination(destination);
}
- _immediate = immediate;
- _mandatory = mandatory;
+ _immediate = immediate == null ? _defaultImmediateValue : immediate;
+ _mandatory = mandatory == null
+ ? destination == null ? null
+ : destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : mandatory;
+
_userID = connection.getUsername();
setPublishMode();
}
@@ -215,7 +179,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
publishMode = PublishMode.SYNC_PUBLISH_ALL;
}
- _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("MessageProducer " + toString() + " using publish mode : " + publishMode);
+ }
}
void resubscribe() throws AMQException
@@ -381,7 +348,12 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
+ sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
+ _mandatory == null
+ ? destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : _mandatory,
_immediate);
}
}
@@ -394,7 +366,13 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+ sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
+ _mandatory == null
+ ? destination instanceof Topic
+ ? _defaultMandatoryTopicValue
+ : _defaultMandatoryValue
+ : _mandatory,
+ _immediate);
}
}
@@ -542,7 +520,10 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_logger.debug("Updating original message");
origMessage.setJMSPriority(message.getJMSPriority());
origMessage.setJMSTimestamp(message.getJMSTimestamp());
- _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ }
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
@@ -646,6 +627,69 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
}
}
+ /**
+ * If true, messages will not get a timestamp.
+ */
+ protected boolean isDisableTimestamps()
+ {
+ return _disableTimestamps;
+ }
+
+ protected void setDisableTimestamps(boolean disableTimestamps)
+ {
+ _disableTimestamps = disableTimestamps;
+ }
+
+ protected void setDestination(AMQDestination destination)
+ {
+ _destination = destination;
+ }
+
+ protected AMQProtocolHandler getProtocolHandler()
+ {
+ return _protocolHandler;
+ }
+
+ protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
+ {
+ _protocolHandler = protocolHandler;
+ }
+
+ protected int getChannelId()
+ {
+ return _channelId;
+ }
+
+ protected void setChannelId(int channelId)
+ {
+ _channelId = channelId;
+ }
+
+ protected void setSession(AMQSession session)
+ {
+ _session = session;
+ }
+
+ protected String getUserID()
+ {
+ return _userID;
+ }
+
+ protected void setUserID(String userID)
+ {
+ _userID = userID;
+ }
+
+ protected PublishMode getPublishMode()
+ {
+ return publishMode;
+ }
+
+ protected void setPublishMode(PublishMode publishMode)
+ {
+ this.publishMode = publishMode;
+ }
+
Logger getLogger()
{
return _logger;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 024219cfd6..a3a1e9c28b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -61,9 +61,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
- boolean immediate, boolean mandatory) throws AMQException
+ Boolean immediate, Boolean mandatory) throws AMQException
{
- super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
userIDBytes = Strings.toUTF8(getUserID());
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 3b5e361f97..21ff6c877a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -33,6 +33,9 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.MethodRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
@@ -42,11 +45,12 @@ import java.util.UUID;
public class BasicMessageProducer_0_8 extends BasicMessageProducer
{
+ private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class);
BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+ AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
{
- super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+ super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
}
void declareDestination(AMQDestination destination)
diff --git a/java/client/src/main/java/org/apache/qpid/client/Closeable.java b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
index ba26bfc485..2f7fbad30c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/Closeable.java
+++ b/java/client/src/main/java/org/apache/qpid/client/Closeable.java
@@ -81,7 +81,7 @@ public abstract class Closeable
}
/**
- * Checks if this is closis.
+ * Checks if this is closing.
*
* @return <tt>true</tt> if we are closing, <tt>false</tt> otherwise.
*/
@@ -90,6 +90,11 @@ public abstract class Closeable
return _closing.get();
}
+ public void resetClosedFlag()
+ {
+ _closed.set(false);
+ }
+
protected boolean setClosed()
{
return _closed.getAndSet(true);
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
index 128aa18d30..af9048f1f5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
@@ -308,13 +308,16 @@ public class XAResourceImpl implements XAResource
_xaSession.createSession();
convertExecutionErrorToXAErr( e.getException().getErrorCode());
}
- Xid[] result = new Xid[res.getInDoubt().size()];
- int i = 0;
- for (Object obj : res.getInDoubt())
+ Xid[] result = new Xid[res.getInDoubt() != null ? res.getInDoubt().size() : 0];
+ if(result.length != 0)
{
- org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
- result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
- i++;
+ int i = 0;
+ for (Object obj : res.getInDoubt())
+ {
+ org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
+ result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
+ i++;
+ }
}
return result;
}
@@ -436,6 +439,16 @@ public class XAResourceImpl implements XAResource
}
}
+ /**
+ * Is this resource currently enlisted in a transaction?
+ *
+ * @return true if the resource is associated with a transaction, false otherwise.
+ */
+ public boolean isEnlisted()
+ {
+ return (_xid != null) ;
+ }
+
//------------------------------------------------------------------------
// Private methods
//------------------------------------------------------------------------
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index 85623df8c0..f2efb6e8a5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -6,7 +6,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE 2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -18,6 +18,7 @@
package org.apache.qpid.client;
import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.transport.RangeSet;
import javax.jms.JMSException;
import javax.jms.QueueSession;
@@ -178,4 +179,17 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
{
return (TopicSession) getSession();
}
+
+ @Override
+ protected void acknowledgeImpl()
+ {
+ if (_xaResource.isEnlisted())
+ {
+ acknowledgeMessage(Long.MAX_VALUE, true) ;
+ }
+ else
+ {
+ super.acknowledgeImpl() ;
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 558d93538b..e1a0e18262 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -95,9 +95,9 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session)
{
- if (_logger.isInfoEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.info("New Method Dispatcher:" + session);
+ _logger.debug("New Method Dispatcher:" + session);
}
DispatcherFactory factory = _dispatcherFactories.get(version);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
index 31a0440b04..bd63cdb5c5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
@@ -48,9 +48,12 @@ public class FieldTableSupport
public static Map<String,Object> convertToMap(FieldTable ft)
{
Map<String,Object> map = new HashMap<String,Object>();
- for (AMQShortString key: ft.keySet() )
+ if(ft != null)
{
- map.put(key.asString(), ft.getObject(key));
+ for (AMQShortString key: ft.keySet() )
+ {
+ map.put(key.asString(), ft.getObject(key));
+ }
}
return map;
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index d380402da7..b314453e31 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -652,7 +652,8 @@ public class AMQProtocolHandler implements ProtocolEngine
}
writeFrame(frame);
- return listener.blockForFrame(timeout);
+ long actualTimeout = timeout == -1 ? DEFAULT_SYNC_TIMEOUT : timeout;
+ return listener.blockForFrame(actualTimeout);
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index ced734f70f..af57fd98fc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -108,7 +108,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
_protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
- _logger.info("Using ProtocolVersion for Session:" + _protocolVersion);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion);
+ }
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
_connection = connection;
@@ -302,7 +305,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void closeSession(AMQSession session)
{
- _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ }
final int channelId = session.getChannelId();
if (channelId <= 0)
{
@@ -393,7 +399,10 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void setProtocolVersion(final ProtocolVersion pv)
{
- _logger.info("Setting ProtocolVersion to :" + pv);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting ProtocolVersion to :" + pv);
+ }
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
diff --git a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 80d171592f..22dc17e53c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -39,7 +39,7 @@ import java.util.concurrent.locks.ReentrantLock;
* differs from a 'rendezvous' in that sense.
*
* <p/>BlockingWaiters are used to coordinate when waiting for an an event that expect a response.
- * They are always used in a 'one-shot' manner, that is, to recieve just one response. Usually the caller has to register
+ * They are always used in a 'one-shot' manner, that is, to receive just one response. Usually the caller has to register
* them as method listeners with an event dispatcher and remember to de-register them (in a finally block) once they
* have been completed.
*
@@ -51,12 +51,12 @@ import java.util.concurrent.locks.ReentrantLock;
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations </td>
* <tr><td> Accept generic objects as events for processing via {@link #process}. <td>
- * <tr><td> Delegate handling and undserstanding of the object to a concrete implementation. <td>
+ * <tr><td> Delegate handling and understanding of the object to a concrete implementation. <td>
* <tr><td> Block until {@link #process} determines that waiting is no longer required <td>
* <tr><td> Propagate the most recent exception to the consumer.<td>
* </table>
*
- * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
+ * @todo Interruption is caught but not handled. This could be allowed to fall through. This might actually be useful
* for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
* when this happens. At the very least, restore the interrupted status flag.
* @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
@@ -84,13 +84,13 @@ public abstract class BlockingWaiter<T>
/** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
private volatile Exception _error;
- /** Holds the incomming Object. */
+ /** Holds the incoming Object. */
private Object _doneObject = null;
private AtomicBoolean _waiting = new AtomicBoolean(false);
private boolean _closed = false;
/**
- * Delegates processing of the incomming object to the handler.
+ * Delegates processing of the incoming object to the handler.
*
* @param object The object to process.
*
@@ -146,6 +146,11 @@ public abstract class BlockingWaiter<T>
*/
public Object block(long timeout) throws AMQException, FailoverException
{
+ if (timeout < 0)
+ {
+ throw new IllegalArgumentException("timeout must be zero or greater");
+ }
+
long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
_lock.lock();
@@ -165,26 +170,18 @@ public abstract class BlockingWaiter<T>
{
try
{
- if (timeout == -1)
- {
- _receivedCondition.await();
- }
- else
- {
- nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout);
+ nanoTimeout = _receivedCondition.awaitNanos(nanoTimeout);
- if (nanoTimeout <= 0 && !_ready && _error == null)
- {
- _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
- _ready = true;
- }
+ if (nanoTimeout <= 0 && !_ready && _error == null)
+ {
+ _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
+ _ready = true;
}
}
catch (InterruptedException e)
{
_logger.error(e.getMessage(), e);
- // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
-
+ // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivalent to success
}
}
}
@@ -285,8 +282,8 @@ public abstract class BlockingWaiter<T>
/**
* Close this Waiter so that no more errors are processed.
* This is a preventative method to ensure that a second error thread does not get stuck in the error method after
- * the await has returned. This has not happend but in practise but if two errors occur on the Connection at
- * the same time then it is conceiveably possible for the second to get stuck if the first one is processed by a
+ * the await has returned. This has not happened but in practise but if two errors occur on the Connection at
+ * the same time then it is conceivably possible for the second to get stuck if the first one is processed by a
* waiter.
*
* Once closed any attempt to wait will throw an exception.
diff --git a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
index bc3f89849e..9b202a13ee 100644
--- a/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
+++ b/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
@@ -20,8 +20,26 @@
*/
package org.apache.qpid.jndi;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.naming.ConfigurationException;
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.spi.InitialContextFactory;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQDestination;
@@ -33,23 +51,10 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.naming.ConfigurationException;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
@@ -60,6 +65,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
private String QUEUE_PREFIX = "queue.";
private String TOPIC_PREFIX = "topic.";
+ @SuppressWarnings({ "rawtypes", "unchecked" })
public Context getInitialContext(Hashtable environment) throws NamingException
{
Map data = new ConcurrentHashMap();
@@ -68,6 +74,7 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
{
String file = null;
+
if (environment.containsKey(Context.PROVIDER_URL))
{
file = (String) environment.get(Context.PROVIDER_URL);
@@ -77,13 +84,23 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
file = System.getProperty(Context.PROVIDER_URL);
}
+ // Load the properties specified
if (file != null)
{
_logger.info("Loading Properties from:" + file);
+ BufferedInputStream inputStream = null;
- // Load the properties specified
- BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(file));
+ if(file.contains("file:"))
+ {
+ inputStream = new BufferedInputStream(new FileInputStream(new File(new URI(file))));
+ }
+ else
+ {
+ inputStream = new BufferedInputStream(new FileInputStream(file));
+ }
+
Properties p = new Properties();
+
try
{
p.load(inputStream);
@@ -119,6 +136,11 @@ public class PropertiesFileInitialContextFactory implements InitialContextFactor
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" +
"Due to:"+ioe.getMessage());
}
+ catch(URISyntaxException uoe)
+ {
+ _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL) +"\n" +
+ "Due to:"+uoe.getMessage());
+ }
createConnectionFactories(data, environment);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDITest.properties b/java/client/src/test/java/org/apache/qpid/jndi/JNDITest.properties
index 07017a05a6..07017a05a6 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDITest.properties
+++ b/java/client/src/test/java/org/apache/qpid/jndi/JNDITest.properties
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java
index 576ab4fa05..2989970dcd 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
+++ b/java/client/src/test/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactoryTest.java
@@ -14,29 +14,35 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
-package org.apache.qpid.test.unit.jndi;
+package org.apache.qpid.jndi;
-import junit.framework.TestCase;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.framing.AMQShortString;
+import java.util.Properties;
+import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.ConfigurationException;
import javax.naming.Context;
import javax.naming.InitialContext;
-import java.util.Properties;
-public class JNDIPropertyFileTest extends TestCase
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.framing.AMQShortString;
+
+public class PropertiesFileInitialContextFactoryTest extends TestCase
{
+ private static final String FILE_URL_PATH = System.getProperty("user.dir") + "/client/src/test/java/org/apache/qpid/jndi/";
+ private static final String FILE_NAME = "hello.properties";
+
private Context ctx;
-
- public JNDIPropertyFileTest() throws Exception
+
+ protected void setUp() throws Exception
{
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("JNDITest.properties"));
@@ -44,19 +50,20 @@ public class JNDIPropertyFileTest extends TestCase
//Create the initial context
ctx = new InitialContext(properties);
}
-
+
+
public void testQueueNamesWithTrailingSpaces() throws Exception
{
Queue queue = (Queue)ctx.lookup("QueueNameWithSpace");
- assertEquals("QueueNameWithSpace",queue.getQueueName());
+ assertEquals("QueueNameWithSpace",queue.getQueueName());
}
-
+
public void testTopicNamesWithTrailingSpaces() throws Exception
{
Topic topic = (Topic)ctx.lookup("TopicNameWithSpace");
- assertEquals("TopicNameWithSpace",topic.getTopicName());
+ assertEquals("TopicNameWithSpace",topic.getTopicName());
}
-
+
public void testMultipleTopicNamesWithTrailingSpaces() throws Exception
{
Topic topic = (Topic)ctx.lookup("MultipleTopicNamesWithSpace");
@@ -64,16 +71,16 @@ public class JNDIPropertyFileTest extends TestCase
for (AMQShortString bindingKey: ((AMQDestination)topic).getBindingKeys())
{
i++;
- assertEquals("Topic" + i + "WithSpace",bindingKey.asString());
+ assertEquals("Topic" + i + "WithSpace",bindingKey.asString());
}
}
-
+
public void testConfigurationErrors() throws Exception
{
Properties properties = new Properties();
properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
properties.put("destination.my-queue","amq.topic/test;create:always}");
-
+
try
{
ctx = new InitialContext(properties);
@@ -83,6 +90,6 @@ public class JNDIPropertyFileTest extends TestCase
{
assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
}
-
+
}
}
diff --git a/java/client/src/test/java/org/apache/qpid/jndi/hello.properties b/java/client/src/test/java/org/apache/qpid/jndi/hello.properties
new file mode 100644
index 0000000000..d017d137fe
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/jndi/hello.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://10.0.1.46:5672'
+
+# Register an AMQP destination in JNDI
+# destination.[jniName] = [Address Format]
+destination.topicExchange = amq.topic
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index 84d91ee57e..f199961b6f 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -145,7 +145,7 @@ public class TestAMQSession extends AMQSession_0_8
}
public void sendQueueDeclare(AMQDestination amqd, AMQProtocolHandler protocolHandler,
- boolean nowait) throws AMQException, FailoverException
+ boolean nowait, boolean passive) throws AMQException, FailoverException
{
}