diff options
author | Alex Rudyy <orudyy@apache.org> | 2012-12-17 12:33:05 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2012-12-17 12:33:05 +0000 |
commit | a9a7379887b39b248c8c435276a8e18bf0f22d98 (patch) | |
tree | c88a1ead116e17f89ce73b398033f42b3027a139 | |
parent | b3c7409db4cded6d116d851fab5f3863afaa00c8 (diff) | |
download | qpid-python-a9a7379887b39b248c8c435276a8e18bf0f22d98.tar.gz |
merge from trunk up to revision 1422060
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-qpid-4390@1422901 13f79535-47bb-0310-9956-ffa450edef68
131 files changed, 1402 insertions, 348 deletions
diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am index b0321d4e5d..8bf56ead91 100644 --- a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am +++ b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am @@ -21,16 +21,19 @@ INCLUDE = -I$(top_srcdir)/include AM_CPPFLAGS = $(INCLUDE) +TYPES_LIB=$(top_builddir)/src/libqpidtypes.la +MESSAGING_LIB=$(top_builddir)/src/libqpidmessaging.la + noinst_PROGRAMS=agent event_driven_list_agents list_agents print_events agent_SOURCES=agent.cpp -agent_LDADD=$(top_builddir)/src/libqmf2.la -lqpidtypes -lqpidmessaging +agent_LDADD=$(top_builddir)/src/libqmf2.la $(TYPES_LIB) $(MESSAGING_LIB) list_agents_SOURCES=list_agents.cpp -list_agents_LDADD=$(top_builddir)/src/libqmf2.la -lqpidmessaging +list_agents_LDADD=$(top_builddir)/src/libqmf2.la $(MESSAGING_LIB) event_driven_list_agents_SOURCES=event_driven_list_agents.cpp -event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la -lqpidmessaging +event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la $(MESSAGING_LIB) print_events_SOURCES=print_events.cpp -print_events_LDADD=$(top_builddir)/src/libqmf2.la -lqpidtypes -lqpidmessaging +print_events_LDADD=$(top_builddir)/src/libqmf2.la $(TYPES_LIB) $(MESSAGING_LIB) diff --git a/qpid/cpp/bindings/qpid/examples/perl/drain.pl b/qpid/cpp/bindings/qpid/examples/perl/drain.pl index 2da28f2867..e66184a160 100755 --- a/qpid/cpp/bindings/qpid/examples/perl/drain.pl +++ b/qpid/cpp/bindings/qpid/examples/perl/drain.pl @@ -78,7 +78,7 @@ eval { my $redelivered = ($message->get_redelivered) ? "redelivered=True, " : ""; print "Message(" . $redelivered . "properties=" . printProperties($message->get_properties()) . ", content='"; if ($message->get_content_type() eq "amqp/map") { - my $content = qpid::messasging::decode_map($message); + my $content = qpid::messaging::decode_map($message); map{ print "\n$_ => $content->{$_}"; } keys %{$content}; } else { diff --git a/qpid/cpp/bindings/qpid/perl/test/test-null-inside-map.pl b/qpid/cpp/bindings/qpid/perl/test/test-null-inside-map.pl new file mode 100644 index 0000000000..2c1e698abb --- /dev/null +++ b/qpid/cpp/bindings/qpid/perl/test/test-null-inside-map.pl @@ -0,0 +1,59 @@ +#!/usr/bin/perl +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +use strict; +use warnings; +use Data::Dumper; + +use cqpid_perl; + +my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672"; +my $address = ( @ARGV > 1 ) ? $ARGV[0] : "amq.match"; +my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : ""; + +my $in_address = "amq.match; {link:{x-bindings:[{exchange: 'amq.match', arguments:{'x-match': 'all', 'header2' : 'value2'}}]}}"; + +my $connection = new cqpid_perl::Connection($broker, $connectionOptions); + +eval { + $connection->open(); + my $session = $connection->createSession(); + + my $receiver = $session->createReceiver($in_address); + my $sender = $session->createSender($address); + + my $hash = { id => 1234, name => "Blah\x00Blah" }; + my $outmsg = new cqpid_perl::Message("Hello\x00World"); + cqpid_perl::encode($hash, $outmsg); + $outmsg->setProperty("header2", "value2"); + $sender->send($outmsg); + + my $message = $receiver->fetch($cqpid_perl::Duration::SECOND); + + print Dumper($message->getProperties()); + + print $message->getContent() . "\n"; + my $outmap = cqpid_perl::decodeMap($message); + print Dumper($outmap); + $session->acknowledge(); + + $connection->close(); +}; + +die $@ if ($@); diff --git a/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb b/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb index 90292d4bec..fc9e65d562 100644 --- a/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb +++ b/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb @@ -26,9 +26,10 @@ require 'mkmf' # Setup the build environment. -$CFLAGS = "-fPIC -fno-inline -x c++" +$CFLAGS = "-fPIC -fno-inline -x c++ -lstdc++" REQUIRED_LIBRARIES = [ + 'stdc++', 'qpidclient', 'qpidcommon', 'qpidmessaging', diff --git a/qpid/cpp/bindings/swig_perl_typemaps.i b/qpid/cpp/bindings/swig_perl_typemaps.i index 831576a7d4..02e2d4a6b6 100644 --- a/qpid/cpp/bindings/swig_perl_typemaps.i +++ b/qpid/cpp/bindings/swig_perl_typemaps.i @@ -47,7 +47,9 @@ return qpid::types::Variant((float)SvNV(value)); } else if (SvPOK(value)) { - return qpid::types::Variant(std::string(SvPV_nolen(value))); + STRLEN len; + char *ptr = SvPV(value, len); + return qpid::types::Variant(std::string(ptr, len)); } } return qpid::types::Variant(); @@ -173,7 +175,7 @@ argvi++; } -%typemap (in) uint16_t, uint32_t, uint64_t { +%typemap (in) uint8_t, uint16_t, uint32_t, uint64_t { if (SvIOK($input)) { $1 = ($1_ltype)SvUV($input); } @@ -182,12 +184,12 @@ } } -%typemap (out) uint16_t, uint32_t, uint64_t { +%typemap (out) uint8_t, uint16_t, uint32_t, uint64_t { sv_setuv($result, (UV)$1); argvi++; } -%typemap (in) int32_t, int64_t { +%typemap (in) int8_t, int16_t, int32_t, int64_t { if (SvIOK($input)) { $1 = ($1_ltype)SvIV($input); } @@ -196,7 +198,7 @@ } } -%typemap (out) int32_t, int64_t { +%typemap (out) int8_t, int16_t, int32_t, int64_t { sv_setiv($result, (IV)$1); argvi++; } diff --git a/qpid/cpp/etc/CMakeLists.txt b/qpid/cpp/etc/CMakeLists.txt index d9266537b0..014842c9c7 100644 --- a/qpid/cpp/etc/CMakeLists.txt +++ b/qpid/cpp/etc/CMakeLists.txt @@ -23,11 +23,6 @@ install(FILES qpidc.conf install(FILES qpidd.conf DESTINATION ${QPID_INSTALL_CONFDIR} COMPONENT ${QPID_COMPONENT_BROKER}) -if (UNIX) - install(FILES qpidd.service - DESTINATION ${QPID_INSTALL_SYSTEMDDIR} - COMPONENT ${QPID_COMPONENT_BROKER}) -endif (UNIX) if (BUILD_SASL) install(FILES sasl2/qpidd.conf DESTINATION ${QPID_INSTALL_SASLDIR} diff --git a/qpid/cpp/etc/Makefile.am b/qpid/cpp/etc/Makefile.am index 80c5fc51eb..aa41c65b37 100644 --- a/qpid/cpp/etc/Makefile.am +++ b/qpid/cpp/etc/Makefile.am @@ -20,7 +20,7 @@ SASL_CONF = sasl2/qpidd.conf EXTRA_DIST = \ $(SASL_CONF) \ - qpidd.service qpidd.in qpidd-primary.in qpidd.conf qpidc.conf CMakeLists.txt \ + qpidd.in qpidd-primary.in qpidd.conf qpidc.conf CMakeLists.txt \ cluster.conf-example.xml.in confdir = $(sysconfdir)/qpid @@ -53,5 +53,3 @@ CLEANFILES = qpidd qpidd-primary cluster.conf-example.xml initddir = $(sysconfdir)/init.d nobase_initd_SCRIPTS = qpidd qpidd-primary -systemddir = /usr/lib/systemd/system -nobase_systemd_SCRIPTS = qpidd.service diff --git a/qpid/cpp/etc/qpidd.service b/qpid/cpp/etc/qpidd.service deleted file mode 100644 index a6549834f4..0000000000 --- a/qpid/cpp/etc/qpidd.service +++ /dev/null @@ -1,13 +0,0 @@ -[Unit] -Description=An AMQP message broker daemon. -Documentation=man:qpidd(1) http://qpid.apache.org/ -Requires=network.target - -[Service] -User=qpidd -Group=qpidd -Type=simple -ExecStart=/usr/sbin/qpidd --config /etc/qpidd.conf - -[Install] -WantedBy=multi-user.target diff --git a/qpid/cpp/examples/makedist.mk b/qpid/cpp/examples/makedist.mk index c494af5e8f..9a1568d427 100644 --- a/qpid/cpp/examples/makedist.mk +++ b/qpid/cpp/examples/makedist.mk @@ -20,6 +20,7 @@ AM_CXXFLAGS = $(WARNING_CFLAGS) INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include CLIENT_LIB=$(top_builddir)/src/libqpidclient.la +COMMON_LIB=$(top_builddir)/src/libqpidcommon.la CONSOLE_LIB=$(top_builddir)/src/libqmfconsole.la CLIENTFLAGS=-lqpidclient CONSOLEFLAGS=-lqmfconsole diff --git a/qpid/cpp/examples/messaging/Makefile.am b/qpid/cpp/examples/messaging/Makefile.am index d5303f4437..f11ca20c71 100644 --- a/qpid/cpp/examples/messaging/Makefile.am +++ b/qpid/cpp/examples/messaging/Makefile.am @@ -22,6 +22,7 @@ examplesdir=$(pkgdatadir)/examples/messaging AM_CXXFLAGS = $(WARNING_CFLAGS) INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include CLIENT_LIB=$(top_builddir)/src/libqpidmessaging.la +TYPES_LIB=$(top_builddir)/src/libqpidtypes.la CLIENTFLAGS=-lqpidmessaging noinst_PROGRAMS=drain spout client server map_sender map_receiver hello_world hello_xml @@ -33,10 +34,10 @@ hello_xml_SOURCES=hello_xml.cpp hello_xml_LDADD=$(CLIENT_LIB) drain_SOURCES=drain.cpp OptionParser.h OptionParser.cpp -drain_LDADD=$(CLIENT_LIB) -lqpidtypes +drain_LDADD=$(CLIENT_LIB) $(TYPES_LIB) spout_SOURCES=spout.cpp OptionParser.h OptionParser.cpp -spout_LDADD=$(CLIENT_LIB) -lqpidtypes +spout_LDADD=$(CLIENT_LIB) $(TYPES_LIB) client_SOURCES=client.cpp client_LDADD=$(CLIENT_LIB) @@ -45,10 +46,10 @@ server_SOURCES=server.cpp server_LDADD=$(CLIENT_LIB) map_sender_SOURCES=map_sender.cpp -map_sender_LDADD=$(CLIENT_LIB) -lqpidtypes +map_sender_LDADD=$(CLIENT_LIB) $(TYPES_LIB) map_receiver_SOURCES=map_receiver.cpp -map_receiver_LDADD=$(CLIENT_LIB) -lqpidtypes +map_receiver_LDADD=$(CLIENT_LIB) $(TYPES_LIB) examples_DATA= \ hello_world.cpp \ diff --git a/qpid/cpp/examples/old_api/direct/Makefile.am b/qpid/cpp/examples/old_api/direct/Makefile.am index 09709c2bf4..18957c84f4 100644 --- a/qpid/cpp/examples/old_api/direct/Makefile.am +++ b/qpid/cpp/examples/old_api/direct/Makefile.am @@ -23,13 +23,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=direct_producer listener declare_queues direct_producer_SOURCES=direct_producer.cpp -direct_producer_LDADD=$(CLIENT_LIB) -lqpidcommon +direct_producer_LDADD=$(CLIENT_LIB) $(COMMON_LIB) listener_SOURCES=listener.cpp -listener_LDADD=$(CLIENT_LIB) -lqpidcommon +listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB) declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon +declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB) examples_DATA= \ direct_producer.cpp \ diff --git a/qpid/cpp/examples/old_api/failover/Makefile.am b/qpid/cpp/examples/old_api/failover/Makefile.am index 516c3625c1..60e99b9ed6 100644 --- a/qpid/cpp/examples/old_api/failover/Makefile.am +++ b/qpid/cpp/examples/old_api/failover/Makefile.am @@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon +declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB) resuming_receiver_SOURCES=resuming_receiver.cpp -resuming_receiver_LDADD=$(CLIENT_LIB) -lqpidcommon +resuming_receiver_LDADD=$(CLIENT_LIB) $(COMMON_LIB) replaying_sender_SOURCES=replaying_sender.cpp -replaying_sender_LDADD=$(CLIENT_LIB) -lqpidcommon +replaying_sender_LDADD=$(CLIENT_LIB) $(COMMON_LIB) examples_DATA= \ declare_queues.cpp \ diff --git a/qpid/cpp/examples/old_api/fanout/Makefile.am b/qpid/cpp/examples/old_api/fanout/Makefile.am index 797312a72d..06e84b47b6 100644 --- a/qpid/cpp/examples/old_api/fanout/Makefile.am +++ b/qpid/cpp/examples/old_api/fanout/Makefile.am @@ -26,7 +26,7 @@ fanout_producer_SOURCES=fanout_producer.cpp fanout_producer_LDADD=$(CLIENT_LIB) listener_SOURCES=listener.cpp -listener_LDADD=$(CLIENT_LIB) -lqpidcommon +listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB) examples_DATA= \ fanout_producer.cpp \ diff --git a/qpid/cpp/examples/old_api/pub-sub/Makefile.am b/qpid/cpp/examples/old_api/pub-sub/Makefile.am index fc61236475..e8e19e4c32 100644 --- a/qpid/cpp/examples/old_api/pub-sub/Makefile.am +++ b/qpid/cpp/examples/old_api/pub-sub/Makefile.am @@ -24,10 +24,10 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=topic_listener topic_publisher topic_listener_SOURCES=topic_listener.cpp -topic_listener_LDADD=$(CLIENT_LIB) -lqpidcommon +topic_listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB) topic_publisher_SOURCES=topic_publisher.cpp -topic_publisher_LDADD=$(CLIENT_LIB) -lqpidcommon +topic_publisher_LDADD=$(CLIENT_LIB) $(COMMON_LIB) examples_DATA= \ topic_listener.cpp \ diff --git a/qpid/cpp/examples/old_api/request-response/Makefile.am b/qpid/cpp/examples/old_api/request-response/Makefile.am index 92f5bc6558..cf10ae81db 100644 --- a/qpid/cpp/examples/old_api/request-response/Makefile.am +++ b/qpid/cpp/examples/old_api/request-response/Makefile.am @@ -24,10 +24,10 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=client server client_SOURCES=client.cpp -client_LDADD=$(CLIENT_LIB) -lqpidcommon +client_LDADD=$(CLIENT_LIB) $(COMMON_LIB) server_SOURCES=server.cpp -server_LDADD=$(CLIENT_LIB) -lqpidcommon +server_LDADD=$(CLIENT_LIB) $(COMMON_LIB) examples_DATA= \ server.cpp \ diff --git a/qpid/cpp/examples/old_api/tradedemo/Makefile.am b/qpid/cpp/examples/old_api/tradedemo/Makefile.am index a05bbc3780..9932d87a6b 100644 --- a/qpid/cpp/examples/old_api/tradedemo/Makefile.am +++ b/qpid/cpp/examples/old_api/tradedemo/Makefile.am @@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=topic_listener topic_publisher declare_queues topic_listener_SOURCES=topic_listener.cpp -topic_listener_LDADD=$(CLIENT_LIB) -lqpidcommon +topic_listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB) topic_publisher_SOURCES=topic_publisher.cpp -topic_publisher_LDADD=$(CLIENT_LIB) -lqpidcommon +topic_publisher_LDADD=$(CLIENT_LIB) $(COMMON_LIB) declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon +declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB) examples_DATA= \ diff --git a/qpid/cpp/examples/old_api/xml-exchange/Makefile.am b/qpid/cpp/examples/old_api/xml-exchange/Makefile.am index 9391806849..d4bc6ba233 100644 --- a/qpid/cpp/examples/old_api/xml-exchange/Makefile.am +++ b/qpid/cpp/examples/old_api/xml-exchange/Makefile.am @@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk noinst_PROGRAMS=declare_queues xml_producer listener declare_queues_SOURCES=declare_queues.cpp -declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon +declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB) xml_producer_SOURCES=xml_producer.cpp -xml_producer_LDADD=$(CLIENT_LIB) -lqpidcommon +xml_producer_LDADD=$(CLIENT_LIB) $(COMMON_LIB) listener_SOURCES=listener.cpp -listener_LDADD=$(CLIENT_LIB) -lqpidcommon +listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB) EXTRA_DIST= \ README.txt \ diff --git a/qpid/cpp/include/qpid/management/Manageable.h b/qpid/cpp/include/qpid/management/Manageable.h index e72dc0b332..ede5c29e43 100644 --- a/qpid/cpp/include/qpid/management/Manageable.h +++ b/qpid/cpp/include/qpid/management/Manageable.h @@ -55,8 +55,11 @@ class QPID_COMMON_EXTERN Manageable // // This accessor function returns a pointer to the management object. // - virtual ManagementObject* GetManagementObject() const; - virtual ManagementObject::shared_ptr GetManagementObjectShared() const; +#ifdef _IN_QPID_BROKER + virtual ManagementObject::shared_ptr GetManagementObject() const = 0; +#else + virtual ManagementObject* GetManagementObject() const = 0; +#endif // Every "Manageable" object must implement ManagementMethod. This // function is called when a remote management client invokes a method diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h index 2aca6fb1c5..93fbec7bc7 100644 --- a/qpid/cpp/include/qpid/management/ManagementObject.h +++ b/qpid/cpp/include/qpid/management/ManagementObject.h @@ -25,10 +25,13 @@ #include "qpid/management/Mutex.h" #include "qpid/types/Variant.h" -#include <boost/shared_ptr.hpp> #include <map> #include <vector> +#ifdef _IN_QPID_BROKER +#include <boost/shared_ptr.hpp> +#endif + namespace qpid { namespace management { @@ -155,7 +158,9 @@ protected: QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const; public: +#ifdef _IN_QPID_BROKER typedef boost::shared_ptr<ManagementObject> shared_ptr; +#endif QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16; QPID_COMMON_EXTERN static int maxThreads; @@ -229,8 +234,10 @@ protected: //QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map); }; +#ifdef _IN_QPID_BROKER typedef std::map<ObjectId, ManagementObject::shared_ptr> ManagementObjectMap; typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector; +#endif }} diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py index cfbc88f7a9..7bf161dc2b 100755 --- a/qpid/cpp/managementgen/qmfgen/schema.py +++ b/qpid/cpp/managementgen/qmfgen/schema.py @@ -1523,12 +1523,8 @@ class SchemaClass: def genParentRefAssignment (self, stream, variables): for config in self.properties: if config.isParentRef == 1: - if variables['genForBroker']: - stream.write (config.getName () + \ - " = _parent->GetManagementObjectShared()->getObjectId ();") - else: - stream.write (config.getName () + \ - " = _parent->GetManagementObject()->getObjectId ();") + stream.write (config.getName () + \ + " = _parent->GetManagementObject()->getObjectId();") return def genSchemaMD5 (self, stream, variables): diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h index 362d268aba..cd43cef7f4 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Class.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h @@ -26,6 +26,7 @@ #include "qpid/management/ManagementObject.h" /*MGEN:IF(Root.InBroker)*/ #include "qmf/BrokerImportExport.h" +#include <boost/shared_ptr.hpp> /*MGEN:ENDIF*/ #include <limits> @@ -79,7 +80,9 @@ namespace qmf { void aggregatePerThreadStats(struct PerThreadStats*) const; /*MGEN:ENDIF*/ public: +/*MGEN:IF(Root.InBroker)*/ typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr; +/*MGEN:ENDIF*/ /*MGEN:Root.ExternMethod*/ static void writeSchema(std::string& schema); /*MGEN:Root.ExternMethod*/ void mapEncodeValues(::qpid::types::Variant::Map& map, diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 579e792b62..731451754f 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -277,7 +277,15 @@ if (CMAKE_COMPILER_IS_GNUCXX) set (CATCH_UNDEFINED "") endif (CMAKE_SYSTEM_NAME STREQUAL SunOS) set (COMPILER_FLAGS "-fvisibility-inlines-hidden") - set (HIDE_SYMBOL_FLAGS "-fvisibility=hidden") + # gcc 4.1.2 on RHEL 5 needs -Wno-attributes to avoid an error that's fixed + # in later gcc versions. + execute_process(COMMAND ${CMAKE_CXX_COMPILER} -dumpversion + OUTPUT_VARIABLE GCC_VERSION) + if (GCC_VERSION VERSION_EQUAL 4.1.2) + set (HIDE_SYMBOL_FLAGS "-fvisibility=hidden -Wno-attributes") + else (GCC_VERSION VERSION_EQUAL 4.1.2) + set (HIDE_SYMBOL_FLAGS "-fvisibility=hidden") + endif (GCC_VERSION VERSION_EQUAL 4.1.2) endif (CMAKE_COMPILER_IS_GNUCXX) if (CMAKE_CXX_COMPILER_ID STREQUAL SunPro) @@ -578,6 +586,7 @@ if (BUILD_XML) target_link_libraries (xml xerces-c xqilla qpidbroker pthread) set_target_properties (xml PROPERTIES PREFIX "" + COMPILE_DEFINITIONS _IN_QPID_BROKER LINK_FLAGS "${CATCH_UNDEFINED}") install (TARGETS xml DESTINATION ${QPIDD_MODULE_DIR} @@ -614,6 +623,7 @@ if (BUILD_ACL) target_link_libraries (acl qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY}) set_target_properties (acl PROPERTIES PREFIX "" + COMPILE_DEFINITIONS _IN_QPID_BROKER LINK_FLAGS "${CATCH_UNDEFINED}") install (TARGETS acl DESTINATION ${QPIDD_MODULE_DIR} @@ -661,7 +671,7 @@ if (BUILD_HA) ) add_library (ha MODULE ${ha_SOURCES}) - set_target_properties (ha PROPERTIES PREFIX "") + set_target_properties (ha PROPERTIES PREFIX "" COMPILE_DEFINITIONS _IN_QPID_BROKER) target_link_libraries (ha qpidtypes qpidcommon qpidbroker qpidmessaging) if (CMAKE_COMPILER_IS_GNUCXX) set_target_properties (ha PROPERTIES @@ -1225,7 +1235,7 @@ set (qpidbroker_SOURCES add_msvc_version (qpidbroker library dll) add_library (qpidbroker SHARED ${qpidbroker_SOURCES}) target_link_libraries (qpidbroker qpidcommon ${qpidbroker_platform_LIBS}) -set_target_properties (qpidbroker PROPERTIES VERSION ${qpidbroker_version}) +set_target_properties (qpidbroker PROPERTIES VERSION ${qpidbroker_version} COMPILE_DEFINITIONS _IN_QPID_BROKER) if (MSVC) set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290) endif (MSVC) @@ -1244,6 +1254,7 @@ add_msvc_version (qpidd application exe) add_executable (qpidd ${qpidd_SOURCES}) target_link_libraries (qpidd qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_FILESYSTEM_LIBRARY}) +set_target_properties (qpidd PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) install (TARGETS qpidd RUNTIME DESTINATION ${QPID_INSTALL_SBINDIR} COMPONENT ${QPID_COMPONENT_BROKER}) diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 91ff0621c0..cdddd22c41 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -139,9 +139,10 @@ tmoduleexecdir = $(libdir)/qpid/tests tmoduleexec_LTLIBRARIES= AM_CXXFLAGS += -DBOOST_FILESYSTEM_VERSION=2 +BROKER_CXXFLAGS = -D_IN_QPID_BROKER ## Automake macros to build libraries and executables. -qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduleexecdir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\" +qpidd_CXXFLAGS = $(AM_CXXFLAGS) $(BROKER_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduleexecdir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\" libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduleexecdir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\" qpidd_LDADD = \ @@ -259,7 +260,7 @@ rdma_la_LIBADD = \ -libverbs rdma_la_LDFLAGS = $(PLUGINLDFLAGS) rdma_la_CXXFLAGS = \ - $(AM_CXXFLAGS) -Wno-missing-field-initializers + $(AM_CXXFLAGS) -Wno-missing-field-initializers -D_IN_QPID_BROKER dmoduleexec_LTLIBRARIES += \ rdma.la @@ -759,6 +760,7 @@ libqpidbroker_la_SOURCES = \ QPIDBROKER_VERSION_INFO = 2:0:0 libqpidbroker_la_LDFLAGS = -version-info $(QPIDBROKER_VERSION_INFO) +libqpidbroker_la_CXXFLAGS=$(AM_CXXFLAGS) $(BROKER_CXXFLAGS) if HAVE_PROTON @@ -793,7 +795,7 @@ amqp_la_SOURCES = \ qpid/broker/amqp/Translation.h \ qpid/broker/amqp/Translation.cpp -amqp_la_CXXFLAGS=$(AM_CXXFLAGS) $(PROTON_CFLAGS) +amqp_la_CXXFLAGS=$(AM_CXXFLAGS) $(BROKER_CXXFLAGS) $(PROTON_CFLAGS) amqp_la_LDFLAGS = $(PLUGINLDFLAGS) $(PROTON_LIBS) cmoduleexec_LTLIBRARIES += amqpc.la @@ -950,6 +952,7 @@ libqpidmessaging_la_LDFLAGS = -version-info $(QPIDMESSAGING_VERSION_INFO) # NOTE: only public header files (which should be in ../include) # should go in this list. Private headers should go in the SOURCES # list for one of the libraries or executables that includes it. +# Also included are the swig descriptor files. nobase_include_HEADERS += \ ../include/qpid/Address.h \ @@ -1036,7 +1039,10 @@ nobase_include_HEADERS += \ ../include/qpid/types/Exception.h \ ../include/qpid/types/Uuid.h \ ../include/qpid/types/Variant.h \ - ../include/qpid/types/ImportExport.h + ../include/qpid/types/ImportExport.h \ + ../include/qpid/qpid.i \ + ../include/qmf/qmfengine.i \ + ../include/qmf/qmf2.i # Create the default data directory install-data-local: diff --git a/qpid/cpp/src/acl.mk b/qpid/cpp/src/acl.mk index dfbcd06f4c..87821a3741 100644 --- a/qpid/cpp/src/acl.mk +++ b/qpid/cpp/src/acl.mk @@ -43,4 +43,5 @@ if SUNOS endif acl_la_LDFLAGS = $(PLUGINLDFLAGS) +acl_la_CXXFLAGS = $(AM_CXXFLAGS) -D_IN_QPID_BROKER diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 355e591cf6..718e6fe342 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -82,6 +82,7 @@ if (BUILD_AMQP) PREFIX "" COMPILE_FLAGS "${PROTON_COMPILE_FLAGS}" LINK_FLAGS "${PROTON_LINK_FLAGS}") + set_target_properties (amqp PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) install (TARGETS amqp DESTINATION ${QPIDD_MODULE_DIR} COMPONENT ${QPID_COMPONENT_BROKER}) diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 0cc0760d94..31b3bc243d 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -59,3 +59,4 @@ ha_la_SOURCES = \ ha_la_LIBADD = libqpidbroker.la libqpidmessaging.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) +ha_la_CXXFLAGS = $(AM_CXXFLAGS) -D_IN_QPID_BROKER diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index 40857f411f..a681a6d18d 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon { uint16_t port=brokerPtr->getPort(options->daemon.transport); ready(port); // Notify parent. if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) { - boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); } brokerPtr->run(); } @@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *options) { uint16_t port = brokerPtr->getPort(myOptions->daemon.transport); cout << port << endl; if (options->broker.enableMgmt) { - boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); } } brokerPtr->run(); diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index 3634c0cdc1..61e0b56104 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -317,7 +317,7 @@ Acl::~Acl(){ broker->getConnectionObservers().remove(connectionCounter); } -ManagementObject::shared_ptr Acl::GetManagementObjectShared(void) const +ManagementObject::shared_ptr Acl::GetManagementObject(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index 8c1a925713..ea3c6586a3 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -117,7 +117,7 @@ private: bool readAclFile(std::string& aclFile, std::string& errorText); Manageable::status_t lookup (management::Args& args, std::string& text); Manageable::status_t lookupPublish(management::Args& args, std::string& text); - virtual qpid::management::ManagementObject::shared_ptr GetManagementObjectShared(void) const; + virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 09b7fa58e9..a48789973a 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -654,7 +654,10 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk) { - moveNewObjectsLH(); + { + sys::Mutex::ScopedLock lock(agentLock); + moveNewObjectsLH(lock); + } Variant::Map inMap; Variant::Map::const_iterator i; @@ -985,14 +988,37 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage( return result.first; } -void ManagementAgentImpl::moveNewObjectsLH() +// note well: caller must hold agentLock when calling this! +void ManagementAgentImpl::moveNewObjectsLH(const sys::Mutex::ScopedLock& /*agentLock*/) { sys::Mutex::ScopedLock lock(addLock); - for (ObjectMap::iterator iter = newManagementObjects.begin(); - iter != newManagementObjects.end(); - iter++) - managementObjects[iter->first] = iter->second; - newManagementObjects.clear(); + ObjectMap::iterator newObj = newManagementObjects.begin(); + while (newObj != newManagementObjects.end()) { + // before adding a new mgmt object, check for duplicates: + ObjectMap::iterator oldObj = managementObjects.find(newObj->first); + if (oldObj == managementObjects.end()) { + managementObjects[newObj->first] = newObj->second; + newManagementObjects.erase(newObj++); // post inc iterator safe! + } else { + // object exists with same object id. This may be legit, for example, when a + // recently deleted object is re-added before the mgmt poll runs. + if (newObj->second->isDeleted()) { + // @TODO fixme: we missed an add-delete for the new object + QPID_LOG(warning, "Mgmt Object deleted before update sent, oid=" << newObj->first); + newManagementObjects.erase(newObj++); // post inc iterator safe! + } else if (oldObj->second->isDeleted()) { + // skip adding newObj, try again later once oldObj has been cleaned up by poll + ++newObj; + } else { + // real bad - two objects exist with same OID. This is a bug in the application + QPID_LOG(error, "Detected two Mgmt Objects using the same object id! oid=" << newObj->first + << ", this is bad!"); + // what to do here? Can't erase an active obj - owner has a pointer to it. + // for now I punt. Maybe the flood of log messages will get someone's attention :P + ++newObj; + } + } + } } void ManagementAgentImpl::addClassLocal(uint8_t classKind, @@ -1060,7 +1086,7 @@ void ManagementAgentImpl::periodicProcessing() if (!connected) return; - moveNewObjectsLH(); + moveNewObjectsLH(lock); // // Clear the been-here flag on all objects in the map. diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 53f3c13a91..d801989f64 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -261,7 +261,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void storeData(bool requested=false); void retrieveData(std::string& vendor, std::string& product, std::string& inst); PackageMap::iterator findOrAddPackage(const std::string& name); - void moveNewObjectsLH(); + void moveNewObjectsLH(const sys::Mutex::ScopedLock& agentLock); void addClassLocal (uint8_t classKind, PackageMap::iterator pIter, const std::string& className, diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 90cb1a79ed..d7844b50ce 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -49,6 +49,11 @@ using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; +namespace { +const std::string QPID_REPLICATE("qpid.replicate"); +const std::string NONE("none"); +} + namespace qpid { namespace broker { @@ -298,7 +303,7 @@ uint32_t Bridge::encodedSize() const + 2; // sync } -management::ManagementObject::shared_ptr Bridge::GetManagementObjectShared (void) const +management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const { return mgmtObject; } @@ -333,6 +338,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList, } string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag); + bindArgs.setString(QPID_REPLICATE, NONE); bindArgs.setString(qpidFedOp, op); bindArgs.setString(qpidFedTags, newTagList); if (origin.empty()) diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 9f99c9ce01..da397b8f77 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -73,7 +73,7 @@ class Bridge : public PersistableConfig, bool isDetached() const { return detached; } - management::ManagementObject::shared_ptr GetManagementObjectShared() const; + management::ManagementObject::shared_ptr GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 292820abe4..094dd63527 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) : systemObject = System::shared_ptr(system); mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker")); - mgmtObject->set_systemRef(system->GetManagementObjectShared()->getObjectId()); + mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); mgmtObject->set_connBacklog(conf.connectionBacklog); @@ -454,7 +454,7 @@ Broker::~Broker() { QPID_LOG(notice, "Shut down"); } -ManagementObject::shared_ptr Broker::GetManagementObjectShared(void) const +ManagementObject::shared_ptr Broker::GetManagementObject(void) const { return mgmtObject; } @@ -1250,6 +1250,7 @@ void Broker::bind(const std::string& queueName, QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName << " queue:" << queueName << " key:" << key + << " arguments:" << arguments << " user:" << userId << " rhost:" << connectionId); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index eecfd3925c..0a8f406dbf 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -235,7 +235,7 @@ class Broker : public sys::Runnable, public Plugin::Target, SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } - QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared() const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const; QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const; QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod( uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 238bb71fb5..3cb30a82e3 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -402,7 +402,7 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject::shared_ptr Connection::GetManagementObjectShared(void) const +ManagementObject::shared_ptr Connection::GetManagementObject(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 91470dc3df..2f25b0e3f9 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -112,7 +112,7 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObjectShared (void) const; + management::ManagementObject::shared_ptr GetManagementObject(void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); diff --git a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h +++ /dev/null diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 2fa7ce0fc5..773a99d2c9 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { Mutex::ScopedLock l(lock); - Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin)); BoundKey& bk = bindings[routingKey]; if (exclusiveBinding) bk.queues.clear(); diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 20bd76f645..9098c75f0b 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -177,7 +177,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : mgmtExchange->set_autoDelete(false); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared()); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); } } } @@ -198,7 +198,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange->set_arguments(ManagementAgent::toMap(args)); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared()); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); } } @@ -227,7 +227,7 @@ void Exchange::setAlternate(Exchange::shared_ptr _alternate) alternate = _alternate; if (mgmtExchange != 0) { if (alternate.get() != 0) - mgmtExchange->set_altExchange(alternate->GetManagementObjectShared()->getObjectId()); + mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId()); else mgmtExchange->clr_altExchange(); } @@ -294,7 +294,7 @@ void Exchange::recoveryComplete(ExchangeRegistry& exchanges) } } -ManagementObject::shared_ptr Exchange::GetManagementObjectShared (void) const +ManagementObject::shared_ptr Exchange::GetManagementObject (void) const { return mgmtExchange; } @@ -352,7 +352,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang Exchange::Binding::~Binding () { if (mgmtBinding != 0) { - _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared()); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); if (mo != 0) mo->dec_bindingCount(); mgmtBinding->resourceDestroy (); @@ -367,7 +367,7 @@ void Exchange::Binding::startManagement() if (broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared()); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); if (mo != 0) { management::ObjectId queueId = mo->getObjectId(); @@ -383,7 +383,7 @@ void Exchange::Binding::startManagement() } } -ManagementObject::shared_ptr Exchange::Binding::GetManagementObjectShared () const +ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const { return mgmtBinding; } diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index ec9a0bea2f..70ed393f64 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -58,7 +58,7 @@ public: framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); void startManagement(); - management::ManagementObject::shared_ptr GetManagementObjectShared() const; + management::ManagementObject::shared_ptr GetManagementObject() const; }; private: @@ -210,7 +210,7 @@ public: static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared(void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; // Federation hooks class DynamicBridge { diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 56c894c129..43c67af810 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const bool propagate = false; if (args == 0 || fedOp.empty() || fedOp == fedOpBind) { - Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin)); if (bindings.add_unless(binding, MatchQueue(queue))) { binding->startManagement(); propagate = fedBinding.addOrigin(queue->getName(), fedOrigin); diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 02c05852ff..ea7fce4ff6 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -48,6 +48,7 @@ namespace { const std::string empty; // federation related args and values + const std::string QPID_RESERVED("qpid."); const std::string qpidFedOp("qpid.fed.op"); const std::string qpidFedTags("qpid.fed.tags"); const std::string qpidFedOrigin("qpid.fed.origin"); @@ -200,8 +201,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co //matching (they are internally added properties //controlling binding propagation but not relevant to //actual routing) - Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args)); - BoundKey bk(binding); + Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable())); + BoundKey bk(binding, extra_args); if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) { binding->startManagement(); propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin); @@ -282,7 +283,7 @@ void HeadersExchange::route(Deliverable& msg) Bindings::ConstPtr p = bindings.snapshot(); if (p.get()) { for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { - Matcher matcher(i->binding->args); + Matcher matcher(i->args); msg.getMessage().processProperties(matcher); if (matcher.matches()) { b->push_back(i->binding); @@ -298,7 +299,7 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons Bindings::ConstPtr p = bindings.snapshot(); if (p.get()){ for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) { - if ( (!args || equal((*i).binding->args, *args)) && (!queue || (*i).binding->queue == queue)) { + if ( (!args || equal((*i).args, *args)) && (!queue || (*i).binding->queue == queue)) { return true; } } @@ -315,10 +316,7 @@ void HeadersExchange::getNonFedArgs(const FieldTable* args, FieldTable& nonFedAr for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i) { - const string & name(i->first); - if (name == qpidFedOp || - name == qpidFedTags || - name == qpidFedOrigin) + if (i->first.find(QPID_RESERVED) == 0) { continue; } diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index 2e4669a018..67ba793ba8 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -38,8 +38,9 @@ class HeadersExchange : public virtual Exchange { struct BoundKey { Binding::shared_ptr binding; + qpid::framing::FieldTable args; FedBinding fedBinding; - BoundKey(Binding::shared_ptr binding_) : binding(binding_) {} + BoundKey(Binding::shared_ptr binding_, const qpid::framing::FieldTable& args_) : binding(binding_), args(args_) {} }; struct MatchArgs diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ /dev/null diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h +++ /dev/null diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index db789d79cf..0c18e08cd1 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -292,8 +292,8 @@ void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; - if (!hideManagement() && connection->GetManagementObjectShared()) { - mgmtObject->set_connectionRef(connection->GetManagementObjectShared()->getObjectId()); + if (!hideManagement() && connection->GetManagementObject()) { + mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); } // Get default URL from known-hosts if not already set @@ -669,7 +669,7 @@ uint32_t Link::encodedSize() const + password.size() + 1; } -ManagementObject::shared_ptr Link::GetManagementObjectShared (void) const +ManagementObject::shared_ptr Link::GetManagementObject(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 2087b5259c..97511de08f 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -183,7 +183,7 @@ class Link : public PersistableConfig, public management::Manageable { static bool isEncodedLink(const std::string& key); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObjectShared(void) const; + management::ManagementObject::shared_ptr GetManagementObject(void) const; management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); // manage the exchange owned by this link diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 271e8476f9..8af61bb49a 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -198,7 +198,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete)); mgmtObject->set_arguments(settings.asMap()); agent->addObject(mgmtObject, 0, store != 0); - brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared()); + brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject()); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); } @@ -1108,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore) { - ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObjectShared(); + ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); if (childObj != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1154,7 +1154,7 @@ void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) alternateExchange = exchange; if (mgmtObject) { if (exchange.get() != 0) - mgmtObject->set_altExchange(exchange->GetManagementObjectShared()->getObjectId()); + mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId()); else mgmtObject->clr_altExchange(); } @@ -1258,7 +1258,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { externalQueueStore = inst; if (inst) { - ManagementObject::shared_ptr childObj = inst->GetManagementObjectShared(); + ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj != 0 && mgmtObject != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1306,7 +1306,7 @@ void Queue::countLoadedFromDisk(uint64_t size) const } -ManagementObject::shared_ptr Queue::GetManagementObjectShared (void) const +ManagementObject::shared_ptr Queue::GetManagementObject(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 25cefd144d..bf1103902e 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -340,7 +340,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const; // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared (void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; management::Manageable::status_t QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 9d6053669b..944cc7e838 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount(); if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize(); broker = queue->getBroker(); - queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObjectShared()); + queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject()); if (queueMgmtObj) { queueMgmtObj->set_flowStopped(isFlowControlActive()); } diff --git a/qpid/cpp/src/qpid/broker/RecoverableMessage.h b/qpid/cpp/src/qpid/broker/RecoverableMessage.h index c98857ceb0..aafcd756d5 100644 --- a/qpid/cpp/src/qpid/broker/RecoverableMessage.h +++ b/qpid/cpp/src/qpid/broker/RecoverableMessage.h @@ -22,12 +22,14 @@ * */ +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> #include "qpid/framing/amqp_types.h" #include "qpid/framing/Buffer.h" namespace qpid { namespace broker { +class ExpiryPolicy; /** * The interface through which messages are reloaded on recovery. @@ -38,6 +40,7 @@ public: typedef boost::shared_ptr<RecoverableMessage> shared_ptr; virtual void setPersistenceId(uint64_t id) = 0; virtual void setRedelivered() = 0; + virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0; /** * Used by store to determine whether to load content on recovery * or let message load its own content as and when it requires it. diff --git a/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h index f3ead261c1..a46f5a3676 100644 --- a/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h +++ b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h @@ -37,6 +37,7 @@ public: ~RecoverableMessageImpl() {}; void setPersistenceId(uint64_t id); void setRedelivered(); + void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep); bool loadContent(uint64_t available); void decodeContent(framing::Buffer& buffer); void recover(boost::shared_ptr<Queue> queue); diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 6d831563e2..ab89a46a46 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -186,6 +186,11 @@ void RecoverableMessageImpl::setRedelivered() msg.deliver();//increment delivery count (but at present that isn't recorded durably) } +void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep) +{ + msg.computeExpiration(ep); +} + void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) { dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 0dc8d6cdfe..0965381fcd 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -37,9 +37,11 @@ #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/FedOps.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/tuple/tuple_comparison.hpp> #include <iostream> #include <sstream> @@ -48,6 +50,11 @@ #include <assert.h> +namespace { +const std::string X_SCOPE("x-scope"); +const std::string SESSION("session"); +} + namespace qpid { namespace broker { @@ -87,6 +94,7 @@ void SemanticState::closed() { if (dtxBuffer.get()) { dtxBuffer->fail(); } + unbindSessionBindings(); requeue(); //now unsubscribe, which may trigger queue deletion and thus @@ -303,14 +311,14 @@ Consumer(_name, type), deliveryCount(0), protocols(parent->getSession().getBroker().getProtocolRegistry()) { - if (parent != 0 && queue.get() != 0 && queue->GetManagementObjectShared() !=0) + if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) { ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); if (agent != 0) { - mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObjectShared()->getObjectId(), getTag(), + mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments))); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -318,7 +326,7 @@ Consumer(_name, type), } } -ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObjectShared (void) const +ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const { return mgmtObject; } @@ -803,4 +811,63 @@ void SemanticState::detached() } } +void SemanticState::addBinding(const string& queueName, const string& exchangeName, + const string& routingKey, const framing::FieldTable& arguments) +{ + QPID_LOG (debug, "SemanticState::addBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey << ", " + << "args=" << arguments << "]"); + std::string fedOp = arguments.getAsString(qpidFedOp); + if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) { + fedOp = fedOpBind; + } + std::string fedOrigin = arguments.getAsString(qpidFedOrigin); + if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) { + bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } + else if (fedOp == fedOpUnbind) { + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } +} + +void SemanticState::removeBinding(const string& queueName, const string& exchangeName, + const string& routingKey) +{ + QPID_LOG (debug, "SemanticState::removeBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey) + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, "")); +} + +void SemanticState::unbindSessionBindings() +{ + //unbind session-scoped bindings + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + QPID_LOG (debug, "SemanticState::unbindSessionBindings [" + << "queue=" << i->get<0>() << ", " + << "exchange=" << i->get<1>()<< ", " + << "key=" << i->get<2>() << ", " + << "fedOrigin=" << i->get<3>() << "]"); + try { + std::string fedOrigin = i->get<3>(); + if (!fedOrigin.empty()) { + framing::FieldTable fedArguments; + fedArguments.setString(qpidFedOp, fedOpUnbind); + fedArguments.setString(qpidFedOrigin, fedOrigin); + session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments, + userID, connectionId); + } else { + session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(), + userID, connectionId); + } + } + catch (...) { + } + } + bindings.clear(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index afb527b0f5..f873c5c656 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -46,10 +46,12 @@ #include <list> #include <map> +#include <set> #include <vector> #include <boost/enable_shared_from_this.hpp> #include <boost/cast.hpp> +#include <boost/tuple/tuple.hpp> namespace qpid { namespace broker { @@ -163,7 +165,7 @@ class SemanticState : private boost::noncopyable { // manageable entry points QPID_BROKER_EXTERN management::ManagementObject::shared_ptr - GetManagementObjectShared(void) const; + GetManagementObject(void) const; QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); @@ -173,6 +175,8 @@ class SemanticState : private boost::noncopyable { private: typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; + typedef boost::tuple<std::string, std::string, std::string, std::string> Binding; + typedef std::set<Binding> Bindings; SessionState& session; ConsumerImplMap consumers; @@ -190,6 +194,8 @@ class SemanticState : private boost::noncopyable { //needed for queue delete events in auto-delete: const std::string connectionId; + Bindings bindings; + void checkDtxTimeout(); bool complete(DeliveryRecord&); @@ -197,6 +203,7 @@ class SemanticState : private boost::noncopyable { void requestDispatch(); void cancel(ConsumerImpl::shared_ptr); void disable(ConsumerImpl::shared_ptr); + void unbindSessionBindings(); public: @@ -271,6 +278,11 @@ class SemanticState : private boost::noncopyable { void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); DtxBufferMap& getSuspendedXids() { return suspendedXids; } + + void addBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey, const framing::FieldTable& arguments); + void removeBinding(const std::string& queueName, const std::string& exchangeName, + const std::string& routingKey); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 0263ff2a58..b679aebbfa 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -154,12 +154,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, { getBroker().bind(queueName, exchangeName, routingKey, arguments, getConnection().getUserId(), getConnection().getUrl()); + state.addBinding(queueName, exchangeName, routingKey, arguments); } void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { + state.removeBinding(queueName, exchangeName, routingKey); getBroker().unbind(queueName, exchangeName, routingKey, getConnection().getUserId(), getConnection().getUrl()); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index f48bf653fb..a6494bc362 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -65,7 +65,7 @@ SessionState::SessionState( } void SessionState::addManagementObject() { - if (GetManagementObjectShared()) return; // Already added. + if (GetManagementObject()) return; // Already added. Manageable* parent = broker.GetVhostObject (); if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); @@ -127,7 +127,7 @@ void SessionState::attach(SessionHandler& h) { if (mgmtObject != 0) { mgmtObject->set_attached (1); - mgmtObject->set_connectionRef (h.getConnection().GetManagementObjectShared()->getObjectId()); + mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId()); mgmtObject->set_channelId (h.getChannel()); } asyncCommandCompleter->attached(); @@ -148,7 +148,7 @@ void SessionState::giveReadCredit(int32_t credit) { getConnection().outputTasks.giveReadCredit(credit); } -ManagementObject::shared_ptr SessionState::GetManagementObjectShared (void) const +ManagementObject::shared_ptr SessionState::GetManagementObject(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 06643fdbef..ae28df8026 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -110,7 +110,7 @@ class SessionState : public qpid::SessionState, const qpid::types::Variant::Map& annotations, bool sync); // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObjectShared (void) const; + management::ManagementObject::shared_ptr GetManagementObject(void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index 179a3275a7..52643fb2d5 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -45,7 +45,7 @@ class System : public management::Manageable System (std::string _dataDir, Broker* broker = 0); - management::ManagementObject::shared_ptr GetManagementObjectShared (void) const + management::ManagementObject::shared_ptr GetManagementObject(void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index c11389bb17..d49464b4e1 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons } } - Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin)); + Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin)); binding->startManagement(); bk->bindingVector.push_back(binding); nBindings++; diff --git a/qpid/cpp/src/qpid/broker/TxOpVisitor.h b/qpid/cpp/src/qpid/broker/TxOpVisitor.h deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/broker/TxOpVisitor.h +++ /dev/null diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/broker/TxPublish.cpp +++ /dev/null diff --git a/qpid/cpp/src/qpid/broker/TxPublish.h b/qpid/cpp/src/qpid/broker/TxPublish.h deleted file mode 100644 index e69de29bb2..0000000000 --- a/qpid/cpp/src/qpid/broker/TxPublish.h +++ /dev/null diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h index c4b1c280e1..599b821870 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.h +++ b/qpid/cpp/src/qpid/broker/Vhost.h @@ -40,7 +40,7 @@ class Vhost : public management::Manageable Vhost (management::Manageable* parentBroker, Broker* broker = 0); - management::ManagementObject::shared_ptr GetManagementObjectShared (void) const + management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } void setFederationTag(const std::string& tag); }; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp index 8daf860f8e..0253ba5552 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -73,7 +73,7 @@ void ManagedConnection::setSaslSsf(int ssf) connection->set_saslSsf(ssf); } -qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObjectShared() const +qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const { return connection; } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h index f1514d11c5..e2d0376918 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -44,7 +44,7 @@ class ManagedConnection : public qpid::management::Manageable, public Connection std::string getUserid() const; void setSaslMechanism(const std::string&); void setSaslSsf(int); - qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const; + qpid::management::ManagementObject::shared_ptr GetManagementObject() const; bool isLocal(const ConnectionToken* t) const; void incomingMessageReceived(); void outgoingMessageSent(); diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp index 0fe20f68ab..f36a1e8da4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp @@ -37,7 +37,7 @@ ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, Queue& q, ManagedSessio { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObjectShared()->getObjectId(), id, + subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id, false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map())); agent->addObject(subscription); subscription->set_creditMode("n/a"); @@ -48,7 +48,7 @@ ManagedOutgoingLink::~ManagedOutgoingLink() if (subscription != 0) subscription->resourceDestroy(); } -qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObjectShared() const +qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const { return subscription; } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h index 19667da698..20a1095db2 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h @@ -39,7 +39,7 @@ class ManagedOutgoingLink : public qpid::management::Manageable public: ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic); virtual ~ManagedOutgoingLink(); - qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const; + qpid::management::ManagementObject::shared_ptr GetManagementObject() const; void outgoingMessageSent(); void outgoingMessageAccepted(); void outgoingMessageRejected(); diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp index f1c4940118..9bef0e842b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp @@ -38,7 +38,7 @@ ManagedSession::ManagedSession(Broker& broker, ManagedConnection& p, const std:: session->set_attached(true); session->set_detachedLifespan(0); session->clr_expireTime(); - session->set_connectionRef(parent.GetManagementObjectShared()->getObjectId()); + session->set_connectionRef(parent.GetManagementObject()->getObjectId()); agent->addObject(session); } } @@ -48,7 +48,7 @@ ManagedSession::~ManagedSession() if (session) session->resourceDestroy(); } -qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObjectShared() const +qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const { return session; } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h index 2f62c8705a..1f56964bb6 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h @@ -40,7 +40,7 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke public: ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id); virtual ~ManagedSession(); - qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const; + qpid::management::ManagementObject::shared_ptr GetManagementObject() const; bool isLocal(const ConnectionToken* t) const; void incomingMessageReceived(); void incomingMessageAccepted(); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 6b88111732..8f3eb3bf90 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -534,17 +534,19 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = queues.find(values[QNAME].asString()); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && + replicationTest.replicateLevel(args)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() - << " key=" << key); + << " key=" << key + << " args=" << args); queue->bind(exchange, key, args); } } @@ -559,13 +561,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); - exchange->unbind(queue, key, &args); + exchange->unbind(queue, key, 0); } } @@ -692,16 +692,19 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = exchanges.find(exName); boost::shared_ptr<Queue> queue = queues.find(qName); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + // Automatically replicate binding if queue and exchange exist and are replicated if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) && + replicationTest.replicateLevel(args)) { string key = values[BINDING_KEY].asString(); QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName << " queue:" << qName - << " key:" << key); - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + << " key:" << key + << " args:" << args); queue->bind(exchange, key, args); } } @@ -837,6 +840,13 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) { } } +// Callback function for accumulating exchange candidates +namespace { + void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) { + c.push_back(i); + } +} + void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected"); connection = 0; @@ -844,7 +854,7 @@ void BrokerReplicator::disconnected() { vector<boost::shared_ptr<Exchange> > collect; // Make a copy so we can work outside the ExchangeRegistry lock exchanges.eachExchange( - boost::bind(&vector<boost::shared_ptr<Exchange> >::push_back, ref(collect), _1)); + boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1)); for_each(collect.begin(), collect.end(), boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 1358baf0e1..8c16a5ea38 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -79,6 +79,11 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) } } +namespace { +const std::string NONE("none"); +bool isNone(const std::string& x) { return x.empty() || x == NONE; } +} + // Called in Plugin::initialize void HaBroker::initialize() { @@ -110,11 +115,10 @@ void HaBroker::initialize() { backup.reset(new Backup(*this, settings)); broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo)); + if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl)); + if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl)); } - if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl)); - if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl)); - // NOTE: lock is not needed in a constructor, but create one // to pass to functions that have a ScopedLock parameter. @@ -182,7 +186,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, break; } case _qmf::HaBroker::METHOD_SETPUBLICURL: { - setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url)); + setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url)); break; } case _qmf::HaBroker::METHOD_REPLICATE: { @@ -217,19 +221,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, return Manageable::STATUS_OK; } -void HaBroker::setClientUrl(const Url& url) { +void HaBroker::setPublicUrl(const Url& url) { Mutex::ScopedLock l(lock); - if (url.empty()) throw Exception("Invalid empty URL for HA client failover"); - clientUrl = url; - updateClientUrl(l); -} - -void HaBroker::updateClientUrl(Mutex::ScopedLock&) { - Url url = clientUrl.empty() ? brokerUrl : clientUrl; + publicUrl = url; mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, logPrefix << "Setting client URL to: " << url); + QPID_LOG(debug, logPrefix << "Setting public URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url) { @@ -238,10 +236,8 @@ void HaBroker::setBrokerUrl(const Url& url) { Mutex::ScopedLock l(lock); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); - QPID_LOG(info, logPrefix << "Broker URL set to: " << url); + QPID_LOG(info, logPrefix << "Brokers URL set to: " << url); if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url); - // Updating broker URL also updates defaulted client URL: - if (clientUrl.empty()) updateClientUrl(l); b = backup; } if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 4b3f1d49c1..76dbf57a0c 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -71,7 +71,7 @@ class HaBroker : public management::Manageable void initialize(); // Implement Manageable. - qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObject; } + qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; } management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); @@ -100,7 +100,7 @@ class HaBroker : public management::Manageable types::Uuid getSystemId() const { return systemId; } private: - void setClientUrl(const Url&); + void setPublicUrl(const Url&); void setBrokerUrl(const Url&); void updateClientUrl(sys::Mutex::ScopedLock&); @@ -125,7 +125,7 @@ class HaBroker : public management::Manageable boost::shared_ptr<Backup> backup; boost::shared_ptr<Primary> primary; qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; - Url clientUrl, brokerUrl; + Url publicUrl, brokerUrl; std::vector<Url> knownBrokers; BrokerStatus status; BrokerInfo brokerInfo; diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 3d77a4cbd1..5edb98c135 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -33,9 +33,11 @@ struct Options : public qpid::Options { addOptions() ("ha-cluster", optValue(settings.cluster, "yes|no"), "Join a HA active/passive cluster.") + ("ha-queue-replication", optValue(settings.queueReplication, "yes|no"), + "Enable replication of specific queues without joining a cluster") ("ha-brokers-url", optValue(settings.brokerUrl,"URL"), "URL with address of each broker in the cluster.") - ("ha-public-url", optValue(settings.clientUrl,"URL"), + ("ha-public-url", optValue(settings.publicUrl,"URL"), "URL advertized to clients to connect to the cluster.") ("ha-replicate", optValue(settings.replicateDefault, "LEVEL"), @@ -68,7 +70,7 @@ struct HaPlugin : public Plugin { void earlyInitialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker) { + if (broker && (settings.cluster || settings.queueReplication)) { if (!broker->getManagementAgent()) { QPID_LOG(info, "HA plugin disabled because management is disabled"); if (settings.cluster) diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 1be068063a..53b61415cf 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -35,12 +35,14 @@ namespace ha { class Settings { public: - Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5), + Settings() : cluster(false), queueReplication(false), + replicateDefault(NONE), backupTimeout(5), flowMessages(100), flowBytes(0) {} bool cluster; // True if we are a cluster member. - std::string clientUrl; + bool queueReplication; // True if enabled. + std::string publicUrl; std::string brokerUrl; Enum<ReplicateLevel> replicateDefault; std::string username, password, mechanism; diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp index 322ec16656..651215ffb5 100644 --- a/qpid/cpp/src/qpid/management/Manageable.cpp +++ b/qpid/cpp/src/qpid/management/Manageable.cpp @@ -41,16 +41,6 @@ string Manageable::StatusText (status_t status, string text) return "??"; } -ManagementObject* Manageable::GetManagementObject(void) const -{ - return 0; -} - -ManagementObject::shared_ptr Manageable::GetManagementObjectShared() const -{ - return ManagementObject::shared_ptr(); -} - Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&) { return STATUS_UNKNOWN_METHOD; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 3f647ba052..7b8808c0a0 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -698,7 +698,7 @@ void ManagementAgent::periodicProcessing (void) // if (publish) { uint64_t uptime = sys::Duration(startTime, sys::now()); - boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); qpid::sys::MemStat::loadMemInfo(memstat.get()); } @@ -1722,7 +1722,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply string label; uint32_t requestedBrokerBank, requestedAgentBank; uint32_t assignedBank; - ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObjectShared()->getObjectId(); + ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; moveNewObjects(); @@ -1754,7 +1754,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get())); agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); - agent->mgmtObject->set_registeredTo (broker->GetManagementObjectShared()->getObjectId()); + agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); agent->mgmtObject->set_brokerBank (brokerBank); agent->mgmtObject->set_agentBank (assignedBank); @@ -1831,7 +1831,7 @@ void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } @@ -1945,7 +1945,7 @@ void ManagementAgent::handleGetQuery(const string& body, const string& rte, cons if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } /* diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 9df5825e32..7f1a2e3e66 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -211,7 +211,7 @@ private: ObjectId connectionRef; qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} - ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; } + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); void mapEncode(qpid::types::Variant::Map& _map) const; diff --git a/qpid/cpp/src/qpid/store/CMakeLists.txt b/qpid/cpp/src/qpid/store/CMakeLists.txt index 9abdf0ae3d..31623f8e84 100644 --- a/qpid/cpp/src/qpid/store/CMakeLists.txt +++ b/qpid/cpp/src/qpid/store/CMakeLists.txt @@ -42,6 +42,7 @@ if (CMAKE_COMPILER_IS_GNUCXX) set_target_properties (store PROPERTIES PREFIX "" + COMPILE_DEFINITIONS _IN_QPID_BROKER LINK_FLAGS "${GCC_CATCH_UNDEFINED}") endif (CMAKE_COMPILER_IS_GNUCXX) @@ -54,7 +55,9 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) endif (MSVC) endif (CMAKE_SYSTEM_NAME STREQUAL Windows) -set_target_properties (store PROPERTIES VERSION ${qpidc_version}) +set_target_properties (store PROPERTIES + COMPILE_DEFINITIONS _IN_QPID_BROKER + VERSION ${qpidc_version}) install (TARGETS store # RUNTIME DESTINATION ${QPIDD_MODULE_DIR} COMPONENT ${QPID_COMPONENT_BROKER}) @@ -81,6 +84,7 @@ if (BUILD_MSSQL) ms-sql/State.cpp ms-sql/TplRecordset.cpp ms-sql/VariantHelper.cpp) + set_target_properties (mssql_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}) install (TARGETS mssql_store # RUNTIME DESTINATION ${QPIDD_MODULE_DIR} @@ -110,6 +114,7 @@ if (BUILD_MSCLFS) ms-sql/State.cpp ms-sql/VariantHelper.cpp) include_directories(ms-sql) + set_target_properties (msclfs_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) target_link_libraries (msclfs_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY} clfsw32.lib) install (TARGETS msclfs_store # RUNTIME DESTINATION ${QPIDD_MODULE_DIR} diff --git a/qpid/cpp/src/rdma.cmake b/qpid/cpp/src/rdma.cmake index 21597f85d2..1d355e7ae6 100644 --- a/qpid/cpp/src/rdma.cmake +++ b/qpid/cpp/src/rdma.cmake @@ -79,6 +79,7 @@ if (BUILD_RDMA) add_library (rdma MODULE qpid/sys/RdmaIOPlugin.cpp) target_link_libraries (rdma qpidbroker rdmawrap) set_target_properties (rdma PROPERTIES + COMPILE_DEFINITIONS _IN_QPID_BROKER LINK_FLAGS "${CATCH_UNDEFINED}" PREFIX "") diff --git a/qpid/cpp/src/ssl.cmake b/qpid/cpp/src/ssl.cmake index b1a1ba9fa3..b7ad58b9f0 100644 --- a/qpid/cpp/src/ssl.cmake +++ b/qpid/cpp/src/ssl.cmake @@ -90,7 +90,8 @@ if (BUILD_SSL) target_link_libraries (ssl qpidbroker sslcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}) set_target_properties (ssl PROPERTIES PREFIX "" - COMPILE_FLAGS ${NSS_COMPILE_FLAGS}) + COMPILE_FLAGS "${NSS_COMPILE_FLAGS}" + COMPILE_DEFINITIONS _IN_QPID_BROKER) if (CMAKE_COMPILER_IS_GNUCXX) set_target_properties(ssl PROPERTIES LINK_FLAGS "${GCC_CATCH_UNDEFINED}") diff --git a/qpid/cpp/src/ssl.mk b/qpid/cpp/src/ssl.mk index 24ba8f585e..ff2aa502d6 100644 --- a/qpid/cpp/src/ssl.mk +++ b/qpid/cpp/src/ssl.mk @@ -39,7 +39,7 @@ ssl_la_SOURCES = \ ssl_la_LIBADD= libqpidbroker.la libsslcommon.la -ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFLAGS) +ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFLAGS) -D_IN_QPID_BROKER ssl_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index 71e1945d94..9c21e51a18 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -123,7 +123,7 @@ class TestManageable : public qpid::management::Manageable mgmtObj = tmp; }; ~TestManageable() { mgmtObj.reset(); } - management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObj; }; + management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; }; static void validateTestObjectProperties(_qmf::TestObject& to) { // verify the default values are as expected. We don't check 'string1', @@ -209,11 +209,11 @@ QPID_AUTO_TEST_CASE(v1ObjPublish) // create a manageable test object TestManageable *tm = new TestManageable(agent, std::string("obj1")); - uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize(); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent->addObject(tm->GetManagementObjectShared(), 1); + agent->addObject(tm->GetManagementObject(), 1); // wait for the object to be published Message m1; @@ -234,7 +234,7 @@ QPID_AUTO_TEST_CASE(v1ObjPublish) // destroy the object - tm->GetManagementObjectShared()->resourceDestroy(); + tm->GetManagementObject()->resourceDestroy(); // wait for the deleted object to be published @@ -272,9 +272,9 @@ QPID_AUTO_TEST_CASE(v2ObjPublish) TestManageable *tm = new TestManageable(agent, std::string("obj2")); - Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObjectShared()->getPackageName(), "#"); + Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); - agent->addObject(tm->GetManagementObjectShared(), "testobj-1"); + agent->addObject(tm->GetManagementObject(), "testobj-1"); // wait for the object to be published Message m1; @@ -295,7 +295,7 @@ QPID_AUTO_TEST_CASE(v2ObjPublish) // destroy the object - tm->GetManagementObjectShared()->resourceDestroy(); + tm->GetManagementObject()->resourceDestroy(); // wait for the deleted object to be published @@ -335,11 +335,11 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj) // create a manageable test object TestManageable *tm = new TestManageable(agent, std::string("myObj")); - uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize(); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent->addObject(tm->GetManagementObjectShared(), 1); + agent->addObject(tm->GetManagementObject(), 1); // wait for the object to be published Message m1; @@ -352,7 +352,7 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj) // destroy the object, then immediately export (before the next poll cycle) ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObjectShared()->resourceDestroy(); + tm->GetManagementObject()->resourceDestroy(); agent->exportDeletedObjects( delObjs ); BOOST_CHECK(delObjs.size() == 1); @@ -399,11 +399,11 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj) // create a manageable test object TestManageable *tm = new TestManageable(agent, std::string("anObj")); - uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize(); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent->addObject(tm->GetManagementObjectShared(), 1); + agent->addObject(tm->GetManagementObject(), 1); // wait for the object to be published Message m1; @@ -416,7 +416,7 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj) // destroy the object, then immediately export (before the next poll cycle) ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObjectShared()->resourceDestroy(); + tm->GetManagementObject()->resourceDestroy(); agent->exportDeletedObjects( delObjs ); BOOST_CHECK(delObjs.size() == 1); @@ -478,8 +478,8 @@ QPID_AUTO_TEST_CASE(v1ExportFastDelObj) // add, then immediately delete and export the object... ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->addObject(tm->GetManagementObjectShared(), 999); - tm->GetManagementObjectShared()->resourceDestroy(); + agent->addObject(tm->GetManagementObject(), 999); + tm->GetManagementObject()->resourceDestroy(); agent->exportDeletedObjects( delObjs ); BOOST_CHECK(delObjs.size() == 1); @@ -511,8 +511,8 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) // FOR ALL OBJECTS, so objLen will be the same. Otherwise the // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length). TestManageable *tm = new TestManageable(agent, key.str()); - objLen = tm->GetManagementObjectShared()->writePropertiesSize(); - agent->addObject(tm->GetManagementObjectShared(), i + 1); + objLen = tm->GetManagementObject()->writePropertiesSize(); + agent->addObject(tm->GetManagementObject(), i + 1); tmv.push_back(tm); } @@ -531,7 +531,7 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) uint32_t delCount = 0; for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObjectShared()->resourceDestroy(); + tmv[i]->GetManagementObject()->resourceDestroy(); delCount++; } @@ -604,8 +604,8 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) std::stringstream key; key << "testobj-" << i; TestManageable *tm = new TestManageable(agent, key.str()); - if (tm->GetManagementObjectShared()->writePropertiesSize()) {} - agent->addObject(tm->GetManagementObjectShared(), key.str()); + if (tm->GetManagementObject()->writePropertiesSize()) {} + agent->addObject(tm->GetManagementObject(), key.str()); tmv.push_back(tm); } @@ -624,7 +624,7 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) uint32_t delCount = 0; for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObjectShared()->resourceDestroy(); + tmv[i]->GetManagementObject()->resourceDestroy(); delCount++; } @@ -689,12 +689,12 @@ QPID_AUTO_TEST_CASE(v2RapidRestoreObj) TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); - Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObjectShared()->getPackageName(), "#"); + Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); // add, then immediately delete and re-add a copy of the object - agent->addObject(tm1->GetManagementObjectShared(), "testobj-1"); - tm1->GetManagementObjectShared()->resourceDestroy(); - agent->addObject(tm2->GetManagementObjectShared(), "testobj-1"); + agent->addObject(tm1->GetManagementObject(), "testobj-1"); + tm1->GetManagementObject()->resourceDestroy(); + agent->addObject(tm2->GetManagementObject(), "testobj-1"); // expect: a delete notification, then an update notification TestObjectVector objs; diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index b0af187087..63afc46831 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -158,6 +158,7 @@ add_executable (unit_test unit_test target_link_libraries (unit_test ${qpid_test_boost_libs} qpidmessaging qpidbroker qmfconsole) +set_target_properties (unit_test PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER) remember_location(unit_test) add_library (shlibtest MODULE shlibtest.cpp) @@ -327,7 +328,9 @@ endif (PYTHON_EXECUTABLE) add_library(test_store MODULE test_store.cpp) target_link_libraries (test_store qpidbroker qpidcommon) -set_target_properties (test_store PROPERTIES PREFIX "") +set_target_properties (test_store PROPERTIES + COMPILE_DEFINITIONS _IN_QPID_BROKER + PREFIX "") add_library (dlclose_noop MODULE dlclose_noop.c) diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 55387f0091..4184b5f38a 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -17,7 +17,7 @@ # under the License. # -AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK +AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK -D_IN_QPID_BROKER INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src PUBLIC_INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include # Use public API only QMF_GEN=$(top_srcdir)/managementgen/qmf-gen @@ -28,6 +28,7 @@ abs_srcdir=@abs_srcdir@ extra_libs = lib_client = $(abs_builddir)/../libqpidclient.la lib_messaging = $(abs_builddir)/../libqpidmessaging.la +lib_types = $(abs_builddir)/../libqpidtypes.la lib_common = $(abs_builddir)/../libqpidcommon.la lib_broker = $(abs_builddir)/../libqpidbroker.la lib_console = $(abs_builddir)/../libqmfconsole.la @@ -154,7 +155,7 @@ receiver_SOURCES = \ receiver.cpp \ TestOptions.h \ ConnectionOptions.h -receiver_LDADD = $(lib_client) -lboost_program_options -lqpidcommon +receiver_LDADD = $(lib_client) -lboost_program_options $(lib_common) qpidexectest_PROGRAMS += sender sender_SOURCES = \ @@ -162,7 +163,7 @@ sender_SOURCES = \ TestOptions.h \ ConnectionOptions.h \ Statistics.cpp -sender_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes -lqpidclient +sender_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) $(lib_client) qpidexectest_PROGRAMS += qpid-receive qpid_receive_SOURCES = \ @@ -171,7 +172,7 @@ qpid_receive_SOURCES = \ ConnectionOptions.h \ Statistics.h \ Statistics.cpp -qpid_receive_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes +qpid_receive_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) qpidexectest_PROGRAMS += qpid-send qpid_send_SOURCES = \ @@ -180,42 +181,42 @@ qpid_send_SOURCES = \ ConnectionOptions.h \ Statistics.h \ Statistics.cpp -qpid_send_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes +qpid_send_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) qpidexectest_PROGRAMS+=qpid-perftest qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES) -qpid_perftest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon +qpid_perftest_LDADD=$(lib_client) -lboost_program_options $(lib_common) qpidexectest_PROGRAMS+=qpid-txtest qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES) qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h -qpid_txtest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon +qpid_txtest_LDADD=$(lib_client) -lboost_program_options $(lib_common) qpidexectest_PROGRAMS+=qpid-latency-test qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h -qpid_latency_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon +qpid_latency_test_LDADD=$(lib_client) -lboost_program_options $(lib_common) qpidexectest_PROGRAMS+=qpid-client-test qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES) qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h -qpid_client_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon +qpid_client_test_LDADD=$(lib_client) -lboost_program_options $(lib_common) qpidexectest_PROGRAMS+=qpid-topic-listener qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h -qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options -lqpidcommon +qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options $(lib_common) qpidexectest_PROGRAMS+=qpid-topic-publisher qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES) qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h -qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options -lqpidcommon +qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options $(lib_common) qpidexectest_PROGRAMS+=qpid-ping qpid_ping_INCLUDES=$(PUBLIC_INCLUDES) qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h -qpid_ping_LDADD=$(lib_client) -lboost_program_options -lqpidcommon +qpid_ping_LDADD=$(lib_client) -lboost_program_options $(lib_common) # # Other test programs diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index edb50fce9c..55cff046e2 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testLinkBindingCleanup) +{ + MessagingFixture fix; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + + Connection connection = fix.newConnection(); + connection.open(); + + Session session(connection.createSession()); + Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}"); + Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}"); + connection.close(); + + sender.send(Message("test-message"), true); + + // The session-scoped binding should be removed when receiver1's network connection is lost + Message in; + BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 0597a933a3..24f4bcadf9 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -203,7 +203,9 @@ class Popen(subprocess.Popen): self.wait() def kill(self): - self.expect = EXPECT_EXIT_FAIL + # Set to EXPECT_UNKNOWN, EXPECT_EXIT_FAIL creates a race condition + # if the process exits normally concurrent with the call to kill. + self.expect = EXPECT_UNKNOWN try: subprocess.Popen.kill(self) except AttributeError: # No terminate method try: diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py index dcd074eda9..6477c6effd 100755 --- a/qpid/cpp/src/tests/federation.py +++ b/qpid/cpp/src/tests/federation.py @@ -2604,3 +2604,109 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_dynamic_bounce_unbinds_named_queue(self): + """ Verify that a propagated binding is removed when the connection is + bounced + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers[0:2]: + _b.client_session.exchange_declare(exchange="fedX", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + timeout = time() + 10 + while my_exchange is None and time() <= timeout: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX": + my_exchange = ooo + break + if my_exchange is None: + self.fail("QMF failed to find new exchange!") + exchanges.append(my_exchange) + + # on the destination broker, create a binding for propagation + self._brokers[0].client_session.queue_declare(queue="fedDstQ") + self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud") + + # on the source broker, create a bridge queue + self._brokers[1].client_session.queue_declare(queue="fedSrcQ") + + # connect B1 --> B0 + result = self._brokers[0].qmf_object.create( "link", + "Link-dynamic", + {"host":self._brokers[1].host, + "port":self._brokers[1].port}, False) + self.assertEqual(result.status, 0) + + # bridge the "fedX" exchange: + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-dynamic", + {"link":"Link-dynamic", + "src":"fedX", + "dest":"fedX", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # wait for the inter-broker links to become operational + operational = False + timeout = time() + 10 + while not operational and time() <= timeout: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + self.failUnless(operational, "inter-broker links failed to become operational.") + + # wait until the binding key has propagated to the src broker + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount < 1 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 1) + + # + # Tear down the bridges between the two exchanges, then wait + # for the bindings to be cleaned up + # + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + exchanges[1].update() + timeout = time() + 10 + while exchanges[1].bindingCount != 0 and time() <= timeout: + exchanges[1].update() + self.failUnless(exchanges[1].bindingCount == 0) + + self._brokers[1].client_session.queue_delete(queue="fedSrcQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers[0:2]: + _b.client_session.exchange_delete(exchange="fedX") + + self._teardown_brokers() + + self.verify_cleanup() diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index d7885d9622..4efbfdba3d 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -100,7 +100,7 @@ class HaBroker(Broker): self.qpid_ha_script.main_except(["", "-b", url]+args) def promote(self): self.ready(); self.qpid_ha(["promote"]) - def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) + def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url]) def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) @@ -113,10 +113,12 @@ class HaBroker(Broker): self._agent = QmfAgent(self.host_port()) return self._agent - def ha_status(self): + def qmf(self): hb = self.agent().getHaBroker() hb.update() - return hb.status + return hb + + def ha_status(self): return self.qmf().status def wait_status(self, status): def try_get_status(): @@ -234,7 +236,9 @@ class HaCluster(object): def update_urls(self): self.url = ",".join([b.host_port() for b in self]) if len(self) > 1: # No failover addresses on a 1 cluster. - for b in self: b.set_brokers_url(self.url) + for b in self: + b.set_brokers_url(self.url) + b.set_public_url(self.url) def connect(self, i): """Connect with reconnect_urls""" diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index bc5566ae63..b29ff42627 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -279,11 +279,13 @@ class ReplicationTests(HaBrokerTest): """Test replication of individual queues outside of cluster mode""" l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. try: - primary = HaBroker(self, name="primary", ha_cluster=False) + primary = HaBroker(self, name="primary", ha_cluster=False, + args=["--ha-queue-replication=yes"]); pc = primary.connect() ps = pc.session().sender("q;{create:always}") pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False) + backup = HaBroker(self, name="backup", ha_cluster=False, + args=["--ha-queue-replication=yes"]) br = backup.connect().session().receiver("q;{create:always}") # Set up replication with qpid-ha @@ -304,7 +306,8 @@ class ReplicationTests(HaBrokerTest): finally: l.restore() def test_queue_replica_failover(self): - """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" + """Test individual queue replication from a cluster to a standalone + backup broker, verify it fails over.""" l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. try: cluster = HaCluster(self, 2) @@ -312,7 +315,8 @@ class ReplicationTests(HaBrokerTest): pc = cluster.connect(0) ps = pc.session().sender("q;{create:always}") pr = pc.session().receiver("q;{create:always}") - backup = HaBroker(self, name="backup", ha_cluster=False) + backup = HaBroker(self, name="backup", ha_cluster=False, + args=["--ha-queue-replication=yes"]) br = backup.connect().session().receiver("q;{create:always}") backup.replicate(cluster.url, "q") ps.send("a") @@ -474,6 +478,23 @@ class ReplicationTests(HaBrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass + def test_replicate_binding(self): + """Verify that binding replication can be disabled""" + primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) + primary.promote() + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) + ps = primary.connect().session() + ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}") + ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}") + backup.wait_backup("q") + + primary.kill() + assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die + backup.promote() + bs = backup.connect_admin().session() + bs.sender("ex").send(Message("msg")) + self.assert_browse_retry(bs, "q", []) + def test_invalid_replication(self): """Verify that we reject an attempt to declare a queue with invalid replication value.""" cluster = HaCluster(self, 1, ha_replicate="all") @@ -761,9 +782,9 @@ acl deny all all cluster[1].wait_queue("q0") cluster[1].wait_queue("q1") cluster[0].kill() - cluster[1].wait_queue("q1") # Not timed out yet - cluster[1].wait_no_queue("q1", timeout=2) # Wait for timeout - cluster[1].wait_no_queue("q0", timeout=2) + cluster[1].wait_queue("q1") # Not timed out yet + cluster[1].wait_no_queue("q1") # Wait for timeout + cluster[1].wait_no_queue("q0") def test_alt_exchange_dup(self): """QPID-4349: if a queue has an alterante exchange and is deleted the @@ -1114,6 +1135,38 @@ class RecoveryTests(HaBrokerTest): cluster.bounce(0, promote_next=False) cluster[0].promote() + +class ConfigurationTests(HaBrokerTest): + """Tests for configuration settings.""" + + def test_client_broker_url(self): + """Check that setting of broker and public URLs obeys correct defaulting + and precedence""" + + def check(broker, brokers, public): + qmf = broker.qmf() + self.assertEqual(brokers, qmf.brokersUrl) + self.assertEqual(public, qmf.publicUrl) + + def start(brokers, public, known=None): + args=[] + if brokers: args.append("--ha-brokers-url="+brokers) + if public: args.append("--ha-public-url="+public) + if known: args.append("--known-hosts-url="+known) + return HaBroker(self, args=args) + + # Both set explictily, no defaulting + b = start("foo:123", "bar:456") + check(b, "amqp:tcp:foo:123", "amqp:tcp:bar:456") + b.set_brokers_url("foo:999") + check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:456") + b.set_public_url("bar:999") + check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:999") + + # Allow "none" to mean "not set" + b = start("none", "none") + check(b, "", "") + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") diff --git a/qpid/cpp/src/tests/testagent.mk b/qpid/cpp/src/tests/testagent.mk index 9f530621c6..0492f3e3bb 100644 --- a/qpid/cpp/src/tests/testagent.mk +++ b/qpid/cpp/src/tests/testagent.mk @@ -46,6 +46,6 @@ testagent-testagent.$(OBJEXT): $(TESTAGENT_GEN_SRC) qpidexectest_PROGRAMS+=testagent testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC) -testagent_LDADD=$(top_builddir)/src/libqmf.la -lqpidcommon -lqpidtypes -lqpidclient +testagent_LDADD=$(top_builddir)/src/libqmf.la $(top_builddir)/src/libqpidcommon.la $(top_builddir)/src/libqpidtypes.la $(top_builddir)/src/libqpidclient.la EXTRA_DIST+=testagent.xml diff --git a/qpid/cpp/src/xml.mk b/qpid/cpp/src/xml.mk index baf3803647..9376cfd54a 100644 --- a/qpid/cpp/src/xml.mk +++ b/qpid/cpp/src/xml.mk @@ -24,6 +24,6 @@ xml_la_SOURCES = \ qpid/xml/XmlExchangePlugin.cpp xml_la_LIBADD = -lxerces-c -lxqilla libqpidbroker.la - +xml_la_CXXFLAGS = $(AM_CXXFLAGS) -D_IN_QPID_BROKER xml_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml index 65ce108aef..55893387a4 100644 --- a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml +++ b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml @@ -173,6 +173,13 @@ under the License. </listitem> </itemizedlist> </para> + <para> + You should not enable the old and new cluster modules at the same time + in a broker, as they may interfere with each other. In other words you + should not set <literal>cluster-name</literal> at the same time as + either <literal>ha-cluster</literal> or + <literal>ha-queue-replication</literal> + </para> </section> <section> <title>Limitations</title> @@ -254,6 +261,14 @@ under the License. </row> <row> <entry> + <literal>ha-queue-replication <replaceable>yes|no</replaceable></literal> + </entry> + <entry> + Enable replication of specific queues without joining a cluster, see <xref linkend="ha-queue-replication"/>. + </entry> + </row> + <row> + <entry> <literal>ha-brokers-url <replaceable>URL</replaceable></literal> </entry> <entry> @@ -273,8 +288,7 @@ ssl_addr = "ssl:" host [":" port]' </footnote> used by cluster brokers to connect to each other. The URL should contain a comma separated list of the broker addresses, rather than a - virtual IP address. For example: - <literal>amqp:node1.exaple.com,node2.exaple.com,node3.exaple.com</literal> + virtual IP address. </para> </entry> </row> @@ -282,14 +296,18 @@ ssl_addr = "ssl:" host [":" port]' <entry><literal>ha-public-url <replaceable>URL</replaceable></literal> </entry> <entry> <para> - The URL <footnoteref linkend="ha-url-grammar"/> used by clients to connect to the cluster. This can be a list or - a single virtual IP address. A virtual IP address is recommended as it - simplifies deployment. If not specified this defaults to the value of - <literal>ha-brokers-url</literal>. + The URL <footnoteref linkend="ha-url-grammar"/> is advertised to + clients as the "known-hosts" for fail-over. It can be a list or + a single virtual IP address. A virtual IP address is recommended. </para> <para> - This option allows you to put client traffic on a different network from - broker traffic, which is recommended. + Using this option you can put client and broker traffic on + separate networks, which is recommended. + </para> + <para> + Note: When HA clustering is enabled the broker option + <literal>known-hosts-url</literal> is ignored and over-ridden by + the <literal>ha-public-url</literal> setting. </para> </entry> </row> @@ -548,7 +566,7 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl </section> <section id="ha-creating-replicated"> - <title>Creating replicated queues and exchanges</title> + <title>Controlling replication of queues and exchanges</title> <para> By default, queues and exchanges are not replicated automatically. You can change the default behavior by setting the <literal>ha-replicate</literal> configuration @@ -849,6 +867,30 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl or to simulate a cluster on a single node. For deployment, a resource manager is required. </para> </section> + <section id="ha-queue-replication"> + <title>Replicating specific queues</title> + <para> + In addition to the automatic replication performed in a cluster, you can + set up replication for specific queues between arbitrary brokers, even if + the brokers are not members of a cluster. The command: + </para> + <programlisting> + qpid-ha replicate <replaceable>QUEUE</replaceable> <replaceable>REMOTE-BROKER</replaceable> + </programlisting> + <para> + sets up replication of <replaceable>QUEUE</replaceable> on <replaceable>REMOTE-BROKER</replaceable> to <replaceable>QUEUE</replaceable> on the current broker. + </para> + <para> + Set the configuration option + <literal>ha-queue-replication=yes</literal> on both brokers to enable this + feature on non-cluster brokers. It is automatically enabled for brokers + that are part of a cluster. + </para> + <para> + Note that this feature does not provide automatic fail-over, for that you + need to run a cluster. + </para> + </section> </section> <!-- LocalWords: scalability rgmanager multicast RGManager mailto LVQ qpidd IP dequeued Transactional username diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java index 66cfe10771..4856a7c491 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java @@ -43,6 +43,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection private String _remoteHost; private boolean _ssl; + private String _queuePrefix; + private String _topicPrefix; public ConnectionFactoryImpl(final String host, final int port, @@ -90,12 +92,15 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection public ConnectionImpl createConnection() throws JMSException { - return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl); + return createConnection(_username, _password); } public ConnectionImpl createConnection(final String username, final String password) throws JMSException { - return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl); + connection.setQueuePrefix(_queuePrefix); + connection.setTopicPrefix(_topicPrefix); + return connection; } public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException @@ -211,4 +216,23 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection return connection; } + public String getTopicPrefix() + { + return _topicPrefix; + } + + public void setTopicPrefix(String topicPrefix) + { + _topicPrefix = topicPrefix; + } + + public String getQueuePrefix() + { + return _queuePrefix; + } + + public void setQueuePrefix(String queuePrefix) + { + _queuePrefix = queuePrefix; + } } diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 417f6f71e1..be1c2d6514 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transport.Container; import javax.jms.*;
import javax.jms.IllegalStateException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import javax.jms.Queue;
+import java.util.*;
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
@@ -50,6 +49,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect private final String _remoteHost;
private final boolean _ssl;
private String _clientId;
+ private String _queuePrefix;
+ private String _topicPrefix;
private static enum State
@@ -379,4 +380,78 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect {
_isTopicConnection = topicConnection;
}
+
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queueprefix)
+ {
+ _queuePrefix = queueprefix;
+ }
+
+ DecodedDestination toDecodedDestination(DestinationImpl dest)
+ {
+ String address = dest.getAddress();
+ Set<String> kind = null;
+ Class clazz = dest.getClass();
+ if( clazz==QueueImpl.class )
+ {
+ kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+ if( _queuePrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_queuePrefix) )
+ {
+ address = _queuePrefix+address;
+ }
+ }
+ }
+ else if( clazz==TopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+ if( _topicPrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_topicPrefix) )
+ {
+ address = _topicPrefix+address;
+ }
+ }
+ }
+ else if( clazz==TemporaryQueueImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+ }
+ else if( clazz==TemporaryTopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+ }
+ return new DecodedDestination(address, kind);
+ }
+
+ DecodedDestination toDecodedDestination(String address, Set<String> kind)
+ {
+ if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+ {
+ return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+ }
+ if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+ {
+ return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+ }
+ return new DecodedDestination(address, kind);
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java new file mode 100644 index 0000000000..74e98c2163 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.amqp_1_0.jms.impl; + +import java.util.Set; + +/** +* @author <a href="http://hiramchirino.com">Hiram Chirino</a> +*/ +class DecodedDestination +{ + private final String _address; + private final Set<String> _attributes; + + DecodedDestination(String address, Set<String> kind) + { + _address = address; + _attributes = kind; + } + + public String getAddress() + { + return _address; + } + + public Set<String> getAttributes() + { + return _attributes; + } +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java index eb34cead08..3c15c74d6f 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java @@ -127,7 +127,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi {
try
{
- return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+ return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
_linkName, _durable, getFilters(), null);
}
catch (AmqpErrorException e)
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java index f1056b94fd..fba50c5477 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java @@ -50,14 +50,24 @@ public abstract class MessageImpl implements Message static final Set<Class> _supportedClasses =
new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
Float.class, Double.class, Character.class, String.class, byte[].class));
- private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type");
+ static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type");
+
+ static final String QUEUE_ATTRIBUTE = "queue";
+ static final String TOPIC_ATTRIBUTE = "topic";
+ static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+ static final Set<String> JMS_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE);
+ static final Set<String> JMS_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE);
+ static final Set<String> JMS_TEMP_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
+ static final Set<String> JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
private Header _header;
private Properties _properties;
private ApplicationProperties _applicationProperties;
private Footer _footer;
- public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
- private SessionImpl _sessionImpl;
+ private final SessionImpl _sessionImpl;
private boolean _readOnly;
private MessageAnnotations _messageAnnotations;
@@ -171,45 +181,53 @@ public abstract class MessageImpl implements Message public DestinationImpl getJMSReplyTo() throws JMSException
{
- return DestinationImpl.valueOf(getReplyTo());
+ return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE)));
}
public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException
{
- if(destination == null)
+ if( destination==null )
{
setReplyTo(null);
- }
- else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
- {
- setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ messageAnnotationMap().remove(REPLY_TO_TYPE);
}
else
{
- throw new NonAMQPDestinationException(destination);
+ DecodedDestination dd = toDecodedDestination(destination);
+ setReplyTo(dd.getAddress());
+ messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes()));
}
}
public DestinationImpl getJMSDestination() throws JMSException
{
- return _isFromQueue ? QueueImpl.valueOf(getTo())
- : _isFromTopic ? TopicImpl.valueOf(getTo())
- : DestinationImpl.valueOf(getTo());
+ Set<String> type = splitCommaSeparateSet((String) getMessageAnnotation(TO_TYPE));
+ if( type==null )
+ {
+ if( _isFromQueue )
+ {
+ type = JMS_QUEUE_ATTRIBUTES;
+ }
+ else if( _isFromTopic )
+ {
+ type = JMS_TOPIC_ATTRIBUTES;
+ }
+ }
+ return toDestination(getTo(), type);
}
public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
{
- if(destination == null)
+ if( destination==null )
{
setTo(null);
- }
- else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
- {
- setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ messageAnnotationMap().remove(TO_TYPE);
}
else
{
- throw new NonAMQPDestinationException(destination);
+ DecodedDestination dd = toDecodedDestination(destination);
+ setTo(dd.getAddress());
+ messageAnnotationMap().put(TO_TYPE, join(",", dd.getAttributes()));
}
}
@@ -264,22 +282,13 @@ public abstract class MessageImpl implements Message public String getJMSType() throws JMSException
{
- Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
- final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
-
+ final Object attrValue = getMessageAnnotation(JMS_TYPE);
return attrValue instanceof String ? attrValue.toString() : null;
}
public void setJMSType(String s) throws JMSException
{
- Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
- if(messageAttrs == null)
- {
- messageAttrs = new HashMap();
- _messageAnnotations = new MessageAnnotations(messageAttrs);
- }
-
- messageAttrs.put(JMS_TYPE, s);
+ messageAnnotationMap().put(JMS_TYPE, s);
}
public long getJMSExpiration() throws JMSException
@@ -1206,4 +1215,118 @@ public abstract class MessageImpl implements Message }
abstract Collection<Section> getSections();
+
+ DecodedDestination toDecodedDestination(Destination destination) throws NonAMQPDestinationException
+ {
+ if(destination == null)
+ {
+ return null;
+ }
+ if (destination instanceof DestinationImpl)
+ {
+ return _sessionImpl.getConnection().toDecodedDestination((DestinationImpl) destination);
+ }
+ throw new NonAMQPDestinationException(destination);
+ }
+
+ DestinationImpl toDestination(String address, Set<String> kind)
+ {
+ if( address == null )
+ {
+ return null;
+ }
+
+ // If destination prefixes are in play, we have to strip the the prefix, and we might
+ // be able to infer the kind, if we don't know it yet.
+ DecodedDestination decoded = _sessionImpl.getConnection().toDecodedDestination(address, kind);
+ address = decoded.getAddress();
+ kind = decoded.getAttributes();
+
+ if( kind == null )
+ {
+ return DestinationImpl.valueOf(address);
+ }
+ if( kind.contains(QUEUE_ATTRIBUTE) )
+ {
+ if( kind.contains(TEMPORARY_ATTRIBUTE) )
+ {
+ return new TemporaryQueueImpl(address, null, _sessionImpl);
+ }
+ else
+ {
+ return QueueImpl.valueOf(address);
+ }
+ }
+ else if ( kind.contains(TOPIC_ATTRIBUTE) )
+ {
+ if( kind.contains(TEMPORARY_ATTRIBUTE) )
+ {
+ return new TemporaryTopicImpl(address, null, _sessionImpl);
+ }
+ else
+ {
+ return TopicImpl.valueOf(address);
+ }
+ }
+
+ return DestinationImpl.valueOf(address);
+ }
+
+ private Object getMessageAnnotation(Symbol key)
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ return messageAttrs == null ? null : messageAttrs.get(key);
+ }
+
+ private Map messageAnnotationMap()
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ if(messageAttrs == null)
+ {
+ messageAttrs = new HashMap();
+ _messageAnnotations = new MessageAnnotations(messageAttrs);
+ }
+ return messageAttrs;
+ }
+
+ Set<String> splitCommaSeparateSet(String value)
+ {
+ if( value == null )
+ {
+ return null;
+ }
+ HashSet<String> rc = new HashSet<String>();
+ for( String x: value.split("\\s*,\\s*") )
+ {
+ rc.add(x);
+ }
+ return rc;
+ }
+
+ private static Set<String> set(String ...args)
+ {
+ HashSet<String> s = new HashSet<String>();
+ for (String arg : args)
+ {
+ s.add(arg);
+ }
+ return Collections.unmodifiableSet(s);
+ }
+
+ static final String join(String sep, Iterable items)
+ {
+ StringBuilder result = new StringBuilder();
+
+ for (Object o : items)
+ {
+ if (result.length() > 0)
+ {
+ result.append(sep);
+ }
+ result.append(o.toString());
+ }
+
+ return result.toString();
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index 5bb8845eb7..badc20472b 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -20,7 +20,6 @@ package org.apache.qpid.amqp_1_0.jms.impl; import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
-import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
@@ -61,7 +60,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP {
try
{
- _sender = _session.getClientSession().createSender(_destination.getAddress());
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
}
catch (Sender.SenderCreationException e)
{
@@ -297,7 +296,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP try
{
_destination = (DestinationImpl) destination;
- _sender = _session.getClientSession().createSender(_destination.getAddress());
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
send(message, deliveryMode, priority, ttl);
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java index 75003c0d77..8fab315b10 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java @@ -100,7 +100,7 @@ public class QueueBrowserImpl implements QueueBrowser {
try
{
- _receiver = _session.getClientSession().createReceiver(_queue.getAddress(),
+ _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
StdDistMode.COPY,
AcknowledgeMode.AMO, null,
false,
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java index d46ed7183f..67b597f5cf 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java @@ -41,7 +41,7 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei {
try
{
- return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+ return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
}
catch (AmqpErrorException e)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java index cedf9a180a..58b7d4f625 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java @@ -899,4 +899,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession {
_isTopicSession = topicSession;
}
+
+ String toAddress(DestinationImpl dest)
+ {
+ return _connection.toDecodedDestination(dest).getAddress();
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java index 52d8c412ec..f267794796 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java @@ -66,7 +66,7 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub {
try
{
- String address = getDestination().getAddress();
+ String address = getSession().toAddress(getDestination());
Receiver receiver = getSession().getClientSession().createReceiver(address,
StdDistMode.COPY, AcknowledgeMode.ALO,
getLinkName(), isDurable(), getFilters(),
diff --git a/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider b/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider index b5bc947612..8ece9627b0 100644 --- a/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider +++ b/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider @@ -1 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBeanProvider diff --git a/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory b/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory index 85b942383f..b6c429baab 100644 --- a/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory +++ b/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory @@ -1 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.qpid.server.security.access.plugins.DefaultAccessControlFactory diff --git a/qpid/java/broker-plugins/management-http/build.xml b/qpid/java/broker-plugins/management-http/build.xml index 734d762f17..abf35d9c88 100644 --- a/qpid/java/broker-plugins/management-http/build.xml +++ b/qpid/java/broker-plugins/management-http/build.xml @@ -33,11 +33,9 @@ <!-- Flagfile used to determine if uwar needs to be done. ._ is part of Ant's defaultexcludes so wont appear bundles --> <property name="dojo.uptodate.flagfile" value="${module.classes}/resources/dojo/._dojouptodate.timestamp" /> - <uptodate property="unwardojo.done" targetfile="${dojo.uptodate.flagfile}" srcfile="${project.root}/${dojo}" /> - <target name="precompile" depends="unwardojo" /> - <target name="unwardojo" unless="unwardojo.done"> + <target name="unwardojo" depends="check-unwardojo.done" unless="unwardojo.done"> <unwar src="${project.root}/${dojo}" dest="${module.classes}/resources/dojo"> <patternset> <exclude name="META-INF/**" /> @@ -48,5 +46,9 @@ <touch file="${dojo.uptodate.flagfile}" /> </target> + <target name="check-unwardojo.done"> + <uptodate property="unwardojo.done" targetfile="${dojo.uptodate.flagfile}" srcfile="${project.root}/${dojo}" /> + </target> + <target name="bundle" depends="bundle-tasks" /> </project> diff --git a/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory b/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory index 0565b60e64..7ffb9a9013 100644 --- a/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory +++ b/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory @@ -1 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.qpid.server.management.plugin.HttpManagementFactory diff --git a/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory b/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory index afbe217301..8fa778269e 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory +++ b/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory @@ -1 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.qpid.server.jmx.JMXManagementFactory diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index d84ff145e4..ac4fda2985 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -111,7 +111,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F */ private long _deliveryTag = 0; - /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ + /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */ private AMQQueue _defaultQueue; /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */ @@ -207,10 +207,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } private void incrementOutstandingTxnsIfNecessary() { @@ -1485,11 +1481,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - if (inTransaction()) + final long transactionStartTime = _transaction.getTransactionStartTime(); + final long transactionUpdateTime = _txnUpdateTime.get(); + if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0) { long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); + long openTime = currentTime - transactionStartTime; + long idleTime = currentTime - transactionUpdateTime; _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java index 11f16690ef..096d5265ed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.exchange; import java.util.UUID; @@ -30,4 +50,4 @@ public class DirectExchangeType implements ExchangeType<DirectExchange> { return ExchangeDefaults.DIRECT_EXCHANGE_NAME; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java index de70373703..0371a363de 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.exchange; import java.util.UUID; @@ -28,4 +48,4 @@ public class FanoutExchangeType implements ExchangeType<FanoutExchange> { return ExchangeDefaults.FANOUT_EXCHANGE_NAME; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java index 61d9a3c2b0..ed4d57d0f8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.exchange; import java.util.UUID; @@ -29,4 +49,4 @@ public class HeadersExchangeType implements ExchangeType<HeadersExchange> return ExchangeDefaults.HEADERS_EXCHANGE_NAME; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java index cb4e747a2d..25a3549e61 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.exchange; import java.util.UUID; @@ -30,4 +50,4 @@ public class TopicExchangeType implements ExchangeType<TopicExchange> { return ExchangeDefaults.TOPIC_EXCHANGE_NAME; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java index f67c7a1c6a..8cf121b3d9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.logging.actors; import java.security.AccessController; @@ -45,4 +65,4 @@ public abstract class AbstractManagementActor extends AbstractActor } return identity; } -}
\ No newline at end of file +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 440db73bea..d9e5e1c473 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -573,7 +573,29 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine public void closed() { - + try + { + _delegate = new ClosedDelegateProtocolEngine(); + if(_logger.isDebugEnabled()) + { + _logger.debug("Connection from " + getRemoteAddress() + " was closed before any protocol version was established."); + } + } + catch(Exception e) + { + //ignore + } + finally + { + try + { + _network.close(); + } + catch(Exception e) + { + //ignore + } + } } public void writerIdle() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index eed55a2e85..075ed2a87c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -42,7 +42,6 @@ import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -449,11 +448,6 @@ public class ServerSession extends Session return _transaction.isTransactional(); } - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } - public void selectTx() { _transaction = new LocalTransaction(this.getMessageStore()); @@ -591,7 +585,7 @@ public class ServerSession extends Session /** * Update last transaction activity timestamp */ - public void updateTransactionalActivity() + private void updateTransactionalActivity() { if (isTransactional()) { @@ -709,11 +703,13 @@ public class ServerSession extends Session public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - if (inTransaction()) + final long transactionStartTime = _transaction.getTransactionStartTime(); + final long transactionUpdateTime = _txnUpdateTime.get(); + if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0) { long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); + long openTime = currentTime - transactionStartTime; + long idleTime = currentTime - transactionUpdateTime; _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 3fbcff7e2c..f11fb1086e 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -50,7 +50,7 @@ public class LocalTransaction implements ServerTransaction private volatile Transaction _transaction; private MessageStore _transactionLog; - private long _txnStartTime = 0L; + private volatile long _txnStartTime = 0L; private StoreFuture _asyncTran; public LocalTransaction(MessageStore transactionLog) diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory index d49510530d..8ff67030ef 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory org.apache.qpid.server.security.auth.manager.Base64MD5PasswordFileAuthenticationManagerFactory org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType index 26aa70c31f..4ad646b7a0 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# org.apache.qpid.server.exchange.DirectExchangeType org.apache.qpid.server.exchange.TopicExchangeType org.apache.qpid.server.exchange.FanoutExchangeType diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory index 33ae92181b..6bfb55ff18 100644 --- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory @@ -1 +1,19 @@ -org.apache.qpid.server.security.group.FileGroupManagerFactory
\ No newline at end of file +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.security.group.FileGroupManagerFactory diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java index 28e1d5a87e..f0eb83ad24 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java @@ -88,7 +88,7 @@ public class Drain extends OptionParser } } } - + consumer.close(); ssn.close(); con.close(); } diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java index 61ff2dfc19..09e813f8c1 100644 --- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java @@ -100,6 +100,7 @@ public class Spout extends OptionParser System.out.println(msg); System.out.println("-------------------------------\n"); } + producer.close(); ssn.close(); con.close(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 12b174198a..9a7f5241a5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1095,7 +1095,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } - public boolean isExchangeExist(AMQDestination dest,boolean assertNode) + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get(); @@ -1118,6 +1118,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setExchangeClass(new AMQShortString(result.getType())); } } + + if (assertNode) + { + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + } + } + return match; } @@ -1137,9 +1146,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic (result.getExclusive() == node.isExclusive()) && (matchProps(result.getArguments(),node.getDeclareArgs())); } - else if (match) + + if (assertNode) { - // should I use the queried details to update the local data structure. + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +", Result was : " + result); + } } } catch(SessionException e) @@ -1218,32 +1231,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic { case AMQDestination.QUEUE_TYPE: { - if (isQueueExist(dest,assertNode)) + if(createNode) { setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); break; } - else if(createNode) + else if (isQueueExist(dest,assertNode)) { setLegacyFieldsForQueueType(dest); - handleQueueNodeCreation(dest,noLocal); break; - } + } } case AMQDestination.TOPIC_TYPE: { - if (isExchangeExist(dest,assertNode)) + if(createNode) { setLegacyFiledsForTopicType(dest); verifySubject(dest); + handleExchangeNodeCreation(dest); break; } - else if(createNode) + else if (isExchangeExist(dest,assertNode)) { setLegacyFiledsForTopicType(dest); verifySubject(dest); - handleExchangeNodeCreation(dest); break; } } @@ -1322,6 +1335,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic arguments.put(AddressHelper.NO_LOCAL, noLocal); } + if (link.isDurable() && queueName.startsWith("TempQueue")) + { + throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); + } + getQpidSession().queueDeclare(queueName, queueProps.getAlternateExchange(), arguments, queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 9b291b48f7..72fc74e19c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -283,7 +283,7 @@ public class AddressHelper { MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap); queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true)); - queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true)); + queue.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true)); queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE)); queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS)); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java index a602dcbfd4..7401168978 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.client.messaging.address; import org.apache.qpid.client.AMQDestination; diff --git a/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef b/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef index 29998c89dd..46696bf942 100644 --- a/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef +++ b/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef @@ -20,7 +20,7 @@ chartType=XYLINE chartTitle=Varying number of producer sessions on single connection chartSubtitle=Persistent messages (1024b) -chartDescription=1-80P transacted on single connection, 20C auto-ack on separate connections, persistent, message payload 1KB. +chartDescription=1-80P transacted on single connection, 20C transacted on separate connections, persistent, message payload 1KB. xAxisTitle=Number of producer sessions yAxisTitle=Throughput (KB/s) diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js index 07f8bf9d92..527300eff4 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ jsonObject = { "_tests": QPID.iterations( { "__ACK_MODE": [ 0, 1 ] }, @@ -31,4 +51,4 @@ jsonObject = { ) }) -}
\ No newline at end of file +} diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js index f64af82feb..eab98e8bd7 100644 --- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js +++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ jsonObject = { "_countries": QPID.iterations( { "__ITERATING_VALUE": [ 0, 1 ] }, @@ -20,4 +40,4 @@ jsonObject = { ) }) -}
\ No newline at end of file +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java index e9798f15b5..0e55557806 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.client.failover; import java.net.InetSocketAddress; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 974e8d6e96..6041600364 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -96,7 +96,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest,false)); // create always ------------------------------------------- @@ -124,7 +124,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); cons = jmsSession.createConsumer(dest); @@ -159,7 +159,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; @@ -175,7 +175,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase "doesn't resolve to an exchange or a queue")); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( @@ -776,7 +776,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testSubscriptionForSameDestination() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}"); + Destination dest = ssn.createTopic("ADDR:amq.topic/foo"); MessageConsumer consumer1 = ssn.createConsumer(dest); MessageConsumer consumer2 = ssn.createConsumer(dest); MessageProducer prod = ssn.createProducer(dest); @@ -1033,7 +1033,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; @@ -1049,7 +1049,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; @@ -1066,7 +1066,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); } /** |