summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-17 14:08:14 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-17 14:08:14 +0000
commit16a49ba6ef283a5093780d28efaaa8483fc9010d (patch)
tree427908f0242a05033385a1dcd1a0af908bca44ea
parentfca7b2ac23c76c402457ab605639ba4b16a5e3f1 (diff)
downloadqpid-python-16a49ba6ef283a5093780d28efaaa8483fc9010d.tar.gz
QPID-2935: sync with latest trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071615 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/bin/release.sh2
-rw-r--r--qpid/cpp/INSTALL7
-rw-r--r--qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am4
-rw-r--r--qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp64
-rwxr-xr-xqpid/cpp/bindings/qmf2/examples/python/agent.py73
-rw-r--r--qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb2
-rw-r--r--qpid/cpp/bindings/qmf2/python/qmf2.py12
-rw-r--r--qpid/cpp/bindings/qmf2/ruby/qmf2.rb8
-rw-r--r--qpid/cpp/bindings/qpid/Makefile.am3
-rw-r--r--qpid/cpp/bindings/qpid/dotnet/Makefile.am125
-rw-r--r--qpid/cpp/build-aux/.gitignore1
-rw-r--r--qpid/cpp/configure.ac3
-rw-r--r--qpid/cpp/src/CMakeLists.txt17
-rw-r--r--qpid/cpp/src/Makefile.am23
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp35
-rw-r--r--qpid/cpp/src/qmf/AgentSession.cpp8
-rw-r--r--qpid/cpp/src/qmf/ConsoleSession.cpp7
-rw-r--r--qpid/cpp/src/qmf/ConsoleSessionImpl.h1
-rw-r--r--qpid/cpp/src/qpid/RefCountedBuffer.h6
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.cpp156
-rw-r--r--qpid/cpp/src/qpid/broker/Fairshare.h61
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp116
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h59
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h5
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.cpp140
-rw-r--r--qpid/cpp/src/qpid/broker/MessageDeque.h62
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.cpp166
-rw-r--r--qpid/cpp/src/qpid/broker/MessageMap.h72
-rw-r--r--qpid/cpp/src/qpid/broker/Messages.h117
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.cpp212
-rw-r--r--qpid/cpp/src/qpid/broker/PriorityQueue.h78
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp465
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h42
-rw-r--r--qpid/cpp/src/qpid/broker/QueueEvents.cpp25
-rw-r--r--qpid/cpp/src/qpid/broker/QueueEvents.h1
-rw-r--r--qpid/cpp/src/qpid/broker/QueueObserver.h42
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h8
-rw-r--r--qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp139
-rw-r--r--qpid/cpp/src/qpid/broker/ThresholdAlerts.h73
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp28
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp34
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h17
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterTimer.cpp21
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp5
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp51
-rw-r--r--qpid/cpp/src/tests/cluster.mk2
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp36
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark17
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp1
-rw-r--r--qpid/cpp/src/tests/qpid-send.cpp25
-rw-r--r--qpid/cpp/xml/cluster.xml7
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py21
-rw-r--r--qpid/extras/sasl/m4/ac_pkg_swig.m48
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java2
-rw-r--r--qpid/java/common.xml7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java41
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java17
-rw-r--r--qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.certbin756 -> 0 bytes
-rw-r--r--qpid/java/module.xml13
-rw-r--r--qpid/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java184
-rw-r--r--qpid/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java266
-rw-r--r--qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java49
-rw-r--r--qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java258
-rw-r--r--qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java183
-rw-r--r--qpid/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java79
-rw-r--r--qpid/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java216
-rwxr-xr-xqpid/java/tools/bin/perf_report.sh53
-rw-r--r--qpid/java/tools/etc/jndi.properties35
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java42
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java58
-rw-r--r--qpid/specs/management-schema.xml3
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py4
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py37
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py75
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/priority.py237
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py62
-rw-r--r--qpid/tools/src/py/.gitignore26
89 files changed, 2925 insertions, 1825 deletions
diff --git a/qpid/bin/release.sh b/qpid/bin/release.sh
index 913bbc7334..31c12e630c 100755
--- a/qpid/bin/release.sh
+++ b/qpid/bin/release.sh
@@ -208,7 +208,7 @@ fi
if [ "JAVA" == "$JAVA" ] ; then
pushd qpid-${VER}/java
- ant build release release-bin release-mvn -Dsvnversion.output=${REV}
+ ant build release release-bin release-mvn -Dsvnversion.output=${REV} -Dmaven.snapshot=false
popd
cp qpid-${VER}/java/release/*.tar.gz artifacts/qpid-java-${VER}.tar.gz
diff --git a/qpid/cpp/INSTALL b/qpid/cpp/INSTALL
index 6628552ce5..6483d7de4e 100644
--- a/qpid/cpp/INSTALL
+++ b/qpid/cpp/INSTALL
@@ -67,6 +67,10 @@ Optional SSL support requires:
* nss <http://www.mozilla.org/projects/security/pki/nss/>
* nspr <http://www.mozilla.org/projects/nspr/>
+Optional binding support for ruby requires:
+* ruby and ruby devel <http://www.ruby-lang.org/en/>
+* swig <http://www.swig.org/>
+
Qpid has been built using the GNU C++ compiler:
* gcc <http://gcc.gnu.org/> (3.4.6)
@@ -124,6 +128,9 @@ For the XML Exchange, include:
# yum install xqilla-devel xerces-c-devel
+Optional ruby binding support include:
+ # yum install ruby ruby-devel swig
+
Follow the manual installation instruction below for any packages not
available through your distributions packaging tool.
diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
index 9c3bd615d6..84207d43c4 100644
--- a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
+++ b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
@@ -21,7 +21,7 @@ INCLUDE = -I$(top_srcdir)/include
AM_CPPFLAGS = $(INCLUDE)
-noinst_PROGRAMS=agent list_agents
+noinst_PROGRAMS=agent list_agents print_events
agent_SOURCES=agent.cpp
agent_LDADD=$(top_builddir)/src/libqmf2.la
@@ -29,3 +29,5 @@ agent_LDADD=$(top_builddir)/src/libqmf2.la
list_agents_SOURCES=list_agents.cpp
list_agents_LDADD=$(top_builddir)/src/libqmf2.la
+print_events_SOURCES=print_events.cpp
+print_events_LDADD=$(top_builddir)/src/libqmf2.la
diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp b/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp
new file mode 100644
index 0000000000..9883a19962
--- /dev/null
+++ b/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Duration.h>
+#include <qmf/ConsoleSession.h>
+#include <qmf/ConsoleEvent.h>
+#include <qmf/Data.h>
+#include <qpid/types/Variant.h>
+#include <string>
+#include <iostream>
+
+using namespace std;
+using namespace qmf;
+using qpid::types::Variant;
+using qpid::messaging::Duration;
+
+int main(int argc, char** argv)
+{
+ string url("localhost");
+ string connectionOptions;
+ string sessionOptions;
+
+ if (argc > 1)
+ url = argv[1];
+ if (argc > 2)
+ connectionOptions = argv[2];
+ if (argc > 3)
+ sessionOptions = argv[3];
+
+ qpid::messaging::Connection connection(url, connectionOptions);
+ connection.open();
+
+ ConsoleSession session(connection, sessionOptions);
+ session.open();
+
+ while (true) {
+ ConsoleEvent event;
+ if (session.nextEvent(event)) {
+ if (event.getType() == CONSOLE_EVENT) {
+ const Data& data(event.getData(0));
+ cout << "Event: timestamp=" << event.getTimestamp() << " severity=" <<
+ event.getSeverity() << " content=" << data.getProperties() << endl;
+ }
+ }
+ }
+}
+
diff --git a/qpid/cpp/bindings/qmf2/examples/python/agent.py b/qpid/cpp/bindings/qmf2/examples/python/agent.py
index 66b7dbdc58..b24890f531 100755
--- a/qpid/cpp/bindings/qmf2/examples/python/agent.py
+++ b/qpid/cpp/bindings/qmf2/examples/python/agent.py
@@ -69,32 +69,41 @@ class ExampleAgent(AgentHandler):
if addr == self.controlAddr:
self.control.methodCount += 1
- if methodName == "stop":
- self.session.methodSuccess(handle)
- self.cancel()
-
- elif methodName == "echo":
- handle.addReturnArgument("sequence", args["sequence"])
- handle.addReturnArgument("map", args["map"])
- self.session.methodSuccess(handle)
-
- elif methodName == "fail":
- if args['useString']:
- self.session.raiseException(handle, args['stringVal'])
- else:
- ex = Data(self.sch_exception)
- ex.whatHappened = "It Failed"
- ex.howBad = 75
- ex.details = args['details']
- self.session.raiseException(handle, ex)
-
- elif methodName == "create_child":
- name = args['name']
- child = Data(self.sch_child)
- child.name = name
- addr = self.session.addData(child, name)
- handle.addReturnArgument("childAddr", addr.asMap())
- self.session.methodSuccess(handle)
+ try:
+ if methodName == "stop":
+ self.session.methodSuccess(handle)
+ self.cancel()
+
+ elif methodName == "echo":
+ handle.addReturnArgument("sequence", args["sequence"])
+ handle.addReturnArgument("map", args["map"])
+ self.session.methodSuccess(handle)
+
+ elif methodName == "event":
+ ev = Data(self.sch_event)
+ ev.text = args['text']
+ self.session.raiseEvent(ev, args['severity'])
+ self.session.methodSuccess(handle)
+
+ elif methodName == "fail":
+ if args['useString']:
+ self.session.raiseException(handle, args['stringVal'])
+ else:
+ ex = Data(self.sch_exception)
+ ex.whatHappened = "It Failed"
+ ex.howBad = 75
+ ex.details = args['details']
+ self.session.raiseException(handle, ex)
+
+ elif methodName == "create_child":
+ name = args['name']
+ child = Data(self.sch_child)
+ child.name = name
+ addr = self.session.addData(child, name)
+ handle.addReturnArgument("childAddr", addr.asMap())
+ self.session.methodSuccess(handle)
+ except BaseException, e:
+ self.session.raiseException(handle, "%r" % e)
def setupSchema(self):
@@ -128,6 +137,11 @@ class ExampleAgent(AgentHandler):
echoMethod.addArgument(SchemaProperty("map", SCHEMA_DATA_MAP, direction=DIR_IN_OUT))
self.sch_control.addMethod(echoMethod)
+ eventMethod = SchemaMethod("event", desc="Raise an Event")
+ eventMethod.addArgument(SchemaProperty("text", SCHEMA_DATA_STRING, direction=DIR_IN))
+ eventMethod.addArgument(SchemaProperty("severity", SCHEMA_DATA_INT, direction=DIR_IN))
+ self.sch_control.addMethod(eventMethod)
+
failMethod = SchemaMethod("fail", desc="Expected to Fail")
failMethod.addArgument(SchemaProperty("useString", SCHEMA_DATA_BOOL, direction=DIR_IN))
failMethod.addArgument(SchemaProperty("stringVal", SCHEMA_DATA_STRING, direction=DIR_IN))
@@ -146,11 +160,18 @@ class ExampleAgent(AgentHandler):
self.sch_child.addProperty(SchemaProperty("name", SCHEMA_DATA_STRING))
##
+ ## Declare the event class
+ ##
+ self.sch_event = Schema(SCHEMA_TYPE_EVENT, package, "event")
+ self.sch_event.addProperty(SchemaProperty("text", SCHEMA_DATA_STRING))
+
+ ##
## Register our schemata with the agent session.
##
self.session.registerSchema(self.sch_exception)
self.session.registerSchema(self.sch_control)
self.session.registerSchema(self.sch_child)
+ self.session.registerSchema(self.sch_event)
def populateData(self):
diff --git a/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb b/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
index 712e5007be..41de7e5abe 100644
--- a/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
+++ b/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb
@@ -27,7 +27,7 @@ class FindAgents < Qmf2::ConsoleHandler
end
def agent_added(agent)
- puts "Agent Added: #{agent.to_s}"
+ puts "Agent Added: #{agent.name}"
end
def agent_deleted(agent, reason)
diff --git a/qpid/cpp/bindings/qmf2/python/qmf2.py b/qpid/cpp/bindings/qmf2/python/qmf2.py
index 61a5453f8e..9f2d8556f4 100644
--- a/qpid/cpp/bindings/qmf2/python/qmf2.py
+++ b/qpid/cpp/bindings/qmf2/python/qmf2.py
@@ -165,7 +165,7 @@ class ConsoleHandler(Thread):
reason = 'filter'
if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED:
reason = 'aged'
- self.agentDeleted(Agent(event.getAgent(), reason))
+ self.agentDeleted(Agent(event.getAgent()), reason)
elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART:
self.agentRestarted(Agent(event.getAgent()))
@@ -373,6 +373,16 @@ class AgentSession(object):
else:
self._impl.raiseException(handle, data)
+ def raiseEvent(self, data, severity=None):
+ """
+ """
+ if not severity:
+ self._impl.raiseEvent(data._impl)
+ else:
+ if (severity.__class__ != int and severity.__class__ != long) or severity < 0 or severity > 7:
+ raise Exception("Severity must be an int between 0..7")
+ self._impl.raiseEvent(data._impl, severity);
+
#===================================================================================================
# AGENT PROXY
diff --git a/qpid/cpp/bindings/qmf2/ruby/qmf2.rb b/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
index 6d1741ebc0..c14ecba4e1 100644
--- a/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
+++ b/qpid/cpp/bindings/qmf2/ruby/qmf2.rb
@@ -433,6 +433,14 @@ module Qmf2
def del_data(addr)
@impl.del_data(addr.impl)
end
+
+ def raise_event(data, severity=nil)
+ if !severity
+ @impl.raiseEvent(data.impl)
+ else
+ @impl.raiseEvent(data.impl, severity)
+ end
+ end
end
##==============================================================================
diff --git a/qpid/cpp/bindings/qpid/Makefile.am b/qpid/cpp/bindings/qpid/Makefile.am
index 07b51e6c64..ca9eda0c73 100644
--- a/qpid/cpp/bindings/qpid/Makefile.am
+++ b/qpid/cpp/bindings/qpid/Makefile.am
@@ -17,10 +17,11 @@
# under the License.
#
+SUBDIRS = dotnet
+
if HAVE_SWIG
EXTRA_DIST = qpid.i
-SUBDIRS =
if HAVE_RUBY_DEVEL
SUBDIRS += ruby
diff --git a/qpid/cpp/bindings/qpid/dotnet/Makefile.am b/qpid/cpp/bindings/qpid/dotnet/Makefile.am
new file mode 100644
index 0000000000..f2b106bcb2
--- /dev/null
+++ b/qpid/cpp/bindings/qpid/dotnet/Makefile.am
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+EXTRA_DIST = winsdk_sources/winsdk_dotnet_examples.sln \
+ winsdk_sources/examples/csharp.direct.receiver/csharp.direct.receiver.csproj \
+ winsdk_sources/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj \
+ winsdk_sources/examples/csharp.example.helloworld/csharp.example.helloworld.csproj \
+ winsdk_sources/examples/csharp.example.declare_queues/csharp.example.declare_queues.csproj \
+ winsdk_sources/examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj \
+ winsdk_sources/examples/csharp.example.server/csharp.example.server.csproj \
+ winsdk_sources/examples/csharp.example.spout/csharp.example.spout.csproj \
+ winsdk_sources/examples/csharp.example.drain/csharp.example.drain.csproj \
+ winsdk_sources/examples/csharp.map.sender/csharp.map.sender.csproj \
+ winsdk_sources/examples/csharp.map.receiver/csharp.map.receiver.csproj \
+ winsdk_sources/examples/csharp.example.client/csharp.example.client.csproj \
+ winsdk_sources/examples/csharp.direct.sender/csharp.direct.sender.csproj \
+ examples/csharp.direct.receiver/csharp.direct.receiver.cs \
+ examples/csharp.direct.receiver/csharp.direct.receiver.csproj \
+ examples/csharp.direct.receiver/Properties/AssemblyInfo.cs \
+ examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs \
+ examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj \
+ examples/csharp.map.callback.receiver/Properties/AssemblyInfo.cs \
+ examples/powershell.example.helloworld/powershell.example.helloworld.ps1 \
+ examples/csharp.example.helloworld/csharp.example.helloworld.cs \
+ examples/csharp.example.helloworld/csharp.example.helloworld.csproj \
+ examples/csharp.example.helloworld/Properties/AssemblyInfo.cs \
+ examples/csharp.example.declare_queues/csharp.example.declare_queues.cs \
+ examples/csharp.example.declare_queues/csharp.example.declare_queues.csproj \
+ examples/csharp.example.declare_queues/Properties/AssemblyInfo.cs \
+ examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj \
+ examples/csharp.map.callback.sender/Properties/AssemblyInfo.cs \
+ examples/csharp.map.callback.sender/csharp.map.callback.sender.cs \
+ examples/csharp.example.server/csharp.example.server.csproj \
+ examples/csharp.example.server/csharp.example.server.cs \
+ examples/csharp.example.server/Properties/AssemblyInfo.cs \
+ examples/csharp.example.spout/csharp.example.spout.csproj \
+ examples/csharp.example.spout/Options.cs \
+ examples/csharp.example.spout/csharp.example.spout.cs \
+ examples/csharp.example.spout/Properties/AssemblyInfo.cs \
+ examples/csharp.example.drain/csharp.example.drain.cs \
+ examples/csharp.example.drain/csharp.example.drain.csproj \
+ examples/csharp.example.drain/Options.cs \
+ examples/csharp.example.drain/Properties/AssemblyInfo.cs \
+ examples/csharp.map.sender/csharp.map.sender.csproj \
+ examples/csharp.map.sender/csharp.map.sender.cs \
+ examples/csharp.map.sender/Properties/AssemblyInfo.cs \
+ examples/visualbasic.example.client/visualbasic.example.client.vbproj \
+ examples/visualbasic.example.client/MyProject/Resources.resx \
+ examples/visualbasic.example.client/MyProject/Application.myapp \
+ examples/visualbasic.example.client/MyProject/Settings.settings \
+ examples/visualbasic.example.client/MyProject/Settings.Designer.vb \
+ examples/visualbasic.example.client/MyProject/AssemblyInfo.vb \
+ examples/visualbasic.example.client/MyProject/Application.Designer.vb \
+ examples/visualbasic.example.client/MyProject/Resources.Designer.vb \
+ examples/visualbasic.example.client/visualbasic.example.client.vb \
+ examples/csharp.map.receiver/csharp.map.receiver.csproj \
+ examples/csharp.map.receiver/csharp.map.receiver.cs \
+ examples/csharp.map.receiver/Properties/AssemblyInfo.cs \
+ examples/csharp.example.client/csharp.example.client.cs \
+ examples/csharp.example.client/Properties/AssemblyInfo.cs \
+ examples/csharp.example.client/csharp.example.client.csproj \
+ examples/csharp.direct.sender/csharp.direct.sender.csproj \
+ examples/csharp.direct.sender/csharp.direct.sender.cs \
+ examples/csharp.direct.sender/Properties/AssemblyInfo.cs \
+ configure-windows.ps1 \
+ ReadMe.txt \
+ org.apache.qpid.messaging.sln \
+ test/messaging.test/messaging.test.address.cs \
+ test/messaging.test/messaging.test.duration.cs \
+ test/messaging.test/messaging.test.cs \
+ test/messaging.test/messaging.test.message.cs \
+ test/messaging.test/messaging.test.csproj \
+ test/messaging.test/Properties/AssemblyInfo.cs \
+ test/messaging.test/messaging.test.connection.cs \
+ src/org.apache.qpid.messaging.vcproj \
+ src/Message.cpp \
+ src/Connection.cpp \
+ src/TypeTranslator.h \
+ src/AssemblyInfo.cpp \
+ src/FailoverUpdates.h \
+ src/sessionreceiver/sessionreceiver.cs \
+ src/sessionreceiver/Properties/sessionreceiver-AssemblyInfo-template.cs \
+ src/sessionreceiver/qpid.snk \
+ src/sessionreceiver/org.apache.qpid.messaging.sessionreceiver.csproj \
+ src/Sender.h \
+ src/TypeTranslator.cpp \
+ src/Receiver.h \
+ src/Address.h \
+ src/Sender.cpp \
+ src/QpidTypeCheck.h \
+ src/resource1.h \
+ src/Duration.h \
+ src/Session.h \
+ src/Message.h \
+ src/ReadMe.txt \
+ src/Receiver.cpp \
+ src/Address.cpp \
+ src/app.rc \
+ src/Session.cpp \
+ src/org.apache.qpid.messaging.template.rc \
+ src/qpid.snk \
+ src/Connection.h \
+ src/QpidException.h \
+ src/QpidMarshal.h \
+ src/FailoverUpdates.cpp \
+ src/Duration.cpp \
+ ../../../src/windows/resources/qpid-icon.ico \
+ ../../../src/windows/resources/template-resource.rc \
+ ../../../src/windows/resources/version-resource.h
diff --git a/qpid/cpp/build-aux/.gitignore b/qpid/cpp/build-aux/.gitignore
new file mode 100644
index 0000000000..42725ceff3
--- /dev/null
+++ b/qpid/cpp/build-aux/.gitignore
@@ -0,0 +1 @@
+/py-compile
diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac
index ee1bade1c9..ea1a1b49ea 100644
--- a/qpid/cpp/configure.ac
+++ b/qpid/cpp/configure.ac
@@ -16,7 +16,7 @@ AC_INIT([qpidc],
[dev@qpid.apache.org])
AC_CONFIG_AUX_DIR([build-aux])
-AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects])
+AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects tar-ustar])
# Minimum Autoconf version required.
AC_PREREQ(2.59)
@@ -538,6 +538,7 @@ AC_CONFIG_FILES([
bindings/qpid/ruby/Makefile
bindings/qpid/python/Makefile
bindings/qpid/perl/Makefile
+ bindings/qpid/dotnet/Makefile
bindings/qmf/Makefile
bindings/qmf/ruby/Makefile
bindings/qmf/python/Makefile
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index eb4bb72aad..4b5a1b1c2c 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -313,6 +313,10 @@ if (NOT Boost_FILESYSTEM_LIBRARY)
set(Boost_FILESYSTEM_LIBRARY boost_filesystem)
endif (NOT Boost_FILESYSTEM_LIBRARY)
+if (NOT Boost_SYSTEM_LIBRARY)
+ set(Boost_SYSTEM_LIBRARY boost_system)
+endif (NOT Boost_SYSTEM_LIBRARY)
+
if (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY)
set(Boost_UNIT_TEST_FRAMEWORK_LIBRARY boost_unit_test_framework)
endif (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY)
@@ -487,6 +491,7 @@ endif (BUILD_SASL)
CHECK_LIBRARY_EXISTS (xerces-c _init "" HAVE_XERCES)
CHECK_INCLUDE_FILE_CXX (xercesc/framework/MemBufInputSource.hpp HAVE_XERCES_H)
CHECK_INCLUDE_FILE_CXX (xqilla/xqilla-simple.hpp HAVE_XQILLA_H)
+CHECK_INCLUDE_FILE_CXX (xqilla/ast/XQEffectiveBooleanValue.hpp HAVE_XQ_EBV)
set (xml_default ${xml_force})
if (CMAKE_SYSTEM_NAME STREQUAL Windows)
@@ -510,6 +515,10 @@ if (BUILD_XML)
message(FATAL_ERROR "XML Exchange support requested but XQilla headers not found")
endif (NOT HAVE_XQILLA_H)
+ if (HAVE_XQ_EBV)
+ add_definitions(-DXQ_EFFECTIVE_BOOLEAN_VALUE_HPP)
+ endif (HAVE_XQ_EBV)
+
add_library (xml MODULE
qpid/xml/XmlExchange.cpp
qpid/xml/XmlExchange.h
@@ -956,6 +965,11 @@ set (qpidbroker_SOURCES
qpid/broker/Broker.cpp
qpid/broker/Exchange.cpp
qpid/broker/ExpiryPolicy.cpp
+ qpid/broker/Fairshare.cpp
+ qpid/broker/LegacyLVQ.cpp
+ qpid/broker/MessageDeque.cpp
+ qpid/broker/MessageMap.cpp
+ qpid/broker/PriorityQueue.cpp
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
@@ -1006,6 +1020,7 @@ set (qpidbroker_SOURCES
qpid/broker/SessionHandler.h
qpid/broker/SessionHandler.cpp
qpid/broker/System.cpp
+ qpid/broker/ThresholdAlerts.cpp
qpid/broker/TopicExchange.cpp
qpid/broker/TxAccept.cpp
qpid/broker/TxBuffer.cpp
@@ -1207,6 +1222,8 @@ install (TARGETS replication_exchange
# file whereas older builds only have config.h on autoconf-generated builds.
add_definitions(-DHAVE_CONFIG_H)
+add_definitions(-DBOOST_FILESYSTEM_VERSION=2)
+
# Now create the config file from all the info learned above.
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake
${CMAKE_CURRENT_BINARY_DIR}/config.h)
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 8f00cefb33..6fafff7d54 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -24,6 +24,7 @@ SUBDIRS = . tests
# using Visual Studio solutions/projects.
windows_dist = \
qpid/client/windows/SaslFactory.cpp \
+ qpid/client/windows/SslConnector.cpp \
qpid/log/windows/SinkOptions.cpp \
qpid/log/windows/SinkOptions.h \
../include/qpid/sys/windows/check.h \
@@ -42,6 +43,8 @@ windows_dist = \
qpid/sys/windows/Shlib.cpp \
qpid/sys/windows/SocketAddress.cpp \
qpid/sys/windows/Socket.cpp \
+ qpid/sys/windows/SslAsynchIO.cpp \
+ qpid/sys/windows/SslAsynchIO.h \
qpid/sys/windows/StrError.cpp \
qpid/sys/windows/SystemInfo.cpp \
qpid/sys/windows/Thread.cpp \
@@ -51,7 +54,9 @@ windows_dist = \
qpid/sys/windows/uuid.h \
windows/QpiddBroker.cpp \
qpid/broker/windows/BrokerDefaults.cpp \
- qpid/broker/windows/SaslAuthenticator.cpp
+ qpid/broker/windows/SaslAuthenticator.cpp \
+ qpid/broker/windows/SslProtocolFactory.cpp \
+ qpid/messaging/HandleInstantiator.cpp
EXTRA_DIST= $(platform_dist) $(rgen_srcs) $(windows_dist)
@@ -122,6 +127,8 @@ qpidtest_SCRIPTS =
tmoduledir = $(libdir)/qpid/tests
tmodule_LTLIBRARIES=
+AM_CXXFLAGS += -DBOOST_FILESYSTEM_VERSION=2
+
## Automake macros to build libraries and executables.
qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\"
libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"
@@ -543,6 +550,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/ExchangeRegistry.h \
qpid/broker/ExpiryPolicy.cpp \
qpid/broker/ExpiryPolicy.h \
+ qpid/broker/Fairshare.h \
+ qpid/broker/Fairshare.cpp \
qpid/broker/FanOutExchange.cpp \
qpid/broker/FanOutExchange.h \
qpid/broker/FedOps.h \
@@ -550,6 +559,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/HeadersExchange.cpp \
qpid/broker/HeadersExchange.h \
qpid/broker/AsyncCompletion.h \
+ qpid/broker/LegacyLVQ.h \
+ qpid/broker/LegacyLVQ.cpp \
qpid/broker/Link.cpp \
qpid/broker/Link.h \
qpid/broker/LinkRegistry.cpp \
@@ -560,9 +571,16 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageBuilder.h \
+ qpid/broker/MessageDeque.h \
+ qpid/broker/MessageDeque.cpp \
+ qpid/broker/MessageMap.h \
+ qpid/broker/MessageMap.cpp \
+ qpid/broker/Messages.h \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/MessageStoreModule.h \
+ qpid/broker/PriorityQueue.h \
+ qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.cpp \
@@ -584,6 +602,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/QueueEvents.h \
qpid/broker/QueueListeners.cpp \
qpid/broker/QueueListeners.h \
+ qpid/broker/QueueObserver.h \
qpid/broker/QueuePolicy.cpp \
qpid/broker/QueuePolicy.h \
qpid/broker/QueueRegistry.cpp \
@@ -632,6 +651,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/SignalHandler.h \
qpid/broker/System.cpp \
qpid/broker/System.h \
+ qpid/broker/ThresholdAlerts.cpp \
+ qpid/broker/ThresholdAlerts.h \
qpid/broker/TopicExchange.cpp \
qpid/broker/TopicExchange.h \
qpid/broker/TransactionalStore.h \
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
index 176cadf0c1..915f2a1c88 100644
--- a/qpid/cpp/src/qmf/Agent.cpp
+++ b/qpid/cpp/src/qmf/Agent.cpp
@@ -339,7 +339,7 @@ void AgentImpl::handleMethodResponse(const Variant::Map& response, const Message
uint32_t correlator;
boost::shared_ptr<SyncContext> context;
- QPID_LOG(trace, "RCVD MethodResponse map=" << response);
+ QPID_LOG(trace, "RCVD MethodResponse cid=" << cid << " map=" << response);
aIter = response.find("_arguments");
if (aIter != response.end())
@@ -556,13 +556,14 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator)
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
msg.setSubject(directSubject);
- if (!session.authUser.empty())
- msg.setUserId(session.authUser);
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
encode(QueryImplAccess::get(query).asMap(), msg);
- if (sender.isValid())
+ if (sender.isValid()) {
sender.send(msg);
-
- QPID_LOG(trace, "SENT QueryRequest to=" << name);
+ QPID_LOG(trace, "SENT QueryRequest to=" << sender.getName() << "/" << directSubject << " cid=" << correlator);
+ }
}
@@ -583,13 +584,14 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
msg.setSubject(directSubject);
- if (!session.authUser.empty())
- msg.setUserId(session.authUser);
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
encode(map, msg);
- if (sender.isValid())
+ if (sender.isValid()) {
sender.send(msg);
-
- QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
+ QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << sender.getName() << "/" << directSubject << " content=" << map << " cid=" << correlator);
+ }
}
void AgentImpl::sendSchemaRequest(const SchemaId& id)
@@ -626,12 +628,13 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id)
msg.setReplyTo(session.replyAddress);
msg.setContent(content);
msg.setSubject(directSubject);
- if (!session.authUser.empty())
- msg.setUserId(session.authUser);
- if (sender.isValid())
+ string userId(session.connection.getAuthenticatedUsername());
+ if (!userId.empty())
+ msg.setUserId(userId);
+ if (sender.isValid()) {
sender.send(msg);
-
- QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
+ QPID_LOG(trace, "SENT V1SchemaRequest to=" << sender.getName() << "/" << directSubject);
+ }
}
diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp
index 30176a8c01..4c5a72a467 100644
--- a/qpid/cpp/src/qmf/AgentSession.cpp
+++ b/qpid/cpp/src/qmf/AgentSession.cpp
@@ -571,7 +571,7 @@ void AgentSessionImpl::raiseEvent(const Data& data, int severity)
encode(list, msg);
topicSender.send(msg);
- QPID_LOG(trace, "SENT EventIndication to=" << subject);
+ QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject);
}
@@ -625,7 +625,7 @@ void AgentSessionImpl::setAgentName()
void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg)
{
- QPID_LOG(trace, "RCVD AgentLocateRequest");
+ QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo());
if (!predicate.empty()) {
Query agentQuery(QUERY_OBJECT);
@@ -659,7 +659,7 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const
void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg)
{
- QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo());
+ QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
//
// Construct an AgentEvent to be sent to the application.
@@ -719,7 +719,7 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me
void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg)
{
- QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo());
+ QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId());
//
// Construct an AgentEvent to be sent to the application or directly handled by the agent.
diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
index bb4458a0b9..e12c1152f6 100644
--- a/qpid/cpp/src/qmf/ConsoleSession.cpp
+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
@@ -65,7 +65,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5),
+ connection(c), domain("default"), maxAgentAgeMinutes(5),
opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
{
@@ -394,6 +394,7 @@ void ConsoleSessionImpl::sendAgentLocate()
{
Message msg;
Variant::Map& headers(msg.getProperties());
+ static const string subject("console.request.agent_locate");
headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST;
headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST;
@@ -401,12 +402,12 @@ void ConsoleSessionImpl::sendAgentLocate()
msg.setReplyTo(replyAddress);
msg.setCorrelationId("agent-locate");
- msg.setSubject("console.request.agent_locate");
+ msg.setSubject(subject);
encode(agentQuery.getPredicate(), msg);
topicSender.send(msg);
- QPID_LOG(trace, "SENT AgentLocate to topic");
+ QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject);
}
diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
index e495c1c1e8..675c8bcfb5 100644
--- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
@@ -72,7 +72,6 @@ namespace qmf {
qpid::messaging::Sender directSender;
qpid::messaging::Sender topicSender;
std::string domain;
- std::string authUser;
uint32_t maxAgentAgeMinutes;
bool listenOnDirect;
bool strictSecurity;
diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.h b/qpid/cpp/src/qpid/RefCountedBuffer.h
index c332325378..75a23862be 100644
--- a/qpid/cpp/src/qpid/RefCountedBuffer.h
+++ b/qpid/cpp/src/qpid/RefCountedBuffer.h
@@ -27,7 +27,7 @@
#include <boost/intrusive_ptr.hpp>
namespace qpid {
-// FIXME aconway 2008-09-06: easy to add alignment
+
/**
* Reference-counted byte buffer.
* No alignment guarantees.
@@ -51,7 +51,7 @@ public:
pointer(const pointer&);
~pointer();
pointer& operator=(const pointer&);
-
+
char* get() { return cp(); }
operator char*() { return cp(); }
char& operator*() { return *cp(); }
@@ -62,7 +62,7 @@ public:
const char& operator*() const { return *cp(); }
const char& operator[](size_t i) const { return cp()[i]; }
};
-
+
/** Create a reference counted buffer of size n */
static pointer create(size_t n);
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 2ef6933612..ebccdbe38f 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -103,7 +103,8 @@ Broker::Options::Options(const std::string& name) :
requireEncrypted(false),
maxSessionRate(0),
asyncQueueEvents(false), // Must be false in a cluster.
- qmf2Support(false),
+ qmf2Support(true),
+ qmf1Support(true),
queueFlowStopRatio(80),
queueFlowResumeRatio(70)
{
@@ -125,7 +126,8 @@ Broker::Options::Options(const std::string& name) :
("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
- ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
+ ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
+ ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -153,7 +155,7 @@ const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
- managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+ managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support,
conf.qmf2Support)
: 0),
store(new NullMessageStore),
@@ -225,7 +227,6 @@ Broker::Broker(const Broker::Options& conf) :
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
- queues.setQueueEvents(&queueEvents);
// Early-Initialize plugins
Plugin::earlyInitAll(*this);
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 55823bc45a..9ed9ca3995 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -115,6 +115,7 @@ public:
uint32_t maxSessionRate;
bool asyncQueueEvents;
bool qmf2Support;
+ bool qmf1Support;
uint queueFlowStopRatio; // producer flow control: on
uint queueFlowResumeRatio; // producer flow control: off
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp
new file mode 100644
index 0000000000..e6bbf86691
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Fairshare.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+
+Fairshare::Fairshare(size_t levels, uint limit) :
+ PriorityQueue(levels),
+ limits(levels, limit), priority(levels-1), count(0) {}
+
+
+void Fairshare::setLimit(size_t level, uint limit)
+{
+ limits[level] = limit;
+}
+
+bool Fairshare::limitReached()
+{
+ uint l = limits[priority];
+ return l && ++count > l;
+}
+
+uint Fairshare::currentLevel()
+{
+ if (limitReached()) {
+ return nextLevel();
+ } else {
+ return priority;
+ }
+}
+
+uint Fairshare::nextLevel()
+{
+ count = 1;
+ if (priority) --priority;
+ else priority = levels-1;
+ return priority;
+}
+
+bool Fairshare::isNull()
+{
+ for (int i = 0; i < levels; i++) if (limits[i]) return false;
+ return true;
+}
+
+bool Fairshare::getState(uint& p, uint& c) const
+{
+ p = priority;
+ c = count;
+ return true;
+}
+
+bool Fairshare::setState(uint p, uint c)
+{
+ priority = p;
+ count = c;
+ return true;
+}
+
+bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages)
+{
+ const uint start = p = currentLevel();
+ do {
+ if (!messages[p].empty()) return true;
+ } while ((p = nextLevel()) != start);
+ return false;
+}
+
+
+
+bool Fairshare::getState(const Messages& m, uint& priority, uint& count)
+{
+ const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m);
+ return fairshare && fairshare->getState(priority, count);
+}
+
+bool Fairshare::setState(Messages& m, uint priority, uint count)
+{
+ Fairshare* fairshare = dynamic_cast<Fairshare*>(&m);
+ return fairshare && fairshare->setState(priority, count);
+}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
+int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue)
+{
+ return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue));
+}
+
+std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings)
+{
+ std::auto_ptr<Messages> result;
+ size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100);
+ if (levels) {
+ uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare");
+ std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit));
+ for (uint i = 0; i < levels; i++) {
+ std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str();
+ if(settings.isSet(key)) {
+ fairshare->setLimit(i, getIntegerSetting(settings, key));
+ }
+ }
+
+ if (fairshare->isNull()) {
+ result = std::auto_ptr<Messages>(new PriorityQueue(levels));
+ } else {
+ result = fairshare;
+ }
+ }
+ return result;
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h
new file mode 100644
index 0000000000..6c4b87f857
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Fairshare.h
@@ -0,0 +1,61 @@
+#ifndef QPID_BROKER_FAIRSHARE_H
+#define QPID_BROKER_FAIRSHARE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PriorityQueue.h"
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace broker {
+
+/**
+ * Modifies a basic prioirty queue by limiting the number of messages
+ * from each priority level that are dispatched before allowing
+ * dispatch from the next level.
+ */
+class Fairshare : public PriorityQueue
+{
+ public:
+ Fairshare(size_t levels, uint limit);
+ bool getState(uint& priority, uint& count) const;
+ bool setState(uint priority, uint count);
+ void setLimit(size_t level, uint limit);
+ static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings);
+ static bool getState(const Messages&, uint& priority, uint& count);
+ static bool setState(Messages&, uint priority, uint count);
+ private:
+ std::vector<uint> limits;
+
+ uint priority;
+ uint count;
+
+ uint currentLevel();
+ uint nextLevel();
+ bool isNull();
+ bool limitReached();
+ bool findFrontLevel(uint& p, PriorityLevels&);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_FAIRSHARE_H*/
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
new file mode 100644
index 0000000000..a811a86492
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {}
+
+void LegacyLVQ::setNoBrowse(bool b)
+{
+ noBrowse = b;
+}
+
+bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(position);
+ if (i != messages.end() && i->second.payload == message.payload) {
+ message = i->second;
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ if (MessageMap::next(position, message)) {
+ if (!noBrowse) index.erase(getKey(message));
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed)
+{
+ //Hack to disable LVQ behaviour on cluster update:
+ if (broker && broker->isClusterUpdatee()) {
+ messages[added.position] = added;
+ return false;
+ } else {
+ return MessageMap::push(added, removed);
+ }
+}
+
+const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update)
+{
+ //add the new message into the original position of the replaced message
+ Ordering::iterator i = messages.find(original.position);
+ i->second = update;
+ i->second.position = original.position;
+ return i->second;
+}
+
+void LegacyLVQ::removeIf(Predicate p)
+{
+ //Note: This method is currently called periodically on the timer
+ //thread to expire messages. In a clustered broker this means that
+ //the purging does not occur on the cluster event dispatch thread
+ //and consequently that is not totally ordered w.r.t other events
+ //(including publication of messages). The cluster does ensure
+ //that the actual expiration of messages (as distinct from the
+ //removing of those expired messages from the queue) *is*
+ //consistently ordered w.r.t. cluster events. This means that
+ //delivery of messages is in general consistent across the cluster
+ //inspite of any non-determinism in the triggering of a
+ //purge. However at present purging a last value queue (of the
+ //legacy sort) could potentially cause inconsistencies in the
+ //cluster (as the order w.r.t publications can affect the order in
+ //which messages appear in the queue). Consequently periodic
+ //purging of an LVQ is not enabled if the broker is clustered
+ //(expired messages will be removed on delivery and consolidated
+ //by key as part of normal LVQ operation).
+
+ //TODO: Is there a neater way to check whether broker is
+ //clustered? Here we assume that if the clustered timer is the
+ //same as the regular timer, we are not clustered:
+ if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer()))
+ MessageMap::removeIf(p);
+}
+
+std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current,
+ const std::string& key, bool noBrowse, Broker* broker)
+{
+ LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get());
+ if (lvq) {
+ lvq->setNoBrowse(noBrowse);
+ return current;
+ } else {
+ return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker));
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
new file mode 100644
index 0000000000..dd0fd7aaec
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
@@ -0,0 +1,59 @@
+#ifndef QPID_BROKER_LEGACYLVQ_H
+#define QPID_BROKER_LEGACYLVQ_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageMap.h"
+#include <memory>
+
+namespace qpid {
+namespace broker {
+class Broker;
+
+/**
+ * This class encapsulates the behaviour of the old style LVQ where a
+ * message replacing another messages for the given key will use the
+ * position in the queue of the previous message. This however causes
+ * problems for browsing. Either browsers stop the coalescing of
+ * messages by key (default) or they may mis updates (if the no-browse
+ * option is specified).
+ */
+class LegacyLVQ : public MessageMap
+{
+ public:
+ LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0);
+ bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const framing::SequenceNumber&, QueuedMessage&);
+ bool push(const QueuedMessage& added, QueuedMessage& removed);
+ void removeIf(Predicate);
+ void setNoBrowse(bool);
+ static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current,
+ const std::string& key, bool noBrowse,
+ Broker* broker);
+ protected:
+ bool noBrowse;
+ Broker* broker;
+
+ const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_LEGACYLVQ_H*/
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index a16180f3ae..122c5b9c1a 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -400,22 +400,6 @@ bool Message::hasExpired()
return expiryPolicy && expiryPolicy->hasExpired(*this);
}
-boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
-{
- sys::Mutex::ScopedLock l(lock);
- Replacement::iterator i = replacement.find(qfor);
- if (i != replacement.end()){
- return i->second;
- }
- return empty;
-}
-
-void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
-{
- sys::Mutex::ScopedLock l(lock);
- replacement[qfor] = msg;
-}
-
namespace {
struct ScopedSet {
sys::Monitor& lock;
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index e8a8a19d53..2d0de27823 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -153,8 +153,6 @@ public:
void forcePersistent();
bool isForcedPersistent();
- boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const;
- void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor);
/** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
void setDequeueCompleteCallback(MessageCallback& cb);
@@ -163,8 +161,6 @@ public:
uint8_t getPriority() const;
private:
- typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
-
MessageAdapter& getAdapter() const;
void allDequeuesComplete();
@@ -183,7 +179,6 @@ public:
static TransferAdapter TRANSFER;
- mutable Replacement replacement;
mutable boost::intrusive_ptr<Message> empty;
sys::Monitor callbackLock;
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
new file mode 100644
index 0000000000..24b8f6f895
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+
+size_t MessageDeque::size()
+{
+ return messages.size();
+}
+
+bool MessageDeque::empty()
+{
+ return messages.empty();
+}
+
+void MessageDeque::reinsert(const QueuedMessage& message)
+{
+ messages.insert(lower_bound(messages.begin(), messages.end(), message), message);
+}
+
+MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position)
+{
+ if (!messages.empty()) {
+ QueuedMessage comp;
+ comp.position = position;
+ unsigned long diff = position.getValue() - messages.front().position.getValue();
+ long maxEnd = diff < messages.size()? diff : messages.size();
+ return lower_bound(messages.begin(),messages.begin()+maxEnd,comp);
+ } else {
+ return messages.end();
+ }
+}
+
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+{
+ Deque::iterator i = seek(position);
+ if (i != messages.end() && i->position == position) {
+ message = *i;
+ if (remove) messages.erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, true);
+}
+
+bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, false);
+}
+
+bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ if (messages.empty()) {
+ return false;
+ } else if (position < front().position) {
+ message = front();
+ return true;
+ } else {
+ Deque::iterator i = seek(position+1);
+ if (i != messages.end()) {
+ message = *i;
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
+
+QueuedMessage& MessageDeque::front()
+{
+ return messages.front();
+}
+
+void MessageDeque::pop()
+{
+ if (!messages.empty()) {
+ messages.pop_front();
+ }
+}
+
+bool MessageDeque::pop(QueuedMessage& out)
+{
+ if (messages.empty()) {
+ return false;
+ } else {
+ out = front();
+ messages.pop_front();
+ return true;
+ }
+}
+
+bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
+{
+ messages.push_back(added);
+ return false;//adding a message never causes one to be removed for deque
+}
+
+void MessageDeque::foreach(Functor f)
+{
+ std::for_each(messages.begin(), messages.end(), f);
+}
+
+void MessageDeque::removeIf(Predicate p)
+{
+ for (Deque::iterator i = messages.begin(); i != messages.end();) {
+ if (p(*i)) {
+ i = messages.erase(i);
+ } else {
+ ++i;
+ }
+ }
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h
new file mode 100644
index 0000000000..0e1aef2986
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageDeque.h
@@ -0,0 +1,62 @@
+#ifndef QPID_BROKER_MESSAGEDEQUE_H
+#define QPID_BROKER_MESSAGEDEQUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include "qpid/broker/QueuedMessage.h"
+#include <deque>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Provides the standard FIFO queue behaviour.
+ */
+class MessageDeque : public Messages
+{
+ public:
+ size_t size();
+ bool empty();
+
+ void reinsert(const QueuedMessage&);
+ bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+ QueuedMessage& front();
+ void pop();
+ bool pop(QueuedMessage&);
+ bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+ void foreach(Functor);
+ void removeIf(Predicate);
+
+ private:
+ typedef std::deque<QueuedMessage> Deque;
+ Deque messages;
+
+ Deque::iterator seek(const framing::SequenceNumber&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_MESSAGEDEQUE_H*/
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp
new file mode 100644
index 0000000000..39e23df533
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/MessageMap.h"
+#include "qpid/broker/QueuedMessage.h"
+
+namespace qpid {
+namespace broker {
+namespace {
+const std::string EMPTY;
+}
+
+std::string MessageMap::getKey(const QueuedMessage& message)
+{
+ const framing::FieldTable* ft = message.payload->getApplicationHeaders();
+ if (ft) return ft->getAsString(key);
+ else return EMPTY;
+}
+
+size_t MessageMap::size()
+{
+ return messages.size();
+}
+
+bool MessageMap::empty()
+{
+ return messages.empty();
+}
+
+void MessageMap::reinsert(const QueuedMessage& message)
+{
+ std::string key = getKey(message);
+ Index::iterator i = index.find(key);
+ if (i == index.end()) {
+ index[key] = message;
+ messages[message.position] = message;
+ } //else message has already been replaced
+}
+
+bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(position);
+ if (i != messages.end()) {
+ message = i->second;
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ Ordering::iterator i = messages.find(position);
+ if (i != messages.end()) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ if (!messages.empty() && position < front().position) {
+ message = front();
+ return true;
+ } else {
+ Ordering::iterator i = messages.lower_bound(position+1);
+ if (i != messages.end()) {
+ message = i->second;
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
+
+QueuedMessage& MessageMap::front()
+{
+ return messages.begin()->second;
+}
+
+void MessageMap::pop()
+{
+ QueuedMessage dummy;
+ pop(dummy);
+}
+
+bool MessageMap::pop(QueuedMessage& out)
+{
+ Ordering::iterator i = messages.begin();
+ if (i != messages.end()) {
+ out = i->second;
+ erase(i);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
+{
+ messages.erase(original.position);
+ messages[update.position] = update;
+ return update;
+}
+
+bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
+{
+ std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added));
+ if (result.second) {
+ //there was no previous message for this key; nothing needs to
+ //be removed, just add the message into its correct position
+ messages[added.position] = added;
+ return false;
+ } else {
+ //there is already a message with that key which needs to be replaced
+ removed = result.first->second;
+ result.first->second = replace(result.first->second, added);
+ return true;
+ }
+}
+
+void MessageMap::foreach(Functor f)
+{
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+ f(i->second);
+ }
+}
+
+void MessageMap::removeIf(Predicate p)
+{
+ for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) {
+ if (p(i->second)) {
+ erase(i);
+ }
+ }
+}
+
+void MessageMap::erase(Ordering::iterator i)
+{
+ index.erase(getKey(i->second));
+ messages.erase(i);
+}
+
+MessageMap::MessageMap(const std::string& k) : key(k) {}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h
new file mode 100644
index 0000000000..1128a1d54a
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/MessageMap.h
@@ -0,0 +1,72 @@
+#ifndef QPID_BROKER_MESSAGEMAP_H
+#define QPID_BROKER_MESSAGEMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include "qpid/framing/SequenceNumber.h"
+#include <map>
+#include <string>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Provides a last value queue behaviour, whereby a messages replace
+ * any previous message with the same value for a defined property
+ * (i.e. the key).
+ */
+class MessageMap : public Messages
+{
+ public:
+ MessageMap(const std::string& key);
+ virtual ~MessageMap() {}
+
+ size_t size();
+ bool empty();
+
+ void reinsert(const QueuedMessage&);
+ virtual bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&);
+ virtual bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+ QueuedMessage& front();
+ void pop();
+ bool pop(QueuedMessage&);
+ virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+ void foreach(Functor);
+ virtual void removeIf(Predicate);
+
+ protected:
+ typedef std::map<std::string, QueuedMessage> Index;
+ typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering;
+ const std::string key;
+ Index index;
+ Ordering messages;
+
+ std::string getKey(const QueuedMessage&);
+ virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+ void erase(Ordering::iterator);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_MESSAGEMAP_H*/
diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h
new file mode 100644
index 0000000000..0d75417640
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Messages.h
@@ -0,0 +1,117 @@
+#ifndef QPID_BROKER_MESSAGES_H
+#define QPID_BROKER_MESSAGES_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace framing {
+class SequenceNumber;
+}
+namespace broker {
+struct QueuedMessage;
+
+/**
+ * This interface abstracts out the access to the messages held for
+ * delivery by a Queue instance.
+ */
+class Messages
+{
+ public:
+ typedef boost::function1<void, QueuedMessage&> Functor;
+ typedef boost::function1<bool, QueuedMessage&> Predicate;
+
+ virtual ~Messages() {}
+ /**
+ * @return the number of messages available for delivery.
+ */
+ virtual size_t size() = 0;
+ /**
+ * @return true if there are no messages for delivery, false otherwise
+ */
+ virtual bool empty() = 0;
+
+ /**
+ * Re-inserts a message back into its original position - used
+ * when requeing released messages.
+ */
+ virtual void reinsert(const QueuedMessage&) = 0;
+ /**
+ * Remove the message at the specified position, returning true if
+ * found, false otherwise. The removed message is passed back via
+ * the second parameter.
+ */
+ virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ /**
+ * Find the message at the specified position, returning true if
+ * found, false otherwise. The matched message is passed back via
+ * the second parameter.
+ */
+ virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0;
+ /**
+ * Return the next message to be given to a browsing subscrption
+ * that has reached the specified poisition. The next messages is
+ * passed back via the second parameter.
+ *
+ * @return true if there is another message, false otherwise.
+ */
+ virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0;
+
+ /**
+ * Note: Caller is responsible for ensuring that there is a front
+ * (e.g. empty() returns false)
+ *
+ * @return the next message to be delivered
+ */
+ virtual QueuedMessage& front() = 0;
+ /**
+ * Removes the front message
+ */
+ virtual void pop() = 0;
+ /**
+ * @return true if there is a mesage to be delivered - in which
+ * case that message will be returned via the parameter and
+ * removed - otherwise false.
+ */
+ virtual bool pop(QueuedMessage&) = 0;
+ /**
+ * Pushes a message to the back of the 'queue'. For some types of
+ * queue this may cause another message to be removed; if that is
+ * the case the method will return true and the removed message
+ * will be passed out via the second parameter.
+ */
+ virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0;
+
+ /**
+ * Apply the functor to each message held
+ */
+ virtual void foreach(Functor) = 0;
+ /**
+ * Remove every message held that for which the specified
+ * predicate returns true
+ */
+ virtual void removeIf(Predicate) = 0;
+ private:
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_MESSAGES_H*/
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
new file mode 100644
index 0000000000..e07e73d323
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp
@@ -0,0 +1,212 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PriorityQueue.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/framing/reply_exceptions.h"
+#include <cmath>
+
+namespace qpid {
+namespace broker {
+
+PriorityQueue::PriorityQueue(int l) :
+ levels(l),
+ messages(levels, Deque()),
+ frontLevel(0), haveFront(false), cached(false) {}
+
+size_t PriorityQueue::size()
+{
+ size_t total(0);
+ for (int i = 0; i < levels; ++i) {
+ total += messages[i].size();
+ }
+ return total;
+}
+
+bool PriorityQueue::empty()
+{
+ for (int i = 0; i < levels; ++i) {
+ if (!messages[i].empty()) return false;
+ }
+ return true;
+}
+
+void PriorityQueue::reinsert(const QueuedMessage& message)
+{
+ uint p = getPriorityLevel(message);
+ messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
+ clearCache();
+}
+
+bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
+{
+ QueuedMessage comp;
+ comp.position = position;
+ for (int i = 0; i < levels; ++i) {
+ if (!messages[i].empty()) {
+ unsigned long diff = position.getValue() - messages[i].front().position.getValue();
+ long maxEnd = diff < messages[i].size() ? diff : messages[i].size();
+ Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
+ if (l != messages[i].end() && l->position == position) {
+ message = *l;
+ if (remove) {
+ messages[i].erase(l);
+ clearCache();
+ }
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, true);
+}
+
+bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ return find(position, message, false);
+}
+
+bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message)
+{
+ QueuedMessage match;
+ match.position = position+1;
+ Deque::iterator lowest;
+ bool found = false;
+ for (int i = 0; i < levels; ++i) {
+ Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match);
+ if (m != messages[i].end()) {
+ if (m->position == match.position) {
+ message = *m;
+ return true;
+ } else if (!found || m->position < lowest->position) {
+ lowest = m;
+ found = true;
+ }
+ }
+ }
+ if (found) {
+ message = *lowest;
+ }
+ return found;
+}
+
+QueuedMessage& PriorityQueue::front()
+{
+ if (checkFront()) {
+ return messages[frontLevel].front();
+ } else {
+ throw qpid::framing::InternalErrorException(QPID_MSG("No message available"));
+ }
+}
+
+bool PriorityQueue::pop(QueuedMessage& message)
+{
+ if (checkFront()) {
+ message = messages[frontLevel].front();
+ messages[frontLevel].pop_front();
+ clearCache();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void PriorityQueue::pop()
+{
+ QueuedMessage dummy;
+ pop(dummy);
+}
+
+bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
+{
+ messages[getPriorityLevel(added)].push_back(added);
+ clearCache();
+ return false;//adding a message never causes one to be removed for deque
+}
+
+void PriorityQueue::foreach(Functor f)
+{
+ for (int i = 0; i < levels; ++i) {
+ std::for_each(messages[i].begin(), messages[i].end(), f);
+ }
+}
+
+void PriorityQueue::removeIf(Predicate p)
+{
+ for (int priority = 0; priority < levels; ++priority) {
+ for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
+ if (p(*i)) {
+ i = messages[priority].erase(i);
+ clearCache();
+ } else {
+ ++i;
+ }
+ }
+ }
+}
+
+uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
+{
+ uint priority = m.payload->getPriority();
+ //Use AMQP 0-10 approach to mapping priorities to a fixed level
+ //(see rule priority-level-implementation)
+ const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
+ if (priority <= firstLevel) return 0;
+ return std::min(priority - firstLevel, (uint)levels-1);
+}
+
+void PriorityQueue::clearCache()
+{
+ cached = false;
+}
+
+bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
+{
+ for (int p = levels-1; p >= 0; --p) {
+ if (!m[p].empty()) {
+ l = p;
+ return true;
+ }
+ }
+ return false;
+}
+
+bool PriorityQueue::checkFront()
+{
+ if (!cached) {
+ haveFront = findFrontLevel(frontLevel, messages);
+ cached = true;
+ }
+ return haveFront;
+}
+
+uint PriorityQueue::getPriority(const QueuedMessage& message)
+{
+ const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
+ if (queue) return queue->getPriorityLevel(message);
+ else return 0;
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h
new file mode 100644
index 0000000000..4bf9d26a9d
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h
@@ -0,0 +1,78 @@
+#ifndef QPID_BROKER_PRIORITYQUEUE_H
+#define QPID_BROKER_PRIORITYQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include "qpid/sys/IntegerTypes.h"
+#include <deque>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Basic priority queue with a configurable number of recognised
+ * priority levels. This is implemented as a separate deque per
+ * priority level. Browsing is FIFO not priority order.
+ */
+class PriorityQueue : public Messages
+{
+ public:
+ PriorityQueue(int levels);
+ virtual ~PriorityQueue() {}
+ size_t size();
+ bool empty();
+
+ void reinsert(const QueuedMessage&);
+ bool remove(const framing::SequenceNumber&, QueuedMessage&);
+ bool find(const framing::SequenceNumber&, QueuedMessage&);
+ bool next(const framing::SequenceNumber&, QueuedMessage&);
+
+ QueuedMessage& front();
+ void pop();
+ bool pop(QueuedMessage&);
+ bool push(const QueuedMessage& added, QueuedMessage& removed);
+
+ void foreach(Functor);
+ void removeIf(Predicate);
+ static uint getPriority(const QueuedMessage&);
+ protected:
+ typedef std::deque<QueuedMessage> Deque;
+ typedef std::vector<Deque> PriorityLevels;
+ virtual bool findFrontLevel(uint& p, PriorityLevels&);
+
+ const int levels;
+ private:
+ PriorityLevels messages;
+ uint frontLevel;
+ bool haveFront;
+ bool cached;
+
+ bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove);
+ uint getPriorityLevel(const QueuedMessage&) const;
+ void clearCache();
+ bool checkFront();
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PRIORITYQUEUE_H*/
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 3de93ed74e..cfb32749a0 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -23,11 +23,16 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/LegacyLVQ.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/MessageMap.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueFlowLimit.h"
+#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
@@ -67,11 +72,13 @@ const std::string qpidMaxCount("qpid.max_count");
const std::string qpidNoLocal("no-local");
const std::string qpidTraceIdentity("qpid.trace.id");
const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueueKey("qpid.last_value_queue_key");
const std::string qpidLastValueQueue("qpid.last_value_queue");
const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string qpidPersistLastNode("qpid.persist_last_node");
const std::string qpidVQMatchProperty("qpid.LVQ_key");
const std::string qpidQueueEventGeneration("qpid.queue_event_generation");
+const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout");
//following feature is not ready for general use as it doesn't handle
//the case where a message is enqueued on more than one queue well enough:
const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers");
@@ -93,19 +100,18 @@ Queue::Queue(const string& _name, bool _autodelete,
consumerCount(0),
exclusive(0),
noLocal(false),
- lastValueQueue(false),
- lastValueQueueNoBrowse(false),
persistLastNode(false),
inLastNodeFailure(false),
+ messages(new MessageDeque()),
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
eventMode(0),
- eventMgr(0),
insertSeqNo(0),
broker(b),
deleted(false),
- barrier(*this)
+ barrier(*this),
+ autoDeleteTimeout(0)
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -160,7 +166,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){
} else {
enqueue(0, msg);
push(msg);
- mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -179,7 +184,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
- mgntEnqStats(msg);
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
//content has not been loaded, need to ensure that lazy loading mode is set:
@@ -194,7 +198,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
- mgntEnqStats(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -208,7 +211,7 @@ void Queue::requeue(const QueuedMessage& msg){
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
- messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg);
+ messages->reinsert(msg);
listeners.populate(copy);
// for persistLastNode - don't force a message twice to disk, but force it if no force before
@@ -223,57 +226,23 @@ void Queue::requeue(const QueuedMessage& msg){
copy.notify();
}
-void Queue::clearLVQIndex(const QueuedMessage& msg){
- assertClusterSafe();
- const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
-}
-
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
-
- Messages::iterator i = findAt(position);
- if (i != messages.end() ) {
- message = *i;
- if (lastValueQueue) {
- clearLVQIndex(*i);
- }
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
- messages.erase(i);
+ if (messages->remove(position, message)) {
+ QPID_LOG(debug, "Acquired message at " << position << " from " << name);
return true;
- }
- QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
- return false;
+ } else {
+ QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
+ return false;
+ }
}
bool Queue::acquire(const QueuedMessage& msg) {
- Mutex::ScopedLock locker(messageLock);
- assertClusterSafe();
-
- QPID_LOG(debug, "attempting to acquire " << msg.position);
- Messages::iterator i = findAt(msg.position);
- if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
- (!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
-
- clearLVQIndex(msg);
- QPID_LOG(debug,
- "Match found, acquire succeeded: " <<
- i->position << " == " << msg.position);
- messages.erase(i);
- return true;
- }
-
- QPID_LOG(debug, "Acquire failed for " << msg.position);
- return false;
+ QueuedMessage copy = msg;
+ return acquireMessageAt(msg.position, copy);
}
void Queue::notifyListener()
@@ -282,7 +251,7 @@ void Queue::notifyListener()
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -311,12 +280,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
} else {
- QueuedMessage msg = getFront();
+ QueuedMessage msg = messages->front();
if (msg.payload->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
popAndDequeue();
@@ -326,7 +295,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- popMsg(msg);
+ pop();
return CONSUMED;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -352,11 +321,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
//consumer wants the message
c->position = msg.position;
m = msg;
- if (!lastValueQueueNoBrowse) clearLVQIndex(msg);
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) m.payload = replacement;
- }
return true;
} else {
//browser hasn't got enough credit for the message
@@ -378,7 +342,7 @@ void Queue::removeListener(Consumer::shared_ptr c)
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
- if (messages.size()) {
+ if (messages->size()) {
listeners.populate(set);
}
}
@@ -399,52 +363,20 @@ bool Queue::dispatch(Consumer::shared_ptr c)
// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > c->position) {
- if (c->position < getFront().position) {
- msg = getFront();
- return true;
- } else {
- Messages::iterator pos = findAt(c->position);
- if (pos != messages.end() && pos+1 != messages.end()) {
- msg = *(pos+1);
- return true;
- }
- }
+ if (messages->next(c->position, msg)) {
+ return true;
+ } else {
+ listeners.addListener(c);
+ return false;
}
- listeners.addListener(c);
- return false;
}
-Queue::Messages::iterator Queue::findAt(SequenceNumber pos) {
-
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i!= messages.end() && i->position == pos)
- return i;
- }
- return messages.end(); // no match found.
-}
-
-
QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
- if(!messages.empty()){
- QueuedMessage compM;
- compM.position = pos;
- unsigned long diff = pos.getValue() - messages.front().position.getValue();
- long maxEnd = diff < messages.size()? diff : messages.size();
-
- Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM);
- if (i != messages.end())
- return *i;
- }
- return QueuedMessage();
+ QueuedMessage msg;
+ messages->find(pos, msg);
+ return msg;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -464,6 +396,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
consumerCount++;
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
}
void Queue::cancel(Consumer::shared_ptr c){
@@ -478,12 +414,18 @@ void Queue::cancel(Consumer::shared_ptr c){
QueuedMessage Queue::get(){
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
+ messages->pop(msg);
+ return msg;
+}
- if(!messages.empty()){
- msg = getFront();
- popMsg(msg);
+bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message)
+{
+ if (message.payload->hasExpired()) {
+ expired.push_back(message);
+ return true;
+ } else {
+ return false;
}
- return msg;
}
void Queue::purgeExpired()
@@ -492,37 +434,11 @@ void Queue::purgeExpired()
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
- //Note: This method is currently called periodically on the timer
- //thread. In a clustered broker this means that the purging does
- //not occur on the cluster event dispatch thread and consequently
- //that is not totally ordered w.r.t other events (including
- //publication of messages). However the cluster does ensure that
- //the actual expiration of messages (as distinct from the removing
- //of those expired messages from the queue) *is* consistently
- //ordered w.r.t. cluster events. This means that delivery of
- //messages is in general consistent across the cluster inspite of
- //any non-determinism in the triggering of a purge. However at
- //present purging a last value queue could potentially cause
- //inconsistencies in the cluster (as the order w.r.t publications
- //can affect the order in which messages appear in the
- //queue). Consequently periodic purging of an LVQ is not enabled
- //(expired messages will be removed on delivery and consolidated
- //by key as part of normal LVQ operation).
-
- if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) {
- Messages expired;
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
- //Re-introduce management of LVQ-specific state here
- //if purging is renabled for that case (see note above)
- if (i->payload->hasExpired()) {
- expired.push_back(*i);
- i = messages.erase(i);
- } else {
- ++i;
- }
- }
+ messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
}
for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -548,13 +464,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
- while((!purge_request || purge_count--) && !messages.empty()) {
+ while((!purge_request || purge_count--) && !messages->empty()) {
if (dest.get()) {
//
// If there is a destination exchange, stage the messages onto a reroute queue
// so they don't wind up getting purged more than once.
//
- DeliverableMessage msg(getFront().payload);
+ DeliverableMessage msg(messages->front().payload);
rerouteQueue.push_back(msg);
}
popAndDequeue();
@@ -580,101 +496,53 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
- while((!qty || move_count--) && !messages.empty()) {
- QueuedMessage qmsg = getFront();
+ while((!qty || move_count--) && !messages->empty()) {
+ QueuedMessage qmsg = messages->front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- popMsg(qmsg);
+ pop();
dequeue(0, qmsg);
count++;
}
return count;
}
-void Queue::popMsg(QueuedMessage& qmsg)
+void Queue::pop()
{
assertClusterSafe();
- const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
- lvq.erase(key);
- }
- messages.pop_front();
+ messages->pop();
++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
QueueListeners::NotificationSet copy;
+ QueuedMessage removed;
+ bool dequeueRequired = false;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
- LVQ::iterator i;
- const framing::FieldTable* ft = msg->getApplicationHeaders();
- if (lastValueQueue && ft){
- string key = ft->getAsString(qpidVQMatchProperty);
-
- i = lvq.find(key);
- if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
- messages.push_back(qm);
- listeners.populate(copy);
- lvq[key] = msg;
- }else {
- boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
- if (!old) old = i->second;
- i->second->setReplacementMessage(msg,this);
- if (isRecovery) {
- //can't issue new requests for the store until
- //recovery is complete
- pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
- } else {
- Mutex::ScopedUnlock u(messageLock);
- dequeue(0, QueuedMessage(qm.queue, old, qm.position));
- }
- }
- }else {
- messages.push_back(qm);
- listeners.populate(copy);
- }
- if (eventMode) {
- if (eventMgr) eventMgr->enqueued(qm);
- else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
- }
- if (policy.get()) {
- policy->enqueued(qm);
- }
- if (flowLimit.get())
- flowLimit->enqueued(qm);
+ dequeueRequired = messages->push(qm, removed);
+ listeners.populate(copy);
+ enqueued(qm);
}
copy.notify();
-}
-
-QueuedMessage Queue::getFront()
-{
- QueuedMessage msg = messages.front();
- if (lastValueQueue) {
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) msg.payload = replacement;
+ if (dequeueRequired) {
+ if (isRecovery) {
+ //can't issue new requests for the store until
+ //recovery is complete
+ pendingDequeues.push_back(removed);
+ } else {
+ dequeue(0, removed);
+ }
}
- return msg;
}
-QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
+void isEnqueueComplete(uint32_t* result, const QueuedMessage& message)
{
- boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this);
- if (replacement.get()) {
- const framing::FieldTable* ft = replacement->getApplicationHeaders();
- if (ft) {
- string key = ft->getAsString(qpidVQMatchProperty);
- if (lvq.find(key) != lvq.end()){
- lvq[key] = replacement;
- }
- }
- msg.payload = replacement;
- }
- return msg;
+ if (message.payload->isIngressComplete()) (*result)++;
}
/** function only provided for unit tests, or code not in critical message path */
@@ -682,20 +550,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
uint32_t count = 0;
- for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- //NOTE: don't need to use checkLvqReplace() here as it
- //is only relevant for LVQ which does not support persistence
- //so the enqueueComplete check has no effect
- if ( i->payload->isIngressComplete() ) count ++;
- }
-
+ messages->foreach(boost::bind(&isEnqueueComplete, &count, _1));
return count;
}
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
- return messages.size();
+ return messages->size();
}
uint32_t Queue::getConsumerCount() const
@@ -707,7 +569,7 @@ uint32_t Queue::getConsumerCount() const
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(consumerLock);
- return autodelete && !consumerCount;
+ return autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
@@ -715,21 +577,22 @@ void Queue::clearLastNodeFailure()
inLastNodeFailure = false;
}
+void Queue::forcePersistent(QueuedMessage& message)
+{
+ if(!message.payload->isStoredOnQueue(shared_from_this())) {
+ message.payload->forcePersistent();
+ if (message.payload->isForcedPersistent() ){
+ enqueue(0, message.payload);
+ }
+ }
+}
+
void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
try {
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
- if (lastValueQueue) checkLvqReplace(*i);
- // don't force a message twice to disk.
- if(!i->payload->isStoredOnQueue(shared_from_this())) {
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
- }
- }
- }
+ messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
} catch (const std::exception& e) {
// Could not go into last node standing (for example journal not large enough)
QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
@@ -746,7 +609,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
if (!u.acquired) return false;
if (policy.get() && !suppressPolicyCheck) {
- Messages dequeues;
+ std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
policy->tryEnqueue(msg);
@@ -833,8 +696,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
*/
void Queue::popAndDequeue()
{
- QueuedMessage msg = getFront();
- popMsg(msg);
+ QueuedMessage msg = messages->front();
+ pop();
dequeue(0, msg);
}
@@ -845,11 +708,16 @@ void Queue::popAndDequeue()
void Queue::dequeued(const QueuedMessage& msg)
{
if (policy.get()) policy->dequeued(msg);
+ /** todo KAG make flowLimit an observer */
if (flowLimit.get())
flowLimit->dequeued(msg);
mgntDeqStats(msg.payload);
- if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
- eventMgr->dequeued(msg);
+ for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
+ try{
+ (*i)->dequeued(msg);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
+ }
}
}
@@ -863,16 +731,41 @@ void Queue::create(const FieldTable& _settings)
configure(_settings);
}
+
+int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
+{
+ qpid::framing::FieldTable::ValuePtr v = settings.get(key);
+ if (!v) {
+ return 0;
+ } else if (v->convertsTo<int>()) {
+ return v->get<int>();
+ } else if (v->convertsTo<std::string>()){
+ std::string s = v->get<std::string>();
+ try {
+ return boost::lexical_cast<int>(s);
+ } catch(const boost::bad_lexical_cast&) {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
+ return 0;
+ }
+ } else {
+ QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
+ return 0;
+ }
+}
+
void Queue::configure(const FieldTable& _settings, bool recovering)
{
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
+ if (eventMode && broker) {
+ broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
+ }
if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
- (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) {
+ (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
- } else if (eventMgr && !eventMgr->isSync() ) {
+ } else if (broker && !(broker->getQueueEvents().isSync()) ) {
QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName());
}
FieldTable copy(_settings);
@@ -881,17 +774,30 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
} else {
setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings));
}
+ if (broker && broker->getManagementAgent()) {
+ ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings);
+ }
+
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal);
- lastValueQueue= _settings.get(qpidLastValueQueue);
- if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName());
-
- lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse);
- if (lastValueQueueNoBrowse){
- QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName());
- lastValueQueue = lastValueQueueNoBrowse;
+ std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey);
+ if (lvqKey.size()) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
+ messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ } else if (_settings.get(qpidLastValueQueueNoBrowse)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ } else if (_settings.get(qpidLastValueQueue)) {
+ QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
+ messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ } else {
+ std::auto_ptr<Messages> m = Fairshare::create(_settings);
+ if (m.get()) {
+ messages = m;
+ QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
+ }
}
persistLastNode= _settings.get(qpidPersistLastNode);
@@ -910,6 +816,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings);
+ autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
if (flowLimit.get())
@@ -924,8 +834,8 @@ void Queue::destroy()
{
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
- while(!messages.empty()){
- DeliverableMessage msg(getFront().payload);
+ while(!messages->empty()){
+ DeliverableMessage msg(messages->front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
msg.getMessage().getApplicationHeaders());
popAndDequeue();
@@ -939,6 +849,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
}
void Queue::notifyDeleted()
@@ -1043,15 +954,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
-void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
}
+struct AutoDeleteTask : qpid::sys::TimerTask
+{
+ Broker& broker;
+ Queue::shared_ptr queue;
+
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
+
+ void fire()
+ {
+ //need to detect case where queue was used after the task was
+ //created, but then became unused again before the task fired;
+ //in this case ignore this request as there will have already
+ //been a later task added
+ tryAutoDeleteImpl(broker, queue);
+ }
+};
+
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
+ AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
+ queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
+ broker.getClusterTimer().add(queue->autoDeleteTask);
+ QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
+ } else {
+ tryAutoDeleteImpl(broker, queue);
+ }
+}
+
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
@@ -1066,6 +1008,10 @@ void Queue::releaseExclusiveOwnership()
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
+ //reset auto deletion timer if necessary
+ if (autoDeleteTimeout && autoDeleteTask) {
+ autoDeleteTask->cancel();
+ }
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;
@@ -1154,11 +1100,6 @@ SequenceNumber Queue::getPosition() {
int Queue::getEventMode() { return eventMode; }
-void Queue::setQueueEventManager(QueueEvents& mgr)
-{
- eventMgr = &mgr;
-}
-
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
@@ -1184,16 +1125,31 @@ void Queue::insertSequenceNumbers(const std::string& key)
void Queue::enqueued(const QueuedMessage& m)
{
- if (m.payload) {
- if (policy.get()) {
- policy->recoverEnqueued(m.payload);
- policy->enqueued(m);
+ for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
+ try {
+ (*i)->enqueued(m);
+ } catch (const std::exception& e) {
+ QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
- if (flowLimit.get())
- flowLimit->enqueued(m);
- mgntEnqStats(m.payload);
+ }
+ if (policy.get()) {
+ policy->enqueued(m);
+ }
+ /** todo make flowlimit an observer */
+ if (flowLimit.get())
+ flowLimit->enqueued(m);
+ mgntEnqStats(m.payload);
+}
+
+void Queue::updateEnqueued(const QueuedMessage& m)
+{
+ if (m.payload) {
boost::intrusive_ptr<Message> payload = m.payload;
enqueue ( 0, payload, true );
+ if (policy.get()) {
+ policy->recoverEnqueued(payload);
+ }
+ enqueued(m);
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}
@@ -1205,6 +1161,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
}
QueueListeners& Queue::getListeners() { return listeners; }
+Messages& Queue::getMessages() { return *messages; }
+const Messages& Queue::getMessages() const { return *messages; }
void Queue::checkNotDeleted()
{
@@ -1213,6 +1171,11 @@ void Queue::checkNotDeleted()
}
}
+void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
+{
+ observers.insert(observer);
+}
+
void Queue::flush()
{
ScopedUse u(barrier);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 5af630f3c8..e8429128f7 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -26,14 +26,17 @@
#include "qpid/broker/OwnershipToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
#include "qpid/framing/amqp_types.h"
@@ -46,6 +49,7 @@
#include <vector>
#include <memory>
#include <deque>
+#include <set>
#include <algorithm>
namespace qpid {
@@ -86,10 +90,10 @@ class Queue : public boost::enable_shared_from_this<Queue>,
~ScopedUse() { if (acquired) barrier.release(); }
};
- typedef std::deque<QueuedMessage> Messages;
- typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ;
+ typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+
const std::string name;
const bool autodelete;
MessageStore* store;
@@ -97,16 +101,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
uint32_t consumerCount;
OwnershipToken* exclusive;
bool noLocal;
- bool lastValueQueue;
- bool lastValueQueueNoBrowse;
bool persistLastNode;
bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
QueueListeners listeners;
- Messages messages;
- Messages pendingDequeues;//used to avoid dequeuing during recovery
- LVQ lvq;
+ std::auto_ptr<Messages> messages;
+ std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Monitor messageLock;
mutable qpid::sys::Mutex ownershipLock;
@@ -122,12 +123,14 @@ class Queue : public boost::enable_shared_from_this<Queue>,
qmf::org::apache::qpid::broker::Queue* mgmtObject;
RateTracker dequeueTracker;
int eventMode;
- QueueEvents* eventMgr;
+ Observers observers;
bool insertSeqNo;
std::string seqNoKey;
Broker* broker;
bool deleted;
UsageBarrier barrier;
+ int autoDeleteTimeout;
+ boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -141,12 +144,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
bool isExcluded(boost::intrusive_ptr<Message>& msg);
+ void enqueued(const QueuedMessage& msg);
void dequeued(const QueuedMessage& msg);
- void popMsg(QueuedMessage& qmsg);
+ void pop();
void popAndDequeue();
QueuedMessage getFront();
- QueuedMessage& checkLvqReplace(QueuedMessage& msg);
- void clearLVQIndex(const QueuedMessage& msg);
+ void forcePersistent(QueuedMessage& msg);
+ int getEventMode();
inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
{
@@ -171,7 +175,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
}
}
- Messages::iterator findAt(framing::SequenceNumber pos);
void checkNotDeleted();
public:
@@ -277,7 +280,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
* thus are still logically on the queue) - used in
* clustered broker.
*/
- void enqueued(const QueuedMessage& msg);
+ void updateEnqueued(const QueuedMessage& msg);
/**
* Test whether the specified message (identified by its
@@ -322,13 +325,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** Apply f to each Message on the queue. */
template <class F> void eachMessage(F f) {
sys::Mutex::ScopedLock l(messageLock);
- if (lastValueQueue) {
- for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(checkLvqReplace(*i));
- }
- } else {
- std::for_each(messages.begin(), messages.end(), f);
- }
+ messages->foreach(f);
}
/** Apply f to each QueueBinding on the queue */
@@ -344,8 +341,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
/** return current position sequence number for the next message on the queue.
*/
QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
- int getEventMode();
- void setQueueEventManager(QueueEvents&);
+ void addObserver(boost::shared_ptr<QueueObserver>);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
/**
* Notify queue that recovery has completed.
@@ -354,6 +350,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
// For cluster update
QueueListeners& getListeners();
+ Messages& getMessages();
+ const Messages& getMessages() const;
/**
* Reserve space in policy for an enqueued message that
diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
index bba054b0b8..2c540ff1ad 100644
--- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp
@@ -19,6 +19,8 @@
*
*/
#include "qpid/broker/QueueEvents.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
@@ -115,6 +117,29 @@ bool QueueEvents::isSync()
return sync;
}
+class EventGenerator : public QueueObserver
+{
+ public:
+ EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly) {}
+ void enqueued(const QueuedMessage& m)
+ {
+ manager.enqueued(m);
+ }
+ void dequeued(const QueuedMessage& m)
+ {
+ if (!enqueueOnly) manager.dequeued(m);
+ }
+ private:
+ QueueEvents& manager;
+ const bool enqueueOnly;
+};
+
+void QueueEvents::observe(Queue& queue, bool enqueueOnly)
+{
+ boost::shared_ptr<QueueObserver> observer(new EventGenerator(*this, enqueueOnly));
+ queue.addObserver(observer);
+}
+
QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.h b/qpid/cpp/src/qpid/broker/QueueEvents.h
index c42752133e..fcddfe9092 100644
--- a/qpid/cpp/src/qpid/broker/QueueEvents.h
+++ b/qpid/cpp/src/qpid/broker/QueueEvents.h
@@ -63,6 +63,7 @@ class QueueEvents
QPID_BROKER_EXTERN void unregisterListener(const std::string& id);
void enable();
void disable();
+ void observe(Queue&, bool enqueueOnly);
//process all outstanding events
QPID_BROKER_EXTERN void shutdown();
QPID_BROKER_EXTERN bool isSync();
diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h
new file mode 100644
index 0000000000..a711213dee
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueObserver.h
@@ -0,0 +1,42 @@
+#ifndef QPID_BROKER_QUEUEOBSERVER_H
+#define QPID_BROKER_QUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace broker {
+
+class QueuedMessage;
+/**
+ * Interface for notifying classes who want to act as 'observers' of a
+ * queue of particular events.
+ */
+class QueueObserver
+{
+ public:
+ virtual ~QueueObserver() {}
+ virtual void enqueued(const QueuedMessage&) = 0;
+ virtual void dequeued(const QueuedMessage&) = 0;
+ private:
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUEOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index f311ea8321..4168221ad0 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/QueuePolicy.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/PriorityQueue.h"
#include "qpid/Exception.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/reply_exceptions.h"
@@ -213,7 +214,10 @@ RingQueuePolicy::RingQueuePolicy(const std::string& _name,
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
- return a.position < b.position;
+ int priorityA = PriorityQueue::getPriority(a);
+ int priorityB = PriorityQueue::getPriority(b);
+ if (priorityA == priorityB) return a.position < b.position;
+ else return priorityA < priorityB;
}
void RingQueuePolicy::enqueued(const QueuedMessage& m)
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 28b2d60cda..ea2531dae7 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -47,7 +47,6 @@ QueueRegistry::declare(const string& declareName, bool durable,
Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker));
queues[name] = queue;
if (lastNode) queue->setLastNodeFailure();
- if (events) queue->setQueueEventManager(*events);
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
@@ -108,8 +107,3 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode)
}
lastNode = _lastNode;
}
-
-void QueueRegistry::setQueueEvents(QueueEvents* e)
-{
- events = e;
-}
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index 66437f9665..57859fe639 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -96,8 +96,6 @@ class QueueRegistry {
*/
std::string generateName();
- void setQueueEvents(QueueEvents*);
-
/**
* Set the store to use. May only be called once.
*/
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 7106f85807..69b364ad7b 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -33,7 +33,7 @@ using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
- connection(c),
+ connection(c),
proxy(out),
clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
@@ -69,7 +69,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
- connection.closeChannel(channel.get());
+ connection.closeChannel(channel.get());
}
void SessionHandler::setState(const std::string& name, bool force) {
@@ -78,7 +78,7 @@ void SessionHandler::setState(const std::string& name, bool force) {
session = connection.broker.getSessionManager().attach(*this, id, force);
}
-void SessionHandler::detaching()
+void SessionHandler::detaching()
{
assert(session.get());
session->disableOutput();
@@ -98,7 +98,10 @@ void SessionHandler::attachAs(const std::string& name)
{
SessionId id(connection.getUserId(), name);
SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
- session.reset(new SessionState(connection.getBroker(), *this, id, config));
+ // Delay creating management object till attached(). In a cluster,
+ // only the active link broker calls attachAs but all brokers
+ // receive the subsequent attached() call.
+ session.reset(new SessionState(connection.getBroker(), *this, id, config, true));
sendAttach(false);
}
@@ -109,6 +112,7 @@ void SessionHandler::attachAs(const std::string& name)
void SessionHandler::attached(const std::string& name)
{
if (session.get()) {
+ session->addManagementObject(); // Delayed from attachAs()
amqp_0_10::SessionHandler::attached(name);
} else {
SessionId id(connection.getUserId(), name);
@@ -117,5 +121,5 @@ void SessionHandler::attached(const std::string& name)
markReadyToSend();
}
}
-
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index d572e37d00..2e69102537 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -53,7 +53,8 @@ using qpid::sys::AbsTime;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
- Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ Broker& b, SessionHandler& h, const SessionId& id,
+ const SessionState::Configuration& config, bool delayManagement)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this, *this),
@@ -71,6 +72,12 @@ SessionState::SessionState(
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
}
+ if (!delayManagement) addManagementObject();
+ attach(h);
+}
+
+void SessionState::addManagementObject() {
+ if (GetManagementObject()) return; // Already added.
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -80,11 +87,11 @@ SessionState::SessionState(
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
- if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate);
+ if (rateFlowcontrol)
+ mgmtObject->set_maxClientRate(rateFlowcontrol->getRate());
agent->addObject(mgmtObject);
}
}
- attach(h);
}
SessionState::~SessionState() {
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index f4c10295b1..568e8593fa 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -73,7 +73,8 @@ class SessionState : public qpid::SessionState,
public framing::FrameHandler::InOutHandler
{
public:
- SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&);
+ SessionState(Broker&, SessionHandler&, const SessionId&,
+ const SessionState::Configuration&, bool delayManagement=false);
~SessionState();
bool isAttached() const { return handler; }
@@ -127,8 +128,11 @@ class SessionState : public qpid::SessionState,
// the SessionState of a received Execution.Sync command.
void addPendingExecutionSync();
- private:
+ // Used to delay creation of management object for sessions
+ // belonging to inter-broker bridges
+ void addManagementObject();
+ private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
new file mode 100644
index 0000000000..4f35884af8
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/ThresholdAlerts.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qmf/org/apache/qpid/broker/EventQueueThresholdExceeded.h"
+
+namespace qpid {
+namespace broker {
+ThresholdAlerts::ThresholdAlerts(const std::string& n,
+ qpid::management::ManagementAgent& a,
+ const uint32_t ct,
+ const uint64_t st,
+ const long repeat)
+ : name(n), agent(a), countThreshold(ct), sizeThreshold(st),
+ repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0),
+ count(0), size(0), lastAlert(qpid::sys::EPOCH) {}
+
+void ThresholdAlerts::enqueued(const QueuedMessage& m)
+{
+ size += m.payload->contentSize();
+ ++count;
+ if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
+ if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
+ || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) {
+ agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
+ lastAlert = qpid::sys::now();
+ }
+ }
+}
+
+void ThresholdAlerts::dequeued(const QueuedMessage& m)
+{
+ size -= m.payload->contentSize();
+ --count;
+ if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) {
+ lastAlert = qpid::sys::EPOCH;
+ }
+}
+
+
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const uint64_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval)
+{
+ if (countThreshold || sizeThreshold) {
+ boost::shared_ptr<QueueObserver> observer(
+ new ThresholdAlerts(queue.getName(), agent, countThreshold, sizeThreshold, repeatInterval)
+ );
+ queue.addObserver(observer);
+ }
+}
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::framing::FieldTable& settings)
+
+{
+ qpid::types::Variant::Map map;
+ qpid::amqp_0_10::translate(settings, map);
+ observe(queue, agent, map);
+}
+
+template <class T>
+class Option
+{
+ public:
+ Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); }
+ void addAlias(const std::string& name) { names.push_back(name); }
+ T get(const qpid::types::Variant::Map& settings) const
+ {
+ T value(defaultValue);
+ for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
+ if (get(settings, *i, value)) break;
+ }
+ return value;
+ }
+ private:
+ std::vector<std::string> names;
+ T defaultValue;
+
+ bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const
+ {
+ qpid::types::Variant::Map::const_iterator i = settings.find(name);
+ if (i != settings.end()) {
+ try {
+ value = (T) i->second;
+ } catch (const qpid::types::InvalidConversion&) {
+ QPID_LOG(warning, "Bad value for" << name << ": " << i->second);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
+
+void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::types::Variant::Map& settings)
+
+{
+ //Note: aliases are keys defined by java broker
+ Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
+ repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
+
+ //If no explicit threshold settings were given use 80% of any
+ //limit from the policy.
+ const QueuePolicy* policy = queue.getPolicy();
+ Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy ? policy->getMaxCount()*0.8 : 0));
+ countThreshold.addAlias("x-qpid-maximum-message-count");
+ Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy ? policy->getMaxSize()*0.8 : 0));
+ sizeThreshold.addAlias("x-qpid-maximum-message-size");
+
+ observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.h b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
new file mode 100644
index 0000000000..e1f59252c4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
@@ -0,0 +1,73 @@
+#ifndef QPID_BROKER_THRESHOLDALERTS_H
+#define QPID_BROKER_THRESHOLDALERTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
+#include <string>
+
+namespace qpid {
+namespace framing {
+class FieldTable;
+}
+namespace management {
+class ManagementAgent;
+}
+namespace broker {
+
+class Queue;
+/**
+ * Class to manage generation of QMF alerts when particular thresholds
+ * are breached on a queue.
+ */
+class ThresholdAlerts : public QueueObserver
+{
+ public:
+ ThresholdAlerts(const std::string& name,
+ qpid::management::ManagementAgent& agent,
+ const uint32_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const uint64_t countThreshold,
+ const uint64_t sizeThreshold,
+ const long repeatInterval);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::framing::FieldTable& settings);
+ static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
+ const qpid::types::Variant::Map& settings);
+ private:
+ const std::string name;
+ qpid::management::ManagementAgent& agent;
+ const uint32_t countThreshold;
+ const uint64_t sizeThreshold;
+ const qpid::sys::Duration repeatInterval;
+ uint64_t count;
+ uint64_t size;
+ qpid::sys::AbsTime lastAlert;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_THRESHOLDALERTS_H*/
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 6acd0a3ced..030b804143 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messaging::Message&)
window = capacity;
}
}
-
-qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!get(result, timeout)) throw NoMessageAvailable();
return result;
}
-
-qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
{
qpid::messaging::Message result;
if (!fetch(result, timeout)) throw NoMessageAvailable();
@@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Dur
return f.result;
}
-void ReceiverImpl::close()
-{
+void ReceiverImpl::close()
+{
execute<Close>();
}
@@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled()
return parent->getUnsettledAcks(destination);
}
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
- const qpid::messaging::Address& a) :
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a) :
- parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
+ parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
state(UNRESOLVED), capacity(0), window(0) {}
bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging:
}
}
-void ReceiverImpl::closeImpl()
-{
+void ReceiverImpl::closeImpl()
+{
sys::Mutex::ScopedLock l(lock);
if (state != CANCELLED) {
state = CANCELLED;
+ sync(session).messageStop(destination);
+ parent->releasePending(destination);
source->cancel(session, destination);
parent->receiverCancelled(destination);
}
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 6d98527627..75a71997fd 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Command
{
qpid::messaging::Receiver result;
const qpid::messaging::Address& address;
-
+
CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createReceiverImpl(address); }
@@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Command
{
qpid::messaging::Sender result;
const qpid::messaging::Address& address;
-
+
CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
Command(i), address(a) {}
void operator()() { result = impl.createSenderImpl(address); }
@@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std::string& name) const
throw KeyError(name);
} else {
return i->second;
- }
+ }
}
Receiver SessionImpl::getReceiver(const std::string& name) const
@@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT
}
}
-bool SessionImpl::accept(ReceiverImpl* receiver,
- qpid::messaging::Message* message,
+bool SessionImpl::accept(ReceiverImpl* receiver,
+ qpid::messaging::Message* message,
IncomingMessages::MessageTransfer& transfer)
{
if (receiver->getName() == transfer.getDestination()) {
@@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
}
@@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command
{
const std::string* destination;
uint32_t result;
-
+
Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getReceivableImpl(destination); }
};
@@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Command
{
const std::string* destination;
uint32_t result;
-
+
UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
void operator()() { result = impl.getUnsettledAcksImpl(destination); }
};
@@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl()
getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
}
//ensure that stop has been processed and all previously sent
- //messages are available for release:
+ //messages are available for release:
session.sync();
incoming.releaseAll();
- session.txRollback();
+ session.txRollback();
for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
@@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(const std::string& name)
incoming.releasePending(name);
}
+void SessionImpl::releasePending(const std::string& name)
+{
+ ScopedLock l(lock);
+ incoming.releasePending(name);
+}
+
void SessionImpl::senderCancelled(const std::string& name)
{
ScopedLock l(lock);
@@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->open();
+ connection->open();
}
bool SessionImpl::backoff()
{
- return connection->backoff();
+ return connection->backoff();
}
qpid::messaging::Connection SessionImpl::getConnection() const
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 3dd5cd0189..2a2aa47df6 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,8 +79,9 @@ class SessionImpl : public qpid::messaging::SessionImpl
void checkError();
bool hasError();
- bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout);
+ void releasePending(const std::string& destination);
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
@@ -110,7 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
} catch (const qpid::ConnectionException& e) {
throw qpid::messaging::ConnectionError(e.what());
} catch (const qpid::ChannelException& e) {
- throw qpid::messaging::MessagingException(e.what());
+ throw qpid::messaging::MessagingException(e.what());
}
}
@@ -206,11 +207,11 @@ class SessionImpl : public qpid::messaging::SessionImpl
struct Acknowledge1 : Command
{
qpid::messaging::Message& message;
-
+
Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
void operator()() { impl.acknowledgeImpl(message); }
};
-
+
struct CreateSender;
struct CreateReceiver;
struct UnsettledAcks;
@@ -222,12 +223,12 @@ class SessionImpl : public qpid::messaging::SessionImpl
F f(*this);
return execute(f);
}
-
+
template <class F> void retry()
{
while (!execute<F>()) {}
}
-
+
template <class F, class P> bool execute1(P p)
{
F f(*this, p);
diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
index baeaafb478..f6e1c7a849 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -5,7 +5,7 @@
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
+ * "License "); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -33,6 +33,25 @@ using std::max;
using sys::Timer;
using sys::TimerTask;
+//
+// Note on use of Broker::getTimer() rather than getClusterTime in broker code.
+// The following uses of getTimer() are cluster safe:
+//
+// LinkRegistry: maintenance visits in timer can call Bridge::create/cancel
+// but these don't modify any management state.
+//
+// broker::Connection:
+// - Heartbeats use ClusterOrderOutput to ensure consistency
+// - timeout: aborts connection in timer, cluster does an orderly connection close.
+//
+// SessionState: scheduledCredit - uses ClusterOrderProxy
+// Broker::queueCleaner: cluster implements ExpiryPolicy for consistent expiry.
+//
+// Broker::dtxManager: dtx disabled with cluster.
+//
+// requestIOProcessing: called in doOutput.
+//
+
ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {
// Allow more generous overrun threshold with cluster as we
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index c7689577a7..e9b718e6de 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -32,6 +32,7 @@
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Queue.h"
@@ -528,7 +529,7 @@ void Connection::deliveryRecord(const string& qname,
m = getUpdateMessage();
m.queue = queue.get();
m.position = position;
- if (enqueued) queue->enqueued(m); //inform queue of the message
+ if (enqueued) queue->updateEnqueued(m); //inform queue of the message
} else { // Message at original position in original queue
m = queue->find(position);
}
@@ -548,6 +549,13 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi
findQueue(qname)->setPosition(position);
}
+void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count)
+{
+ if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) {
+ QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies.");
+ }
+}
+
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index d90cdd898b..7ee85bf1aa 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -152,6 +152,7 @@ class Connection :
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void expiryId(uint64_t);
void txStart();
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 4f6488a28a..8f751add9b 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -32,6 +32,7 @@
#include "qpid/client/ConnectionImpl.h"
#include "qpid/client/Future.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Fairshare.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
@@ -352,6 +353,10 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1));
ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition());
+ uint priority, count;
+ if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
+ ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
+ }
}
void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index c9d53c028b..cf065e1ba9 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -62,7 +62,7 @@ endif (MSVC)
# Like this to work with cmake 2.4 on Unix
set (qpid_test_boost_libs
- ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY})
+ ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} ${Boost_SYSTEM_LIBRARY})
# Macro to make it easier to remember where the tests are built
macro(remember_location testname)
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index fc1632b4e1..991ec847bf 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture
~QueueCreatePolicyFixture()
{
- admin.deleteQueue(address.getName());
+ admin.deleteQueue(address.getName());
}
};
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture
~ExchangeCreatePolicyFixture()
{
- admin.deleteExchange(address.getName());
+ admin.deleteExchange(address.getName());
}
};
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue)
s1.close();
Receiver r1 = fix.session.createReceiver(a1);
r1.close();
-
+
std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}";
Sender s2 = fix.session.createSender(a2);
s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification)
{
MessagingFixture fix;
fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
- BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
+ BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
}
QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,19 +775,48 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber)
QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
{
MessagingFixture fix;
-
+
std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }";
std::string browseAddress = "exclusive-queue; { mode: browse }";
Receiver receiver = fix.session.createReceiver(address);
fix.session.sync();
- Connection c2 = fix.newConnection();
+ Connection c2 = fix.newConnection();
c2.open();
Session s2 = c2.createSession();
-
+
BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
- c2.close();
+ c2.close();
+}
+
+
+QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
+{
+ MessagingFixture fix;
+ const uint capacity = 5;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+ Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
+ Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
+
+ receiver1.setCapacity(capacity);
+ receiver2.setCapacity(capacity*2);
+
+ Message out("test-message");
+ for (uint i = 0; i < capacity*2; ++i) {
+ sender.send(out);
+ }
+
+ receiver1.close();
+
+ // Make sure all pending messages were sent to the alternate
+ // exchange when the queue was deleted.
+ Message in;
+ for (uint i = 0; i < capacity*2; ++i) {
+ in = receiver2.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+ }
}
QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
@@ -828,7 +857,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
messages.push_back(msg);
}
const uint batch(10); //acknowledge first 10 messages only
- for (uint i = 0; i < batch; ++i) {
+ for (uint i = 0; i < batch; ++i) {
other.acknowledge(messages[i]);
}
messages.clear();
@@ -836,7 +865,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
other.close();
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < (count-batch); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str());
@@ -847,7 +876,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
//check unacknowledged messages are still enqueued
other = fix.connection.createSession();
- receiver = other.createReceiver(fix.queue);
+ receiver = other.createReceiver(fix.queue);
for (uint i = 0; i < ((count-batch)/2); ++i) {
Message msg = receiver.fetch();
BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str());
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index c9e6d79ee7..7d17dd7bde 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -94,7 +94,7 @@ cluster_test_SOURCES = \
cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
-qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
+qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)
endif
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 903a20ec28..f2ccd0ba84 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -1191,5 +1191,41 @@ QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
}
+QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) {
+ ClusterFixture::Args args;
+ prepareArgs(args, durableFlag);
+ ClusterFixture cluster(1, args, -1);
+ Client c0(cluster[0], "c0");
+
+ FieldTable arguments;
+ arguments.setInt("x-qpid-priorities", 10);
+ arguments.setInt("x-qpid-fairshare", 5);
+ c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments);
+
+ //send messages of different priorities
+ for (int i = 0; i < 20; i++) {
+ Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag);
+ msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5);
+ c0.session.messageTransfer(arg::content=msg);
+ }
+
+ //pull off a couple of the messages (first four should be the top priority messages
+ for (int i = 0; i < 4; i++) {
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData());
+ }
+
+ // Add another member
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ //pull off some more messages
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData());
+ BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData());
+
+ //check queue has same content on both nodes
+ BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index e865a49813..1f77226b4d 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -65,6 +65,8 @@ op.add_option("--connection-options", type="str",
help="Connection options for senders & receivers")
op.add_option("--flow-control", default=0, type="int", metavar="N",
help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.")
+op.add_option("--durable", default=False, action="store_true",
+ help="Use durable queues and messages")
single_quote_re = re.compile("'")
def posix_quote(string):
@@ -76,7 +78,9 @@ def ssh_command(host, command):
return ["ssh", host] + [posix_quote(arg) for arg in command]
def start_receive(queue, index, opts, ready_queue, broker, host):
- address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option))
+ address_opts=["create:receiver"] + opts.receive_option
+ if opts.durable: address_opts += ["node:{durable:true}"]
+ address="%s;{%s}"%(queue,",".join(address_opts))
msg_total=opts.senders*opts.messages
messages = msg_total/opts.receivers;
if (index < msg_total%opts.receivers): messages += 1
@@ -111,7 +115,8 @@ def start_send(queue, opts, broker, host):
"--report-header=no",
"--timestamp=%s"%(opts.timestamp and "yes" or "no"),
"--sequence=no",
- "--flow-control", str(opts.flow_control)
+ "--flow-control", str(opts.flow_control),
+ "--durable", str(opts.durable)
]
command += opts.send_arg
if opts.connection_options:
@@ -140,12 +145,12 @@ def print_header(timestamp):
def parse(parser, lines): # Parse sender/receiver output
for l in lines:
fn_val = zip(parser, l)
-
+
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
def parse_senders(senders):
return parse([int],[first_line(p) for p in senders])
-
+
def parse_receivers(receivers):
return parse([int,float,float,float],[first_line(p) for p in receivers if p])
@@ -168,7 +173,7 @@ def print_summary(send_stats, recv_stats):
l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
print summary
-
+
class ReadyReceiver:
"""A receiver for ready messages"""
@@ -177,7 +182,7 @@ class ReadyReceiver:
self.connection = qpid.messaging.Connection(broker)
self.connection.open()
self.receiver = self.connection.session().receiver(
- "%s;{create:always,delete:always}"%(queue))
+ "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
self.receiver.session.sync()
self.timeout=2
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 28e229ca27..012d544a2e 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -206,6 +206,7 @@ int main(int argc, char ** argv)
if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
+ if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl;
if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl;
std::cout << "Properties: " << msg.getProperties() << std::endl;
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index c71cb83f9a..6a7e7838ce 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -56,6 +56,7 @@ struct Options : public qpid::Options
uint sendEos;
bool durable;
uint ttl;
+ uint priority;
std::string userid;
std::string correlationid;
string_vector properties;
@@ -84,6 +85,7 @@ struct Options : public qpid::Options
sendEos(0),
durable(false),
ttl(0),
+ priority(0),
contentString(),
contentSize(0),
contentStdin(false),
@@ -110,6 +112,7 @@ struct Options : public qpid::Options
("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input")
("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.")
("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds")
+ ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)")
("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property")
("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message")
("user-id", qpid::optValue(userid, "USERID"), "userid for message")
@@ -266,7 +269,14 @@ int main(int argc, char ** argv)
if (opts.ttl) {
msg.setTtl(Duration(opts.ttl));
}
- if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto));
+ if (opts.priority) {
+ msg.setPriority(opts.priority);
+ }
+ if (!opts.replyto.empty()) {
+ if (opts.flowControl)
+ throw Exception("Can't use reply-to and flow-control together");
+ msg.setReplyTo(Address(opts.replyto));
+ }
if (!opts.userid.empty()) msg.setUserId(opts.userid);
if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid);
opts.setProperties(msg);
@@ -305,13 +315,17 @@ int main(int argc, char ** argv)
if (opts.timestamp)
msg.getProperties()[TS] = int64_t(
qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
- if (opts.flowControl && ((sent % opts.flowControl) == 0)) {
- msg.setReplyTo(flowControlAddress);
- ++flowSent;
+ if (opts.flowControl) {
+ if ((sent % opts.flowControl) == 0) {
+ msg.setReplyTo(flowControlAddress);
+ ++flowSent;
+ }
+ else
+ msg.setReplyTo(Address()); // Clear the reply address.
}
-
sender.send(msg);
reporter.message(msg);
+
if (opts.tx && (sent % opts.tx == 0)) {
if (opts.rollbackFrequency &&
(++txCount % opts.rollbackFrequency == 0))
@@ -331,7 +345,6 @@ int main(int argc, char ** argv)
int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill);
if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC);
}
- msg = Message(); // Clear out contents and properties for next iteration
}
for ( ; flowSent>0; --flowSent)
flowControlReceiver.get(Duration::SECOND);
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 5e407a061f..be1c1f868c 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -275,6 +275,13 @@
<!-- Replicate encoded config objects - e.g. links and bridges. -->
<control name="config" code="0x37"><field name="encoded" type="str32"/></control>
+
+ <!-- Set the fairshare delivery related state of a replicated queue. -->
+ <control name="queue-fairshare-state" code="0x38">
+ <field name="queue" type="str8"/>
+ <field name="position" type="uint8"/>
+ <field name="count" type="uint8"/>
+ </control>
</class>
</amqp>
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 3c0b9f0434..6f4c11ae15 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -380,7 +380,8 @@ class Object(object):
dp.routing_key = self.getV2RoutingKey()
mp = self._broker.amqpSession.message_properties()
mp.content_type = "amqp/map"
- mp.user_id = self._broker.authUser
+ if self._broker.saslUser:
+ mp.user_id = self._broker.saslUser
mp.correlation_id = str(seq)
mp.app_id = "qmf2"
mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_direct_queue)
@@ -1492,7 +1493,8 @@ class Session:
dp.routing_key = objectId.getV2RoutingKey()
mp = broker.amqpSession.message_properties()
mp.content_type = "amqp/map"
- mp.user_id = broker.authUser
+ if broker.saslUser:
+ mp.user_id = broker.saslUser
mp.correlation_id = str(seq)
mp.app_id = "qmf2"
mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_direct_queue)
@@ -2236,6 +2238,7 @@ class Broker(Thread):
self.connTimeout = connTimeout
self.authUser = authUser
self.authPass = authPass
+ self.saslUser = None
self.cv = Condition()
self.seqToAgentMap = {}
self.error = None
@@ -2409,6 +2412,11 @@ class Broker(Thread):
self.conn.start()
sock.settimeout(oldTimeout)
self.conn.aborted = oldAborted
+ uid = self.conn.user_id
+ if uid.__class__ == tuple and len(uid) == 2:
+ self.saslUser = uid[1]
+ else:
+ self.saslUser = None
# prevent topic queues from filling up (and causing the agents to
# disconnect) by discarding the oldest queued messages when full.
@@ -2588,7 +2596,8 @@ class Broker(Thread):
dp.routing_key = "console.request.agent_locate"
mp = self.amqpSession.message_properties()
mp.content_type = "amqp/list"
- mp.user_id = self.authUser
+ if self.saslUser:
+ mp.user_id = self.saslUser
mp.app_id = "qmf2"
mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue)
mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
@@ -2630,7 +2639,8 @@ class Broker(Thread):
dp.ttl = ttl
mp = self.amqpSession.message_properties()
mp.content_type = "x-application/qmf"
- mp.user_id = self.authUser
+ if self.saslUser:
+ mp.user_id = self.saslUser
mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)
return Message(dp, mp, body)
@@ -3543,7 +3553,8 @@ class Agent:
dp.routing_key = self.getV2RoutingKey()
mp = self.broker.amqpSession.message_properties()
mp.content_type = "amqp/map"
- mp.user_id = self.broker.authUser
+ if self.broker.saslUser:
+ mp.user_id = self.broker.saslUser
mp.correlation_id = str(sequence)
mp.app_id = "qmf2"
mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue)
diff --git a/qpid/extras/sasl/m4/ac_pkg_swig.m4 b/qpid/extras/sasl/m4/ac_pkg_swig.m4
index 3bff433f80..6e385c067c 100644
--- a/qpid/extras/sasl/m4/ac_pkg_swig.m4
+++ b/qpid/extras/sasl/m4/ac_pkg_swig.m4
@@ -56,6 +56,8 @@
# Macro released by the Autoconf Archive. When you make and distribute a
# modified version of the Autoconf Macro, you may extend this special
# exception to the GPL to apply to your modified version as well.
+#
+# Fixed by Sandro Santilli to consider 2.0.0 > 1.3.37 (2010-06-15)
AC_DEFUN([AC_PROG_SWIG],[
AC_PATH_PROG([SWIG],[swig])
@@ -99,9 +101,9 @@ AC_DEFUN([AC_PROG_SWIG],[
if test -z "$available_patch" ; then
[available_patch=0]
fi
- if test $available_major -ne $required_major \
- -o $available_minor -ne $required_minor \
- -o $available_patch -lt $required_patch ; then
+ [required_full=`printf %2.2d%2.2d%2.2d%2.2d $required_major $required_minor $required_patch]`
+ [available_full=`printf %2.2d%2.2d%2.2d%2.2d $available_major $available_minor $available_patch]`
+ if test $available_full -lt $required_full; then
AC_MSG_WARN([SWIG version >= $1 is required. You have $swig_version. You should look at http://www.swig.org])
SWIG='echo "Error: SWIG version >= $1 is required. You have '"$swig_version"'. You should look at http://www.swig.org" ; false'
else
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index af0d8a3a1d..ab59fee020 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -304,7 +304,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
else
{
// use the default value set for all connections
- _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
+ _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish);
}
if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null)
diff --git a/qpid/java/common.xml b/qpid/java/common.xml
index b1f28dc062..066859b29f 100644
--- a/qpid/java/common.xml
+++ b/qpid/java/common.xml
@@ -70,6 +70,13 @@
</fileset>
</path>
+ <property name="maven.local.repo" value="${build.scratch}/maven-local-repo"/>
+ <property name="maven.unique.version" value="false"/>
+ <property name="maven.snapshot" value="true"/>
+ <condition property="maven.version.suffix" value="" else="-SNAPSHOT">
+ <isfalse value="${maven.snapshot}"/>
+ </condition>
+
<macrodef name="indirect">
<attribute name="name"/>
<attribute name="variable"/>
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index bce64075e5..0d9f8c0b28 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -28,6 +28,7 @@ import org.ietf.jgss.Oid;
import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import static org.apache.qpid.transport.Connection.State.OPEN;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
import org.apache.qpid.transport.util.Logger;
import javax.security.sasl.Sasl;
@@ -216,7 +217,14 @@ public class ClientDelegate extends ConnectionDelegate
}
}
- conn.setState(OPEN);
+ if (conn.isConnectionResuming())
+ {
+ conn.setState(RESUMING);
+ }
+ else
+ {
+ conn.setState(OPEN);
+ }
}
@Override
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index fd19fa0512..e5e10c0e07 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -25,6 +25,7 @@ import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
+import static org.apache.qpid.transport.Connection.State.RESUMING;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
@@ -63,7 +65,7 @@ public class Connection extends ConnectionInvoker
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
- public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+ public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
static class DefaultConnectionListener implements ConnectionListener
{
@@ -119,7 +121,8 @@ public class Connection extends ConnectionInvoker
private static final AtomicLong idGenerator = new AtomicLong(0);
private final long _connectionId = idGenerator.incrementAndGet();
-
+ private final AtomicBoolean connectionLost = new AtomicBoolean(false);
+
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -270,6 +273,8 @@ public class Connection extends ConnectionInvoker
close();
throw new ConnectionException("connect() timed out");
case OPEN:
+ case RESUMING:
+ connectionLost.set(false);
break;
case CLOSED:
throw new ConnectionException("connect() aborted");
@@ -313,6 +318,17 @@ public class Connection extends ConnectionInvoker
{
synchronized (lock)
{
+ Waiter w = new Waiter(lock, timeout);
+ while (w.hasTime() && state != OPEN && error == null)
+ {
+ w.await();
+ }
+
+ if (state != OPEN)
+ {
+ throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
+ }
+
Session ssn = _sessionFactory.newSession(this, name, expiry);
sessions.put(name, ssn);
map(ssn);
@@ -452,15 +468,25 @@ public class Connection extends ConnectionInvoker
{
for (Session ssn : sessions.values())
{
- map(ssn);
- ssn.attach();
- ssn.resume();
+ if (ssn.isTransacted())
+ {
+ removeSession(ssn);
+ ssn.setState(Session.State.CLOSED);
+ }
+ else
+ {
+ map(ssn);
+ ssn.attach();
+ ssn.resume();
+ }
}
+ setState(OPEN);
}
}
public void exception(ConnectionException e)
{
+ connectionLost.set(true);
synchronized (lock)
{
switch (state)
@@ -663,5 +689,10 @@ public class Connection extends ConnectionInvoker
{
return securityLayer;
}
+
+ public boolean isConnectionResuming()
+ {
+ return connectionLost.get();
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 4de578da18..214d4534c1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -532,11 +532,22 @@ public class Session extends SessionInvoker
{
if (m.getEncodedTrack() == Frame.L4)
{
+
+ if (state == DETACHED && transacted)
+ {
+ state = CLOSED;
+ delegate.closed(this);
+ connection.removeSession(this);
+ throw new SessionException(
+ "Session failed over, possibly in the middle of a transaction. " +
+ "Closing the session. Any Transaction in progress will be rolledback.");
+ }
+
if (m.hasPayload())
{
acquireCredit();
}
-
+
synchronized (commands)
{
if (state == DETACHED && m.isUnreliable())
@@ -1002,4 +1013,8 @@ public class Session extends SessionInvoker
this.transacted = b;
}
+ public boolean isTransacted(){
+ return transacted;
+ }
+
}
diff --git a/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert b/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert
deleted file mode 100644
index e6702108e6..0000000000
--- a/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert
+++ /dev/null
Binary files differ
diff --git a/qpid/java/module.xml b/qpid/java/module.xml
index 877ca130af..d3954a1544 100644
--- a/qpid/java/module.xml
+++ b/qpid/java/module.xml
@@ -219,10 +219,10 @@
<args>
<arg line='"${project.root}/genpom"'/>
<arg line='-s "${project.root}/lib/poms"'/>
- <arg line='-o "${build}/qpid-${module.name}.pom"'/>
+ <arg line='-o "${build.scratch}/qpid-${module.name}.pom"'/>
<arg line="-u http://qpid.apache.org"/>
<arg line="-g org.apache.qpid"/>
- <arg line="-v ${project.version}"/>
+ <arg line="-v ${project.version}${maven.version.suffix}"/>
<arg line="-p qpid"/>
<arg line='-m "${module.depends}"'/>
<arg line="-a ${module.name}"/>
@@ -235,12 +235,15 @@
<target name="release-mvn" depends="pom" if="module.genpom" description="Install the artifacts into the local repository and prepare the release">
<antcall target="build"/>
- <artifact:pom id="module.pom" file="${build}/qpid-${module.name}.pom"/>
+ <artifact:pom id="module.pom" file="${build.scratch}/qpid-${module.name}.pom"/>
- <artifact:install file="${module.jar}" pomRefId="module.pom"/>
+ <artifact:install file="${module.jar}" pomRefId="module.pom">
+ <localRepository path="${maven.local.repo}"/>
+ </artifact:install>
- <artifact:deploy file="${module.jar}" pomRefId="module.pom">
+ <artifact:deploy file="${module.jar}" pomRefId="module.pom" uniqueVersion="${maven.unique.version}">
<attach file="${module.source.jar}" classifier="sources"/>
+ <localRepository path="${maven.local.repo}"/>
<remoteRepository url="file://${module.release.base}/maven"/>
</artifact:deploy>
</target>
diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
deleted file mode 100644
index ff0d58ad69..0000000000
--- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.util.TimedRun;
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-
-import java.util.List;
-
-/**
- * Want to vary the number of regsitrations, messages and matches and measure
- * the corresponding variance in execution time.
- * <p/>
- * Each registration will contain the 'All' header, even registrations will
- * contain the 'Even' header and odd headers will contain the 'Odd' header.
- * In additions each regsitration will have a unique value for the 'Specific'
- * header as well.
- * <p/>
- * Messages can then be routed to all registrations, to even- or odd- registrations
- * or to a specific registration.
- *
- */
-public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest
-{
- private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC}
-
- private final TestQueue[] queues;
- private final Mode mode;
-
- public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException
- {
- this.mode = mode;
- queues = new TestQueue[registrations];
- for (int i = 0; i < queues.length; i++)
- {
- switch(mode)
- {
- case ALL:
- queues[i] = bind(new FastQueue("Queue" + i), "All");
- break;
- case ODD_OR_EVEN:
- queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i));
- break;
- case SPECIFIC:
- queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i);
- break;
- }
- }
- }
-
- void sendToAll(int count) throws AMQException
- {
- send(count, "All=True");
- }
-
- void sendToOdd(int count) throws AMQException
- {
- send(count, "All=True", "Odd=True");
- }
-
- void sendToEven(int count) throws AMQException
- {
- send(count, "All=True", "Even=True");
- }
-
- void sendToAllSpecifically(int count) throws AMQException
- {
- for (int i = 0; i < queues.length; i++)
- {
- sendToSpecific(count, i);
- }
- }
-
- void sendToSpecific(int count, int index) throws AMQException
- {
- send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index);
- }
-
- private void send(int count, String... headers) throws AMQException
- {
- for (int i = 0; i < count; i++)
- {
- route(new Message("Message" + i, headers));
- }
- }
-
- private static String oddOrEven(int i)
- {
- return (i % 2 == 0 ? "Even" : "Odd");
- }
-
- static class FastQueue extends TestQueue
- {
-
- public FastQueue(String name) throws AMQException
- {
- super(name);
- }
-
- public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException
- {
- //just discard as we are not testing routing functionality here
- }
- }
-
- static class Test extends TimedRun
- {
- private final Mode mode;
- private final int registrations;
- private final int count;
- private HeadersExchangePerformanceTest test;
-
- Test(Mode mode, int registrations, int count)
- {
- super(mode + ", registrations=" + registrations + ", count=" + count);
- this.mode = mode;
- this.registrations = registrations;
- this.count = count;
- }
-
- protected void setup() throws Exception
- {
- test = new HeadersExchangePerformanceTest(mode, registrations);
- run(100); //do a warm up run before times start
- }
-
- protected void teardown() throws Exception
- {
- test = null;
- System.gc();
- }
-
- protected void run() throws Exception
- {
- run(count);
- }
-
- private void run(int count) throws Exception
- {
- switch(mode)
- {
- case ALL:
- test.sendToAll(count);
- break;
- default:
- System.out.println("Test for " + mode + " not yet implemented.");
- }
- }
- }
-
- public static void main(String[] argv) throws Exception
- {
- int registrations = Integer.parseInt(argv[0]);
- int messages = Integer.parseInt(argv[1]);
- int iterations = Integer.parseInt(argv[2]);
- TimedRun test = new Test(Mode.ALL, registrations, messages);
- AveragedRun tests = new AveragedRun(test, iterations);
- System.out.println(tests.call());
- }
-}
-
diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
deleted file mode 100644
index e76c164f64..0000000000
--- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.codec.AMQDecoder;
-import org.apache.qpid.codec.AMQEncoder;
-import org.apache.qpid.framing.*;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.mina.filter.codec.ProtocolEncoderOutput;
-import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput;
-
-import junit.framework.TestCase;
-
-/**
- * This test suite tests the handling of protocol initiation frames and related issues.
- */
-public class TestProtocolInitiation extends TestCase implements ProtocolVersionList
-{
- private AMQPFastProtocolHandler _protocolHandler;
-
- private MockIoSession _mockIoSession;
-
- /**
- * We need to use the object encoder mechanism so to allow us to retrieve the
- * output (a bytebuffer) we define our own encoder output class. The encoder
- * writes the encoded data to this class, from where we can retrieve it during
- * the test run.
- */
- private class TestProtocolEncoderOutput implements ProtocolEncoderOutput
- {
- public ByteBuffer result;
-
- public void write(ByteBuffer buf)
- {
- result = buf;
- }
-
- public void mergeAll()
- {
- throw new UnsupportedOperationException();
- }
-
- public WriteFuture flush()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- private class TestProtocolDecoderOutput implements ProtocolDecoderOutput
- {
- public Object result;
-
- public void write(Object buf)
- {
- result = buf;
- }
-
- public void flush()
- {
- throw new UnsupportedOperationException();
- }
- }
-
- protected void setUp() throws Exception
- {
- super.setUp();
- _mockIoSession = new MockIoSession();
- _protocolHandler = new AMQPFastProtocolHandler(null, null);
- }
-
-
- /**
- * Tests that the AMQDecoder handles invalid protocol classes
- * @throws Exception
- */
- public void testDecoderValidateProtocolClass() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolClass = 2;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolClassException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolClassException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder handles invalid protocol instance numbers
- * @throws Exception
- */
- public void testDecoderValidatesProtocolInstance() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolInstance = 2;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolInstanceException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolInstanceException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder handles invalid protocol major
- * @throws Exception
- */
- public void testDecoderValidatesProtocolMajor() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolMajor = 2;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolVersionException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolVersionException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder handles invalid protocol minor
- * @throws Exception
- */
- public void testDecoderValidatesProtocolMinor() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.protocolMinor = 99;
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolVersionException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolVersionException, got " + e);
- }
- }
-
- /**
- * Tests that the AMQDecoder accepts a valid PI
- * @throws Exception
- */
- public void testDecoderValidatesHeader() throws Exception
- {
- try
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- pi.header = new char[] {'P', 'Q', 'M', 'A' };
- decodePI(pi);
- fail("expected exception did not occur");
- }
- catch (AMQProtocolHeaderException m)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected AMQProtocolHeaderException, got " + e);
- }
- }
-
- /**
- * Test that a valid header is passed by the decoder.
- * @throws Exception
- */
- public void testDecoderAcceptsValidHeader() throws Exception
- {
- ProtocolInitiation pi = createValidProtocolInitiation();
- decodePI(pi);
- }
-
- /**
- * This test checks that an invalid protocol header results in the
- * connection being closed.
- */
- public void testInvalidProtocolHeaderClosesConnection() throws Exception
- {
- AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test");
- _protocolHandler.exceptionCaught(_mockIoSession, pe);
- assertNotNull(_mockIoSession.getLastWrittenObject());
- Object piResponse = _mockIoSession.getLastWrittenObject();
- assertEquals(piResponse.getClass(), ProtocolInitiation.class);
- ProtocolInitiation pi = (ProtocolInitiation) piResponse;
- assertEquals("Protocol Initiation sent out was not the broker's expected header", pi,
- createValidProtocolInitiation());
- assertTrue("Session has not been closed", _mockIoSession.isClosing());
- }
-
- private ProtocolInitiation createValidProtocolInitiation()
- {
- /* Find last protocol version in protocol version list. Make sure last protocol version
- listed in the build file (build-module.xml) is the latest version which will be used
- here. */
- int i = pv.length - 1;
- return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]);
- }
-
- /**
- * Helper that encodes a protocol initiation and attempts to decode it
- * @param pi
- * @throws Exception
- */
- private void decodePI(ProtocolInitiation pi) throws Exception
- {
- // we need to do this test at the level of the decoder since we initially only expect PI frames
- // so the protocol handler is not set up to know whether it should be expecting a PI frame or
- // a different type of frame
- AMQDecoder decoder = new AMQDecoder(true);
- AMQEncoder encoder = new AMQEncoder();
- TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput();
- encoder.encode(_mockIoSession, pi, peo);
- TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput();
- decoder.decode(_mockIoSession, peo.result, pdo);
- ((ProtocolInitiation) pdo.result).checkVersion(this);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(TestProtocolInitiation.class);
- }
-}
diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
deleted file mode 100644
index 11c0026455..0000000000
--- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.server.util.ConcurrentTest;
-
-public class QueueConcurrentPerfTest extends QueuePerfTest
-{
- QueueConcurrentPerfTest(Factory factory, int queueCount, int messages)
- {
- super(factory, queueCount, messages);
- }
-
- public static void main(String[] argv) throws Exception
- {
- Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT};
- int iterations = 5;
- String label = argv.length > 0 ? argv[0]: null;
- System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
- //vary number of queues:
- for(Factory f : factories)
- {
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5));
- run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5));
- }
- }
-}
diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
deleted file mode 100644
index 5b3857396d..0000000000
--- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.server.util.TimedRun;
-import org.apache.qpid.server.util.RunStats;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class QueuePerfTest extends TimedRun
-{
- private final Factory _factory;
- private final int _queueCount;
- private final int _messages;
- private final String _msg = "";
- private List<Queue<String>> _queues;
-
- QueuePerfTest(Factory factory, int queueCount, int messages)
- {
- super(factory + ", " + queueCount + ", " + messages);
- _factory = factory;
- _queueCount = queueCount;
- _messages = messages;
- }
-
- protected void setup() throws Exception
- {
- //init
- int count = Integer.getInteger("prepopulate", 0);
-// System.err.println("Prepopulating with " + count + " items");
- _queues = new ArrayList<Queue<String>>(_queueCount);
- for (int i = 0; i < _queueCount; i++)
- {
- Queue<String> q = _factory.create();
- for(int j = 0; j < count; ++j)
- {
- q.add("Item"+ j);
- }
- _queues.add(q);
- }
- System.gc();
- }
-
- protected void teardown() throws Exception
- {
- System.gc();
- }
-
- protected void run() throws Exception
- {
- //dispatch
- for (int i = 0; i < _messages; i++)
- {
- for (Queue<String> q : _queues)
- {
- q.offer(_msg);
- q.poll();
- }
- }
- }
-
- static interface Factory
- {
- Queue<String> create();
- }
-
- static Factory CONCURRENT = new Factory()
- {
- public Queue<String> create()
- {
- return new ConcurrentLinkedQueue<String>();
- }
-
- public String toString()
- {
- return "ConcurrentLinkedQueue";
- }
-
- };
-
- static Factory SYNCHRONIZED = new Factory()
- {
- public Queue<String> create()
- {
- return new SynchronizedQueue<String>(new LinkedList<String>());
- }
-
-
- public String toString()
- {
- return "Synchronized LinkedList";
- }
- };
-
- static Factory PLAIN = new Factory()
- {
- public Queue<String> create()
- {
- return new LinkedList<String>();
- }
-
- public String toString()
- {
- return "Plain LinkedList";
- }
- };
-
- static class SynchronizedQueue<E> implements Queue<E>
- {
- private final Queue<E> queue;
-
- SynchronizedQueue(Queue<E> queue)
- {
- this.queue = queue;
- }
-
- public synchronized E element()
- {
- return queue.element();
- }
-
- public synchronized boolean offer(E o)
- {
- return queue.offer(o);
- }
-
- public synchronized E peek()
- {
- return queue.peek();
- }
-
- public synchronized E poll()
- {
- return queue.poll();
- }
-
- public synchronized E remove()
- {
- return queue.remove();
- }
-
- public synchronized int size()
- {
- return queue.size();
- }
-
- public synchronized boolean isEmpty()
- {
- return queue.isEmpty();
- }
-
- public synchronized boolean contains(Object o)
- {
- return queue.contains(o);
- }
-
- public synchronized Iterator<E> iterator()
- {
- return queue.iterator();
- }
-
- public synchronized Object[] toArray()
- {
- return queue.toArray();
- }
-
- public synchronized <T>T[] toArray(T[] a)
- {
- return queue.toArray(a);
- }
-
- public synchronized boolean add(E o)
- {
- return queue.add(o);
- }
-
- public synchronized boolean remove(Object o)
- {
- return queue.remove(o);
- }
-
- public synchronized boolean containsAll(Collection<?> c)
- {
- return queue.containsAll(c);
- }
-
- public synchronized boolean addAll(Collection<? extends E> c)
- {
- return queue.addAll(c);
- }
-
- public synchronized boolean removeAll(Collection<?> c)
- {
- return queue.removeAll(c);
- }
-
- public synchronized boolean retainAll(Collection<?> c)
- {
- return queue.retainAll(c);
- }
-
- public synchronized void clear()
- {
- queue.clear();
- }
- }
-
- static void run(String label, AveragedRun test) throws Exception
- {
- RunStats stats = test.call();
- System.out.println((label == null ? "" : label + ", ") + test
- + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin());
- }
-
- public static void main(String[] argv) throws Exception
- {
- Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT};
- int iterations = 5;
- String label = argv.length > 0 ? argv[0]: null;
- System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time");
- //vary number of queues:
-
- for(Factory f : factories)
- {
- run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations));
- run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations));
- }
- }
-
-}
diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
deleted file mode 100644
index 9784b2f671..0000000000
--- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.handler.OnCurrentThreadExecutor;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.MockIoSession;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
-import org.apache.qpid.server.util.AveragedRun;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.server.util.TimedRun;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-
-public class SendPerfTest extends TimedRun
-{
- private int _messages = 1000;
- private int _clients = 10;
- private List<AMQQueue> _queues;
-
- public SendPerfTest(int clients, int messages)
- {
- super("SendPerfTest, msgs=" + messages + ", clients=" + clients);
- _messages = messages;
- _clients = clients;
- }
-
- protected void setup() throws Exception
- {
- _queues = initQueues(_clients);
- System.gc();
- }
-
- protected void teardown() throws Exception
- {
- System.gc();
- }
-
- protected void run() throws Exception
- {
- deliver(_messages, _queues);
- }
-
- //have a dummy AMQProtocolSession that does nothing on the writeFrame()
- //set up x number of queues
- //create necessary bits and pieces to deliver a message
- //deliver y messages to each queue
-
- public static void main(String[] argv) throws Exception
- {
- ApplicationRegistry.initialise(new TestApplicationRegistry(new ServerConfiguration(new PropertiesConfiguration())));
- int clients = Integer.parseInt(argv[0]);
- int messages = Integer.parseInt(argv[1]);
- int iterations = Integer.parseInt(argv[2]);
- AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations);
- test.run();
- }
-
- /**
- * Delivers messages to a number of queues.
- * @param count the number of messages to deliver
- * @param queues the list of queues
- * @throws NoConsumersException
- */
- static void deliver(int count, List<AMQQueue> queues) throws AMQException
- {
- BasicPublishBody publish = new BasicPublishBody();
- publish.exchange = new NullExchange().getName();
- ContentHeaderBody header = new ContentHeaderBody();
- List<ContentBody> body = new ArrayList<ContentBody>();
- MessageStore messageStore = new SkeletonMessageStore();
- // channel can be null since it is only used in ack processing which does not apply to this test
- TransactionalContext txContext = new NonTransactionalContext(messageStore, null,
- new LinkedList<RequiredDeliveryException>());
- body.add(new ContentBody());
- MessageHandleFactory factory = new MessageHandleFactory();
- for (int i = 0; i < count; i++)
- {
- // this routes and delivers the message
- AMQMessage msg = new AMQMessage(i, publish, txContext, header, queues, body, messageStore,
- factory);
- }
- }
-
- static List<AMQQueue> initQueues(int number) throws AMQException
- {
- Exchange exchange = new NullExchange();
- List<AMQQueue> queues = new ArrayList<AMQQueue>(number);
- for (int i = 0; i < number; i++)
- {
- AMQQueue q = createQueue("Queue" + (i + 1));
- q.bind("routingKey", exchange);
- try
- {
- q.registerProtocolSession(createSession(), 1, "1", false);
- }
- catch (Exception e)
- {
- throw new AMQException("Error creating protocol session: " + e, e);
- }
- queues.add(q);
- }
- return queues;
- }
-
- static AMQQueue createQueue(String name) throws AMQException
- {
- return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
- new OnCurrentThreadExecutor());
- }
-
- static AMQProtocolSession createSession() throws Exception
- {
- IApplicationRegistry reg = ApplicationRegistry.getInstance();
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory);
- result.addChannel(new AMQChannel(1, null, null));
- return result;
- }
-
- static class NullExchange extends AbstractExchange
- {
- public String getName()
- {
- return "NullExchange";
- }
-
- protected ExchangeMBean createMBean()
- {
- return null;
- }
-
- public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
- {
- }
-
- public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
- {
- }
-
- public void route(AMQMessage payload) throws AMQException
- {
- }
- }
-}
diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
deleted file mode 100644
index 1ae8d3205d..0000000000
--- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.util;
-
-public class ConcurrentTest extends TimedRun
-{
- private final TimedRun _test;
- private final Thread[] _threads;
-
- public ConcurrentTest(TimedRun test, int threads)
- {
- super(test.toString());
- _test = test;
- _threads = new Thread[threads];
- }
-
- protected void setup() throws Exception
- {
- _test.setup();
- for(int i = 0; i < _threads.length; i++)
- {
- _threads[i] = new Thread(new Runner());
- }
- }
-
- protected void teardown() throws Exception
- {
- _test.teardown();
- }
-
- protected void run() throws Exception
- {
- for(Thread t : _threads)
- {
- t.start();
- }
- for(Thread t : _threads)
- {
- t.join();
- }
- }
-
- private class Runner implements Runnable
- {
- private Exception error;
-
- public void run()
- {
- try
- {
- _test.run();
- }
- catch(Exception e)
- {
- error = e;
- e.printStackTrace();
- }
- }
- }
-
-}
diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
deleted file mode 100644
index 04b6bceb4f..0000000000
--- a/qpid/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.ack;
-
-import junit.framework.TestCase;
-import org.apache.log4j.Logger;
-import org.apache.log4j.xml.DOMConfigurator;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.util.TestApplicationRegistry;
-
-import javax.jms.*;
-
-public class DisconnectAndRedeliverTest extends InternalBrokerBaseCase
-{
- private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class);
-
- static
- {
- String workdir = System.getProperty("QPID_WORK");
- if (workdir == null || workdir.equals(""))
- {
- String tempdir = System.getProperty("java.io.tmpdir");
- System.out.println("QPID_WORK not set using tmp directory: " + tempdir);
- System.setProperty("QPID_WORK", tempdir);
- }
- DOMConfigurator.configure("../broker/etc/log4j.xml");
- }
-
- @Override
- public void createBroker() throws Exception
- {
- super.createBroker();
- TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
- }
-
- @Override
- public void stopBroker()
- {
- TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
- super.stopBroker();
- }
-
- /**
- * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when
- * the channel is closed.
- *
- * @throws Exception
- */
- public void testDisconnectRedeliversMessages() throws Exception
- {
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
-
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
-
- Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- AMQQueue queue = new AMQQueue("someQ", "someQ", false, false);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
-
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
-
-
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
-
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. About to disconnect and reconnect");
-
- con.close();
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
-
- _logger.info("Starting second consumer connection");
- con.start();
-
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg2", tm.getText());
-
-
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg3", tm.getText());
-
-
- tm = (TextMessage) consumer.receive(3000);
- assertEquals("msg4", tm.getText());
-
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
-
- con.close();
-
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
- _logger.info("Starting third consumer connection");
- con.start();
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
- con.close();
-
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
- _logger.info("Starting fourth consumer connection");
- con.start();
- tm = (TextMessage) consumer.receive(3000);
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
- con.close();
-
- _logger.info("Actually:" + store.getMessageMetaDataMap().size());
- // assertTrue(store.getMessageMap().size() == 0);
- }
-
- /**
- * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be
- * requeued (due perhaps to the queue being deleted).
- *
- * @throws Exception
- */
- public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception
- {
-
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
- Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Queue queue = new AMQQueue("someQ", "someQ", false, true);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
-
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
- Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
-
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
-
- con2.close();
-
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- tm.acknowledge();
- _logger.info("Received and acknowledged first message");
- consumer.receive();
- consumer.receive();
- consumer.receive();
- _logger.info("Received all four messages. About to disconnect and reconnect");
-
- con.close();
- con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
- consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
-
- _logger.info("Starting second consumer connection");
- con.start();
-
- tm = (TextMessage) consumer.receiveNoWait();
- assertNull(tm);
- _logger.info("No messages redelivered as is expected");
-
- _logger.info("Actually:" + store.getMessageMetaDataMap().size());
- assertTrue(store.getMessageMetaDataMap().size() == 0);
- con.close();
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(DisconnectAndRedeliverTest.class);
- }
-}
diff --git a/qpid/java/tools/bin/perf_report.sh b/qpid/java/tools/bin/perf_report.sh
index 22c839e08c..e6b4c987e5 100755
--- a/qpid/java/tools/bin/perf_report.sh
+++ b/qpid/java/tools/bin/perf_report.sh
@@ -25,6 +25,10 @@
SUB_MEM=-Xmx1024M
PUB_MEM=-Xmx1024M
LOG_CONFIG="-Damqj.logging.level=WARN"
+QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
+DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
+TOPIC="amq.topic/test"
+DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
. setenv.sh
@@ -72,60 +76,65 @@ echo "==========================================================================
echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
echo "------------------------------------------------------------------------------------------------"
+# The message counts and warmup counts are set to very low values for quick testing of the script.
+# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k
+# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend
+# setting very low values to start with and experiment while increasing them slowly.
+
# Test 1 Trans Queue
-run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10"
+#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
# Test 2 Dura Queue
-run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
# Test 3 Dura Queue Sync
-run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true"
+run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
# Test 4 Dura Queue Sync Publish and Ack
-run_testcase "Dura_SyncPubAck" "-Ddurable=true -Dsync_ack=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
+run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
# Test 5 Topic
-run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10"
# Test 6 Durable Topic
-run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
# Test 7 Fanout
-run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10"
# Test 8 Small TX
-run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \
- "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
+run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \
+ "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
# Test 9 Large TX
-run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size=10" \
- "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
+run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \
+ "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
# Test 10 256 MSG
-run_testcase "Msg_256b" "" "-Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
# Test 11 512 MSG
-run_testcase "Msg_512b" "" "-Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
# Test 12 2048 MSG
-run_testcase "Msg_2048b" "" "-Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
# Test 13 Random size MSG
-run_testcase "Random_Msg_Size" "" "-Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
# Test 14 Random size MSG Durable
-run_testcase "Rand_Msg_Dura" "-Ddurable=true" "-Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
# Test 15 64K MSG
-run_testcase "Msg_64K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
# Test 16 Durable 64K MSG
-run_testcase "Msg_Durable_64K" "-Ddurable=true -Damqj.tcpNoDelay=true" \
- "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \
+ "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
# Test 17 500K MSG
-run_testcase "Msg_500K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
# Test 18 Durable 500K MSG
-run_testcase "Msg_Dura_500K" "-Damqj.tcpNoDelay=true -Ddurable=true" \
- "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \
+ "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/qpid/java/tools/etc/jndi.properties b/qpid/java/tools/etc/jndi.properties
deleted file mode 100644
index 454551c1b1..0000000000
--- a/qpid/java/tools/etc/jndi.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
-
-# use the following property to configure the default connector
-#java.naming.provider.url - ignored.
-
-# register some connection factories
-# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.connectionFactory = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'
-
-# Register an AMQP destination in JNDI
-destination.transientQueue = BURL:direct://amq.direct//testQueueT?autodelete='true'
-destination.durableQueue = BURL:direct://amq.direct//testQueueD?durable='true'&autodelete='true'
-
-destination.transientTopic = BURL:topic://amq.topic//testTopicT?autodelete='true'
-destination.durableTopic = BURL:topic://amq.topic//testTopicD?durable='true'&autodelete='true'&clientid='test'&subscription='testQueueD'
-
-destination.fanoutQueue = BURL:fanout://amq.fanout//fanoutQueue?autodelete='true'
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index 88e75fb6a9..ac597d17de 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -30,6 +30,9 @@ import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
+import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
+
public class PerfBase
{
TestParams params;
@@ -45,48 +48,21 @@ public class PerfBase
}
public void setUp() throws Exception
- {
- Hashtable<String,String> env = new Hashtable<String,String>();
- env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory());
- env.put(Context.PROVIDER_URL, params.getProviderURL());
+ {
- Context ctx = null;
- try
+ if (params.getHost().equals("") || params.getPort() == -1)
{
- ctx = new InitialContext(env);
+ con = new AMQConnection(params.getUrl());
}
- catch(Exception e)
+ else
{
- throw new Exception("Error initializing JNDI",e);
-
+ con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test");
}
-
- ConnectionFactory conFac = null;
- try
- {
- conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory());
- }
- catch(Exception e)
- {
- throw new Exception("Error looking up connection factory",e);
- }
-
- con = conFac.createConnection();
con.start();
session = con.createSession(params.isTransacted(),
params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
- try
- {
- dest = (Destination)ctx.lookup( params.isDurable()?
- params.getDurableDestination():
- params.getTransientDestination()
- );
- }
- catch(Exception e)
- {
- throw new Exception("Error looking up destination",e);
- }
+ dest = new AMQAnyDestination(params.getAddress());
}
public void handleError(Exception e,String msg)
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index f1b682ff32..89d6462a39 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -24,15 +24,22 @@ import javax.jms.Session;
public class TestParams
{
- private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
-
- private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties";
-
- private String connectionFactory = "connectionFactory";
-
- private String transientDest = "transientQueue";
+ /*
+ * By default the connection URL is used.
+ * This allows a user to easily specify a fully fledged URL any given property.
+ * Ex. SSL parameters
+ *
+ * By providing a host & port allows a user to simply override the URL.
+ * This allows to create multiple clients in test scripts easily,
+ * without having to deal with the long URL format.
+ */
+ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
+
+ private String host = "";
+
+ private int port = -1;
- private String durableDest = "durableQueue";
+ private String address = "queue; {create : always}";
private int msg_size = 1024;
@@ -60,11 +67,11 @@ public class TestParams
public TestParams()
{
- initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory);
- providerURL = System.getProperty("java.naming.provider.url",providerURL);
-
- transientDest = System.getProperty("transDest",transientDest);
- durableDest = System.getProperty("durableDest",durableDest);
+
+ url = System.getProperty("url",url);
+ host = System.getProperty("host","");
+ port = Integer.getInteger("port", -1);
+ address = System.getProperty("address","queue");
msg_size = Integer.getInteger("msg_size", 1024);
msg_type = Integer.getInteger("msg_type",1);
@@ -80,29 +87,29 @@ public class TestParams
random_msg_size = Boolean.getBoolean("random_msg_size");
}
- public int getAckMode()
+ public String getUrl()
{
- return ack_mode;
+ return url;
}
- public String getConnectionFactory()
+ public String getHost()
{
- return connectionFactory;
+ return host;
}
- public String getTransientDestination()
+ public int getPort()
{
- return transientDest;
+ return port;
}
- public String getDurableDestination()
+ public String getAddress()
{
- return durableDest;
+ return address;
}
- public String getInitialContextFactory()
+ public int getAckMode()
{
- return initialContextFactory;
+ return ack_mode;
}
public int getMsgCount()
@@ -125,11 +132,6 @@ public class TestParams
return durable;
}
- public String getProviderURL()
- {
- return providerURL;
- }
-
public boolean isTransacted()
{
return transacted;
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index cde77c47d2..303668eb36 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -381,6 +381,8 @@
<arg name="reason" type="lstr" desc="Reason for a failure"/>
<arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/>
<arg name="user" type="sstr" desc="Authentication identity"/>
+ <arg name="msgDepth" type="count32" desc="Current size of queue in messages"/>
+ <arg name="byteDepth" type="count32" desc="Current size of queue in bytes"/>
</eventArguments>
<event name="clientConnect" sev="inform" args="rhost, user"/>
@@ -396,5 +398,6 @@
<event name="unbind" sev="inform" args="rhost, user, exName, qName, key"/>
<event name="subscribe" sev="inform" args="rhost, user, qName, dest, excl, args"/>
<event name="unsubscribe" sev="inform" args="rhost, user, dest"/>
+ <event name="queueThresholdExceeded" sev="warn" args="qName, msgDepth, byteDepth"/>
</schema>
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
index f9315a6f90..921786af22 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -29,3 +29,7 @@ from message import *
from query import *
from queue import *
from tx import *
+from lvq import *
+from priority import *
+from threshold import *
+from extensions import *
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
new file mode 100644
index 0000000000..26ea3cb0e9
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase010
+from time import sleep
+
+class ExtensionTests(TestBase010):
+ """Tests for various extensions to AMQP 0-10"""
+
+ def test_timed_autodelete(self):
+ session = self.session
+ session2 = self.conn.session("another-session")
+ session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5})
+ session2.close()
+ result = session.queue_query(queue="my-queue")
+ self.assertEqual("my-queue", result.queue)
+ sleep(5)
+ result = session.queue_query(queue="my-queue")
+ self.assert_(not result.queue)
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py b/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
new file mode 100644
index 0000000000..8fd6b88d78
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import math
+
+class LVQTests (Base):
+ """
+ Test last value queue behaviour
+ """
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def test_simple(self):
+ snd = self.ssn.sender("lvq; {create: sender, delete: sender, node: {x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}",
+ durable=self.durable())
+ snd.send(create_message("a", "a-1"))
+ snd.send(create_message("b", "b-1"))
+ snd.send(create_message("a", "a-2"))
+ snd.send(create_message("a", "a-3"))
+ snd.send(create_message("c", "c-1"))
+ snd.send(create_message("c", "c-2"))
+
+ rcv = self.ssn.receiver("lvq; {mode: browse}")
+ assert fetch_all(rcv) == ["b-1", "a-3", "c-2"]
+
+ snd.send(create_message("b", "b-2"))
+ assert fetch_all(rcv) == ["b-2"]
+
+ snd.send(create_message("c", "c-3"))
+ snd.send(create_message("d", "d-1"))
+ assert fetch_all(rcv) == ["c-3", "d-1"]
+
+ snd.send(create_message("b", "b-3"))
+ assert fetch_all(rcv) == ["b-3"]
+
+ rcv.close()
+ rcv = self.ssn.receiver("lvq; {mode: browse}")
+ assert (fetch_all(rcv) == ["a-3", "c-3", "d-1", "b-3"])
+
+
+def create_message(key, content):
+ msg = Message(content=content)
+ msg.properties["lvq-key"] = key
+ return msg
+
+def fetch_all(rcv):
+ content = []
+ while True:
+ try:
+ content.append(rcv.fetch(0).content)
+ except Empty:
+ break
+ return content
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
new file mode 100644
index 0000000000..3651a1218b
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py
@@ -0,0 +1,237 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+from qpid.compat import set
+import math
+
+class PriorityTests (Base):
+ """
+ Test prioritised messaging
+ """
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def prioritised_delivery(self, priorities, levels=10):
+ """
+ Test that message on a queue are delivered in priority order.
+ """
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels,
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver(snd.target)
+ for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True):
+ msg = rcv.fetch(0)
+ #print "expected priority %s got %s" % (expected.priority, msg.priority)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+
+ def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10):
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+ limit_policy = "x-qpid-fairshare:%s" % default_limit
+ if limits:
+ for k, v in limits.items():
+ limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v)
+
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}"
+ % (levels, limit_policy),
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver(snd.target)
+ if limits:
+ limit_function = lambda x : limits.get(x, 0)
+ else:
+ limit_function = lambda x : default_limit
+ for expected in fairshare(sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True),
+ limit_function, levels):
+ msg = rcv.fetch(0)
+ #print "expected priority %s got %s" % (expected.priority, msg.priority)
+ assert msg.priority == expected.priority
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+
+ def test_prioritised_delivery_1(self):
+ self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10)
+
+ def test_prioritised_delivery_2(self):
+ self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5)
+
+ def test_fairshare_1(self):
+ self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3])
+
+ def test_fairshare_2(self):
+ self.fairshare_delivery(priorities = [10 for i in range(30)])
+
+ def test_fairshare_3(self):
+ self.fairshare_delivery(priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3], limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}, levels=8)
+
+ def test_browsing(self):
+ priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+ snd = self.ssn.sender("priority-queue; {create: sender, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}",
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver("priority-queue; {mode: browse, delete: receiver}")
+ received = []
+ try:
+ while True: received.append(rcv.fetch(0))
+ except Empty: None
+ #check all messages on the queue were received by the browser; don't relay on any specific ordering at present
+ assert set([m.content for m in msgs]) == set([m.content for m in received])
+
+ def ring_queue_check(self, msgs):
+ """
+ Ensure that a ring queue removes lowest priority messages first.
+ """
+ snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':10"),
+ durable=self.durable())
+ for m in msgs: snd.send(m)
+
+ rcv = self.ssn.receiver(snd.target)
+ received = []
+ try:
+ while True: received.append(rcv.fetch(0))
+ except Empty: None
+
+ expected = []
+ for m in msgs:
+ while len(expected) > 9:
+ expected=sorted_(expected, key=lambda x: priority_level(x.priority,10))
+ expected.pop(0)
+ expected.append(m)
+ #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received])
+ assert [m.content for m in expected] == [m.content for m in received]
+
+ def test_ring_queue_1(self):
+ priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3]
+ seq = content("msg")
+ self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+ def test_ring_queue_2(self):
+ priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9]
+ seq = content("msg")
+ self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities])
+
+ def test_requeue(self):
+ priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]
+ msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
+
+ snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}",
+ durable=self.durable())
+ #want to have some messages requeued so enable prefetch on a dummy receiver
+ other = self.conn.session()
+ dummy = other.receiver("priority-queue")
+ dummy.capacity = 10
+
+ for m in msgs: snd.send(m)
+
+ #fetch some with dummy receiver on which prefetch is also enabled
+ for i in range(5):
+ msg = dummy.fetch(0)
+ #close session without acknowledgements to requeue messages
+ other.close()
+
+ #now test delivery works as expected after that
+ rcv = self.ssn.receiver(snd.target)
+ for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,10), reverse=True):
+ msg = rcv.fetch(0)
+ #print "expected priority %s got %s" % (expected.priority, msg.priority)
+ assert msg.content == expected.content
+ self.ssn.acknowledge(msg)
+
+def content(base, counter=1):
+ while True:
+ yield "%s-%s" % (base, counter)
+ counter += 1
+
+def address(name, create_policy="sender", delete_policy="receiver", arguments=None):
+ if arguments: node = "node: {x-declare:{arguments:{%s}}}" % arguments
+ else: node = "node: {}"
+ return "%s; {create: %s, delete: %s, %s}" % (name, create_policy, delete_policy, node)
+
+def fairshare(msgs, limit, levels):
+ """
+ Generator to return prioritised messages in expected order for a given fairshare limit
+ """
+ count = 0
+ last_priority = None
+ postponed = []
+ while msgs or postponed:
+ if not msgs:
+ msgs = postponed
+ count = 0
+ last_priority = None
+ postponed = []
+ msg = msgs.pop(0)
+ if last_priority and priority_level(msg.priority, levels) == last_priority:
+ count += 1
+ else:
+ last_priority = priority_level(msg.priority, levels)
+ count = 1
+ l = limit(last_priority)
+ if (l and count > l):
+ postponed.append(msg)
+ else:
+ yield msg
+ return
+
+def effective_priority(value, levels):
+ """
+ Method to determine effective priority given a distinct number of
+ levels supported. Returns the lowest priority value that is of
+ equivalent priority to the value passed in.
+ """
+ if value <= 5-math.ceil(levels/2.0): return 0
+ if value >= 4+math.floor(levels/2.0): return 4+math.floor(levels/2.0)
+ return value
+
+def priority_level(value, levels):
+ """
+ Method to determine which of a distinct number of priority levels
+ a given value falls into.
+ """
+ offset = 5-math.ceil(levels/2.0)
+ return min(max(value - offset, 0), levels-1)
+
+def sorted_(msgs, key=None, reverse=False):
+ """
+ Workaround lack of sorted builtin function in python 2.3 and lack
+ of keyword arguments to list.sort()
+ """
+ temp = msgs
+ temp.sort(key_to_cmp(key, reverse=reverse))
+ return temp
+
+def key_to_cmp(key, reverse=False):
+ if key:
+ if reverse: return lambda a, b: cmp(key(b), key(a))
+ else: return lambda a, b: cmp(key(a), key(b))
+ else:
+ return None
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py b/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
new file mode 100644
index 0000000000..bcd3c507e2
--- /dev/null
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import math
+
+class ThresholdTests (Base):
+ """
+ Test queue threshold events are sent and received correctly
+ """
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def do_threshold_test(self, key, value, messages):
+ rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#")
+ snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'%s':%s}}}}" % (key, value))
+ size = 0
+ count = 0
+ for m in messages:
+ snd.send(m)
+ count = count + 1
+ size = size + len(m.content)
+ event = rcv.fetch()
+ schema = event.content[0]["_schema_id"]
+ assert schema["_class_name"] == "queueThresholdExceeded"
+ values = event.content[0]["_values"]
+ assert values["qName"] == "ttq"
+ assert values["msgDepth"] == count, "msgDepth %s, expected %s" % (values["msgDepth"], count)
+ assert values["byteDepth"] == size, "byteDepth %s, expected %s" % (values["byteDepth"], size)
+
+ def test_alert_count(self):
+ self.do_threshold_test("qpid.alert_count", 5, [Message("msg-%s" % i) for i in range(5)])
+
+ def test_alert_size(self):
+ self.do_threshold_test("qpid.alert_size", 25, [Message("msg-%s" % i) for i in range(5)])
+
+ def test_alert_count_alias(self):
+ self.do_threshold_test("x-qpid-maximum-message-count", 10, [Message("msg-%s" % i) for i in range(10)])
+
+ def test_alert_size_alias(self):
+ self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)])
diff --git a/qpid/tools/src/py/.gitignore b/qpid/tools/src/py/.gitignore
index 3b402156c2..97cb05dc36 100644
--- a/qpid/tools/src/py/.gitignore
+++ b/qpid/tools/src/py/.gitignore
@@ -1,20 +1,22 @@
+
+#
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
+# http://www.apache.org/licenses/LICENSE-2.0
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# "License"); you may not use this file except in compliance
# KIND, either express or implied. See the License for the
+# Licensed to the Apache Software Foundation (ASF) under one
+# Unless required by applicable law or agreed to in writing,
+# distributed with this work for additional information
+# or more contributor license agreements. See the NOTICE file
+# regarding copyright ownership. The ASF licenses this file
+# software distributed under the License is distributed on an
# specific language governing permissions and limitations
+# to you under the Apache License, Version 2.0 (the
# under the License.
-#
-
+# with the License. You may obtain a copy of the License at
/qpid-clusterc
+/qpid-configc
+/qpid-routec