summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-04-01 20:03:16 +0000
committerAlan Conway <aconway@apache.org>2011-04-01 20:03:16 +0000
commit6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea (patch)
tree6246817e66f906b63d44218ad1745c349049324d
parent1b5c9ca18694061838f5e57cf7b8ca836cb35a46 (diff)
downloadqpid-python-6d7a0d5ae684f3521ebb2b505d79549f8e1f9eea.tar.gz
Merge branch 'trunk' into qpid-2920
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1087871 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/examples/old_api/Makefile.am2
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp37
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rw-r--r--qpid/doc/book/src/Security.xml20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java79
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java55
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java125
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexServerTest.java230
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java12
-rwxr-xr-xqpid/tools/src/py/qpid-config4
38 files changed, 585 insertions, 108 deletions
diff --git a/qpid/cpp/examples/old_api/Makefile.am b/qpid/cpp/examples/old_api/Makefile.am
index 466eee22e1..04216ffa97 100644
--- a/qpid/cpp/examples/old_api/Makefile.am
+++ b/qpid/cpp/examples/old_api/Makefile.am
@@ -36,7 +36,7 @@ $(MAKEDIST): Makefile
examplesdir=$(pkgdatadir)/examples/old_api
dist_examples_DATA = $(MAKEDIST)
-EXTRA_DIST = README.verify verify verify_all
+EXTRA_DIST = README.verify verify verify_all CMakeLists.txt
# For older versions of automake
abs_top_srcdir = @abs_top_srcdir@
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 820d97439c..8ede09fa79 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -744,7 +744,7 @@ libqpidclient_la_SOURCES = \
QPIDCLIENT_VERSION_INFO = 2:0:0
libqpidclient_la_LDFLAGS = -version-info $(QPIDCLIENT_VERSION_INFO)
-libqpidtypes_la_libadd=-luuid
+libqpidtypes_la_LIBADD= -luuid
libqpidtypes_la_SOURCES= \
qpid/types/Exception.cpp \
qpid/types/Uuid.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index bd061ac214..48ba320e39 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -188,7 +188,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
msg->addToSyncList(shared_from_this(), store);
}
- msg->enqueueComplete(); // mark the message as enqueued
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -215,7 +214,6 @@ void Queue::requeue(const QueuedMessage& msg){
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
- msg.payload->enqueueComplete(); // mark the message as enqueued
messages->reinsert(msg);
listeners.populate(copy);
@@ -666,7 +664,9 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
}
if ((msg->isPersistent() || msg->checkContentReleasable()) && store) {
- msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ // mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
+ // when it considers the message stored.
+ msg->enqueueAsync(shared_from_this(), store);
boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index b39ea3614b..a93a6332fd 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -246,7 +246,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
// If the message is bigger than the queue size, give up
- if (m->contentSize() > getMaxSize()) {
+ if (getMaxSize() && m->contentSize() > getMaxSize()) {
QPID_LOG(debug, "Message too large for ring queue " << name
<< " [" << *this << "] "
<< ": message size = " << m->contentSize() << " bytes"
diff --git a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
index 454ce62495..3d868da64b 100644
--- a/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
+++ b/qpid/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
@@ -106,7 +106,7 @@ size_t CyrusSecurityLayer::encode(const char* buffer, size_t size)
bool CyrusSecurityLayer::canEncode()
{
- return encrypted || codec->canEncode();
+ return codec && (encrypted || codec->canEncode());
}
void CyrusSecurityLayer::init(qpid::sys::Codec* c)
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 7b906f33e8..3449a753e3 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,9 +45,9 @@ namespace sys {
namespace {
std::string getName(int fd, bool local, bool includeService = false)
{
- ::sockaddr_storage name; // big enough for any socket address
+ ::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
-
+
int result = -1;
if (local) {
result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -60,8 +60,8 @@ std::string getName(int fd, bool local, bool includeService = false)
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
if (includeService) {
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
- servName, sizeof(servName),
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, dispName, sizeof(dispName),
+ servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw QPID_POSIX_ERROR(rc);
return std::string(dispName) + ":" + std::string(servName);
@@ -75,9 +75,9 @@ std::string getName(int fd, bool local, bool includeService = false)
std::string getService(int fd, bool local)
{
- ::sockaddr_storage name; // big enough for any socket address
+ ::sockaddr_storage name; // big enough for any socket address
::socklen_t namelen = sizeof(name);
-
+
int result = -1;
if (local) {
result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
@@ -88,8 +88,8 @@ std::string getService(int fd, bool local)
QPID_POSIX_CHECK(result);
char servName[NI_MAXSERV];
- if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
- servName, sizeof(servName),
+ if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0,
+ servName, sizeof(servName),
NI_NUMERICHOST | NI_NUMERICSERV) != 0)
throw QPID_POSIX_ERROR(rc);
return servName;
@@ -172,6 +172,23 @@ void Socket::connect(const SocketAddress& addr) const
(errno != EINPROGRESS)) {
throw Exception(QPID_MSG(strError(errno) << ": " << connectname));
}
+ // When connecting to a port on the same host which no longer has
+ // a process associated with it, the OS occasionally chooses the
+ // remote port (which is unoccupied) as the port to bind the local
+ // end of the socket, resulting in a "circular" connection.
+ //
+ // This seems like something the OS should prevent but I have
+ // confirmed that sporadic hangs in
+ // cluster_tests.LongTests.test_failover on RHEL5 are caused by
+ // such a circular connection.
+ //
+ // Raise an error if we see such a connection, since we know there is
+ // no listener on the peer address.
+ //
+ if (getLocalAddress() == getPeerAddress()) {
+ close();
+ throw Exception(QPID_MSG("Connection refused: " << connectname));
+ }
}
void
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 39eec466f2..3b2253d7fc 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -509,7 +509,7 @@ class BrokerTest(TestCase):
r.close()
self.assertEqual(expect_contents, actual_contents)
-def join(thread, timeout=1):
+def join(thread, timeout=10):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
diff --git a/qpid/doc/book/src/Security.xml b/qpid/doc/book/src/Security.xml
index 77d10abf8c..49abfbebca 100644
--- a/qpid/doc/book/src/Security.xml
+++ b/qpid/doc/book/src/Security.xml
@@ -1,5 +1,25 @@
<?xml version='1.0' encoding='utf-8' ?>
<!DOCTYPE chapter PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd">
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<section id="chap-Messaging_User_Guide-Security">
<title>Security</title>
<para>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index dd3046cd01..08eb05680c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -346,7 +346,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
finally
{
long bodySize = _currentMessage.getSize();
- long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().properties).getTimestamp();
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp();
_session.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
@@ -1079,8 +1079,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
private boolean checkMessageUserId(ContentHeaderBody header)
{
AMQShortString userID =
- header.properties instanceof BasicContentHeaderProperties
- ? ((BasicContentHeaderProperties) header.properties).getUserId()
+ header.getProperties() instanceof BasicContentHeaderProperties
+ ? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
: null;
return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString()));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
index 194835ac02..84a1642578 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
@@ -37,7 +37,7 @@ public class ContentHeaderBodyAdapter implements AMQMessageHeader
private BasicContentHeaderProperties getProperties()
{
- return (BasicContentHeaderProperties) _contentHeaderBody.properties;
+ return (BasicContentHeaderProperties) _contentHeaderBody.getProperties();
}
public String getCorrelationId()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
index 30bea7b6e6..66cb7ed83b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
@@ -161,7 +161,7 @@ public class MessageMetaData implements StorableMessageMetaData
public boolean isPersistent()
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties);
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
return properties.getDeliveryMode() == BasicContentHeaderProperties.PERSISTENT;
}
@@ -229,7 +229,7 @@ public class MessageMetaData implements StorableMessageMetaData
{
private BasicContentHeaderProperties getProperties()
{
- return (BasicContentHeaderProperties) getContentHeaderBody().properties;
+ return (BasicContentHeaderProperties) getContentHeaderBody().getProperties();
}
public String getCorrelationId()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 77c4e8fc23..c8eb118b11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -507,7 +507,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 2d2fb3a214..3e3288404f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -96,9 +96,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
if (SYNCHED_CLOCKS)
{
@@ -193,8 +193,8 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public boolean isPersistent()
{
- return getContentHeader().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
+ return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
new file mode 100644
index 0000000000..44b7c95535
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.queue.QueueRunner;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+
+
+public class QueueRunner implements ReadWriteRunnable
+{
+ private static final Logger _logger = Logger.getLogger(QueueRunner.class);
+
+ private String _name;
+ private SimpleAMQQueue _queue;
+
+ public QueueRunner(SimpleAMQQueue queue, long count)
+ {
+ _queue = queue;
+ _name = "QueueRunner-" + count + "-" + queue.getLogActor();
+ }
+
+ public void run()
+ {
+ String originalName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(_queue.getLogActor());
+
+ _queue.processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
+ }
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 0e3f7b2625..4890c00047 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1585,7 +1585,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void deliverAsync()
{
- Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+ QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1604,52 +1604,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_asyncDelivery.execute(flusher);
}
-
- private class Runner implements ReadWriteRunnable
- {
- String _name;
- public Runner(long count)
- {
- _name = "QueueRunner-" + count + "-" + _logActor;
- }
-
- public void run()
- {
- String originalName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(_name);
- CurrentActor.set(_logActor);
-
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
- }
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
- public String toString()
- {
- return _name;
- }
- }
-
public void flushSubscription(Subscription sub) throws AMQException
{
// Access control
@@ -1834,7 +1788,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
* @param runner the Runner to schedule
* @throws AMQException
*/
- private void processQueue(Runnable runner) throws AMQException
+ public void processQueue(QueueRunner runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
@@ -2289,4 +2243,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
}
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
index 67d20136bf..17d123eb0d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServerFactory.java
@@ -45,9 +45,10 @@ public class AmqPlainSaslServerFactory implements SaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE))
+ if (props != null &&
+ (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE)))
{
// returned array must be non null according to interface documentation
return new String[0];
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
index 6032255870..8a5ff7df2d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
@@ -47,10 +47,11 @@ public class AnonymousSaslServerFactory implements SaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE) ||
- props.containsKey(Sasl.POLICY_NOANONYMOUS))
+ if (props != null &&
+ (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE) ||
+ props.containsKey(Sasl.POLICY_NOANONYMOUS)))
{
// returned array must be non null according to interface documentation
return new String[0];
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java
index 8020d97364..139818735f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java
@@ -70,7 +70,7 @@ public class CRAMMD5HexInitialiser extends UsernamePasswordInitialiser
for (char c : password)
{
//toHexString does not prepend 0 so we have to
- if (((byte) c > -1) && (byte) c < 10)
+ if (((byte) c > -1) && (byte) c < 0x10 )
{
sb.append(0);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
index f0dd9eeb6d..3144bfbce6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServerFactory.java
@@ -45,9 +45,10 @@ public class PlainSaslServerFactory implements SaslServerFactory
public String[] getMechanismNames(Map props)
{
- if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
- props.containsKey(Sasl.POLICY_NODICTIONARY) ||
- props.containsKey(Sasl.POLICY_NOACTIVE))
+ if (props != null &&
+ (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE)))
{
// returned array must be non null according to interface documentation
return new String[0];
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index a20436f029..68e47fd86a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -441,7 +441,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
Struct[] headers = new Struct[] { deliveryProps, messageProps };
BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().properties;
+ (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
if(exchange != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index 4fd4999b19..806e161bbc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -364,7 +364,7 @@ public class Show extends AbstractCommand
{
if(msg instanceof AMQMessage)
{
- headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
+ headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().getProperties());
}
}
catch (AMQException e)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 7b58966a4c..9e831b2a8e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -276,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
static ContentHeaderBody getContentHeader(FieldTable headers)
{
ContentHeaderBody header = new ContentHeaderBody();
- header.properties = getProperties(headers);
+ header.setProperties(getProperties(headers));
return header;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index f72961c03c..403a290a0f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -396,7 +396,7 @@ public class TopicExchangeTest extends InternalBrokerBaseCase
IncomingMessage message = new IncomingMessage(info);
final ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- chb.properties = props;
+ chb.setProperties(props);
message.setContentHeaderBody(chb);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index d52f4c03f3..3961b3b355 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -96,7 +96,7 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
AMQMessage msg = super.createMessage(id);
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
props.setPriority(i);
- msg.getContentHeaderBody().properties = props;
+ msg.getContentHeaderBody().setProperties(props);
return msg;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 0707cab3d5..a8bddcf6bf 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -277,7 +277,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- contentHeaderBody.properties = props;
+ contentHeaderBody.setProperties(props);
contentHeaderBody.bodySize = size; // in bytes
IncomingMessage message = new IncomingMessage(publish);
message.setContentHeaderBody(contentHeaderBody);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 5b72cfac40..365353e734 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -402,8 +402,8 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
IncomingMessage msg = new IncomingMessage(publish);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 04608275a3..0f5374b3e5 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -126,7 +126,7 @@ public class AckTest extends InternalBrokerBaseCase
//IncomingMessage msg2 = null;
BasicContentHeaderProperties b = new BasicContentHeaderProperties();
ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
+ cb.setProperties(b);
if (persistent)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 67d093d00a..41ca751684 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -660,8 +660,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Create IncomingMessage and nondurable queue
final IncomingMessage msg = new IncomingMessage(info);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
final ArrayList<BaseQueue> qs = new ArrayList<BaseQueue>();
@@ -707,6 +707,111 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
+ /**
+ * processQueue() is used when asynchronously delivering messages to
+ * subscriptions which could not be delivered immediately during the
+ * enqueue() operation.
+ *
+ * A defect within the method would mean that delivery of these messages may
+ * not occur should the Runner stop before all messages have been processed.
+ * Such a defect was discovered when Selectors were used such that one and
+ * only one subscription can/will accept any given messages, but multiple
+ * subscriptions are present, and one of the earlier subscriptions receives
+ * more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to
+ * correctly deliver all of the messages present for asynchronous delivery
+ * to subscriptions in such a scenario.
+ */
+ public void testProcessQueueWithUniqueSelectors() throws Exception
+ {
+ TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+ SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false,
+ false, _virtualHost, factory, null)
+ {
+ @Override
+ public void deliverAsync(Subscription sub)
+ {
+ // do nothing, i.e prevent deliveries by the SubFlushRunner
+ // when registering the new subscriptions
+ }
+ };
+
+ // retrieve the QueueEntryList the queue creates and insert the test
+ // messages, thus avoiding straight-through delivery attempts during
+ //enqueue() process.
+ QueueEntryList list = factory.getQueueEntryList();
+ assertNotNull("QueueEntryList should have been created", list);
+
+ QueueEntry msg1 = list.add(createMessage(1L));
+ QueueEntry msg2 = list.add(createMessage(2L));
+ QueueEntry msg3 = list.add(createMessage(3L));
+ QueueEntry msg4 = list.add(createMessage(4L));
+ QueueEntry msg5 = list.add(createMessage(5L));
+
+ // Create lists of the entries each subscription should be interested
+ // in.Bias over 50% of the messages to the first subscription so that
+ // the later subscriptions reject them and report being done before
+ // the first subscription as the processQueue method proceeds.
+ List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+ List<QueueEntry> msgListSub2 = createEntriesList(msg4);
+ List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+
+ MockSubscription sub1 = new MockSubscription(msgListSub1);
+ MockSubscription sub2 = new MockSubscription(msgListSub2);
+ MockSubscription sub3 = new MockSubscription(msgListSub3);
+
+ // register the subscriptions
+ testQueue.registerSubscription(sub1, false);
+ testQueue.registerSubscription(sub2, false);
+ testQueue.registerSubscription(sub3, false);
+
+ //check that no messages have been delivered to the
+ //subscriptions during registration
+ assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
+
+ // call processQueue to deliver the messages
+ testQueue.processQueue(new QueueRunner(testQueue, 1)
+ {
+ @Override
+ public void run()
+ {
+ // we dont actually want/need this runner to do any work
+ // because we we are already doing it!
+ }
+ });
+
+ // check expected messages delivered to correct consumers
+ verifyRecievedMessages(msgListSub1, sub1.getMessages());
+ verifyRecievedMessages(msgListSub2, sub2.getMessages());
+ verifyRecievedMessages(msgListSub3, sub3.getMessages());
+ }
+
+ private List<QueueEntry> createEntriesList(QueueEntry... entries)
+ {
+ ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+ for (QueueEntry entry : entries)
+ {
+ entriesList.add(entry);
+ }
+ return entriesList;
+ }
+
+ private void verifyRecievedMessages(List<QueueEntry> expected,
+ List<QueueEntry> delivered)
+ {
+ assertEquals("Consumer did not receive the expected number of messages",
+ expected.size(), delivered.size());
+
+ for (QueueEntry msg : expected)
+ {
+ assertTrue("Consumer did not recieve msg: "
+ + msg.getMessage().getMessageNumber(), delivered.contains(msg));
+ }
+ }
+
public class TestMessage extends AMQMessage
{
private final long _tag;
@@ -747,4 +852,20 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
AMQMessage messageA = new TestMessage(id, id, info);
return messageA;
}
+
+ class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+ {
+ QueueEntryList _list;
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ _list = new SimpleQueueEntryList(queue);
+ return _list;
+ }
+
+ public QueueEntryList getQueueEntryList()
+ {
+ return _list;
+ }
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexServerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexServerTest.java
new file mode 100644
index 0000000000..8b3f9c0622
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/auth/sasl/CRAMMD5HexServerTest.java
@@ -0,0 +1,230 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.security.auth.sasl;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.Principal;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.security.auth.login.AccountNotFoundException;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.qpid.server.security.auth.database.Base64MD5PasswordFilePrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexInitialiser;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexSaslServer;
+import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HexServerFactory;
+
+/**
+ * Test for the CRAM-MD5-HEX SASL mechanism.
+ *
+ * This test case focuses on testing {@link CRAMMD5HexSaslServer} but also exercises
+ * collaborators {@link CRAMMD5HexInitialiser} and {@link Base64MD5PasswordFilePrincipalDatabase}
+ */
+public class CRAMMD5HexServerTest extends TestCase
+{
+
+ private SaslServer _saslServer; // Class under test
+ private CRAMMD5HexServerFactory _saslFactory;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ CRAMMD5HexInitialiser _initializer = new CRAMMD5HexInitialiser();
+
+ //Use properties to create a PrincipalDatabase
+ Base64MD5PasswordFilePrincipalDatabase db = createTestPrincipalDatabase();
+ assertEquals("Unexpected number of test users in the db", 2, db.getUsers().size());
+
+ _initializer.initialise(db);
+
+ _saslFactory = new CRAMMD5HexServerFactory();
+
+ _saslServer = _saslFactory.createSaslServer(CRAMMD5HexSaslServer.MECHANISM,
+ "AMQP",
+ "localhost",
+ _initializer.getProperties(),
+ _initializer.getCallbackHandler());
+ assertNotNull("Unable to create saslServer with mechanism type " + CRAMMD5HexSaslServer.MECHANISM, _saslServer);
+
+ }
+
+ public void testSuccessfulAuth() throws Exception
+ {
+
+ final byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]);
+
+ // Generate client response
+ final byte[] clientResponse = generateClientResponse("knownuser", "guest", serverChallenge);
+
+
+ byte[] nextServerChallenge = _saslServer.evaluateResponse(clientResponse);
+ assertTrue("Exchange must be flagged as complete after successful authentication", _saslServer.isComplete());
+ assertNull("Next server challenge must be null after successful authentication", nextServerChallenge);
+
+ }
+
+ public void testKnownUserPresentsWrongPassword() throws Exception
+ {
+ byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]);
+
+
+ final byte[] clientResponse = generateClientResponse("knownuser", "wrong!", serverChallenge);
+ try
+ {
+ _saslServer.evaluateResponse(clientResponse);
+ fail("Exception not thrown");
+ }
+ catch (SaslException se)
+ {
+ // PASS
+ }
+ assertFalse("Exchange must not be flagged as complete after unsuccessful authentication", _saslServer.isComplete());
+ }
+
+ public void testUnknownUser() throws Exception
+ {
+ final byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]);
+
+
+ final byte[] clientResponse = generateClientResponse("unknownuser", "guest", serverChallenge);
+
+ try
+ {
+ _saslServer.evaluateResponse(clientResponse);
+ fail("Exception not thrown");
+ }
+ catch (SaslException se)
+ {
+ assertExceptionHasUnderlyingAsCause(AccountNotFoundException.class, se);
+ // PASS
+ }
+ assertFalse("Exchange must not be flagged as complete after unsuccessful authentication", _saslServer.isComplete());
+ }
+
+ /**
+ *
+ * Demonstrates QPID-3158. A defect meant that users with some valid password were failing to
+ * authenticate when using the .NET 0-8 client (uses this SASL mechanism).
+ * It so happens that password "guest2" was one of the affected passwords.
+ *
+ * @throws Exception
+ */
+ public void testSuccessfulAuthReproducingQpid3158() throws Exception
+ {
+ byte[] serverChallenge = _saslServer.evaluateResponse(new byte[0]);
+
+ // Generate client response
+ byte[] resp = generateClientResponse("qpid3158user", "guest2", serverChallenge);
+
+ byte[] nextServerChallenge = _saslServer.evaluateResponse(resp);
+ assertTrue("Exchange must be flagged as complete after successful authentication", _saslServer.isComplete());
+ assertNull("Next server challenge must be null after successful authentication", nextServerChallenge);
+ }
+
+ /**
+ * Since we don't have a CRAM-MD5-HEX implementation client implementation in Java, this method
+ * provides the implementation for first principals.
+ *
+ * @param userId user id
+ * @param clearTextPassword clear text password
+ * @param serverChallenge challenge from server
+ *
+ * @return challenge response
+ */
+ private byte[] generateClientResponse(final String userId, final String clearTextPassword, final byte[] serverChallenge) throws Exception
+ {
+ byte[] digestedPasswordBytes = MessageDigest.getInstance("MD5").digest(clearTextPassword.getBytes());
+ char[] hexEncodedDigestedPassword = Hex.encodeHex(digestedPasswordBytes);
+ byte[] hexEncodedDigestedPasswordBytes = new String(hexEncodedDigestedPassword).getBytes();
+
+
+ Mac hmacMd5 = Mac.getInstance("HmacMD5");
+ hmacMd5.init(new SecretKeySpec(hexEncodedDigestedPasswordBytes, "HmacMD5"));
+ final byte[] messageAuthenticationCode = hmacMd5.doFinal(serverChallenge);
+
+ // Build client response
+ String responseAsString = userId + " " + new String(Hex.encodeHex(messageAuthenticationCode));
+ byte[] resp = responseAsString.getBytes();
+ return resp;
+ }
+
+ /**
+ * Creates a test principal database.
+ *
+ * @return
+ * @throws IOException
+ */
+ private Base64MD5PasswordFilePrincipalDatabase createTestPrincipalDatabase() throws IOException
+ {
+ Base64MD5PasswordFilePrincipalDatabase db = new Base64MD5PasswordFilePrincipalDatabase();
+ File file = File.createTempFile("passwd", "db");
+ file.deleteOnExit();
+ db.setPasswordFile(file.getCanonicalPath());
+ db.createPrincipal( createTestPrincipal("knownuser"), "guest".toCharArray());
+ db.createPrincipal( createTestPrincipal("qpid3158user"), "guest2".toCharArray());
+ return db;
+ }
+
+ private Principal createTestPrincipal(final String name)
+ {
+ return new Principal()
+ {
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+ };
+ }
+
+ private void assertExceptionHasUnderlyingAsCause(final Class<? extends Throwable> expectedUnderlying, Throwable e)
+ {
+ assertNotNull(e);
+ int infiniteLoopGuard = 0; // Guard against loops in the cause chain
+ boolean foundExpectedUnderlying = false;
+ while (e.getCause() != null && infiniteLoopGuard++ < 10)
+ {
+ if (expectedUnderlying.equals(e.getCause().getClass()))
+ {
+ foundExpectedUnderlying = true;
+ break;
+ }
+ e = e.getCause();
+ }
+
+ if (!foundExpectedUnderlying)
+ {
+ fail("Not found expected underlying exception " + expectedUnderlying + " as underlying cause of " + e.getClass());
+ }
+ }
+
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 3ebe631f62..62ceb68208 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -589,7 +589,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
headerBody.bodySize = 0;
- headerBody.properties = properties;
+ headerBody.setProperties(properties);
try
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
index a75cbe8662..2d41eb9899 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
@@ -102,7 +102,7 @@ public class ReferenceCountingTest extends QpidTestCase
ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
bchp.setDeliveryMode((byte)2);
- chb.properties = bchp;
+ chb.setProperties(bchp);
return chb;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 1ec134e90e..6fbc627d8c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.subscription;
*/
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -45,6 +46,7 @@ public class MockSubscription implements Subscription
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private List<QueueEntry> _acceptEntries = null;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
@@ -54,6 +56,15 @@ public class MockSubscription implements Subscription
// Create a simple ID that increments for ever new Subscription
private final long _subscriptionID = idGenerator.getAndIncrement();
+ public MockSubscription()
+ {
+ }
+
+ public MockSubscription(List<QueueEntry> acceptEntries)
+ {
+ _acceptEntries = acceptEntries;
+ }
+
public void close()
{
_closed = true;
@@ -119,8 +130,15 @@ public class MockSubscription implements Subscription
_stateChangeLock.lock();
}
- public boolean hasInterest(QueueEntry msg)
+ public boolean hasInterest(QueueEntry entry)
{
+ if(_acceptEntries != null)
+ {
+ //simulate selector behaviour, only signal
+ //interest in the dictated queue entries
+ return _acceptEntries.contains(entry);
+ }
+
return true;
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 925b161118..ff94942457 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -243,7 +243,7 @@ public class InternalBrokerBaseCase extends QpidTestCase
//Make Message Persistent
properties.setDeliveryMode((byte) 2);
- _headerBody.properties = properties;
+ _headerBody.setProperties(properties);
channel.publishContentHeader(_headerBody);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
index 2b7e3d44da..2fdb35de49 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.client;
import java.util.ArrayList;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index e719c9a4b2..40c1df0c5d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -99,7 +99,7 @@ public abstract class AbstractJMSMessageFactory implements MessageFactory
}
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
- (BasicContentHeaderProperties) contentHeader.properties,
+ (BasicContentHeaderProperties) contentHeader.getProperties(),
exchange, routingKey);
return createMessage(delegate, data);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
index 4e4061cf4d..cdb75fc9a9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
@@ -104,7 +104,7 @@ public class MessageFactoryRegistry
AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 83e5a7e341..30db3b8be7 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -36,7 +36,7 @@ public class ContentHeaderBody implements AMQBody
public long bodySize;
/** must never be null */
- public ContentHeaderProperties properties;
+ private ContentHeaderProperties properties;
public ContentHeaderBody()
{
@@ -128,4 +128,14 @@ public class ContentHeaderBody implements AMQBody
{
return new AMQFrame(channelId, body);
}
+
+ public ContentHeaderProperties getProperties()
+ {
+ return properties;
+ }
+
+ public void setProperties(ContentHeaderProperties props)
+ {
+ properties = props;
+ }
}
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index da490e831a..1c66d535b2 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -173,8 +173,8 @@ def OptionsAndArguments(argv):
group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node")
group3.add_option("--file-count", action="store", type="int", default=8, metavar="<n>", help="Number of files in queue's persistence journal")
group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64Kib/page)")
- group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal")
- group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal")
+ group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
+ group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")