diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-17 14:08:14 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-17 14:08:14 +0000 |
commit | 16a49ba6ef283a5093780d28efaaa8483fc9010d (patch) | |
tree | 427908f0242a05033385a1dcd1a0af908bca44ea | |
parent | fca7b2ac23c76c402457ab605639ba4b16a5e3f1 (diff) | |
download | qpid-python-16a49ba6ef283a5093780d28efaaa8483fc9010d.tar.gz |
QPID-2935: sync with latest trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1071615 13f79535-47bb-0310-9956-ffa450edef68
89 files changed, 2925 insertions, 1825 deletions
diff --git a/qpid/bin/release.sh b/qpid/bin/release.sh index 913bbc7334..31c12e630c 100755 --- a/qpid/bin/release.sh +++ b/qpid/bin/release.sh @@ -208,7 +208,7 @@ fi if [ "JAVA" == "$JAVA" ] ; then pushd qpid-${VER}/java - ant build release release-bin release-mvn -Dsvnversion.output=${REV} + ant build release release-bin release-mvn -Dsvnversion.output=${REV} -Dmaven.snapshot=false popd cp qpid-${VER}/java/release/*.tar.gz artifacts/qpid-java-${VER}.tar.gz diff --git a/qpid/cpp/INSTALL b/qpid/cpp/INSTALL index 6628552ce5..6483d7de4e 100644 --- a/qpid/cpp/INSTALL +++ b/qpid/cpp/INSTALL @@ -67,6 +67,10 @@ Optional SSL support requires: * nss <http://www.mozilla.org/projects/security/pki/nss/> * nspr <http://www.mozilla.org/projects/nspr/> +Optional binding support for ruby requires: +* ruby and ruby devel <http://www.ruby-lang.org/en/> +* swig <http://www.swig.org/> + Qpid has been built using the GNU C++ compiler: * gcc <http://gcc.gnu.org/> (3.4.6) @@ -124,6 +128,9 @@ For the XML Exchange, include: # yum install xqilla-devel xerces-c-devel +Optional ruby binding support include: + # yum install ruby ruby-devel swig + Follow the manual installation instruction below for any packages not available through your distributions packaging tool. diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am index 9c3bd615d6..84207d43c4 100644 --- a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am +++ b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am @@ -21,7 +21,7 @@ INCLUDE = -I$(top_srcdir)/include AM_CPPFLAGS = $(INCLUDE) -noinst_PROGRAMS=agent list_agents +noinst_PROGRAMS=agent list_agents print_events agent_SOURCES=agent.cpp agent_LDADD=$(top_builddir)/src/libqmf2.la @@ -29,3 +29,5 @@ agent_LDADD=$(top_builddir)/src/libqmf2.la list_agents_SOURCES=list_agents.cpp list_agents_LDADD=$(top_builddir)/src/libqmf2.la +print_events_SOURCES=print_events.cpp +print_events_LDADD=$(top_builddir)/src/libqmf2.la diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp b/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp new file mode 100644 index 0000000000..9883a19962 --- /dev/null +++ b/qpid/cpp/bindings/qmf2/examples/cpp/print_events.cpp @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/messaging/Connection.h> +#include <qpid/messaging/Duration.h> +#include <qmf/ConsoleSession.h> +#include <qmf/ConsoleEvent.h> +#include <qmf/Data.h> +#include <qpid/types/Variant.h> +#include <string> +#include <iostream> + +using namespace std; +using namespace qmf; +using qpid::types::Variant; +using qpid::messaging::Duration; + +int main(int argc, char** argv) +{ + string url("localhost"); + string connectionOptions; + string sessionOptions; + + if (argc > 1) + url = argv[1]; + if (argc > 2) + connectionOptions = argv[2]; + if (argc > 3) + sessionOptions = argv[3]; + + qpid::messaging::Connection connection(url, connectionOptions); + connection.open(); + + ConsoleSession session(connection, sessionOptions); + session.open(); + + while (true) { + ConsoleEvent event; + if (session.nextEvent(event)) { + if (event.getType() == CONSOLE_EVENT) { + const Data& data(event.getData(0)); + cout << "Event: timestamp=" << event.getTimestamp() << " severity=" << + event.getSeverity() << " content=" << data.getProperties() << endl; + } + } + } +} + diff --git a/qpid/cpp/bindings/qmf2/examples/python/agent.py b/qpid/cpp/bindings/qmf2/examples/python/agent.py index 66b7dbdc58..b24890f531 100755 --- a/qpid/cpp/bindings/qmf2/examples/python/agent.py +++ b/qpid/cpp/bindings/qmf2/examples/python/agent.py @@ -69,32 +69,41 @@ class ExampleAgent(AgentHandler): if addr == self.controlAddr: self.control.methodCount += 1 - if methodName == "stop": - self.session.methodSuccess(handle) - self.cancel() - - elif methodName == "echo": - handle.addReturnArgument("sequence", args["sequence"]) - handle.addReturnArgument("map", args["map"]) - self.session.methodSuccess(handle) - - elif methodName == "fail": - if args['useString']: - self.session.raiseException(handle, args['stringVal']) - else: - ex = Data(self.sch_exception) - ex.whatHappened = "It Failed" - ex.howBad = 75 - ex.details = args['details'] - self.session.raiseException(handle, ex) - - elif methodName == "create_child": - name = args['name'] - child = Data(self.sch_child) - child.name = name - addr = self.session.addData(child, name) - handle.addReturnArgument("childAddr", addr.asMap()) - self.session.methodSuccess(handle) + try: + if methodName == "stop": + self.session.methodSuccess(handle) + self.cancel() + + elif methodName == "echo": + handle.addReturnArgument("sequence", args["sequence"]) + handle.addReturnArgument("map", args["map"]) + self.session.methodSuccess(handle) + + elif methodName == "event": + ev = Data(self.sch_event) + ev.text = args['text'] + self.session.raiseEvent(ev, args['severity']) + self.session.methodSuccess(handle) + + elif methodName == "fail": + if args['useString']: + self.session.raiseException(handle, args['stringVal']) + else: + ex = Data(self.sch_exception) + ex.whatHappened = "It Failed" + ex.howBad = 75 + ex.details = args['details'] + self.session.raiseException(handle, ex) + + elif methodName == "create_child": + name = args['name'] + child = Data(self.sch_child) + child.name = name + addr = self.session.addData(child, name) + handle.addReturnArgument("childAddr", addr.asMap()) + self.session.methodSuccess(handle) + except BaseException, e: + self.session.raiseException(handle, "%r" % e) def setupSchema(self): @@ -128,6 +137,11 @@ class ExampleAgent(AgentHandler): echoMethod.addArgument(SchemaProperty("map", SCHEMA_DATA_MAP, direction=DIR_IN_OUT)) self.sch_control.addMethod(echoMethod) + eventMethod = SchemaMethod("event", desc="Raise an Event") + eventMethod.addArgument(SchemaProperty("text", SCHEMA_DATA_STRING, direction=DIR_IN)) + eventMethod.addArgument(SchemaProperty("severity", SCHEMA_DATA_INT, direction=DIR_IN)) + self.sch_control.addMethod(eventMethod) + failMethod = SchemaMethod("fail", desc="Expected to Fail") failMethod.addArgument(SchemaProperty("useString", SCHEMA_DATA_BOOL, direction=DIR_IN)) failMethod.addArgument(SchemaProperty("stringVal", SCHEMA_DATA_STRING, direction=DIR_IN)) @@ -146,11 +160,18 @@ class ExampleAgent(AgentHandler): self.sch_child.addProperty(SchemaProperty("name", SCHEMA_DATA_STRING)) ## + ## Declare the event class + ## + self.sch_event = Schema(SCHEMA_TYPE_EVENT, package, "event") + self.sch_event.addProperty(SchemaProperty("text", SCHEMA_DATA_STRING)) + + ## ## Register our schemata with the agent session. ## self.session.registerSchema(self.sch_exception) self.session.registerSchema(self.sch_control) self.session.registerSchema(self.sch_child) + self.session.registerSchema(self.sch_event) def populateData(self): diff --git a/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb b/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb index 712e5007be..41de7e5abe 100644 --- a/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb +++ b/qpid/cpp/bindings/qmf2/examples/ruby/find_agents.rb @@ -27,7 +27,7 @@ class FindAgents < Qmf2::ConsoleHandler end def agent_added(agent) - puts "Agent Added: #{agent.to_s}" + puts "Agent Added: #{agent.name}" end def agent_deleted(agent, reason) diff --git a/qpid/cpp/bindings/qmf2/python/qmf2.py b/qpid/cpp/bindings/qmf2/python/qmf2.py index 61a5453f8e..9f2d8556f4 100644 --- a/qpid/cpp/bindings/qmf2/python/qmf2.py +++ b/qpid/cpp/bindings/qmf2/python/qmf2.py @@ -165,7 +165,7 @@ class ConsoleHandler(Thread): reason = 'filter' if event.getAgentDelReason() == cqmf2.AGENT_DEL_AGED: reason = 'aged' - self.agentDeleted(Agent(event.getAgent(), reason)) + self.agentDeleted(Agent(event.getAgent()), reason) elif event.getType() == cqmf2.CONSOLE_AGENT_RESTART: self.agentRestarted(Agent(event.getAgent())) @@ -373,6 +373,16 @@ class AgentSession(object): else: self._impl.raiseException(handle, data) + def raiseEvent(self, data, severity=None): + """ + """ + if not severity: + self._impl.raiseEvent(data._impl) + else: + if (severity.__class__ != int and severity.__class__ != long) or severity < 0 or severity > 7: + raise Exception("Severity must be an int between 0..7") + self._impl.raiseEvent(data._impl, severity); + #=================================================================================================== # AGENT PROXY diff --git a/qpid/cpp/bindings/qmf2/ruby/qmf2.rb b/qpid/cpp/bindings/qmf2/ruby/qmf2.rb index 6d1741ebc0..c14ecba4e1 100644 --- a/qpid/cpp/bindings/qmf2/ruby/qmf2.rb +++ b/qpid/cpp/bindings/qmf2/ruby/qmf2.rb @@ -433,6 +433,14 @@ module Qmf2 def del_data(addr) @impl.del_data(addr.impl) end + + def raise_event(data, severity=nil) + if !severity + @impl.raiseEvent(data.impl) + else + @impl.raiseEvent(data.impl, severity) + end + end end ##============================================================================== diff --git a/qpid/cpp/bindings/qpid/Makefile.am b/qpid/cpp/bindings/qpid/Makefile.am index 07b51e6c64..ca9eda0c73 100644 --- a/qpid/cpp/bindings/qpid/Makefile.am +++ b/qpid/cpp/bindings/qpid/Makefile.am @@ -17,10 +17,11 @@ # under the License. # +SUBDIRS = dotnet + if HAVE_SWIG EXTRA_DIST = qpid.i -SUBDIRS = if HAVE_RUBY_DEVEL SUBDIRS += ruby diff --git a/qpid/cpp/bindings/qpid/dotnet/Makefile.am b/qpid/cpp/bindings/qpid/dotnet/Makefile.am new file mode 100644 index 0000000000..f2b106bcb2 --- /dev/null +++ b/qpid/cpp/bindings/qpid/dotnet/Makefile.am @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +EXTRA_DIST = winsdk_sources/winsdk_dotnet_examples.sln \ + winsdk_sources/examples/csharp.direct.receiver/csharp.direct.receiver.csproj \ + winsdk_sources/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj \ + winsdk_sources/examples/csharp.example.helloworld/csharp.example.helloworld.csproj \ + winsdk_sources/examples/csharp.example.declare_queues/csharp.example.declare_queues.csproj \ + winsdk_sources/examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj \ + winsdk_sources/examples/csharp.example.server/csharp.example.server.csproj \ + winsdk_sources/examples/csharp.example.spout/csharp.example.spout.csproj \ + winsdk_sources/examples/csharp.example.drain/csharp.example.drain.csproj \ + winsdk_sources/examples/csharp.map.sender/csharp.map.sender.csproj \ + winsdk_sources/examples/csharp.map.receiver/csharp.map.receiver.csproj \ + winsdk_sources/examples/csharp.example.client/csharp.example.client.csproj \ + winsdk_sources/examples/csharp.direct.sender/csharp.direct.sender.csproj \ + examples/csharp.direct.receiver/csharp.direct.receiver.cs \ + examples/csharp.direct.receiver/csharp.direct.receiver.csproj \ + examples/csharp.direct.receiver/Properties/AssemblyInfo.cs \ + examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs \ + examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj \ + examples/csharp.map.callback.receiver/Properties/AssemblyInfo.cs \ + examples/powershell.example.helloworld/powershell.example.helloworld.ps1 \ + examples/csharp.example.helloworld/csharp.example.helloworld.cs \ + examples/csharp.example.helloworld/csharp.example.helloworld.csproj \ + examples/csharp.example.helloworld/Properties/AssemblyInfo.cs \ + examples/csharp.example.declare_queues/csharp.example.declare_queues.cs \ + examples/csharp.example.declare_queues/csharp.example.declare_queues.csproj \ + examples/csharp.example.declare_queues/Properties/AssemblyInfo.cs \ + examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj \ + examples/csharp.map.callback.sender/Properties/AssemblyInfo.cs \ + examples/csharp.map.callback.sender/csharp.map.callback.sender.cs \ + examples/csharp.example.server/csharp.example.server.csproj \ + examples/csharp.example.server/csharp.example.server.cs \ + examples/csharp.example.server/Properties/AssemblyInfo.cs \ + examples/csharp.example.spout/csharp.example.spout.csproj \ + examples/csharp.example.spout/Options.cs \ + examples/csharp.example.spout/csharp.example.spout.cs \ + examples/csharp.example.spout/Properties/AssemblyInfo.cs \ + examples/csharp.example.drain/csharp.example.drain.cs \ + examples/csharp.example.drain/csharp.example.drain.csproj \ + examples/csharp.example.drain/Options.cs \ + examples/csharp.example.drain/Properties/AssemblyInfo.cs \ + examples/csharp.map.sender/csharp.map.sender.csproj \ + examples/csharp.map.sender/csharp.map.sender.cs \ + examples/csharp.map.sender/Properties/AssemblyInfo.cs \ + examples/visualbasic.example.client/visualbasic.example.client.vbproj \ + examples/visualbasic.example.client/MyProject/Resources.resx \ + examples/visualbasic.example.client/MyProject/Application.myapp \ + examples/visualbasic.example.client/MyProject/Settings.settings \ + examples/visualbasic.example.client/MyProject/Settings.Designer.vb \ + examples/visualbasic.example.client/MyProject/AssemblyInfo.vb \ + examples/visualbasic.example.client/MyProject/Application.Designer.vb \ + examples/visualbasic.example.client/MyProject/Resources.Designer.vb \ + examples/visualbasic.example.client/visualbasic.example.client.vb \ + examples/csharp.map.receiver/csharp.map.receiver.csproj \ + examples/csharp.map.receiver/csharp.map.receiver.cs \ + examples/csharp.map.receiver/Properties/AssemblyInfo.cs \ + examples/csharp.example.client/csharp.example.client.cs \ + examples/csharp.example.client/Properties/AssemblyInfo.cs \ + examples/csharp.example.client/csharp.example.client.csproj \ + examples/csharp.direct.sender/csharp.direct.sender.csproj \ + examples/csharp.direct.sender/csharp.direct.sender.cs \ + examples/csharp.direct.sender/Properties/AssemblyInfo.cs \ + configure-windows.ps1 \ + ReadMe.txt \ + org.apache.qpid.messaging.sln \ + test/messaging.test/messaging.test.address.cs \ + test/messaging.test/messaging.test.duration.cs \ + test/messaging.test/messaging.test.cs \ + test/messaging.test/messaging.test.message.cs \ + test/messaging.test/messaging.test.csproj \ + test/messaging.test/Properties/AssemblyInfo.cs \ + test/messaging.test/messaging.test.connection.cs \ + src/org.apache.qpid.messaging.vcproj \ + src/Message.cpp \ + src/Connection.cpp \ + src/TypeTranslator.h \ + src/AssemblyInfo.cpp \ + src/FailoverUpdates.h \ + src/sessionreceiver/sessionreceiver.cs \ + src/sessionreceiver/Properties/sessionreceiver-AssemblyInfo-template.cs \ + src/sessionreceiver/qpid.snk \ + src/sessionreceiver/org.apache.qpid.messaging.sessionreceiver.csproj \ + src/Sender.h \ + src/TypeTranslator.cpp \ + src/Receiver.h \ + src/Address.h \ + src/Sender.cpp \ + src/QpidTypeCheck.h \ + src/resource1.h \ + src/Duration.h \ + src/Session.h \ + src/Message.h \ + src/ReadMe.txt \ + src/Receiver.cpp \ + src/Address.cpp \ + src/app.rc \ + src/Session.cpp \ + src/org.apache.qpid.messaging.template.rc \ + src/qpid.snk \ + src/Connection.h \ + src/QpidException.h \ + src/QpidMarshal.h \ + src/FailoverUpdates.cpp \ + src/Duration.cpp \ + ../../../src/windows/resources/qpid-icon.ico \ + ../../../src/windows/resources/template-resource.rc \ + ../../../src/windows/resources/version-resource.h diff --git a/qpid/cpp/build-aux/.gitignore b/qpid/cpp/build-aux/.gitignore new file mode 100644 index 0000000000..42725ceff3 --- /dev/null +++ b/qpid/cpp/build-aux/.gitignore @@ -0,0 +1 @@ +/py-compile diff --git a/qpid/cpp/configure.ac b/qpid/cpp/configure.ac index ee1bade1c9..ea1a1b49ea 100644 --- a/qpid/cpp/configure.ac +++ b/qpid/cpp/configure.ac @@ -16,7 +16,7 @@ AC_INIT([qpidc], [dev@qpid.apache.org]) AC_CONFIG_AUX_DIR([build-aux]) -AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects]) +AM_INIT_AUTOMAKE([dist-bzip2 subdir-objects tar-ustar]) # Minimum Autoconf version required. AC_PREREQ(2.59) @@ -538,6 +538,7 @@ AC_CONFIG_FILES([ bindings/qpid/ruby/Makefile bindings/qpid/python/Makefile bindings/qpid/perl/Makefile + bindings/qpid/dotnet/Makefile bindings/qmf/Makefile bindings/qmf/ruby/Makefile bindings/qmf/python/Makefile diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index eb4bb72aad..4b5a1b1c2c 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -313,6 +313,10 @@ if (NOT Boost_FILESYSTEM_LIBRARY) set(Boost_FILESYSTEM_LIBRARY boost_filesystem) endif (NOT Boost_FILESYSTEM_LIBRARY) +if (NOT Boost_SYSTEM_LIBRARY) + set(Boost_SYSTEM_LIBRARY boost_system) +endif (NOT Boost_SYSTEM_LIBRARY) + if (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY) set(Boost_UNIT_TEST_FRAMEWORK_LIBRARY boost_unit_test_framework) endif (NOT Boost_UNIT_TEST_FRAMEWORK_LIBRARY) @@ -487,6 +491,7 @@ endif (BUILD_SASL) CHECK_LIBRARY_EXISTS (xerces-c _init "" HAVE_XERCES) CHECK_INCLUDE_FILE_CXX (xercesc/framework/MemBufInputSource.hpp HAVE_XERCES_H) CHECK_INCLUDE_FILE_CXX (xqilla/xqilla-simple.hpp HAVE_XQILLA_H) +CHECK_INCLUDE_FILE_CXX (xqilla/ast/XQEffectiveBooleanValue.hpp HAVE_XQ_EBV) set (xml_default ${xml_force}) if (CMAKE_SYSTEM_NAME STREQUAL Windows) @@ -510,6 +515,10 @@ if (BUILD_XML) message(FATAL_ERROR "XML Exchange support requested but XQilla headers not found") endif (NOT HAVE_XQILLA_H) + if (HAVE_XQ_EBV) + add_definitions(-DXQ_EFFECTIVE_BOOLEAN_VALUE_HPP) + endif (HAVE_XQ_EBV) + add_library (xml MODULE qpid/xml/XmlExchange.cpp qpid/xml/XmlExchange.h @@ -956,6 +965,11 @@ set (qpidbroker_SOURCES qpid/broker/Broker.cpp qpid/broker/Exchange.cpp qpid/broker/ExpiryPolicy.cpp + qpid/broker/Fairshare.cpp + qpid/broker/LegacyLVQ.cpp + qpid/broker/MessageDeque.cpp + qpid/broker/MessageMap.cpp + qpid/broker/PriorityQueue.cpp qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp qpid/broker/QueueListeners.cpp @@ -1006,6 +1020,7 @@ set (qpidbroker_SOURCES qpid/broker/SessionHandler.h qpid/broker/SessionHandler.cpp qpid/broker/System.cpp + qpid/broker/ThresholdAlerts.cpp qpid/broker/TopicExchange.cpp qpid/broker/TxAccept.cpp qpid/broker/TxBuffer.cpp @@ -1207,6 +1222,8 @@ install (TARGETS replication_exchange # file whereas older builds only have config.h on autoconf-generated builds. add_definitions(-DHAVE_CONFIG_H) +add_definitions(-DBOOST_FILESYSTEM_VERSION=2) + # Now create the config file from all the info learned above. configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8f00cefb33..6fafff7d54 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -24,6 +24,7 @@ SUBDIRS = . tests # using Visual Studio solutions/projects. windows_dist = \ qpid/client/windows/SaslFactory.cpp \ + qpid/client/windows/SslConnector.cpp \ qpid/log/windows/SinkOptions.cpp \ qpid/log/windows/SinkOptions.h \ ../include/qpid/sys/windows/check.h \ @@ -42,6 +43,8 @@ windows_dist = \ qpid/sys/windows/Shlib.cpp \ qpid/sys/windows/SocketAddress.cpp \ qpid/sys/windows/Socket.cpp \ + qpid/sys/windows/SslAsynchIO.cpp \ + qpid/sys/windows/SslAsynchIO.h \ qpid/sys/windows/StrError.cpp \ qpid/sys/windows/SystemInfo.cpp \ qpid/sys/windows/Thread.cpp \ @@ -51,7 +54,9 @@ windows_dist = \ qpid/sys/windows/uuid.h \ windows/QpiddBroker.cpp \ qpid/broker/windows/BrokerDefaults.cpp \ - qpid/broker/windows/SaslAuthenticator.cpp + qpid/broker/windows/SaslAuthenticator.cpp \ + qpid/broker/windows/SslProtocolFactory.cpp \ + qpid/messaging/HandleInstantiator.cpp EXTRA_DIST= $(platform_dist) $(rgen_srcs) $(windows_dist) @@ -122,6 +127,8 @@ qpidtest_SCRIPTS = tmoduledir = $(libdir)/qpid/tests tmodule_LTLIBRARIES= +AM_CXXFLAGS += -DBOOST_FILESYSTEM_VERSION=2 + ## Automake macros to build libraries and executables. qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\" libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\" @@ -543,6 +550,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/ExchangeRegistry.h \ qpid/broker/ExpiryPolicy.cpp \ qpid/broker/ExpiryPolicy.h \ + qpid/broker/Fairshare.h \ + qpid/broker/Fairshare.cpp \ qpid/broker/FanOutExchange.cpp \ qpid/broker/FanOutExchange.h \ qpid/broker/FedOps.h \ @@ -550,6 +559,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HeadersExchange.cpp \ qpid/broker/HeadersExchange.h \ qpid/broker/AsyncCompletion.h \ + qpid/broker/LegacyLVQ.h \ + qpid/broker/LegacyLVQ.cpp \ qpid/broker/Link.cpp \ qpid/broker/Link.h \ qpid/broker/LinkRegistry.cpp \ @@ -560,9 +571,16 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageAdapter.h \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageBuilder.h \ + qpid/broker/MessageDeque.h \ + qpid/broker/MessageDeque.cpp \ + qpid/broker/MessageMap.h \ + qpid/broker/MessageMap.cpp \ + qpid/broker/Messages.h \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/MessageStoreModule.h \ + qpid/broker/PriorityQueue.h \ + qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.cpp \ @@ -584,6 +602,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueEvents.h \ qpid/broker/QueueListeners.cpp \ qpid/broker/QueueListeners.h \ + qpid/broker/QueueObserver.h \ qpid/broker/QueuePolicy.cpp \ qpid/broker/QueuePolicy.h \ qpid/broker/QueueRegistry.cpp \ @@ -632,6 +651,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SignalHandler.h \ qpid/broker/System.cpp \ qpid/broker/System.h \ + qpid/broker/ThresholdAlerts.cpp \ + qpid/broker/ThresholdAlerts.h \ qpid/broker/TopicExchange.cpp \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp index 176cadf0c1..915f2a1c88 100644 --- a/qpid/cpp/src/qmf/Agent.cpp +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -339,7 +339,7 @@ void AgentImpl::handleMethodResponse(const Variant::Map& response, const Message uint32_t correlator; boost::shared_ptr<SyncContext> context; - QPID_LOG(trace, "RCVD MethodResponse map=" << response); + QPID_LOG(trace, "RCVD MethodResponse cid=" << cid << " map=" << response); aIter = response.find("_arguments"); if (aIter != response.end()) @@ -556,13 +556,14 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator) msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); msg.setSubject(directSubject); - if (!session.authUser.empty()) - msg.setUserId(session.authUser); + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); encode(QueryImplAccess::get(query).asMap(), msg); - if (sender.isValid()) + if (sender.isValid()) { sender.send(msg); - - QPID_LOG(trace, "SENT QueryRequest to=" << name); + QPID_LOG(trace, "SENT QueryRequest to=" << sender.getName() << "/" << directSubject << " cid=" << correlator); + } } @@ -583,13 +584,14 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); msg.setSubject(directSubject); - if (!session.authUser.empty()) - msg.setUserId(session.authUser); + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); encode(map, msg); - if (sender.isValid()) + if (sender.isValid()) { sender.send(msg); - - QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name); + QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << sender.getName() << "/" << directSubject << " content=" << map << " cid=" << correlator); + } } void AgentImpl::sendSchemaRequest(const SchemaId& id) @@ -626,12 +628,13 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id) msg.setReplyTo(session.replyAddress); msg.setContent(content); msg.setSubject(directSubject); - if (!session.authUser.empty()) - msg.setUserId(session.authUser); - if (sender.isValid()) + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); + if (sender.isValid()) { sender.send(msg); - - QPID_LOG(trace, "SENT V1SchemaRequest to=" << name); + QPID_LOG(trace, "SENT V1SchemaRequest to=" << sender.getName() << "/" << directSubject); + } } diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp index 30176a8c01..4c5a72a467 100644 --- a/qpid/cpp/src/qmf/AgentSession.cpp +++ b/qpid/cpp/src/qmf/AgentSession.cpp @@ -571,7 +571,7 @@ void AgentSessionImpl::raiseEvent(const Data& data, int severity) encode(list, msg); topicSender.send(msg); - QPID_LOG(trace, "SENT EventIndication to=" << subject); + QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject); } @@ -625,7 +625,7 @@ void AgentSessionImpl::setAgentName() void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg) { - QPID_LOG(trace, "RCVD AgentLocateRequest"); + QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo()); if (!predicate.empty()) { Query agentQuery(QUERY_OBJECT); @@ -659,7 +659,7 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg) { - QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo()); + QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); // // Construct an AgentEvent to be sent to the application. @@ -719,7 +719,7 @@ void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Me void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg) { - QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo()); + QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); // // Construct an AgentEvent to be sent to the application or directly handled by the agent. diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index bb4458a0b9..e12c1152f6 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -65,7 +65,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), authUser(c.getAuthenticatedUsername()), maxAgentAgeMinutes(5), + connection(c), domain("default"), maxAgentAgeMinutes(5), opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { @@ -394,6 +394,7 @@ void ConsoleSessionImpl::sendAgentLocate() { Message msg; Variant::Map& headers(msg.getProperties()); + static const string subject("console.request.agent_locate"); headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; @@ -401,12 +402,12 @@ void ConsoleSessionImpl::sendAgentLocate() msg.setReplyTo(replyAddress); msg.setCorrelationId("agent-locate"); - msg.setSubject("console.request.agent_locate"); + msg.setSubject(subject); encode(agentQuery.getPredicate(), msg); topicSender.send(msg); - QPID_LOG(trace, "SENT AgentLocate to topic"); + QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject); } diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h index e495c1c1e8..675c8bcfb5 100644 --- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h +++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h @@ -72,7 +72,6 @@ namespace qmf { qpid::messaging::Sender directSender; qpid::messaging::Sender topicSender; std::string domain; - std::string authUser; uint32_t maxAgentAgeMinutes; bool listenOnDirect; bool strictSecurity; diff --git a/qpid/cpp/src/qpid/RefCountedBuffer.h b/qpid/cpp/src/qpid/RefCountedBuffer.h index c332325378..75a23862be 100644 --- a/qpid/cpp/src/qpid/RefCountedBuffer.h +++ b/qpid/cpp/src/qpid/RefCountedBuffer.h @@ -27,7 +27,7 @@ #include <boost/intrusive_ptr.hpp> namespace qpid { -// FIXME aconway 2008-09-06: easy to add alignment + /** * Reference-counted byte buffer. * No alignment guarantees. @@ -51,7 +51,7 @@ public: pointer(const pointer&); ~pointer(); pointer& operator=(const pointer&); - + char* get() { return cp(); } operator char*() { return cp(); } char& operator*() { return *cp(); } @@ -62,7 +62,7 @@ public: const char& operator*() const { return *cp(); } const char& operator[](size_t i) const { return cp()[i]; } }; - + /** Create a reference counted buffer of size n */ static pointer create(size_t n); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 2ef6933612..ebccdbe38f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -103,7 +103,8 @@ Broker::Options::Options(const std::string& name) : requireEncrypted(false), maxSessionRate(0), asyncQueueEvents(false), // Must be false in a cluster. - qmf2Support(false), + qmf2Support(true), + qmf1Support(true), queueFlowStopRatio(80), queueFlowResumeRatio(70) { @@ -125,7 +126,8 @@ Broker::Options::Options(const std::string& name) : ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") - ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management") + ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") + ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") @@ -153,7 +155,7 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support, + managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, conf.qmf2Support) : 0), store(new NullMessageStore), @@ -225,7 +227,6 @@ Broker::Broker(const Broker::Options& conf) : } QueuePolicy::setDefaultMaxSize(conf.queueLimit); - queues.setQueueEvents(&queueEvents); // Early-Initialize plugins Plugin::earlyInitAll(*this); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 55823bc45a..9ed9ca3995 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -115,6 +115,7 @@ public: uint32_t maxSessionRate; bool asyncQueueEvents; bool qmf2Support; + bool qmf1Support; uint queueFlowStopRatio; // producer flow control: on uint queueFlowResumeRatio; // producer flow control: off diff --git a/qpid/cpp/src/qpid/broker/Fairshare.cpp b/qpid/cpp/src/qpid/broker/Fairshare.cpp new file mode 100644 index 0000000000..e6bbf86691 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Fairshare.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Fairshare.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace broker { + +Fairshare::Fairshare(size_t levels, uint limit) : + PriorityQueue(levels), + limits(levels, limit), priority(levels-1), count(0) {} + + +void Fairshare::setLimit(size_t level, uint limit) +{ + limits[level] = limit; +} + +bool Fairshare::limitReached() +{ + uint l = limits[priority]; + return l && ++count > l; +} + +uint Fairshare::currentLevel() +{ + if (limitReached()) { + return nextLevel(); + } else { + return priority; + } +} + +uint Fairshare::nextLevel() +{ + count = 1; + if (priority) --priority; + else priority = levels-1; + return priority; +} + +bool Fairshare::isNull() +{ + for (int i = 0; i < levels; i++) if (limits[i]) return false; + return true; +} + +bool Fairshare::getState(uint& p, uint& c) const +{ + p = priority; + c = count; + return true; +} + +bool Fairshare::setState(uint p, uint c) +{ + priority = p; + count = c; + return true; +} + +bool Fairshare::findFrontLevel(uint& p, PriorityLevels& messages) +{ + const uint start = p = currentLevel(); + do { + if (!messages[p].empty()) return true; + } while ((p = nextLevel()) != start); + return false; +} + + + +bool Fairshare::getState(const Messages& m, uint& priority, uint& count) +{ + const Fairshare* fairshare = dynamic_cast<const Fairshare*>(&m); + return fairshare && fairshare->getState(priority, count); +} + +bool Fairshare::setState(Messages& m, uint priority, uint count) +{ + Fairshare* fairshare = dynamic_cast<Fairshare*>(&m); + return fairshare && fairshare->setState(priority, count); +} + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + +int getSetting(const qpid::framing::FieldTable& settings, const std::string& key, int minvalue, int maxvalue) +{ + return std::max(minvalue,std::min(getIntegerSetting(settings, key), maxvalue)); +} + +std::auto_ptr<Messages> Fairshare::create(const qpid::framing::FieldTable& settings) +{ + std::auto_ptr<Messages> result; + size_t levels = getSetting(settings, "x-qpid-priorities", 1, 100); + if (levels) { + uint defaultLimit = getIntegerSetting(settings, "x-qpid-fairshare"); + std::auto_ptr<Fairshare> fairshare(new Fairshare(levels, defaultLimit)); + for (uint i = 0; i < levels; i++) { + std::string key = (boost::format("x-qpid-fairshare-%1%") % i).str(); + if(settings.isSet(key)) { + fairshare->setLimit(i, getIntegerSetting(settings, key)); + } + } + + if (fairshare->isNull()) { + result = std::auto_ptr<Messages>(new PriorityQueue(levels)); + } else { + result = fairshare; + } + } + return result; +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Fairshare.h b/qpid/cpp/src/qpid/broker/Fairshare.h new file mode 100644 index 0000000000..6c4b87f857 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Fairshare.h @@ -0,0 +1,61 @@ +#ifndef QPID_BROKER_FAIRSHARE_H +#define QPID_BROKER_FAIRSHARE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PriorityQueue.h" + +namespace qpid { +namespace framing { +class FieldTable; +} +namespace broker { + +/** + * Modifies a basic prioirty queue by limiting the number of messages + * from each priority level that are dispatched before allowing + * dispatch from the next level. + */ +class Fairshare : public PriorityQueue +{ + public: + Fairshare(size_t levels, uint limit); + bool getState(uint& priority, uint& count) const; + bool setState(uint priority, uint count); + void setLimit(size_t level, uint limit); + static std::auto_ptr<Messages> create(const qpid::framing::FieldTable& settings); + static bool getState(const Messages&, uint& priority, uint& count); + static bool setState(Messages&, uint priority, uint count); + private: + std::vector<uint> limits; + + uint priority; + uint count; + + uint currentLevel(); + uint nextLevel(); + bool isNull(); + bool limitReached(); + bool findFrontLevel(uint& p, PriorityLevels&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_FAIRSHARE_H*/ diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp new file mode 100644 index 0000000000..a811a86492 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/LegacyLVQ.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { + +LegacyLVQ::LegacyLVQ(const std::string& k, bool b, Broker* br) : MessageMap(k), noBrowse(b), broker(br) {} + +void LegacyLVQ::setNoBrowse(bool b) +{ + noBrowse = b; +} + +bool LegacyLVQ::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end() && i->second.payload == message.payload) { + message = i->second; + erase(i); + return true; + } else { + return false; + } +} + +bool LegacyLVQ::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (MessageMap::next(position, message)) { + if (!noBrowse) index.erase(getKey(message)); + return true; + } else { + return false; + } +} + +bool LegacyLVQ::push(const QueuedMessage& added, QueuedMessage& removed) +{ + //Hack to disable LVQ behaviour on cluster update: + if (broker && broker->isClusterUpdatee()) { + messages[added.position] = added; + return false; + } else { + return MessageMap::push(added, removed); + } +} + +const QueuedMessage& LegacyLVQ::replace(const QueuedMessage& original, const QueuedMessage& update) +{ + //add the new message into the original position of the replaced message + Ordering::iterator i = messages.find(original.position); + i->second = update; + i->second.position = original.position; + return i->second; +} + +void LegacyLVQ::removeIf(Predicate p) +{ + //Note: This method is currently called periodically on the timer + //thread to expire messages. In a clustered broker this means that + //the purging does not occur on the cluster event dispatch thread + //and consequently that is not totally ordered w.r.t other events + //(including publication of messages). The cluster does ensure + //that the actual expiration of messages (as distinct from the + //removing of those expired messages from the queue) *is* + //consistently ordered w.r.t. cluster events. This means that + //delivery of messages is in general consistent across the cluster + //inspite of any non-determinism in the triggering of a + //purge. However at present purging a last value queue (of the + //legacy sort) could potentially cause inconsistencies in the + //cluster (as the order w.r.t publications can affect the order in + //which messages appear in the queue). Consequently periodic + //purging of an LVQ is not enabled if the broker is clustered + //(expired messages will be removed on delivery and consolidated + //by key as part of normal LVQ operation). + + //TODO: Is there a neater way to check whether broker is + //clustered? Here we assume that if the clustered timer is the + //same as the regular timer, we are not clustered: + if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer())) + MessageMap::removeIf(p); +} + +std::auto_ptr<Messages> LegacyLVQ::updateOrReplace(std::auto_ptr<Messages> current, + const std::string& key, bool noBrowse, Broker* broker) +{ + LegacyLVQ* lvq = dynamic_cast<LegacyLVQ*>(current.get()); + if (lvq) { + lvq->setNoBrowse(noBrowse); + return current; + } else { + return std::auto_ptr<Messages>(new LegacyLVQ(key, noBrowse, broker)); + } +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h new file mode 100644 index 0000000000..dd0fd7aaec --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.h @@ -0,0 +1,59 @@ +#ifndef QPID_BROKER_LEGACYLVQ_H +#define QPID_BROKER_LEGACYLVQ_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageMap.h" +#include <memory> + +namespace qpid { +namespace broker { +class Broker; + +/** + * This class encapsulates the behaviour of the old style LVQ where a + * message replacing another messages for the given key will use the + * position in the queue of the previous message. This however causes + * problems for browsing. Either browsers stop the coalescing of + * messages by key (default) or they may mis updates (if the no-browse + * option is specified). + */ +class LegacyLVQ : public MessageMap +{ + public: + LegacyLVQ(const std::string& key, bool noBrowse = false, Broker* broker = 0); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + void removeIf(Predicate); + void setNoBrowse(bool); + static std::auto_ptr<Messages> updateOrReplace(std::auto_ptr<Messages> current, + const std::string& key, bool noBrowse, + Broker* broker); + protected: + bool noBrowse; + Broker* broker; + + const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_LEGACYLVQ_H*/ diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index a16180f3ae..122c5b9c1a 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -400,22 +400,6 @@ bool Message::hasExpired() return expiryPolicy && expiryPolicy->hasExpired(*this); } -boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const -{ - sys::Mutex::ScopedLock l(lock); - Replacement::iterator i = replacement.find(qfor); - if (i != replacement.end()){ - return i->second; - } - return empty; -} - -void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor) -{ - sys::Mutex::ScopedLock l(lock); - replacement[qfor] = msg; -} - namespace { struct ScopedSet { sys::Monitor& lock; diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index e8a8a19d53..2d0de27823 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -153,8 +153,6 @@ public: void forcePersistent(); bool isForcedPersistent(); - boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; - void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); @@ -163,8 +161,6 @@ public: uint8_t getPriority() const; private: - typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; - MessageAdapter& getAdapter() const; void allDequeuesComplete(); @@ -183,7 +179,6 @@ public: static TransferAdapter TRANSFER; - mutable Replacement replacement; mutable boost::intrusive_ptr<Message> empty; sys::Monitor callbackLock; diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.cpp b/qpid/cpp/src/qpid/broker/MessageDeque.cpp new file mode 100644 index 0000000000..24b8f6f895 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageDeque.cpp @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { + +size_t MessageDeque::size() +{ + return messages.size(); +} + +bool MessageDeque::empty() +{ + return messages.empty(); +} + +void MessageDeque::reinsert(const QueuedMessage& message) +{ + messages.insert(lower_bound(messages.begin(), messages.end(), message), message); +} + +MessageDeque::Deque::iterator MessageDeque::seek(const framing::SequenceNumber& position) +{ + if (!messages.empty()) { + QueuedMessage comp; + comp.position = position; + unsigned long diff = position.getValue() - messages.front().position.getValue(); + long maxEnd = diff < messages.size()? diff : messages.size(); + return lower_bound(messages.begin(),messages.begin()+maxEnd,comp); + } else { + return messages.end(); + } +} + +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +{ + Deque::iterator i = seek(position); + if (i != messages.end() && i->position == position) { + message = *i; + if (remove) messages.erase(i); + return true; + } else { + return false; + } +} + +bool MessageDeque::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, true); +} + +bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, false); +} + +bool MessageDeque::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (messages.empty()) { + return false; + } else if (position < front().position) { + message = front(); + return true; + } else { + Deque::iterator i = seek(position+1); + if (i != messages.end()) { + message = *i; + return true; + } else { + return false; + } + } +} + +QueuedMessage& MessageDeque::front() +{ + return messages.front(); +} + +void MessageDeque::pop() +{ + if (!messages.empty()) { + messages.pop_front(); + } +} + +bool MessageDeque::pop(QueuedMessage& out) +{ + if (messages.empty()) { + return false; + } else { + out = front(); + messages.pop_front(); + return true; + } +} + +bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) +{ + messages.push_back(added); + return false;//adding a message never causes one to be removed for deque +} + +void MessageDeque::foreach(Functor f) +{ + std::for_each(messages.begin(), messages.end(), f); +} + +void MessageDeque::removeIf(Predicate p) +{ + for (Deque::iterator i = messages.begin(); i != messages.end();) { + if (p(*i)) { + i = messages.erase(i); + } else { + ++i; + } + } +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageDeque.h b/qpid/cpp/src/qpid/broker/MessageDeque.h new file mode 100644 index 0000000000..0e1aef2986 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageDeque.h @@ -0,0 +1,62 @@ +#ifndef QPID_BROKER_MESSAGEDEQUE_H +#define QPID_BROKER_MESSAGEDEQUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/broker/QueuedMessage.h" +#include <deque> + +namespace qpid { +namespace broker { + +/** + * Provides the standard FIFO queue behaviour. + */ +class MessageDeque : public Messages +{ + public: + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + void removeIf(Predicate); + + private: + typedef std::deque<QueuedMessage> Deque; + Deque messages; + + Deque::iterator seek(const framing::SequenceNumber&); + bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEDEQUE_H*/ diff --git a/qpid/cpp/src/qpid/broker/MessageMap.cpp b/qpid/cpp/src/qpid/broker/MessageMap.cpp new file mode 100644 index 0000000000..39e23df533 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageMap.cpp @@ -0,0 +1,166 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/MessageMap.h" +#include "qpid/broker/QueuedMessage.h" + +namespace qpid { +namespace broker { +namespace { +const std::string EMPTY; +} + +std::string MessageMap::getKey(const QueuedMessage& message) +{ + const framing::FieldTable* ft = message.payload->getApplicationHeaders(); + if (ft) return ft->getAsString(key); + else return EMPTY; +} + +size_t MessageMap::size() +{ + return messages.size(); +} + +bool MessageMap::empty() +{ + return messages.empty(); +} + +void MessageMap::reinsert(const QueuedMessage& message) +{ + std::string key = getKey(message); + Index::iterator i = index.find(key); + if (i == index.end()) { + index[key] = message; + messages[message.position] = message; + } //else message has already been replaced +} + +bool MessageMap::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end()) { + message = i->second; + erase(i); + return true; + } else { + return false; + } +} + +bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + Ordering::iterator i = messages.find(position); + if (i != messages.end()) { + message = i->second; + return true; + } else { + return false; + } +} + +bool MessageMap::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + if (!messages.empty() && position < front().position) { + message = front(); + return true; + } else { + Ordering::iterator i = messages.lower_bound(position+1); + if (i != messages.end()) { + message = i->second; + return true; + } else { + return false; + } + } +} + +QueuedMessage& MessageMap::front() +{ + return messages.begin()->second; +} + +void MessageMap::pop() +{ + QueuedMessage dummy; + pop(dummy); +} + +bool MessageMap::pop(QueuedMessage& out) +{ + Ordering::iterator i = messages.begin(); + if (i != messages.end()) { + out = i->second; + erase(i); + return true; + } else { + return false; + } +} + +const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) +{ + messages.erase(original.position); + messages[update.position] = update; + return update; +} + +bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) +{ + std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added)); + if (result.second) { + //there was no previous message for this key; nothing needs to + //be removed, just add the message into its correct position + messages[added.position] = added; + return false; + } else { + //there is already a message with that key which needs to be replaced + removed = result.first->second; + result.first->second = replace(result.first->second, added); + return true; + } +} + +void MessageMap::foreach(Functor f) +{ + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + f(i->second); + } +} + +void MessageMap::removeIf(Predicate p) +{ + for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) { + if (p(i->second)) { + erase(i); + } + } +} + +void MessageMap::erase(Ordering::iterator i) +{ + index.erase(getKey(i->second)); + messages.erase(i); +} + +MessageMap::MessageMap(const std::string& k) : key(k) {} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/MessageMap.h b/qpid/cpp/src/qpid/broker/MessageMap.h new file mode 100644 index 0000000000..1128a1d54a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/MessageMap.h @@ -0,0 +1,72 @@ +#ifndef QPID_BROKER_MESSAGEMAP_H +#define QPID_BROKER_MESSAGEMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/framing/SequenceNumber.h" +#include <map> +#include <string> + +namespace qpid { +namespace broker { + +/** + * Provides a last value queue behaviour, whereby a messages replace + * any previous message with the same value for a defined property + * (i.e. the key). + */ +class MessageMap : public Messages +{ + public: + MessageMap(const std::string& key); + virtual ~MessageMap() {} + + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + virtual bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + virtual bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + virtual bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + virtual void removeIf(Predicate); + + protected: + typedef std::map<std::string, QueuedMessage> Index; + typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering; + const std::string key; + Index index; + Ordering messages; + + std::string getKey(const QueuedMessage&); + virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&); + void erase(Ordering::iterator); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEMAP_H*/ diff --git a/qpid/cpp/src/qpid/broker/Messages.h b/qpid/cpp/src/qpid/broker/Messages.h new file mode 100644 index 0000000000..0d75417640 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Messages.h @@ -0,0 +1,117 @@ +#ifndef QPID_BROKER_MESSAGES_H +#define QPID_BROKER_MESSAGES_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/function.hpp> + +namespace qpid { +namespace framing { +class SequenceNumber; +} +namespace broker { +struct QueuedMessage; + +/** + * This interface abstracts out the access to the messages held for + * delivery by a Queue instance. + */ +class Messages +{ + public: + typedef boost::function1<void, QueuedMessage&> Functor; + typedef boost::function1<bool, QueuedMessage&> Predicate; + + virtual ~Messages() {} + /** + * @return the number of messages available for delivery. + */ + virtual size_t size() = 0; + /** + * @return true if there are no messages for delivery, false otherwise + */ + virtual bool empty() = 0; + + /** + * Re-inserts a message back into its original position - used + * when requeing released messages. + */ + virtual void reinsert(const QueuedMessage&) = 0; + /** + * Remove the message at the specified position, returning true if + * found, false otherwise. The removed message is passed back via + * the second parameter. + */ + virtual bool remove(const framing::SequenceNumber&, QueuedMessage&) = 0; + /** + * Find the message at the specified position, returning true if + * found, false otherwise. The matched message is passed back via + * the second parameter. + */ + virtual bool find(const framing::SequenceNumber&, QueuedMessage&) = 0; + /** + * Return the next message to be given to a browsing subscrption + * that has reached the specified poisition. The next messages is + * passed back via the second parameter. + * + * @return true if there is another message, false otherwise. + */ + virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; + + /** + * Note: Caller is responsible for ensuring that there is a front + * (e.g. empty() returns false) + * + * @return the next message to be delivered + */ + virtual QueuedMessage& front() = 0; + /** + * Removes the front message + */ + virtual void pop() = 0; + /** + * @return true if there is a mesage to be delivered - in which + * case that message will be returned via the parameter and + * removed - otherwise false. + */ + virtual bool pop(QueuedMessage&) = 0; + /** + * Pushes a message to the back of the 'queue'. For some types of + * queue this may cause another message to be removed; if that is + * the case the method will return true and the removed message + * will be passed out via the second parameter. + */ + virtual bool push(const QueuedMessage& added, QueuedMessage& removed) = 0; + + /** + * Apply the functor to each message held + */ + virtual void foreach(Functor) = 0; + /** + * Remove every message held that for which the specified + * predicate returns true + */ + virtual void removeIf(Predicate) = 0; + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGES_H*/ diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.cpp b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp new file mode 100644 index 0000000000..e07e73d323 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.cpp @@ -0,0 +1,212 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PriorityQueue.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/framing/reply_exceptions.h" +#include <cmath> + +namespace qpid { +namespace broker { + +PriorityQueue::PriorityQueue(int l) : + levels(l), + messages(levels, Deque()), + frontLevel(0), haveFront(false), cached(false) {} + +size_t PriorityQueue::size() +{ + size_t total(0); + for (int i = 0; i < levels; ++i) { + total += messages[i].size(); + } + return total; +} + +bool PriorityQueue::empty() +{ + for (int i = 0; i < levels; ++i) { + if (!messages[i].empty()) return false; + } + return true; +} + +void PriorityQueue::reinsert(const QueuedMessage& message) +{ + uint p = getPriorityLevel(message); + messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message); + clearCache(); +} + +bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove) +{ + QueuedMessage comp; + comp.position = position; + for (int i = 0; i < levels; ++i) { + if (!messages[i].empty()) { + unsigned long diff = position.getValue() - messages[i].front().position.getValue(); + long maxEnd = diff < messages[i].size() ? diff : messages[i].size(); + Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp); + if (l != messages[i].end() && l->position == position) { + message = *l; + if (remove) { + messages[i].erase(l); + clearCache(); + } + return true; + } + } + } + return false; +} + +bool PriorityQueue::remove(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, true); +} + +bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message) +{ + return find(position, message, false); +} + +bool PriorityQueue::next(const framing::SequenceNumber& position, QueuedMessage& message) +{ + QueuedMessage match; + match.position = position+1; + Deque::iterator lowest; + bool found = false; + for (int i = 0; i < levels; ++i) { + Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match); + if (m != messages[i].end()) { + if (m->position == match.position) { + message = *m; + return true; + } else if (!found || m->position < lowest->position) { + lowest = m; + found = true; + } + } + } + if (found) { + message = *lowest; + } + return found; +} + +QueuedMessage& PriorityQueue::front() +{ + if (checkFront()) { + return messages[frontLevel].front(); + } else { + throw qpid::framing::InternalErrorException(QPID_MSG("No message available")); + } +} + +bool PriorityQueue::pop(QueuedMessage& message) +{ + if (checkFront()) { + message = messages[frontLevel].front(); + messages[frontLevel].pop_front(); + clearCache(); + return true; + } else { + return false; + } +} + +void PriorityQueue::pop() +{ + QueuedMessage dummy; + pop(dummy); +} + +bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) +{ + messages[getPriorityLevel(added)].push_back(added); + clearCache(); + return false;//adding a message never causes one to be removed for deque +} + +void PriorityQueue::foreach(Functor f) +{ + for (int i = 0; i < levels; ++i) { + std::for_each(messages[i].begin(), messages[i].end(), f); + } +} + +void PriorityQueue::removeIf(Predicate p) +{ + for (int priority = 0; priority < levels; ++priority) { + for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) { + if (p(*i)) { + i = messages[priority].erase(i); + clearCache(); + } else { + ++i; + } + } + } +} + +uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const +{ + uint priority = m.payload->getPriority(); + //Use AMQP 0-10 approach to mapping priorities to a fixed level + //(see rule priority-level-implementation) + const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0))); + if (priority <= firstLevel) return 0; + return std::min(priority - firstLevel, (uint)levels-1); +} + +void PriorityQueue::clearCache() +{ + cached = false; +} + +bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m) +{ + for (int p = levels-1; p >= 0; --p) { + if (!m[p].empty()) { + l = p; + return true; + } + } + return false; +} + +bool PriorityQueue::checkFront() +{ + if (!cached) { + haveFront = findFrontLevel(frontLevel, messages); + cached = true; + } + return haveFront; +} + +uint PriorityQueue::getPriority(const QueuedMessage& message) +{ + const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages())); + if (queue) return queue->getPriorityLevel(message); + else return 0; +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PriorityQueue.h b/qpid/cpp/src/qpid/broker/PriorityQueue.h new file mode 100644 index 0000000000..4bf9d26a9d --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PriorityQueue.h @@ -0,0 +1,78 @@ +#ifndef QPID_BROKER_PRIORITYQUEUE_H +#define QPID_BROKER_PRIORITYQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/sys/IntegerTypes.h" +#include <deque> +#include <vector> + +namespace qpid { +namespace broker { + +/** + * Basic priority queue with a configurable number of recognised + * priority levels. This is implemented as a separate deque per + * priority level. Browsing is FIFO not priority order. + */ +class PriorityQueue : public Messages +{ + public: + PriorityQueue(int levels); + virtual ~PriorityQueue() {} + size_t size(); + bool empty(); + + void reinsert(const QueuedMessage&); + bool remove(const framing::SequenceNumber&, QueuedMessage&); + bool find(const framing::SequenceNumber&, QueuedMessage&); + bool next(const framing::SequenceNumber&, QueuedMessage&); + + QueuedMessage& front(); + void pop(); + bool pop(QueuedMessage&); + bool push(const QueuedMessage& added, QueuedMessage& removed); + + void foreach(Functor); + void removeIf(Predicate); + static uint getPriority(const QueuedMessage&); + protected: + typedef std::deque<QueuedMessage> Deque; + typedef std::vector<Deque> PriorityLevels; + virtual bool findFrontLevel(uint& p, PriorityLevels&); + + const int levels; + private: + PriorityLevels messages; + uint frontLevel; + bool haveFront; + bool cached; + + bool find(const framing::SequenceNumber&, QueuedMessage&, bool remove); + uint getPriorityLevel(const QueuedMessage&) const; + void clearCache(); + bool checkFront(); +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PRIORITYQUEUE_H*/ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3de93ed74e..cfb32749a0 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -23,11 +23,16 @@ #include "qpid/broker/Queue.h" #include "qpid/broker/QueueEvents.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/LegacyLVQ.h" +#include "qpid/broker/MessageDeque.h" +#include "qpid/broker/MessageMap.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -67,11 +72,13 @@ const std::string qpidMaxCount("qpid.max_count"); const std::string qpidNoLocal("no-local"); const std::string qpidTraceIdentity("qpid.trace.id"); const std::string qpidTraceExclude("qpid.trace.exclude"); +const std::string qpidLastValueQueueKey("qpid.last_value_queue_key"); const std::string qpidLastValueQueue("qpid.last_value_queue"); const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); const std::string qpidPersistLastNode("qpid.persist_last_node"); const std::string qpidVQMatchProperty("qpid.LVQ_key"); const std::string qpidQueueEventGeneration("qpid.queue_event_generation"); +const std::string qpidAutoDeleteTimeout("qpid.auto_delete_timeout"); //following feature is not ready for general use as it doesn't handle //the case where a message is enqueued on more than one queue well enough: const std::string qpidInsertSequenceNumbers("qpid.insert_sequence_numbers"); @@ -93,19 +100,18 @@ Queue::Queue(const string& _name, bool _autodelete, consumerCount(0), exclusive(0), noLocal(false), - lastValueQueue(false), - lastValueQueueNoBrowse(false), persistLastNode(false), inLastNodeFailure(false), + messages(new MessageDeque()), persistenceId(0), policyExceeded(false), mgmtObject(0), eventMode(0), - eventMgr(0), insertSeqNo(0), broker(b), deleted(false), - barrier(*this) + barrier(*this), + autoDeleteTimeout(0) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -160,7 +166,6 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ } else { enqueue(0, msg); push(msg); - mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -179,7 +184,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ msg->addToSyncList(shared_from_this(), store); } msg->enqueueComplete(); // mark the message as enqueued - mgntEnqStats(msg); if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) { //content has not been loaded, need to ensure that lazy loading mode is set: @@ -194,7 +198,6 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); - mgntEnqStats(msg); if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); @@ -208,7 +211,7 @@ void Queue::requeue(const QueuedMessage& msg){ Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return; msg.payload->enqueueComplete(); // mark the message as enqueued - messages.insert(lower_bound(messages.begin(), messages.end(), msg), msg); + messages->reinsert(msg); listeners.populate(copy); // for persistLastNode - don't force a message twice to disk, but force it if no force before @@ -223,57 +226,23 @@ void Queue::requeue(const QueuedMessage& msg){ copy.notify(); } -void Queue::clearLVQIndex(const QueuedMessage& msg){ - assertClusterSafe(); - const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0; - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } -} - bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) { Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - - Messages::iterator i = findAt(position); - if (i != messages.end() ) { - message = *i; - if (lastValueQueue) { - clearLVQIndex(*i); - } - QPID_LOG(debug, - "Acquired message at " << i->position << " from " << name); - messages.erase(i); + if (messages->remove(position, message)) { + QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; - } - QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); - return false; + } else { + QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position"); + return false; + } } bool Queue::acquire(const QueuedMessage& msg) { - Mutex::ScopedLock locker(messageLock); - assertClusterSafe(); - - QPID_LOG(debug, "attempting to acquire " << msg.position); - Messages::iterator i = findAt(msg.position); - if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set - (!lastValueQueue || - (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0 - ) { - - clearLVQIndex(msg); - QPID_LOG(debug, - "Match found, acquire succeeded: " << - i->position << " == " << msg.position); - messages.erase(i); - return true; - } - - QPID_LOG(debug, "Acquire failed for " << msg.position); - return false; + QueuedMessage copy = msg; + return acquireMessageAt(msg.position, copy); } void Queue::notifyListener() @@ -282,7 +251,7 @@ void Queue::notifyListener() QueueListeners::NotificationSet set; { Mutex::ScopedLock locker(messageLock); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -311,12 +280,12 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages.empty()) { + if (messages->empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; } else { - QueuedMessage msg = getFront(); + QueuedMessage msg = messages->front(); if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); popAndDequeue(); @@ -326,7 +295,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - popMsg(msg); + pop(); return CONSUMED; } else { //message(s) are available but consumer hasn't got enough credit @@ -352,11 +321,6 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) //consumer wants the message c->position = msg.position; m = msg; - if (!lastValueQueueNoBrowse) clearLVQIndex(msg); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) m.payload = replacement; - } return true; } else { //browser hasn't got enough credit for the message @@ -378,7 +342,7 @@ void Queue::removeListener(Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); listeners.removeListener(c); - if (messages.size()) { + if (messages->size()) { listeners.populate(set); } } @@ -399,52 +363,20 @@ bool Queue::dispatch(Consumer::shared_ptr c) // Find the next message bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { Mutex::ScopedLock locker(messageLock); - if (!messages.empty() && messages.back().position > c->position) { - if (c->position < getFront().position) { - msg = getFront(); - return true; - } else { - Messages::iterator pos = findAt(c->position); - if (pos != messages.end() && pos+1 != messages.end()) { - msg = *(pos+1); - return true; - } - } + if (messages->next(c->position, msg)) { + return true; + } else { + listeners.addListener(c); + return false; } - listeners.addListener(c); - return false; } -Queue::Messages::iterator Queue::findAt(SequenceNumber pos) { - - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i!= messages.end() && i->position == pos) - return i; - } - return messages.end(); // no match found. -} - - QueuedMessage Queue::find(SequenceNumber pos) const { Mutex::ScopedLock locker(messageLock); - if(!messages.empty()){ - QueuedMessage compM; - compM.position = pos; - unsigned long diff = pos.getValue() - messages.front().position.getValue(); - long maxEnd = diff < messages.size()? diff : messages.size(); - - Messages::const_iterator i = lower_bound(messages.begin(),messages.begin()+maxEnd,compM); - if (i != messages.end()) - return *i; - } - return QueuedMessage(); + QueuedMessage msg; + messages->find(pos, msg); + return msg; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ @@ -464,6 +396,10 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ consumerCount++; if (mgmtObject != 0) mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } } void Queue::cancel(Consumer::shared_ptr c){ @@ -478,12 +414,18 @@ void Queue::cancel(Consumer::shared_ptr c){ QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); + messages->pop(msg); + return msg; +} - if(!messages.empty()){ - msg = getFront(); - popMsg(msg); +bool collect_if_expired(std::deque<QueuedMessage>& expired, QueuedMessage& message) +{ + if (message.payload->hasExpired()) { + expired.push_back(message); + return true; + } else { + return false; } - return msg; } void Queue::purgeExpired() @@ -492,37 +434,11 @@ void Queue::purgeExpired() //bother explicitly expiring if the rate of dequeues since last //attempt is less than one per second. - //Note: This method is currently called periodically on the timer - //thread. In a clustered broker this means that the purging does - //not occur on the cluster event dispatch thread and consequently - //that is not totally ordered w.r.t other events (including - //publication of messages). However the cluster does ensure that - //the actual expiration of messages (as distinct from the removing - //of those expired messages from the queue) *is* consistently - //ordered w.r.t. cluster events. This means that delivery of - //messages is in general consistent across the cluster inspite of - //any non-determinism in the triggering of a purge. However at - //present purging a last value queue could potentially cause - //inconsistencies in the cluster (as the order w.r.t publications - //can affect the order in which messages appear in the - //queue). Consequently periodic purging of an LVQ is not enabled - //(expired messages will be removed on delivery and consolidated - //by key as part of normal LVQ operation). - - if (dequeueTracker.sampleRatePerSecond() < 1 && !lastValueQueue) { - Messages expired; + if (dequeueTracker.sampleRatePerSecond() < 1) { + std::deque<QueuedMessage> expired; { Mutex::ScopedLock locker(messageLock); - for (Messages::iterator i = messages.begin(); i != messages.end();) { - //Re-introduce management of LVQ-specific state here - //if purging is renabled for that case (see note above) - if (i->payload->hasExpired()) { - expired.push_back(*i); - i = messages.erase(i); - } else { - ++i; - } - } + messages->removeIf(boost::bind(&collect_if_expired, expired, _1)); } for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } @@ -548,13 +464,13 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> uint32_t count = 0; // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages.empty()) { + while((!purge_request || purge_count--) && !messages->empty()) { if (dest.get()) { // // If there is a destination exchange, stage the messages onto a reroute queue // so they don't wind up getting purged more than once. // - DeliverableMessage msg(getFront().payload); + DeliverableMessage msg(messages->front().payload); rerouteQueue.push_back(msg); } popAndDequeue(); @@ -580,101 +496,53 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { uint32_t move_count = qty; // only comes into play if qty >0 uint32_t count = 0; // count how many were moved for returning - while((!qty || move_count--) && !messages.empty()) { - QueuedMessage qmsg = getFront(); + while((!qty || move_count--) && !messages->empty()) { + QueuedMessage qmsg = messages->front(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue - popMsg(qmsg); + pop(); dequeue(0, qmsg); count++; } return count; } -void Queue::popMsg(QueuedMessage& qmsg) +void Queue::pop() { assertClusterSafe(); - const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - lvq.erase(key); - } - messages.pop_front(); + messages->pop(); ++dequeueTracker; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ assertClusterSafe(); QueueListeners::NotificationSet copy; + QueuedMessage removed; + bool dequeueRequired = false; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); - LVQ::iterator i; - const framing::FieldTable* ft = msg->getApplicationHeaders(); - if (lastValueQueue && ft){ - string key = ft->getAsString(qpidVQMatchProperty); - - i = lvq.find(key); - if (i == lvq.end() || (broker && broker->isClusterUpdatee())) { - messages.push_back(qm); - listeners.populate(copy); - lvq[key] = msg; - }else { - boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this); - if (!old) old = i->second; - i->second->setReplacementMessage(msg,this); - if (isRecovery) { - //can't issue new requests for the store until - //recovery is complete - pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position)); - } else { - Mutex::ScopedUnlock u(messageLock); - dequeue(0, QueuedMessage(qm.queue, old, qm.position)); - } - } - }else { - messages.push_back(qm); - listeners.populate(copy); - } - if (eventMode) { - if (eventMgr) eventMgr->enqueued(qm); - else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName()); - } - if (policy.get()) { - policy->enqueued(qm); - } - if (flowLimit.get()) - flowLimit->enqueued(qm); + dequeueRequired = messages->push(qm, removed); + listeners.populate(copy); + enqueued(qm); } copy.notify(); -} - -QueuedMessage Queue::getFront() -{ - QueuedMessage msg = messages.front(); - if (lastValueQueue) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) msg.payload = replacement; + if (dequeueRequired) { + if (isRecovery) { + //can't issue new requests for the store until + //recovery is complete + pendingDequeues.push_back(removed); + } else { + dequeue(0, removed); + } } - return msg; } -QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) +void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); - if (replacement.get()) { - const framing::FieldTable* ft = replacement->getApplicationHeaders(); - if (ft) { - string key = ft->getAsString(qpidVQMatchProperty); - if (lvq.find(key) != lvq.end()){ - lvq[key] = replacement; - } - } - msg.payload = replacement; - } - return msg; + if (message.payload->isIngressComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -682,20 +550,14 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const { Mutex::ScopedLock locker(messageLock); uint32_t count = 0; - for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - //NOTE: don't need to use checkLvqReplace() here as it - //is only relevant for LVQ which does not support persistence - //so the enqueueComplete check has no effect - if ( i->payload->isIngressComplete() ) count ++; - } - + messages->foreach(boost::bind(&isEnqueueComplete, &count, _1)); return count; } uint32_t Queue::getMessageCount() const { Mutex::ScopedLock locker(messageLock); - return messages.size(); + return messages->size(); } uint32_t Queue::getConsumerCount() const @@ -707,7 +569,7 @@ uint32_t Queue::getConsumerCount() const bool Queue::canAutoDelete() const { Mutex::ScopedLock locker(consumerLock); - return autodelete && !consumerCount; + return autodelete && !consumerCount && !owner; } void Queue::clearLastNodeFailure() @@ -715,21 +577,22 @@ void Queue::clearLastNodeFailure() inLastNodeFailure = false; } +void Queue::forcePersistent(QueuedMessage& message) +{ + if(!message.payload->isStoredOnQueue(shared_from_this())) { + message.payload->forcePersistent(); + if (message.payload->isForcedPersistent() ){ + enqueue(0, message.payload); + } + } +} + void Queue::setLastNodeFailure() { if (persistLastNode){ Mutex::ScopedLock locker(messageLock); try { - for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { - if (lastValueQueue) checkLvqReplace(*i); - // don't force a message twice to disk. - if(!i->payload->isStoredOnQueue(shared_from_this())) { - i->payload->forcePersistent(); - if (i->payload->isForcedPersistent() ){ - enqueue(0, i->payload); - } - } - } + messages->foreach(boost::bind(&Queue::forcePersistent, this, _1)); } catch (const std::exception& e) { // Could not go into last node standing (for example journal not large enough) QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what()); @@ -746,7 +609,7 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg if (!u.acquired) return false; if (policy.get() && !suppressPolicyCheck) { - Messages dequeues; + std::deque<QueuedMessage> dequeues; { Mutex::ScopedLock locker(messageLock); policy->tryEnqueue(msg); @@ -833,8 +696,8 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) */ void Queue::popAndDequeue() { - QueuedMessage msg = getFront(); - popMsg(msg); + QueuedMessage msg = messages->front(); + pop(); dequeue(0, msg); } @@ -845,11 +708,16 @@ void Queue::popAndDequeue() void Queue::dequeued(const QueuedMessage& msg) { if (policy.get()) policy->dequeued(msg); + /** todo KAG make flowLimit an observer */ if (flowLimit.get()) flowLimit->dequeued(msg); mgntDeqStats(msg.payload); - if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) { - eventMgr->dequeued(msg); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->dequeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); + } } } @@ -863,16 +731,41 @@ void Queue::create(const FieldTable& _settings) configure(_settings); } + +int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key) +{ + qpid::framing::FieldTable::ValuePtr v = settings.get(key); + if (!v) { + return 0; + } else if (v->convertsTo<int>()) { + return v->get<int>(); + } else if (v->convertsTo<std::string>()){ + std::string s = v->get<std::string>(); + try { + return boost::lexical_cast<int>(s); + } catch(const boost::bad_lexical_cast&) { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s); + return 0; + } + } else { + QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v); + return 0; + } +} + void Queue::configure(const FieldTable& _settings, bool recovering) { eventMode = _settings.getAsInt(qpidQueueEventGeneration); + if (eventMode && broker) { + broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY); + } if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && - (!store || NullMessageStore::isNullStore(store) || (eventMode && eventMgr && !eventMgr->isSync()) )) { + (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) { if ( NullMessageStore::isNullStore(store)) { QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName()); - } else if (eventMgr && !eventMgr->isSync() ) { + } else if (broker && !(broker->getQueueEvents().isSync()) ) { QPID_LOG(warning, "Flow to disk not valid with async Queue Events:" << getName()); } FieldTable copy(_settings); @@ -881,17 +774,30 @@ void Queue::configure(const FieldTable& _settings, bool recovering) } else { setPolicy(QueuePolicy::createQueuePolicy(getName(), _settings)); } + if (broker && broker->getManagementAgent()) { + ThresholdAlerts::observe(*this, *(broker->getManagementAgent()), _settings); + } + //set this regardless of owner to allow use of no-local with exclusive consumers also noLocal = _settings.get(qpidNoLocal); QPID_LOG(debug, "Configured queue " << getName() << " with no-local=" << noLocal); - lastValueQueue= _settings.get(qpidLastValueQueue); - if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue for: " << getName()); - - lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); - if (lastValueQueueNoBrowse){ - QPID_LOG(debug, "Configured queue as Last Value Queue No Browse for: " << getName()); - lastValueQueue = lastValueQueueNoBrowse; + std::string lvqKey = _settings.getAsString(qpidLastValueQueueKey); + if (lvqKey.size()) { + QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); + messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + } else if (_settings.get(qpidLastValueQueueNoBrowse)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + } else if (_settings.get(qpidLastValueQueue)) { + QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); + messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + } else { + std::auto_ptr<Messages> m = Fairshare::create(_settings); + if (m.get()) { + messages = m; + QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + } } persistLastNode= _settings.get(qpidPersistLastNode); @@ -910,6 +816,10 @@ void Queue::configure(const FieldTable& _settings, bool recovering) flowLimit = QueueFlowLimit::createQueueFlowLimit(this, _settings); + autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout); + if (autoDeleteTimeout) + QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); if (flowLimit.get()) @@ -924,8 +834,8 @@ void Queue::destroy() { if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); - while(!messages.empty()){ - DeliverableMessage msg(getFront().payload); + while(!messages->empty()){ + DeliverableMessage msg(messages->front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); popAndDequeue(); @@ -939,6 +849,7 @@ void Queue::destroy() store->destroy(*this); store = 0;//ensure we make no more calls to the store for this queue } + if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); } void Queue::notifyDeleted() @@ -1043,15 +954,46 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() return alternateExchange; } -void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue) { if (broker.getQueues().destroyIf(queue->getName(), boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { + QPID_LOG(debug, "Auto-deleting " << queue->getName()); queue->unbind(broker.getExchanges(), queue); queue->destroy(); } } +struct AutoDeleteTask : qpid::sys::TimerTask +{ + Broker& broker; + Queue::shared_ptr queue; + + AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + + void fire() + { + //need to detect case where queue was used after the task was + //created, but then became unused again before the task fired; + //in this case ignore this request as there will have already + //been a later task added + tryAutoDeleteImpl(broker, queue); + } +}; + +void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) +{ + if (queue->autoDeleteTimeout && queue->canAutoDelete()) { + AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC)); + queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time)); + broker.getClusterTimer().add(queue->autoDeleteTask); + QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated"); + } else { + tryAutoDeleteImpl(broker, queue); + } +} + bool Queue::isExclusiveOwner(const OwnershipToken* const o) const { Mutex::ScopedLock locker(ownershipLock); @@ -1066,6 +1008,10 @@ void Queue::releaseExclusiveOwnership() bool Queue::setExclusiveOwner(const OwnershipToken* const o) { + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); + } Mutex::ScopedLock locker(ownershipLock); if (owner) { return false; @@ -1154,11 +1100,6 @@ SequenceNumber Queue::getPosition() { int Queue::getEventMode() { return eventMode; } -void Queue::setQueueEventManager(QueueEvents& mgr) -{ - eventMgr = &mgr; -} - void Queue::recoveryComplete(ExchangeRegistry& exchanges) { // set the alternate exchange @@ -1184,16 +1125,31 @@ void Queue::insertSequenceNumbers(const std::string& key) void Queue::enqueued(const QueuedMessage& m) { - if (m.payload) { - if (policy.get()) { - policy->recoverEnqueued(m.payload); - policy->enqueued(m); + for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { + try { + (*i)->enqueued(m); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); } - if (flowLimit.get()) - flowLimit->enqueued(m); - mgntEnqStats(m.payload); + } + if (policy.get()) { + policy->enqueued(m); + } + /** todo make flowlimit an observer */ + if (flowLimit.get()) + flowLimit->enqueued(m); + mgntEnqStats(m.payload); +} + +void Queue::updateEnqueued(const QueuedMessage& m) +{ + if (m.payload) { boost::intrusive_ptr<Message> payload = m.payload; enqueue ( 0, payload, true ); + if (policy.get()) { + policy->recoverEnqueued(payload); + } + enqueued(m); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1205,6 +1161,8 @@ bool Queue::isEnqueued(const QueuedMessage& msg) } QueueListeners& Queue::getListeners() { return listeners; } +Messages& Queue::getMessages() { return *messages; } +const Messages& Queue::getMessages() const { return *messages; } void Queue::checkNotDeleted() { @@ -1213,6 +1171,11 @@ void Queue::checkNotDeleted() } } +void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) +{ + observers.insert(observer); +} + void Queue::flush() { ScopedUse u(barrier); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 5af630f3c8..e8429128f7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -26,14 +26,17 @@ #include "qpid/broker/OwnershipToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" +#include "qpid/broker/Messages.h" #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" #include "qpid/framing/amqp_types.h" @@ -46,6 +49,7 @@ #include <vector> #include <memory> #include <deque> +#include <set> #include <algorithm> namespace qpid { @@ -86,10 +90,10 @@ class Queue : public boost::enable_shared_from_this<Queue>, ~ScopedUse() { if (acquired) barrier.release(); } }; - typedef std::deque<QueuedMessage> Messages; - typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ; + typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; + const std::string name; const bool autodelete; MessageStore* store; @@ -97,16 +101,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t consumerCount; OwnershipToken* exclusive; bool noLocal; - bool lastValueQueue; - bool lastValueQueueNoBrowse; bool persistLastNode; bool inLastNodeFailure; std::string traceId; std::vector<std::string> traceExclude; QueueListeners listeners; - Messages messages; - Messages pendingDequeues;//used to avoid dequeuing during recovery - LVQ lvq; + std::auto_ptr<Messages> messages; + std::deque<QueuedMessage> pendingDequeues;//used to avoid dequeuing during recovery mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Monitor messageLock; mutable qpid::sys::Mutex ownershipLock; @@ -122,12 +123,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, qmf::org::apache::qpid::broker::Queue* mgmtObject; RateTracker dequeueTracker; int eventMode; - QueueEvents* eventMgr; + Observers observers; bool insertSeqNo; std::string seqNoKey; Broker* broker; bool deleted; UsageBarrier barrier; + int autoDeleteTimeout; + boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -141,12 +144,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); + void enqueued(const QueuedMessage& msg); void dequeued(const QueuedMessage& msg); - void popMsg(QueuedMessage& qmsg); + void pop(); void popAndDequeue(); QueuedMessage getFront(); - QueuedMessage& checkLvqReplace(QueuedMessage& msg); - void clearLVQIndex(const QueuedMessage& msg); + void forcePersistent(QueuedMessage& msg); + int getEventMode(); inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) { @@ -171,7 +175,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, } } - Messages::iterator findAt(framing::SequenceNumber pos); void checkNotDeleted(); public: @@ -277,7 +280,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, * thus are still logically on the queue) - used in * clustered broker. */ - void enqueued(const QueuedMessage& msg); + void updateEnqueued(const QueuedMessage& msg); /** * Test whether the specified message (identified by its @@ -322,13 +325,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { sys::Mutex::ScopedLock l(messageLock); - if (lastValueQueue) { - for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { - f(checkLvqReplace(*i)); - } - } else { - std::for_each(messages.begin(), messages.end(), f); - } + messages->foreach(f); } /** Apply f to each QueueBinding on the queue */ @@ -344,8 +341,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, /** return current position sequence number for the next message on the queue. */ QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); - int getEventMode(); - void setQueueEventManager(QueueEvents&); + void addObserver(boost::shared_ptr<QueueObserver>); QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); /** * Notify queue that recovery has completed. @@ -354,6 +350,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, // For cluster update QueueListeners& getListeners(); + Messages& getMessages(); + const Messages& getMessages() const; /** * Reserve space in policy for an enqueued message that diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.cpp b/qpid/cpp/src/qpid/broker/QueueEvents.cpp index bba054b0b8..2c540ff1ad 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.cpp +++ b/qpid/cpp/src/qpid/broker/QueueEvents.cpp @@ -19,6 +19,8 @@ * */ #include "qpid/broker/QueueEvents.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" @@ -115,6 +117,29 @@ bool QueueEvents::isSync() return sync; } +class EventGenerator : public QueueObserver +{ + public: + EventGenerator(QueueEvents& mgr, bool enqOnly) : manager(mgr), enqueueOnly(enqOnly) {} + void enqueued(const QueuedMessage& m) + { + manager.enqueued(m); + } + void dequeued(const QueuedMessage& m) + { + if (!enqueueOnly) manager.dequeued(m); + } + private: + QueueEvents& manager; + const bool enqueueOnly; +}; + +void QueueEvents::observe(Queue& queue, bool enqueueOnly) +{ + boost::shared_ptr<QueueObserver> observer(new EventGenerator(*this, enqueueOnly)); + queue.addObserver(observer); +} + QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {} diff --git a/qpid/cpp/src/qpid/broker/QueueEvents.h b/qpid/cpp/src/qpid/broker/QueueEvents.h index c42752133e..fcddfe9092 100644 --- a/qpid/cpp/src/qpid/broker/QueueEvents.h +++ b/qpid/cpp/src/qpid/broker/QueueEvents.h @@ -63,6 +63,7 @@ class QueueEvents QPID_BROKER_EXTERN void unregisterListener(const std::string& id); void enable(); void disable(); + void observe(Queue&, bool enqueueOnly); //process all outstanding events QPID_BROKER_EXTERN void shutdown(); QPID_BROKER_EXTERN bool isSync(); diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h new file mode 100644 index 0000000000..a711213dee --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueObserver.h @@ -0,0 +1,42 @@ +#ifndef QPID_BROKER_QUEUEOBSERVER_H +#define QPID_BROKER_QUEUEOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace broker { + +class QueuedMessage; +/** + * Interface for notifying classes who want to act as 'observers' of a + * queue of particular events. + */ +class QueueObserver +{ + public: + virtual ~QueueObserver() {} + virtual void enqueued(const QueuedMessage&) = 0; + virtual void dequeued(const QueuedMessage&) = 0; + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index f311ea8321..4168221ad0 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/QueuePolicy.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/PriorityQueue.h" #include "qpid/Exception.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/reply_exceptions.h" @@ -213,7 +214,10 @@ RingQueuePolicy::RingQueuePolicy(const std::string& _name, bool before(const QueuedMessage& a, const QueuedMessage& b) { - return a.position < b.position; + int priorityA = PriorityQueue::getPriority(a); + int priorityB = PriorityQueue::getPriority(b); + if (priorityA == priorityB) return a.position < b.position; + else return priorityA < priorityB; } void RingQueuePolicy::enqueued(const QueuedMessage& m) diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 28b2d60cda..ea2531dae7 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -47,7 +47,6 @@ QueueRegistry::declare(const string& declareName, bool durable, Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent, broker)); queues[name] = queue; if (lastNode) queue->setLastNodeFailure(); - if (events) queue->setQueueEventManager(*events); return std::pair<Queue::shared_ptr, bool>(queue, true); } else { @@ -108,8 +107,3 @@ void QueueRegistry::updateQueueClusterState(bool _lastNode) } lastNode = _lastNode; } - -void QueueRegistry::setQueueEvents(QueueEvents* e) -{ - events = e; -} diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 66437f9665..57859fe639 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -96,8 +96,6 @@ class QueueRegistry { */ std::string generateName(); - void setQueueEvents(QueueEvents*); - /** * Set the store to use. May only be called once. */ diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 7106f85807..69b364ad7b 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -33,7 +33,7 @@ using namespace qpid::sys; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : amqp_0_10::SessionHandler(&c.getOutput(), ch), - connection(c), + connection(c), proxy(out), clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0) {} @@ -69,7 +69,7 @@ void SessionHandler::handleDetach() { if (session.get()) connection.getBroker().getSessionManager().detach(session); assert(!session.get()); - connection.closeChannel(channel.get()); + connection.closeChannel(channel.get()); } void SessionHandler::setState(const std::string& name, bool force) { @@ -78,7 +78,7 @@ void SessionHandler::setState(const std::string& name, bool force) { session = connection.broker.getSessionManager().attach(*this, id, force); } -void SessionHandler::detaching() +void SessionHandler::detaching() { assert(session.get()); session->disableOutput(); @@ -98,7 +98,10 @@ void SessionHandler::attachAs(const std::string& name) { SessionId id(connection.getUserId(), name); SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig(); - session.reset(new SessionState(connection.getBroker(), *this, id, config)); + // Delay creating management object till attached(). In a cluster, + // only the active link broker calls attachAs but all brokers + // receive the subsequent attached() call. + session.reset(new SessionState(connection.getBroker(), *this, id, config, true)); sendAttach(false); } @@ -109,6 +112,7 @@ void SessionHandler::attachAs(const std::string& name) void SessionHandler::attached(const std::string& name) { if (session.get()) { + session->addManagementObject(); // Delayed from attachAs() amqp_0_10::SessionHandler::attached(name); } else { SessionId id(connection.getUserId(), name); @@ -117,5 +121,5 @@ void SessionHandler::attached(const std::string& name) markReadyToSend(); } } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index d572e37d00..2e69102537 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -53,7 +53,8 @@ using qpid::sys::AbsTime; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( - Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + Broker& b, SessionHandler& h, const SessionId& id, + const SessionState::Configuration& config, bool delayManagement) : qpid::SessionState(id, config), broker(b), handler(&h), semanticState(*this, *this), @@ -71,6 +72,12 @@ SessionState::SessionState( QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support"); } } + if (!delayManagement) addManagementObject(); + attach(h); +} + +void SessionState::addManagementObject() { + if (GetManagementObject()) return; // Already added. Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); @@ -80,11 +87,11 @@ SessionState::SessionState( mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); - if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate); + if (rateFlowcontrol) + mgmtObject->set_maxClientRate(rateFlowcontrol->getRate()); agent->addObject(mgmtObject); } } - attach(h); } SessionState::~SessionState() { diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index f4c10295b1..568e8593fa 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -73,7 +73,8 @@ class SessionState : public qpid::SessionState, public framing::FrameHandler::InOutHandler { public: - SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&); + SessionState(Broker&, SessionHandler&, const SessionId&, + const SessionState::Configuration&, bool delayManagement=false); ~SessionState(); bool isAttached() const { return handler; } @@ -127,8 +128,11 @@ class SessionState : public qpid::SessionState, // the SessionState of a received Execution.Sync command. void addPendingExecutionSync(); - private: + // Used to delay creation of management object for sessions + // belonging to inter-broker bridges + void addManagementObject(); + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp new file mode 100644 index 0000000000..4f35884af8 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp @@ -0,0 +1,139 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/ThresholdAlerts.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qmf/org/apache/qpid/broker/EventQueueThresholdExceeded.h" + +namespace qpid { +namespace broker { +ThresholdAlerts::ThresholdAlerts(const std::string& n, + qpid::management::ManagementAgent& a, + const uint32_t ct, + const uint64_t st, + const long repeat) + : name(n), agent(a), countThreshold(ct), sizeThreshold(st), + repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0), + count(0), size(0), lastAlert(qpid::sys::EPOCH) {} + +void ThresholdAlerts::enqueued(const QueuedMessage& m) +{ + size += m.payload->contentSize(); + ++count; + if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) { + if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH) + || qpid::sys::Duration(lastAlert, qpid::sys::now()) > repeatInterval) { + agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size)); + lastAlert = qpid::sys::now(); + } + } +} + +void ThresholdAlerts::dequeued(const QueuedMessage& m) +{ + size -= m.payload->contentSize(); + --count; + if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) { + lastAlert = qpid::sys::EPOCH; + } +} + + + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const uint64_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval) +{ + if (countThreshold || sizeThreshold) { + boost::shared_ptr<QueueObserver> observer( + new ThresholdAlerts(queue.getName(), agent, countThreshold, sizeThreshold, repeatInterval) + ); + queue.addObserver(observer); + } +} + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::framing::FieldTable& settings) + +{ + qpid::types::Variant::Map map; + qpid::amqp_0_10::translate(settings, map); + observe(queue, agent, map); +} + +template <class T> +class Option +{ + public: + Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); } + void addAlias(const std::string& name) { names.push_back(name); } + T get(const qpid::types::Variant::Map& settings) const + { + T value(defaultValue); + for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) { + if (get(settings, *i, value)) break; + } + return value; + } + private: + std::vector<std::string> names; + T defaultValue; + + bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const + { + qpid::types::Variant::Map::const_iterator i = settings.find(name); + if (i != settings.end()) { + try { + value = (T) i->second; + } catch (const qpid::types::InvalidConversion&) { + QPID_LOG(warning, "Bad value for" << name << ": " << i->second); + } + return true; + } else { + return false; + } + } +}; + +void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::types::Variant::Map& settings) + +{ + //Note: aliases are keys defined by java broker + Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60); + repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap"); + + //If no explicit threshold settings were given use 80% of any + //limit from the policy. + const QueuePolicy* policy = queue.getPolicy(); + Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy ? policy->getMaxCount()*0.8 : 0)); + countThreshold.addAlias("x-qpid-maximum-message-count"); + Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy ? policy->getMaxSize()*0.8 : 0)); + sizeThreshold.addAlias("x-qpid-maximum-message-size"); + + observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings)); +} + +}} diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.h b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h new file mode 100644 index 0000000000..e1f59252c4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.h @@ -0,0 +1,73 @@ +#ifndef QPID_BROKER_THRESHOLDALERTS_H +#define QPID_BROKER_THRESHOLDALERTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueObserver.h" +#include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" +#include <string> + +namespace qpid { +namespace framing { +class FieldTable; +} +namespace management { +class ManagementAgent; +} +namespace broker { + +class Queue; +/** + * Class to manage generation of QMF alerts when particular thresholds + * are breached on a queue. + */ +class ThresholdAlerts : public QueueObserver +{ + public: + ThresholdAlerts(const std::string& name, + qpid::management::ManagementAgent& agent, + const uint32_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + void enqueued(const QueuedMessage&); + void dequeued(const QueuedMessage&); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const uint64_t countThreshold, + const uint64_t sizeThreshold, + const long repeatInterval); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::framing::FieldTable& settings); + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, + const qpid::types::Variant::Map& settings); + private: + const std::string name; + qpid::management::ManagementAgent& agent; + const uint32_t countThreshold; + const uint64_t sizeThreshold; + const qpid::sys::Duration repeatInterval; + uint64_t count; + uint64_t size; + qpid::sys::AbsTime lastAlert; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_THRESHOLDALERTS_H*/ diff --git a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 6acd0a3ced..030b804143 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messaging::Message&) window = capacity; } } - -qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!get(result, timeout)) throw NoMessageAvailable(); return result; } - -qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!fetch(result, timeout)) throw NoMessageAvailable(); @@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Dur return f.result; } -void ReceiverImpl::close() -{ +void ReceiverImpl::close() +{ execute<Close>(); } @@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled() return parent->getUnsettledAcks(destination); } -ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, - const qpid::messaging::Address& a) : +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a) : - parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging: } } -void ReceiverImpl::closeImpl() -{ +void ReceiverImpl::closeImpl() +{ sys::Mutex::ScopedLock l(lock); if (state != CANCELLED) { state = CANCELLED; + sync(session).messageStop(destination); + parent->releasePending(destination); source->cancel(session, destination); parent->receiverCancelled(destination); } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 6d98527627..75a71997fd 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Command { qpid::messaging::Receiver result; const qpid::messaging::Address& address; - + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createReceiverImpl(address); } @@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Command { qpid::messaging::Sender result; const qpid::messaging::Address& address; - + CreateSender(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createSenderImpl(address); } @@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std::string& name) const throw KeyError(name); } else { return i->second; - } + } } Receiver SessionImpl::getReceiver(const std::string& name) const @@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT } } -bool SessionImpl::accept(ReceiverImpl* receiver, - qpid::messaging::Message* message, +bool SessionImpl::accept(ReceiverImpl* receiver, + qpid::messaging::Message* message, IncomingMessages::MessageTransfer& transfer) { if (receiver->getName() == transfer.getDestination()) { @@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } } @@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command { const std::string* destination; uint32_t result; - + Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getReceivableImpl(destination); } }; @@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Command { const std::string* destination; uint32_t result; - + UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getUnsettledAcksImpl(destination); } }; @@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl() getImplPtr<Receiver, ReceiverImpl>(i->second)->stop(); } //ensure that stop has been processed and all previously sent - //messages are available for release: + //messages are available for release: session.sync(); incoming.releaseAll(); - session.txRollback(); + session.txRollback(); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr<Receiver, ReceiverImpl>(i->second)->start(); @@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(const std::string& name) incoming.releasePending(name); } +void SessionImpl::releasePending(const std::string& name) +{ + ScopedLock l(lock); + incoming.releasePending(name); +} + void SessionImpl::senderCancelled(const std::string& name) { ScopedLock l(lock); @@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->open(); + connection->open(); } bool SessionImpl::backoff() { - return connection->backoff(); + return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 3dd5cd0189..2a2aa47df6 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -79,8 +79,9 @@ class SessionImpl : public qpid::messaging::SessionImpl void checkError(); bool hasError(); - bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + void releasePending(const std::string& destination); void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); @@ -110,7 +111,7 @@ class SessionImpl : public qpid::messaging::SessionImpl } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } @@ -206,11 +207,11 @@ class SessionImpl : public qpid::messaging::SessionImpl struct Acknowledge1 : Command { qpid::messaging::Message& message; - + Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} void operator()() { impl.acknowledgeImpl(message); } }; - + struct CreateSender; struct CreateReceiver; struct UnsettledAcks; @@ -222,12 +223,12 @@ class SessionImpl : public qpid::messaging::SessionImpl F f(*this); return execute(f); } - + template <class F> void retry() { while (!execute<F>()) {} } - + template <class F, class P> bool execute1(P p) { F f(*this, p); diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp index baeaafb478..f6e1c7a849 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -5,7 +5,7 @@ * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance + * "License "); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -33,6 +33,25 @@ using std::max; using sys::Timer; using sys::TimerTask; +// +// Note on use of Broker::getTimer() rather than getClusterTime in broker code. +// The following uses of getTimer() are cluster safe: +// +// LinkRegistry: maintenance visits in timer can call Bridge::create/cancel +// but these don't modify any management state. +// +// broker::Connection: +// - Heartbeats use ClusterOrderOutput to ensure consistency +// - timeout: aborts connection in timer, cluster does an orderly connection close. +// +// SessionState: scheduledCredit - uses ClusterOrderProxy +// Broker::queueCleaner: cluster implements ExpiryPolicy for consistent expiry. +// +// Broker::dtxManager: dtx disabled with cluster. +// +// requestIOProcessing: called in doOutput. +// + ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) { // Allow more generous overrun threshold with cluster as we diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index c7689577a7..e9b718e6de 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -32,6 +32,7 @@ #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Queue.h" @@ -528,7 +529,7 @@ void Connection::deliveryRecord(const string& qname, m = getUpdateMessage(); m.queue = queue.get(); m.position = position; - if (enqueued) queue->enqueued(m); //inform queue of the message + if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue m = queue->find(position); } @@ -548,6 +549,13 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi findQueue(qname)->setPosition(position); } +void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) +{ + if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { + QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); + } +} + void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index d90cdd898b..7ee85bf1aa 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -152,6 +152,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); + void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); void expiryId(uint64_t); void txStart(); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 4f6488a28a..8f751add9b 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -32,6 +32,7 @@ #include "qpid/client/ConnectionImpl.h" #include "qpid/client/Future.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Fairshare.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" @@ -352,6 +353,10 @@ void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr< q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1)); ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition()); + uint priority, count; + if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { + ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); + } } void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index c9d53c028b..cf065e1ba9 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -62,7 +62,7 @@ endif (MSVC) # Like this to work with cmake 2.4 on Unix set (qpid_test_boost_libs - ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY}) + ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} ${Boost_SYSTEM_LIBRARY}) # Macro to make it easier to remember where the tests are built macro(remember_location testname) diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index fc1632b4e1..991ec847bf 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public MessagingFixture ~QueueCreatePolicyFixture() { - admin.deleteQueue(address.getName()); + admin.deleteQueue(address.getName()); } }; @@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : public MessagingFixture ~ExchangeCreatePolicyFixture() { - admin.deleteExchange(address.getName()); + admin.deleteExchange(address.getName()); } }; @@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueue) s1.close(); Receiver r1 = fix.session.createReceiver(a1); r1.close(); - + std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s2 = fix.session.createSender(a2); s2.close(); @@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerification) { MessagingFixture fix; fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}"); - BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); + BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); } QPID_AUTO_TEST_CASE(testReceiveSpecialProperties) @@ -775,19 +775,48 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscriber) QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser) { MessagingFixture fix; - + std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }"; std::string browseAddress = "exclusive-queue; { mode: browse }"; Receiver receiver = fix.session.createReceiver(address); fix.session.sync(); - Connection c2 = fix.newConnection(); + Connection c2 = fix.newConnection(); c2.open(); Session s2 = c2.createSession(); - + BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress)); - c2.close(); + c2.close(); +} + + +QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages) +{ + MessagingFixture fix; + const uint capacity = 5; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}"); + Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}"); + + receiver1.setCapacity(capacity); + receiver2.setCapacity(capacity*2); + + Message out("test-message"); + for (uint i = 0; i < capacity*2; ++i) { + sender.send(out); + } + + receiver1.close(); + + // Make sure all pending messages were sent to the alternate + // exchange when the queue was deleted. + Message in; + for (uint i = 0; i < capacity*2; ++i) { + in = receiver2.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + } } QPID_AUTO_TEST_CASE(testAuthenticatedUsername) @@ -828,7 +857,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) messages.push_back(msg); } const uint batch(10); //acknowledge first 10 messages only - for (uint i = 0; i < batch; ++i) { + for (uint i = 0; i < batch; ++i) { other.acknowledge(messages[i]); } messages.clear(); @@ -836,7 +865,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) other.close(); other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < (count-batch); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); @@ -847,7 +876,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) //check unacknowledged messages are still enqueued other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < ((count-batch)/2); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str()); diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index c9e6d79ee7..7d17dd7bde 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -94,7 +94,7 @@ cluster_test_SOURCES = \ cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework -qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail +qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST) endif diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 903a20ec28..f2ccd0ba84 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -1191,5 +1191,41 @@ QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) { BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u); } +QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) { + ClusterFixture::Args args; + prepareArgs(args, durableFlag); + ClusterFixture cluster(1, args, -1); + Client c0(cluster[0], "c0"); + + FieldTable arguments; + arguments.setInt("x-qpid-priorities", 10); + arguments.setInt("x-qpid-fairshare", 5); + c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments); + + //send messages of different priorities + for (int i = 0; i < 20; i++) { + Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag); + msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5); + c0.session.messageTransfer(arg::content=msg); + } + + //pull off a couple of the messages (first four should be the top priority messages + for (int i = 0; i < 4; i++) { + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData()); + } + + // Add another member + cluster.add(); + Client c1(cluster[1], "c1"); + + //pull off some more messages + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData()); + BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData()); + + //check queue has same content on both nodes + BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index e865a49813..1f77226b4d 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -65,6 +65,8 @@ op.add_option("--connection-options", type="str", help="Connection options for senders & receivers") op.add_option("--flow-control", default=0, type="int", metavar="N", help="Flow control each sender to limit queue depth to 2*N. 0 means no flow control.") +op.add_option("--durable", default=False, action="store_true", + help="Use durable queues and messages") single_quote_re = re.compile("'") def posix_quote(string): @@ -76,7 +78,9 @@ def ssh_command(host, command): return ["ssh", host] + [posix_quote(arg) for arg in command] def start_receive(queue, index, opts, ready_queue, broker, host): - address="%s;{%s}"%(queue,",".join(["create:always"]+opts.receive_option)) + address_opts=["create:receiver"] + opts.receive_option + if opts.durable: address_opts += ["node:{durable:true}"] + address="%s;{%s}"%(queue,",".join(address_opts)) msg_total=opts.senders*opts.messages messages = msg_total/opts.receivers; if (index < msg_total%opts.receivers): messages += 1 @@ -111,7 +115,8 @@ def start_send(queue, opts, broker, host): "--report-header=no", "--timestamp=%s"%(opts.timestamp and "yes" or "no"), "--sequence=no", - "--flow-control", str(opts.flow_control) + "--flow-control", str(opts.flow_control), + "--durable", str(opts.durable) ] command += opts.send_arg if opts.connection_options: @@ -140,12 +145,12 @@ def print_header(timestamp): def parse(parser, lines): # Parse sender/receiver output for l in lines: fn_val = zip(parser, l) - + return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] def parse_senders(senders): return parse([int],[first_line(p) for p in senders]) - + def parse_receivers(receivers): return parse([int,float,float,float],[first_line(p) for p in receivers if p]) @@ -168,7 +173,7 @@ def print_summary(send_stats, recv_stats): l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) print summary - + class ReadyReceiver: """A receiver for ready messages""" @@ -177,7 +182,7 @@ class ReadyReceiver: self.connection = qpid.messaging.Connection(broker) self.connection.open() self.receiver = self.connection.session().receiver( - "%s;{create:always,delete:always}"%(queue)) + "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) self.receiver.session.sync() self.timeout=2 diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp index 28e229ca27..012d544a2e 100644 --- a/qpid/cpp/src/tests/qpid-receive.cpp +++ b/qpid/cpp/src/tests/qpid-receive.cpp @@ -206,6 +206,7 @@ int main(int argc, char ** argv) if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; + if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl; if (msg.getDurable()) std::cout << "Durable: true" << std::endl; if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl; std::cout << "Properties: " << msg.getProperties() << std::endl; diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index c71cb83f9a..6a7e7838ce 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -56,6 +56,7 @@ struct Options : public qpid::Options uint sendEos; bool durable; uint ttl; + uint priority; std::string userid; std::string correlationid; string_vector properties; @@ -84,6 +85,7 @@ struct Options : public qpid::Options sendEos(0), durable(false), ttl(0), + priority(0), contentString(), contentSize(0), contentStdin(false), @@ -110,6 +112,7 @@ struct Options : public qpid::Options ("send-eos", qpid::optValue(sendEos, "N"), "Send N EOS messages to mark end of input") ("durable", qpid::optValue(durable, "yes|no"), "Mark messages as durable.") ("ttl", qpid::optValue(ttl, "msecs"), "Time-to-live for messages, in milliseconds") + ("priority", qpid::optValue(priority, "PRIORITY"), "Priority for messages (higher value implies higher priority)") ("property,P", qpid::optValue(properties, "NAME=VALUE"), "specify message property") ("correlation-id", qpid::optValue(correlationid, "ID"), "correlation-id for message") ("user-id", qpid::optValue(userid, "USERID"), "userid for message") @@ -266,7 +269,14 @@ int main(int argc, char ** argv) if (opts.ttl) { msg.setTtl(Duration(opts.ttl)); } - if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto)); + if (opts.priority) { + msg.setPriority(opts.priority); + } + if (!opts.replyto.empty()) { + if (opts.flowControl) + throw Exception("Can't use reply-to and flow-control together"); + msg.setReplyTo(Address(opts.replyto)); + } if (!opts.userid.empty()) msg.setUserId(opts.userid); if (!opts.correlationid.empty()) msg.setCorrelationId(opts.correlationid); opts.setProperties(msg); @@ -305,13 +315,17 @@ int main(int argc, char ** argv) if (opts.timestamp) msg.getProperties()[TS] = int64_t( qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - if (opts.flowControl && ((sent % opts.flowControl) == 0)) { - msg.setReplyTo(flowControlAddress); - ++flowSent; + if (opts.flowControl) { + if ((sent % opts.flowControl) == 0) { + msg.setReplyTo(flowControlAddress); + ++flowSent; + } + else + msg.setReplyTo(Address()); // Clear the reply address. } - sender.send(msg); reporter.message(msg); + if (opts.tx && (sent % opts.tx == 0)) { if (opts.rollbackFrequency && (++txCount % opts.rollbackFrequency == 0)) @@ -331,7 +345,6 @@ int main(int argc, char ** argv) int64_t delay = qpid::sys::Duration(qpid::sys::now(), waitTill); if (delay > 0) qpid::sys::usleep(delay/qpid::sys::TIME_USEC); } - msg = Message(); // Clear out contents and properties for next iteration } for ( ; flowSent>0; --flowSent) flowControlReceiver.get(Duration::SECOND); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 5e407a061f..be1c1f868c 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -275,6 +275,13 @@ <!-- Replicate encoded config objects - e.g. links and bridges. --> <control name="config" code="0x37"><field name="encoded" type="str32"/></control> + + <!-- Set the fairshare delivery related state of a replicated queue. --> + <control name="queue-fairshare-state" code="0x38"> + <field name="queue" type="str8"/> + <field name="position" type="uint8"/> + <field name="count" type="uint8"/> + </control> </class> </amqp> diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 3c0b9f0434..6f4c11ae15 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -380,7 +380,8 @@ class Object(object): dp.routing_key = self.getV2RoutingKey() mp = self._broker.amqpSession.message_properties() mp.content_type = "amqp/map" - mp.user_id = self._broker.authUser + if self._broker.saslUser: + mp.user_id = self._broker.saslUser mp.correlation_id = str(seq) mp.app_id = "qmf2" mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_direct_queue) @@ -1492,7 +1493,8 @@ class Session: dp.routing_key = objectId.getV2RoutingKey() mp = broker.amqpSession.message_properties() mp.content_type = "amqp/map" - mp.user_id = broker.authUser + if broker.saslUser: + mp.user_id = broker.saslUser mp.correlation_id = str(seq) mp.app_id = "qmf2" mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_direct_queue) @@ -2236,6 +2238,7 @@ class Broker(Thread): self.connTimeout = connTimeout self.authUser = authUser self.authPass = authPass + self.saslUser = None self.cv = Condition() self.seqToAgentMap = {} self.error = None @@ -2409,6 +2412,11 @@ class Broker(Thread): self.conn.start() sock.settimeout(oldTimeout) self.conn.aborted = oldAborted + uid = self.conn.user_id + if uid.__class__ == tuple and len(uid) == 2: + self.saslUser = uid[1] + else: + self.saslUser = None # prevent topic queues from filling up (and causing the agents to # disconnect) by discarding the oldest queued messages when full. @@ -2588,7 +2596,8 @@ class Broker(Thread): dp.routing_key = "console.request.agent_locate" mp = self.amqpSession.message_properties() mp.content_type = "amqp/list" - mp.user_id = self.authUser + if self.saslUser: + mp.user_id = self.saslUser mp.app_id = "qmf2" mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue) mp.application_headers = {'qmf.opcode':'_agent_locate_request'} @@ -2630,7 +2639,8 @@ class Broker(Thread): dp.ttl = ttl mp = self.amqpSession.message_properties() mp.content_type = "x-application/qmf" - mp.user_id = self.authUser + if self.saslUser: + mp.user_id = self.saslUser mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) return Message(dp, mp, body) @@ -3543,7 +3553,8 @@ class Agent: dp.routing_key = self.getV2RoutingKey() mp = self.broker.amqpSession.message_properties() mp.content_type = "amqp/map" - mp.user_id = self.broker.authUser + if self.broker.saslUser: + mp.user_id = self.broker.saslUser mp.correlation_id = str(sequence) mp.app_id = "qmf2" mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue) diff --git a/qpid/extras/sasl/m4/ac_pkg_swig.m4 b/qpid/extras/sasl/m4/ac_pkg_swig.m4 index 3bff433f80..6e385c067c 100644 --- a/qpid/extras/sasl/m4/ac_pkg_swig.m4 +++ b/qpid/extras/sasl/m4/ac_pkg_swig.m4 @@ -56,6 +56,8 @@ # Macro released by the Autoconf Archive. When you make and distribute a # modified version of the Autoconf Macro, you may extend this special # exception to the GPL to apply to your modified version as well. +# +# Fixed by Sandro Santilli to consider 2.0.0 > 1.3.37 (2010-06-15) AC_DEFUN([AC_PROG_SWIG],[ AC_PATH_PROG([SWIG],[swig]) @@ -99,9 +101,9 @@ AC_DEFUN([AC_PROG_SWIG],[ if test -z "$available_patch" ; then [available_patch=0] fi - if test $available_major -ne $required_major \ - -o $available_minor -ne $required_minor \ - -o $available_patch -lt $required_patch ; then + [required_full=`printf %2.2d%2.2d%2.2d%2.2d $required_major $required_minor $required_patch]` + [available_full=`printf %2.2d%2.2d%2.2d%2.2d $available_major $available_minor $available_patch]` + if test $available_full -lt $required_full; then AC_MSG_WARN([SWIG version >= $1 is required. You have $swig_version. You should look at http://www.swig.org]) SWIG='echo "Error: SWIG version >= $1 is required. You have '"$swig_version"'. You should look at http://www.swig.org" ; false' else diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index af0d8a3a1d..ab59fee020 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -304,7 +304,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect else { // use the default value set for all connections - _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish); + _syncPublish = System.getProperty((ClientProperties.SYNC_PUBLISH_PROP_NAME),_syncPublish); } if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT) != null) diff --git a/qpid/java/common.xml b/qpid/java/common.xml index b1f28dc062..066859b29f 100644 --- a/qpid/java/common.xml +++ b/qpid/java/common.xml @@ -70,6 +70,13 @@ </fileset> </path> + <property name="maven.local.repo" value="${build.scratch}/maven-local-repo"/> + <property name="maven.unique.version" value="false"/> + <property name="maven.snapshot" value="true"/> + <condition property="maven.version.suffix" value="" else="-SNAPSHOT"> + <isfalse value="${maven.snapshot}"/> + </condition> + <macrodef name="indirect"> <attribute name="name"/> <attribute name="variable"/> diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index bce64075e5..0d9f8c0b28 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -28,6 +28,7 @@ import org.ietf.jgss.Oid; import org.apache.qpid.security.UsernamePasswordCallbackHandler; import static org.apache.qpid.transport.Connection.State.OPEN; +import static org.apache.qpid.transport.Connection.State.RESUMING; import org.apache.qpid.transport.util.Logger; import javax.security.sasl.Sasl; @@ -216,7 +217,14 @@ public class ClientDelegate extends ConnectionDelegate } } - conn.setState(OPEN); + if (conn.isConnectionResuming()) + { + conn.setState(RESUMING); + } + else + { + conn.setState(OPEN); + } } @Override diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index fd19fa0512..e5e10c0e07 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -25,6 +25,7 @@ import static org.apache.qpid.transport.Connection.State.CLOSING; import static org.apache.qpid.transport.Connection.State.NEW; import static org.apache.qpid.transport.Connection.State.OPEN; import static org.apache.qpid.transport.Connection.State.OPENING; +import static org.apache.qpid.transport.Connection.State.RESUMING; import java.util.ArrayList; import java.util.Collections; @@ -32,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; @@ -63,7 +65,7 @@ public class Connection extends ConnectionInvoker public static final int MAX_CHANNEL_MAX = 0xFFFF; public static final int MIN_USABLE_CHANNEL_NUM = 0; - public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD } + public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } static class DefaultConnectionListener implements ConnectionListener { @@ -119,7 +121,8 @@ public class Connection extends ConnectionInvoker private static final AtomicLong idGenerator = new AtomicLong(0); private final long _connectionId = idGenerator.incrementAndGet(); - + private final AtomicBoolean connectionLost = new AtomicBoolean(false); + public Connection() {} public void setConnectionDelegate(ConnectionDelegate delegate) @@ -270,6 +273,8 @@ public class Connection extends ConnectionInvoker close(); throw new ConnectionException("connect() timed out"); case OPEN: + case RESUMING: + connectionLost.set(false); break; case CLOSED: throw new ConnectionException("connect() aborted"); @@ -313,6 +318,17 @@ public class Connection extends ConnectionInvoker { synchronized (lock) { + Waiter w = new Waiter(lock, timeout); + while (w.hasTime() && state != OPEN && error == null) + { + w.await(); + } + + if (state != OPEN) + { + throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state); + } + Session ssn = _sessionFactory.newSession(this, name, expiry); sessions.put(name, ssn); map(ssn); @@ -452,15 +468,25 @@ public class Connection extends ConnectionInvoker { for (Session ssn : sessions.values()) { - map(ssn); - ssn.attach(); - ssn.resume(); + if (ssn.isTransacted()) + { + removeSession(ssn); + ssn.setState(Session.State.CLOSED); + } + else + { + map(ssn); + ssn.attach(); + ssn.resume(); + } } + setState(OPEN); } } public void exception(ConnectionException e) { + connectionLost.set(true); synchronized (lock) { switch (state) @@ -663,5 +689,10 @@ public class Connection extends ConnectionInvoker { return securityLayer; } + + public boolean isConnectionResuming() + { + return connectionLost.get(); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 4de578da18..214d4534c1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -532,11 +532,22 @@ public class Session extends SessionInvoker { if (m.getEncodedTrack() == Frame.L4) { + + if (state == DETACHED && transacted) + { + state = CLOSED; + delegate.closed(this); + connection.removeSession(this); + throw new SessionException( + "Session failed over, possibly in the middle of a transaction. " + + "Closing the session. Any Transaction in progress will be rolledback."); + } + if (m.hasPayload()) { acquireCredit(); } - + synchronized (commands) { if (state == DETACHED && m.isUnreliable()) @@ -1002,4 +1013,8 @@ public class Session extends SessionInvoker this.transacted = b; } + public boolean isTransacted(){ + return transacted; + } + } diff --git a/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert b/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert Binary files differdeleted file mode 100644 index e6702108e6..0000000000 --- a/qpid/java/common/src/main/resources/org/apache/qpid/ssl/qpid.cert +++ /dev/null diff --git a/qpid/java/module.xml b/qpid/java/module.xml index 877ca130af..d3954a1544 100644 --- a/qpid/java/module.xml +++ b/qpid/java/module.xml @@ -219,10 +219,10 @@ <args> <arg line='"${project.root}/genpom"'/> <arg line='-s "${project.root}/lib/poms"'/> - <arg line='-o "${build}/qpid-${module.name}.pom"'/> + <arg line='-o "${build.scratch}/qpid-${module.name}.pom"'/> <arg line="-u http://qpid.apache.org"/> <arg line="-g org.apache.qpid"/> - <arg line="-v ${project.version}"/> + <arg line="-v ${project.version}${maven.version.suffix}"/> <arg line="-p qpid"/> <arg line='-m "${module.depends}"'/> <arg line="-a ${module.name}"/> @@ -235,12 +235,15 @@ <target name="release-mvn" depends="pom" if="module.genpom" description="Install the artifacts into the local repository and prepare the release"> <antcall target="build"/> - <artifact:pom id="module.pom" file="${build}/qpid-${module.name}.pom"/> + <artifact:pom id="module.pom" file="${build.scratch}/qpid-${module.name}.pom"/> - <artifact:install file="${module.jar}" pomRefId="module.pom"/> + <artifact:install file="${module.jar}" pomRefId="module.pom"> + <localRepository path="${maven.local.repo}"/> + </artifact:install> - <artifact:deploy file="${module.jar}" pomRefId="module.pom"> + <artifact:deploy file="${module.jar}" pomRefId="module.pom" uniqueVersion="${maven.unique.version}"> <attach file="${module.source.jar}" classifier="sources"/> + <localRepository path="${maven.local.repo}"/> <remoteRepository url="file://${module.release.base}/maven"/> </artifact:deploy> </target> diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java deleted file mode 100644 index ff0d58ad69..0000000000 --- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/exchange/HeadersExchangePerformanceTest.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.exchange; - -import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.NoConsumersException; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.ContentBody; - -import java.util.List; - -/** - * Want to vary the number of regsitrations, messages and matches and measure - * the corresponding variance in execution time. - * <p/> - * Each registration will contain the 'All' header, even registrations will - * contain the 'Even' header and odd headers will contain the 'Odd' header. - * In additions each regsitration will have a unique value for the 'Specific' - * header as well. - * <p/> - * Messages can then be routed to all registrations, to even- or odd- registrations - * or to a specific registration. - * - */ -public class HeadersExchangePerformanceTest extends AbstractHeadersExchangeTest -{ - private static enum Mode {ALL, ODD_OR_EVEN, SPECIFIC} - - private final TestQueue[] queues; - private final Mode mode; - - public HeadersExchangePerformanceTest(Mode mode, int registrations) throws AMQException - { - this.mode = mode; - queues = new TestQueue[registrations]; - for (int i = 0; i < queues.length; i++) - { - switch(mode) - { - case ALL: - queues[i] = bind(new FastQueue("Queue" + i), "All"); - break; - case ODD_OR_EVEN: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i)); - break; - case SPECIFIC: - queues[i] = bind(new FastQueue("Queue" + i), "All", oddOrEven(i), "Specific"+ i); - break; - } - } - } - - void sendToAll(int count) throws AMQException - { - send(count, "All=True"); - } - - void sendToOdd(int count) throws AMQException - { - send(count, "All=True", "Odd=True"); - } - - void sendToEven(int count) throws AMQException - { - send(count, "All=True", "Even=True"); - } - - void sendToAllSpecifically(int count) throws AMQException - { - for (int i = 0; i < queues.length; i++) - { - sendToSpecific(count, i); - } - } - - void sendToSpecific(int count, int index) throws AMQException - { - send(count, "All=True", oddOrEven(index) + "=True", "Specific=" + index); - } - - private void send(int count, String... headers) throws AMQException - { - for (int i = 0; i < count; i++) - { - route(new Message("Message" + i, headers)); - } - } - - private static String oddOrEven(int i) - { - return (i % 2 == 0 ? "Even" : "Odd"); - } - - static class FastQueue extends TestQueue - { - - public FastQueue(String name) throws AMQException - { - super(name); - } - - public void deliver(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws NoConsumersException - { - //just discard as we are not testing routing functionality here - } - } - - static class Test extends TimedRun - { - private final Mode mode; - private final int registrations; - private final int count; - private HeadersExchangePerformanceTest test; - - Test(Mode mode, int registrations, int count) - { - super(mode + ", registrations=" + registrations + ", count=" + count); - this.mode = mode; - this.registrations = registrations; - this.count = count; - } - - protected void setup() throws Exception - { - test = new HeadersExchangePerformanceTest(mode, registrations); - run(100); //do a warm up run before times start - } - - protected void teardown() throws Exception - { - test = null; - System.gc(); - } - - protected void run() throws Exception - { - run(count); - } - - private void run(int count) throws Exception - { - switch(mode) - { - case ALL: - test.sendToAll(count); - break; - default: - System.out.println("Test for " + mode + " not yet implemented."); - } - } - } - - public static void main(String[] argv) throws Exception - { - int registrations = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - TimedRun test = new Test(Mode.ALL, registrations, messages); - AveragedRun tests = new AveragedRun(test, iterations); - System.out.println(tests.call()); - } -} - diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java deleted file mode 100644 index e76c164f64..0000000000 --- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/protocol/TestProtocolInitiation.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.codec.AMQDecoder; -import org.apache.qpid.codec.AMQEncoder; -import org.apache.qpid.framing.*; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; -import org.apache.mina.filter.codec.ProtocolEncoderOutput; -import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; - -import junit.framework.TestCase; - -/** - * This test suite tests the handling of protocol initiation frames and related issues. - */ -public class TestProtocolInitiation extends TestCase implements ProtocolVersionList -{ - private AMQPFastProtocolHandler _protocolHandler; - - private MockIoSession _mockIoSession; - - /** - * We need to use the object encoder mechanism so to allow us to retrieve the - * output (a bytebuffer) we define our own encoder output class. The encoder - * writes the encoded data to this class, from where we can retrieve it during - * the test run. - */ - private class TestProtocolEncoderOutput implements ProtocolEncoderOutput - { - public ByteBuffer result; - - public void write(ByteBuffer buf) - { - result = buf; - } - - public void mergeAll() - { - throw new UnsupportedOperationException(); - } - - public WriteFuture flush() - { - throw new UnsupportedOperationException(); - } - } - - private class TestProtocolDecoderOutput implements ProtocolDecoderOutput - { - public Object result; - - public void write(Object buf) - { - result = buf; - } - - public void flush() - { - throw new UnsupportedOperationException(); - } - } - - protected void setUp() throws Exception - { - super.setUp(); - _mockIoSession = new MockIoSession(); - _protocolHandler = new AMQPFastProtocolHandler(null, null); - } - - - /** - * Tests that the AMQDecoder handles invalid protocol classes - * @throws Exception - */ - public void testDecoderValidateProtocolClass() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolClass = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolClassException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolClassException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol instance numbers - * @throws Exception - */ - public void testDecoderValidatesProtocolInstance() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolInstance = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolInstanceException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolInstanceException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol major - * @throws Exception - */ - public void testDecoderValidatesProtocolMajor() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMajor = 2; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolVersionException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolVersionException, got " + e); - } - } - - /** - * Tests that the AMQDecoder handles invalid protocol minor - * @throws Exception - */ - public void testDecoderValidatesProtocolMinor() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.protocolMinor = 99; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolVersionException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolVersionException, got " + e); - } - } - - /** - * Tests that the AMQDecoder accepts a valid PI - * @throws Exception - */ - public void testDecoderValidatesHeader() throws Exception - { - try - { - ProtocolInitiation pi = createValidProtocolInitiation(); - pi.header = new char[] {'P', 'Q', 'M', 'A' }; - decodePI(pi); - fail("expected exception did not occur"); - } - catch (AMQProtocolHeaderException m) - { - // ok - } - catch (Exception e) - { - fail("expected AMQProtocolHeaderException, got " + e); - } - } - - /** - * Test that a valid header is passed by the decoder. - * @throws Exception - */ - public void testDecoderAcceptsValidHeader() throws Exception - { - ProtocolInitiation pi = createValidProtocolInitiation(); - decodePI(pi); - } - - /** - * This test checks that an invalid protocol header results in the - * connection being closed. - */ - public void testInvalidProtocolHeaderClosesConnection() throws Exception - { - AMQProtocolHeaderException pe = new AMQProtocolHeaderException("Test"); - _protocolHandler.exceptionCaught(_mockIoSession, pe); - assertNotNull(_mockIoSession.getLastWrittenObject()); - Object piResponse = _mockIoSession.getLastWrittenObject(); - assertEquals(piResponse.getClass(), ProtocolInitiation.class); - ProtocolInitiation pi = (ProtocolInitiation) piResponse; - assertEquals("Protocol Initiation sent out was not the broker's expected header", pi, - createValidProtocolInitiation()); - assertTrue("Session has not been closed", _mockIoSession.isClosing()); - } - - private ProtocolInitiation createValidProtocolInitiation() - { - /* Find last protocol version in protocol version list. Make sure last protocol version - listed in the build file (build-module.xml) is the latest version which will be used - here. */ - int i = pv.length - 1; - return new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]); - } - - /** - * Helper that encodes a protocol initiation and attempts to decode it - * @param pi - * @throws Exception - */ - private void decodePI(ProtocolInitiation pi) throws Exception - { - // we need to do this test at the level of the decoder since we initially only expect PI frames - // so the protocol handler is not set up to know whether it should be expecting a PI frame or - // a different type of frame - AMQDecoder decoder = new AMQDecoder(true); - AMQEncoder encoder = new AMQEncoder(); - TestProtocolEncoderOutput peo = new TestProtocolEncoderOutput(); - encoder.encode(_mockIoSession, pi, peo); - TestProtocolDecoderOutput pdo = new TestProtocolDecoderOutput(); - decoder.decode(_mockIoSession, peo.result, pdo); - ((ProtocolInitiation) pdo.result).checkVersion(this); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(TestProtocolInitiation.class); - } -} diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java deleted file mode 100644 index 11c0026455..0000000000 --- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueueConcurrentPerfTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.ConcurrentTest; - -public class QueueConcurrentPerfTest extends QueuePerfTest -{ - QueueConcurrentPerfTest(Factory factory, int queueCount, int messages) - { - super(factory, queueCount, messages); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - for(Factory f : factories) - { - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 100, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 10000, 10000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 1000), iterations), 5)); - run(label, new AveragedRun(new ConcurrentTest(new QueuePerfTest(f, 1000, 100000), iterations), 5)); - } - } -} diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java deleted file mode 100644 index 5b3857396d..0000000000 --- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/QueuePerfTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TimedRun; -import org.apache.qpid.server.util.RunStats; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class QueuePerfTest extends TimedRun -{ - private final Factory _factory; - private final int _queueCount; - private final int _messages; - private final String _msg = ""; - private List<Queue<String>> _queues; - - QueuePerfTest(Factory factory, int queueCount, int messages) - { - super(factory + ", " + queueCount + ", " + messages); - _factory = factory; - _queueCount = queueCount; - _messages = messages; - } - - protected void setup() throws Exception - { - //init - int count = Integer.getInteger("prepopulate", 0); -// System.err.println("Prepopulating with " + count + " items"); - _queues = new ArrayList<Queue<String>>(_queueCount); - for (int i = 0; i < _queueCount; i++) - { - Queue<String> q = _factory.create(); - for(int j = 0; j < count; ++j) - { - q.add("Item"+ j); - } - _queues.add(q); - } - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - //dispatch - for (int i = 0; i < _messages; i++) - { - for (Queue<String> q : _queues) - { - q.offer(_msg); - q.poll(); - } - } - } - - static interface Factory - { - Queue<String> create(); - } - - static Factory CONCURRENT = new Factory() - { - public Queue<String> create() - { - return new ConcurrentLinkedQueue<String>(); - } - - public String toString() - { - return "ConcurrentLinkedQueue"; - } - - }; - - static Factory SYNCHRONIZED = new Factory() - { - public Queue<String> create() - { - return new SynchronizedQueue<String>(new LinkedList<String>()); - } - - - public String toString() - { - return "Synchronized LinkedList"; - } - }; - - static Factory PLAIN = new Factory() - { - public Queue<String> create() - { - return new LinkedList<String>(); - } - - public String toString() - { - return "Plain LinkedList"; - } - }; - - static class SynchronizedQueue<E> implements Queue<E> - { - private final Queue<E> queue; - - SynchronizedQueue(Queue<E> queue) - { - this.queue = queue; - } - - public synchronized E element() - { - return queue.element(); - } - - public synchronized boolean offer(E o) - { - return queue.offer(o); - } - - public synchronized E peek() - { - return queue.peek(); - } - - public synchronized E poll() - { - return queue.poll(); - } - - public synchronized E remove() - { - return queue.remove(); - } - - public synchronized int size() - { - return queue.size(); - } - - public synchronized boolean isEmpty() - { - return queue.isEmpty(); - } - - public synchronized boolean contains(Object o) - { - return queue.contains(o); - } - - public synchronized Iterator<E> iterator() - { - return queue.iterator(); - } - - public synchronized Object[] toArray() - { - return queue.toArray(); - } - - public synchronized <T>T[] toArray(T[] a) - { - return queue.toArray(a); - } - - public synchronized boolean add(E o) - { - return queue.add(o); - } - - public synchronized boolean remove(Object o) - { - return queue.remove(o); - } - - public synchronized boolean containsAll(Collection<?> c) - { - return queue.containsAll(c); - } - - public synchronized boolean addAll(Collection<? extends E> c) - { - return queue.addAll(c); - } - - public synchronized boolean removeAll(Collection<?> c) - { - return queue.removeAll(c); - } - - public synchronized boolean retainAll(Collection<?> c) - { - return queue.retainAll(c); - } - - public synchronized void clear() - { - queue.clear(); - } - } - - static void run(String label, AveragedRun test) throws Exception - { - RunStats stats = test.call(); - System.out.println((label == null ? "" : label + ", ") + test - + ", " + stats.getAverage() + ", " + stats.getMax() + ", " + stats.getMin()); - } - - public static void main(String[] argv) throws Exception - { - Factory[] factories = new Factory[]{PLAIN, SYNCHRONIZED, CONCURRENT}; - int iterations = 5; - String label = argv.length > 0 ? argv[0]: null; - System.out.println((label == null ? "" : "Label, ") + "Queue Type, No. of Queues, No. of Operations, Avg Time, Min Time, Max Time"); - //vary number of queues: - - for(Factory f : factories) - { - run(label, new AveragedRun(new QueuePerfTest(f, 100, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 10000, 10000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 1000), iterations)); - run(label, new AveragedRun(new QueuePerfTest(f, 1000, 100000), iterations)); - } - } - -} diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java deleted file mode 100644 index 9784b2f671..0000000000 --- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/queue/SendPerfTest.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.BasicPublishBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.RequiredDeliveryException; -import org.apache.qpid.server.configuration.ServerConfiguration; -import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.txn.NonTransactionalContext; -import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.handler.OnCurrentThreadExecutor; -import org.apache.qpid.server.protocol.AMQMinaProtocolSession; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.MockIoSession; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.registry.IApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; -import org.apache.qpid.server.util.AveragedRun; -import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.server.util.TimedRun; - -import java.util.ArrayList; -import java.util.List; -import java.util.LinkedList; - -public class SendPerfTest extends TimedRun -{ - private int _messages = 1000; - private int _clients = 10; - private List<AMQQueue> _queues; - - public SendPerfTest(int clients, int messages) - { - super("SendPerfTest, msgs=" + messages + ", clients=" + clients); - _messages = messages; - _clients = clients; - } - - protected void setup() throws Exception - { - _queues = initQueues(_clients); - System.gc(); - } - - protected void teardown() throws Exception - { - System.gc(); - } - - protected void run() throws Exception - { - deliver(_messages, _queues); - } - - //have a dummy AMQProtocolSession that does nothing on the writeFrame() - //set up x number of queues - //create necessary bits and pieces to deliver a message - //deliver y messages to each queue - - public static void main(String[] argv) throws Exception - { - ApplicationRegistry.initialise(new TestApplicationRegistry(new ServerConfiguration(new PropertiesConfiguration()))); - int clients = Integer.parseInt(argv[0]); - int messages = Integer.parseInt(argv[1]); - int iterations = Integer.parseInt(argv[2]); - AveragedRun test = new AveragedRun(new SendPerfTest(clients, messages), iterations); - test.run(); - } - - /** - * Delivers messages to a number of queues. - * @param count the number of messages to deliver - * @param queues the list of queues - * @throws NoConsumersException - */ - static void deliver(int count, List<AMQQueue> queues) throws AMQException - { - BasicPublishBody publish = new BasicPublishBody(); - publish.exchange = new NullExchange().getName(); - ContentHeaderBody header = new ContentHeaderBody(); - List<ContentBody> body = new ArrayList<ContentBody>(); - MessageStore messageStore = new SkeletonMessageStore(); - // channel can be null since it is only used in ack processing which does not apply to this test - TransactionalContext txContext = new NonTransactionalContext(messageStore, null, - new LinkedList<RequiredDeliveryException>()); - body.add(new ContentBody()); - MessageHandleFactory factory = new MessageHandleFactory(); - for (int i = 0; i < count; i++) - { - // this routes and delivers the message - AMQMessage msg = new AMQMessage(i, publish, txContext, header, queues, body, messageStore, - factory); - } - } - - static List<AMQQueue> initQueues(int number) throws AMQException - { - Exchange exchange = new NullExchange(); - List<AMQQueue> queues = new ArrayList<AMQQueue>(number); - for (int i = 0; i < number; i++) - { - AMQQueue q = createQueue("Queue" + (i + 1)); - q.bind("routingKey", exchange); - try - { - q.registerProtocolSession(createSession(), 1, "1", false); - } - catch (Exception e) - { - throw new AMQException("Error creating protocol session: " + e, e); - } - queues.add(q); - } - return queues; - } - - static AMQQueue createQueue(String name) throws AMQException - { - return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(), - new OnCurrentThreadExecutor()); - } - - static AMQProtocolSession createSession() throws Exception - { - IApplicationRegistry reg = ApplicationRegistry.getInstance(); - AMQCodecFactory codecFactory = new AMQCodecFactory(true); - AMQMinaProtocolSession result = new AMQMinaProtocolSession(new MockIoSession(), reg.getQueueRegistry(), reg.getExchangeRegistry(), codecFactory); - result.addChannel(new AMQChannel(1, null, null)); - return result; - } - - static class NullExchange extends AbstractExchange - { - public String getName() - { - return "NullExchange"; - } - - protected ExchangeMBean createMBean() - { - return null; - } - - public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException - { - } - - public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException - { - } - - public void route(AMQMessage payload) throws AMQException - { - } - } -} diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java deleted file mode 100644 index 1ae8d3205d..0000000000 --- a/qpid/java/systests/src/old_test/java/org/apache/qpid/server/util/ConcurrentTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.util; - -public class ConcurrentTest extends TimedRun -{ - private final TimedRun _test; - private final Thread[] _threads; - - public ConcurrentTest(TimedRun test, int threads) - { - super(test.toString()); - _test = test; - _threads = new Thread[threads]; - } - - protected void setup() throws Exception - { - _test.setup(); - for(int i = 0; i < _threads.length; i++) - { - _threads[i] = new Thread(new Runner()); - } - } - - protected void teardown() throws Exception - { - _test.teardown(); - } - - protected void run() throws Exception - { - for(Thread t : _threads) - { - t.start(); - } - for(Thread t : _threads) - { - t.join(); - } - } - - private class Runner implements Runnable - { - private Exception error; - - public void run() - { - try - { - _test.run(); - } - catch(Exception e) - { - error = e; - e.printStackTrace(); - } - } - } - -} diff --git a/qpid/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java b/qpid/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java deleted file mode 100644 index 04b6bceb4f..0000000000 --- a/qpid/java/systests/src/old_test/java/org/apache/qpid/test/unit/ack/DisconnectAndRedeliverTest.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.unit.ack; - -import junit.framework.TestCase; -import org.apache.log4j.Logger; -import org.apache.log4j.xml.DOMConfigurator; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.util.TestApplicationRegistry; - -import javax.jms.*; - -public class DisconnectAndRedeliverTest extends InternalBrokerBaseCase -{ - private static final Logger _logger = Logger.getLogger(DisconnectAndRedeliverTest.class); - - static - { - String workdir = System.getProperty("QPID_WORK"); - if (workdir == null || workdir.equals("")) - { - String tempdir = System.getProperty("java.io.tmpdir"); - System.out.println("QPID_WORK not set using tmp directory: " + tempdir); - System.setProperty("QPID_WORK", tempdir); - } - DOMConfigurator.configure("../broker/etc/log4j.xml"); - } - - @Override - public void createBroker() throws Exception - { - super.createBroker(); - TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); - } - - @Override - public void stopBroker() - { - TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); - super.stopBroker(); - } - - /** - * This tests that when there are unacknowledged messages on a channel they are requeued for delivery when - * the channel is closed. - * - * @throws Exception - */ - public void testDisconnectRedeliversMessages() throws Exception - { - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - - Session consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - AMQQueue queue = new AMQQueue("someQ", "someQ", false, false); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - - - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg2", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg3", tm.getText()); - - - tm = (TextMessage) consumer.receive(3000); - assertEquals("msg4", tm.getText()); - - _logger.info("Received redelivery of three messages. Acknowledging last message"); - tm.acknowledge(); - - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting third consumer connection"); - con.start(); - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - _logger.info("Starting fourth consumer connection"); - con.start(); - tm = (TextMessage) consumer.receive(3000); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - con.close(); - - _logger.info("Actually:" + store.getMessageMetaDataMap().size()); - // assertTrue(store.getMessageMap().size() == 0); - } - - /** - * Tests that unacknowledged messages are thrown away when the channel is closed and they cannot be - * requeued (due perhaps to the queue being deleted). - * - * @throws Exception - */ - public void testDisconnectWithTransientQueueThrowsAwayMessages() throws Exception - { - - Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); - Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Queue queue = new AMQQueue("someQ", "someQ", false, true); - MessageConsumer consumer = consumerSession.createConsumer(queue); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); - - Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); - Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = producerSession.createProducer(queue); - - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); - - con2.close(); - - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - tm.acknowledge(); - _logger.info("Received and acknowledged first message"); - consumer.receive(); - consumer.receive(); - consumer.receive(); - _logger.info("Received all four messages. About to disconnect and reconnect"); - - con.close(); - con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(queue); - - _logger.info("Starting second consumer connection"); - con.start(); - - tm = (TextMessage) consumer.receiveNoWait(); - assertNull(tm); - _logger.info("No messages redelivered as is expected"); - - _logger.info("Actually:" + store.getMessageMetaDataMap().size()); - assertTrue(store.getMessageMetaDataMap().size() == 0); - con.close(); - } - - public static junit.framework.Test suite() - { - return new junit.framework.TestSuite(DisconnectAndRedeliverTest.class); - } -} diff --git a/qpid/java/tools/bin/perf_report.sh b/qpid/java/tools/bin/perf_report.sh index 22c839e08c..e6b4c987e5 100755 --- a/qpid/java/tools/bin/perf_report.sh +++ b/qpid/java/tools/bin/perf_report.sh @@ -25,6 +25,10 @@ SUB_MEM=-Xmx1024M PUB_MEM=-Xmx1024M LOG_CONFIG="-Damqj.logging.level=WARN" +QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" +DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" +TOPIC="amq.topic/test" +DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" . setenv.sh @@ -72,60 +76,65 @@ echo "========================================================================== echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" echo "------------------------------------------------------------------------------------------------" +# The message counts and warmup counts are set to very low values for quick testing of the script. +# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k +# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend +# setting very low values to start with and experiment while increasing them slowly. + # Test 1 Trans Queue -run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10" +#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" # Test 2 Dura Queue -run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" # Test 3 Dura Queue Sync -run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true" +run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" # Test 4 Dura Queue Sync Publish and Ack -run_testcase "Dura_SyncPubAck" "-Ddurable=true -Dsync_ack=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" +run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" # Test 5 Topic -run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" # Test 6 Durable Topic -run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" # Test 7 Fanout -run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" # Test 8 Small TX -run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \ - "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" +run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" # Test 9 Large TX -run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size=10" \ - "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" +run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ + "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" # Test 10 256 MSG -run_testcase "Msg_256b" "" "-Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" # Test 11 512 MSG -run_testcase "Msg_512b" "" "-Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" # Test 12 2048 MSG -run_testcase "Msg_2048b" "" "-Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" # Test 13 Random size MSG -run_testcase "Random_Msg_Size" "" "-Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" # Test 14 Random size MSG Durable -run_testcase "Rand_Msg_Dura" "-Ddurable=true" "-Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" # Test 15 64K MSG -run_testcase "Msg_64K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" # Test 16 Durable 64K MSG -run_testcase "Msg_Durable_64K" "-Ddurable=true -Damqj.tcpNoDelay=true" \ - "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" # Test 17 500K MSG -run_testcase "Msg_500K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" # Test 18 Durable 500K MSG -run_testcase "Msg_Dura_500K" "-Damqj.tcpNoDelay=true -Ddurable=true" \ - "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ + "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/qpid/java/tools/etc/jndi.properties b/qpid/java/tools/etc/jndi.properties deleted file mode 100644 index 454551c1b1..0000000000 --- a/qpid/java/tools/etc/jndi.properties +++ /dev/null @@ -1,35 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory - -# use the following property to configure the default connector -#java.naming.provider.url - ignored. - -# register some connection factories -# connectionfactory.[jndiname] = [ConnectionURL] -connectionfactory.connectionFactory = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' - -# Register an AMQP destination in JNDI -destination.transientQueue = BURL:direct://amq.direct//testQueueT?autodelete='true' -destination.durableQueue = BURL:direct://amq.direct//testQueueD?durable='true'&autodelete='true' - -destination.transientTopic = BURL:topic://amq.topic//testTopicT?autodelete='true' -destination.durableTopic = BURL:topic://amq.topic//testTopicD?durable='true'&autodelete='true'&clientid='test'&subscription='testQueueD' - -destination.fanoutQueue = BURL:fanout://amq.fanout//fanoutQueue?autodelete='true' diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index 88e75fb6a9..ac597d17de 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -30,6 +30,9 @@ import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + public class PerfBase { TestParams params; @@ -45,48 +48,21 @@ public class PerfBase } public void setUp() throws Exception - { - Hashtable<String,String> env = new Hashtable<String,String>(); - env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory()); - env.put(Context.PROVIDER_URL, params.getProviderURL()); + { - Context ctx = null; - try + if (params.getHost().equals("") || params.getPort() == -1) { - ctx = new InitialContext(env); + con = new AMQConnection(params.getUrl()); } - catch(Exception e) + else { - throw new Exception("Error initializing JNDI",e); - + con = new AMQConnection(params.getHost(),params.getPort(),"guest","guest","test","test"); } - - ConnectionFactory conFac = null; - try - { - conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory()); - } - catch(Exception e) - { - throw new Exception("Error looking up connection factory",e); - } - - con = conFac.createConnection(); con.start(); session = con.createSession(params.isTransacted(), params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - try - { - dest = (Destination)ctx.lookup( params.isDurable()? - params.getDurableDestination(): - params.getTransientDestination() - ); - } - catch(Exception e) - { - throw new Exception("Error looking up destination",e); - } + dest = new AMQAnyDestination(params.getAddress()); } public void handleError(Exception e,String msg) diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index f1b682ff32..89d6462a39 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -24,15 +24,22 @@ import javax.jms.Session; public class TestParams { - private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties"; - - private String connectionFactory = "connectionFactory"; - - private String transientDest = "transientQueue"; + /* + * By default the connection URL is used. + * This allows a user to easily specify a fully fledged URL any given property. + * Ex. SSL parameters + * + * By providing a host & port allows a user to simply override the URL. + * This allows to create multiple clients in test scripts easily, + * without having to deal with the long URL format. + */ + private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; + + private String host = ""; + + private int port = -1; - private String durableDest = "durableQueue"; + private String address = "queue; {create : always}"; private int msg_size = 1024; @@ -60,11 +67,11 @@ public class TestParams public TestParams() { - initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory); - providerURL = System.getProperty("java.naming.provider.url",providerURL); - - transientDest = System.getProperty("transDest",transientDest); - durableDest = System.getProperty("durableDest",durableDest); + + url = System.getProperty("url",url); + host = System.getProperty("host",""); + port = Integer.getInteger("port", -1); + address = System.getProperty("address","queue"); msg_size = Integer.getInteger("msg_size", 1024); msg_type = Integer.getInteger("msg_type",1); @@ -80,29 +87,29 @@ public class TestParams random_msg_size = Boolean.getBoolean("random_msg_size"); } - public int getAckMode() + public String getUrl() { - return ack_mode; + return url; } - public String getConnectionFactory() + public String getHost() { - return connectionFactory; + return host; } - public String getTransientDestination() + public int getPort() { - return transientDest; + return port; } - public String getDurableDestination() + public String getAddress() { - return durableDest; + return address; } - public String getInitialContextFactory() + public int getAckMode() { - return initialContextFactory; + return ack_mode; } public int getMsgCount() @@ -125,11 +132,6 @@ public class TestParams return durable; } - public String getProviderURL() - { - return providerURL; - } - public boolean isTransacted() { return transacted; diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index cde77c47d2..303668eb36 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -381,6 +381,8 @@ <arg name="reason" type="lstr" desc="Reason for a failure"/> <arg name="rhost" type="sstr" desc="Address (i.e. DNS name, IP address, etc.) of a remotely connected host"/> <arg name="user" type="sstr" desc="Authentication identity"/> + <arg name="msgDepth" type="count32" desc="Current size of queue in messages"/> + <arg name="byteDepth" type="count32" desc="Current size of queue in bytes"/> </eventArguments> <event name="clientConnect" sev="inform" args="rhost, user"/> @@ -396,5 +398,6 @@ <event name="unbind" sev="inform" args="rhost, user, exName, qName, key"/> <event name="subscribe" sev="inform" args="rhost, user, qName, dest, excl, args"/> <event name="unsubscribe" sev="inform" args="rhost, user, dest"/> + <event name="queueThresholdExceeded" sev="warn" args="qName, msgDepth, byteDepth"/> </schema> diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py index f9315a6f90..921786af22 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -29,3 +29,7 @@ from message import * from query import * from queue import * from tx import * +from lvq import * +from priority import * +from threshold import * +from extensions import * diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py new file mode 100644 index 0000000000..26ea3cb0e9 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/extensions.py @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from qpid.client import Client, Closed +from qpid.queue import Empty +from qpid.content import Content +from qpid.testlib import TestBase010 +from time import sleep + +class ExtensionTests(TestBase010): + """Tests for various extensions to AMQP 0-10""" + + def test_timed_autodelete(self): + session = self.session + session2 = self.conn.session("another-session") + session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5}) + session2.close() + result = session.queue_query(queue="my-queue") + self.assertEqual("my-queue", result.queue) + sleep(5) + result = session.queue_query(queue="my-queue") + self.assert_(not result.queue) diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py b/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py new file mode 100644 index 0000000000..8fd6b88d78 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/lvq.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +import math + +class LVQTests (Base): + """ + Test last value queue behaviour + """ + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def test_simple(self): + snd = self.ssn.sender("lvq; {create: sender, delete: sender, node: {x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}", + durable=self.durable()) + snd.send(create_message("a", "a-1")) + snd.send(create_message("b", "b-1")) + snd.send(create_message("a", "a-2")) + snd.send(create_message("a", "a-3")) + snd.send(create_message("c", "c-1")) + snd.send(create_message("c", "c-2")) + + rcv = self.ssn.receiver("lvq; {mode: browse}") + assert fetch_all(rcv) == ["b-1", "a-3", "c-2"] + + snd.send(create_message("b", "b-2")) + assert fetch_all(rcv) == ["b-2"] + + snd.send(create_message("c", "c-3")) + snd.send(create_message("d", "d-1")) + assert fetch_all(rcv) == ["c-3", "d-1"] + + snd.send(create_message("b", "b-3")) + assert fetch_all(rcv) == ["b-3"] + + rcv.close() + rcv = self.ssn.receiver("lvq; {mode: browse}") + assert (fetch_all(rcv) == ["a-3", "c-3", "d-1", "b-3"]) + + +def create_message(key, content): + msg = Message(content=content) + msg.properties["lvq-key"] = key + return msg + +def fetch_all(rcv): + content = [] + while True: + try: + content.append(rcv.fetch(0).content) + except Empty: + break + return content diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py new file mode 100644 index 0000000000..3651a1218b --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py @@ -0,0 +1,237 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +from qpid.compat import set +import math + +class PriorityTests (Base): + """ + Test prioritised messaging + """ + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def prioritised_delivery(self, priorities, levels=10): + """ + Test that message on a queue are delivered in priority order. + """ + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels, + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + + def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10): + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + limit_policy = "x-qpid-fairshare:%s" % default_limit + if limits: + for k, v in limits.items(): + limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v) + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}" + % (levels, limit_policy), + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + if limits: + limit_function = lambda x : limits.get(x, 0) + else: + limit_function = lambda x : default_limit + for expected in fairshare(sorted_(msgs, key=lambda m: priority_level(m.priority,levels), reverse=True), + limit_function, levels): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.priority == expected.priority + assert msg.content == expected.content + self.ssn.acknowledge(msg) + + def test_prioritised_delivery_1(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10) + + def test_prioritised_delivery_2(self): + self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5) + + def test_fairshare_1(self): + self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3]) + + def test_fairshare_2(self): + self.fairshare_delivery(priorities = [10 for i in range(30)]) + + def test_fairshare_3(self): + self.fairshare_delivery(priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3], limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}, levels=8) + + def test_browsing(self): + priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3] + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + snd = self.ssn.sender("priority-queue; {create: sender, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver("priority-queue; {mode: browse, delete: receiver}") + received = [] + try: + while True: received.append(rcv.fetch(0)) + except Empty: None + #check all messages on the queue were received by the browser; don't relay on any specific ordering at present + assert set([m.content for m in msgs]) == set([m.content for m in received]) + + def ring_queue_check(self, msgs): + """ + Ensure that a ring queue removes lowest priority messages first. + """ + snd = self.ssn.sender(address("priority-ring-queue", arguments="x-qpid-priorities:10, 'qpid.policy_type':ring, 'qpid.max_count':10"), + durable=self.durable()) + for m in msgs: snd.send(m) + + rcv = self.ssn.receiver(snd.target) + received = [] + try: + while True: received.append(rcv.fetch(0)) + except Empty: None + + expected = [] + for m in msgs: + while len(expected) > 9: + expected=sorted_(expected, key=lambda x: priority_level(x.priority,10)) + expected.pop(0) + expected.append(m) + #print "sent %s; expected %s; got %s" % ([m.content for m in msgs], [m.content for m in expected], [m.content for m in received]) + assert [m.content for m in expected] == [m.content for m in received] + + def test_ring_queue_1(self): + priorities = [4,5,3,6,9,9,2,9,2,9,9,1,9,9,9,3,3,3,9,9,3,9,3,9,9,9,9,9,9,2,3] + seq = content("msg") + self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) + + def test_ring_queue_2(self): + priorities = [9,0,2,3,6,9,9,2,9,2,9,9,1,9,4,7,1,1,3,9,9,3,9,3,9,9,9,1,9,9,2,3,0,9] + seq = content("msg") + self.ring_queue_check([Message(content=seq.next(), priority = p) for p in priorities]) + + def test_requeue(self): + priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3] + msgs = [Message(content=str(uuid4()), priority = p) for p in priorities] + + snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:10}}}}", + durable=self.durable()) + #want to have some messages requeued so enable prefetch on a dummy receiver + other = self.conn.session() + dummy = other.receiver("priority-queue") + dummy.capacity = 10 + + for m in msgs: snd.send(m) + + #fetch some with dummy receiver on which prefetch is also enabled + for i in range(5): + msg = dummy.fetch(0) + #close session without acknowledgements to requeue messages + other.close() + + #now test delivery works as expected after that + rcv = self.ssn.receiver(snd.target) + for expected in sorted_(msgs, key=lambda m: priority_level(m.priority,10), reverse=True): + msg = rcv.fetch(0) + #print "expected priority %s got %s" % (expected.priority, msg.priority) + assert msg.content == expected.content + self.ssn.acknowledge(msg) + +def content(base, counter=1): + while True: + yield "%s-%s" % (base, counter) + counter += 1 + +def address(name, create_policy="sender", delete_policy="receiver", arguments=None): + if arguments: node = "node: {x-declare:{arguments:{%s}}}" % arguments + else: node = "node: {}" + return "%s; {create: %s, delete: %s, %s}" % (name, create_policy, delete_policy, node) + +def fairshare(msgs, limit, levels): + """ + Generator to return prioritised messages in expected order for a given fairshare limit + """ + count = 0 + last_priority = None + postponed = [] + while msgs or postponed: + if not msgs: + msgs = postponed + count = 0 + last_priority = None + postponed = [] + msg = msgs.pop(0) + if last_priority and priority_level(msg.priority, levels) == last_priority: + count += 1 + else: + last_priority = priority_level(msg.priority, levels) + count = 1 + l = limit(last_priority) + if (l and count > l): + postponed.append(msg) + else: + yield msg + return + +def effective_priority(value, levels): + """ + Method to determine effective priority given a distinct number of + levels supported. Returns the lowest priority value that is of + equivalent priority to the value passed in. + """ + if value <= 5-math.ceil(levels/2.0): return 0 + if value >= 4+math.floor(levels/2.0): return 4+math.floor(levels/2.0) + return value + +def priority_level(value, levels): + """ + Method to determine which of a distinct number of priority levels + a given value falls into. + """ + offset = 5-math.ceil(levels/2.0) + return min(max(value - offset, 0), levels-1) + +def sorted_(msgs, key=None, reverse=False): + """ + Workaround lack of sorted builtin function in python 2.3 and lack + of keyword arguments to list.sort() + """ + temp = msgs + temp.sort(key_to_cmp(key, reverse=reverse)) + return temp + +def key_to_cmp(key, reverse=False): + if key: + if reverse: return lambda a, b: cmp(key(b), key(a)) + else: return lambda a, b: cmp(key(a), key(b)) + else: + return None diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py b/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py new file mode 100644 index 0000000000..bcd3c507e2 --- /dev/null +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +import math + +class ThresholdTests (Base): + """ + Test queue threshold events are sent and received correctly + """ + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def do_threshold_test(self, key, value, messages): + rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#") + snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'%s':%s}}}}" % (key, value)) + size = 0 + count = 0 + for m in messages: + snd.send(m) + count = count + 1 + size = size + len(m.content) + event = rcv.fetch() + schema = event.content[0]["_schema_id"] + assert schema["_class_name"] == "queueThresholdExceeded" + values = event.content[0]["_values"] + assert values["qName"] == "ttq" + assert values["msgDepth"] == count, "msgDepth %s, expected %s" % (values["msgDepth"], count) + assert values["byteDepth"] == size, "byteDepth %s, expected %s" % (values["byteDepth"], size) + + def test_alert_count(self): + self.do_threshold_test("qpid.alert_count", 5, [Message("msg-%s" % i) for i in range(5)]) + + def test_alert_size(self): + self.do_threshold_test("qpid.alert_size", 25, [Message("msg-%s" % i) for i in range(5)]) + + def test_alert_count_alias(self): + self.do_threshold_test("x-qpid-maximum-message-count", 10, [Message("msg-%s" % i) for i in range(10)]) + + def test_alert_size_alias(self): + self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)]) diff --git a/qpid/tools/src/py/.gitignore b/qpid/tools/src/py/.gitignore index 3b402156c2..97cb05dc36 100644 --- a/qpid/tools/src/py/.gitignore +++ b/qpid/tools/src/py/.gitignore @@ -1,20 +1,22 @@ + +# # -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an +# http://www.apache.org/licenses/LICENSE-2.0 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "License"); you may not use this file except in compliance # KIND, either express or implied. See the License for the +# Licensed to the Apache Software Foundation (ASF) under one +# Unless required by applicable law or agreed to in writing, +# distributed with this work for additional information +# or more contributor license agreements. See the NOTICE file +# regarding copyright ownership. The ASF licenses this file +# software distributed under the License is distributed on an # specific language governing permissions and limitations +# to you under the Apache License, Version 2.0 (the # under the License. -# - +# with the License. You may obtain a copy of the License at /qpid-clusterc +/qpid-configc +/qpid-routec |