summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2012-12-17 12:33:05 +0000
committerAlex Rudyy <orudyy@apache.org>2012-12-17 12:33:05 +0000
commita9a7379887b39b248c8c435276a8e18bf0f22d98 (patch)
treec88a1ead116e17f89ce73b398033f42b3027a139
parentb3c7409db4cded6d116d851fab5f3863afaa00c8 (diff)
downloadqpid-python-a9a7379887b39b248c8c435276a8e18bf0f22d98.tar.gz
merge from trunk up to revision 1422060
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-qpid-4390@1422901 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am11
-rwxr-xr-xqpid/cpp/bindings/qpid/examples/perl/drain.pl2
-rw-r--r--qpid/cpp/bindings/qpid/perl/test/test-null-inside-map.pl59
-rw-r--r--qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb3
-rw-r--r--qpid/cpp/bindings/swig_perl_typemaps.i12
-rw-r--r--qpid/cpp/etc/CMakeLists.txt5
-rw-r--r--qpid/cpp/etc/Makefile.am4
-rw-r--r--qpid/cpp/etc/qpidd.service13
-rw-r--r--qpid/cpp/examples/makedist.mk1
-rw-r--r--qpid/cpp/examples/messaging/Makefile.am9
-rw-r--r--qpid/cpp/examples/old_api/direct/Makefile.am6
-rw-r--r--qpid/cpp/examples/old_api/failover/Makefile.am6
-rw-r--r--qpid/cpp/examples/old_api/fanout/Makefile.am2
-rw-r--r--qpid/cpp/examples/old_api/pub-sub/Makefile.am4
-rw-r--r--qpid/cpp/examples/old_api/request-response/Makefile.am4
-rw-r--r--qpid/cpp/examples/old_api/tradedemo/Makefile.am6
-rw-r--r--qpid/cpp/examples/old_api/xml-exchange/Makefile.am6
-rw-r--r--qpid/cpp/include/qpid/management/Manageable.h7
-rw-r--r--qpid/cpp/include/qpid/management/ManagementObject.h9
-rwxr-xr-xqpid/cpp/managementgen/qmfgen/schema.py8
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Class.h3
-rw-r--r--qpid/cpp/src/CMakeLists.txt17
-rw-r--r--qpid/cpp/src/Makefile.am14
-rw-r--r--qpid/cpp/src/acl.mk1
-rw-r--r--qpid/cpp/src/amqp.cmake1
-rw-r--r--qpid/cpp/src/ha.mk1
-rw-r--r--qpid/cpp/src/posix/QpiddBroker.cpp4
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.cpp2
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.h2
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp42
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryAdapter.h0
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h4
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.h3
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.cpp0
-rw-r--r--qpid/cpp/src/qpid/broker/LegacyLVQ.h0
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h2
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableMessage.h3
-rw-r--r--qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h1
-rw-r--r--qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp73
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h14
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/System.h2
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/TxOpVisitor.h0
-rw-r--r--qpid/cpp/src/qpid/broker/TxPublish.cpp0
-rw-r--r--qpid/cpp/src/qpid/broker/TxPublish.h0
-rw-r--r--qpid/cpp/src/qpid/broker/Vhost.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedSession.h2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp34
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp28
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h6
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h6
-rw-r--r--qpid/cpp/src/qpid/management/Manageable.cpp10
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp10
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h2
-rw-r--r--qpid/cpp/src/qpid/store/CMakeLists.txt7
-rw-r--r--qpid/cpp/src/rdma.cmake1
-rw-r--r--qpid/cpp/src/ssl.cmake3
-rw-r--r--qpid/cpp/src/ssl.mk2
-rw-r--r--qpid/cpp/src/tests/BrokerMgmtAgent.cpp50
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt5
-rw-r--r--qpid/cpp/src/tests/Makefile.am25
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp21
-rw-r--r--qpid/cpp/src/tests/brokertest.py4
-rwxr-xr-xqpid/cpp/src/tests/federation.py106
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py12
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py67
-rw-r--r--qpid/cpp/src/tests/testagent.mk2
-rw-r--r--qpid/cpp/src/xml.mk2
-rw-r--r--qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml60
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java28
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java81
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java47
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java2
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java183
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java5
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java2
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java2
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java6
-rw-r--r--qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java2
-rw-r--r--qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider18
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory18
-rw-r--r--qpid/java/broker-plugins/management-http/build.xml8
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory18
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java22
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java16
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java2
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory18
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType18
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory20
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java2
-rw-r--r--qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java20
-rw-r--r--qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef2
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js22
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js22
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java20
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java16
131 files changed, 1402 insertions, 348 deletions
diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
index b0321d4e5d..8bf56ead91 100644
--- a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
+++ b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
@@ -21,16 +21,19 @@ INCLUDE = -I$(top_srcdir)/include
AM_CPPFLAGS = $(INCLUDE)
+TYPES_LIB=$(top_builddir)/src/libqpidtypes.la
+MESSAGING_LIB=$(top_builddir)/src/libqpidmessaging.la
+
noinst_PROGRAMS=agent event_driven_list_agents list_agents print_events
agent_SOURCES=agent.cpp
-agent_LDADD=$(top_builddir)/src/libqmf2.la -lqpidtypes -lqpidmessaging
+agent_LDADD=$(top_builddir)/src/libqmf2.la $(TYPES_LIB) $(MESSAGING_LIB)
list_agents_SOURCES=list_agents.cpp
-list_agents_LDADD=$(top_builddir)/src/libqmf2.la -lqpidmessaging
+list_agents_LDADD=$(top_builddir)/src/libqmf2.la $(MESSAGING_LIB)
event_driven_list_agents_SOURCES=event_driven_list_agents.cpp
-event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la -lqpidmessaging
+event_driven_list_agents_LDADD=$(top_builddir)/src/libqmf2.la $(MESSAGING_LIB)
print_events_SOURCES=print_events.cpp
-print_events_LDADD=$(top_builddir)/src/libqmf2.la -lqpidtypes -lqpidmessaging
+print_events_LDADD=$(top_builddir)/src/libqmf2.la $(TYPES_LIB) $(MESSAGING_LIB)
diff --git a/qpid/cpp/bindings/qpid/examples/perl/drain.pl b/qpid/cpp/bindings/qpid/examples/perl/drain.pl
index 2da28f2867..e66184a160 100755
--- a/qpid/cpp/bindings/qpid/examples/perl/drain.pl
+++ b/qpid/cpp/bindings/qpid/examples/perl/drain.pl
@@ -78,7 +78,7 @@ eval {
my $redelivered = ($message->get_redelivered) ? "redelivered=True, " : "";
print "Message(" . $redelivered . "properties=" . printProperties($message->get_properties()) . ", content='";
if ($message->get_content_type() eq "amqp/map") {
- my $content = qpid::messasging::decode_map($message);
+ my $content = qpid::messaging::decode_map($message);
map{ print "\n$_ => $content->{$_}"; } keys %{$content};
}
else {
diff --git a/qpid/cpp/bindings/qpid/perl/test/test-null-inside-map.pl b/qpid/cpp/bindings/qpid/perl/test/test-null-inside-map.pl
new file mode 100644
index 0000000000..2c1e698abb
--- /dev/null
+++ b/qpid/cpp/bindings/qpid/perl/test/test-null-inside-map.pl
@@ -0,0 +1,59 @@
+#!/usr/bin/perl
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+use strict;
+use warnings;
+use Data::Dumper;
+
+use cqpid_perl;
+
+my $broker = ( @ARGV > 0 ) ? $ARGV[0] : "localhost:5672";
+my $address = ( @ARGV > 1 ) ? $ARGV[0] : "amq.match";
+my $connectionOptions = ( @ARGV > 2 ) ? $ARGV[1] : "";
+
+my $in_address = "amq.match; {link:{x-bindings:[{exchange: 'amq.match', arguments:{'x-match': 'all', 'header2' : 'value2'}}]}}";
+
+my $connection = new cqpid_perl::Connection($broker, $connectionOptions);
+
+eval {
+ $connection->open();
+ my $session = $connection->createSession();
+
+ my $receiver = $session->createReceiver($in_address);
+ my $sender = $session->createSender($address);
+
+ my $hash = { id => 1234, name => "Blah\x00Blah" };
+ my $outmsg = new cqpid_perl::Message("Hello\x00World");
+ cqpid_perl::encode($hash, $outmsg);
+ $outmsg->setProperty("header2", "value2");
+ $sender->send($outmsg);
+
+ my $message = $receiver->fetch($cqpid_perl::Duration::SECOND);
+
+ print Dumper($message->getProperties());
+
+ print $message->getContent() . "\n";
+ my $outmap = cqpid_perl::decodeMap($message);
+ print Dumper($outmap);
+ $session->acknowledge();
+
+ $connection->close();
+};
+
+die $@ if ($@);
diff --git a/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb b/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb
index 90292d4bec..fc9e65d562 100644
--- a/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb
+++ b/qpid/cpp/bindings/qpid/ruby/ext/cqpid/extconf.rb
@@ -26,9 +26,10 @@
require 'mkmf'
# Setup the build environment.
-$CFLAGS = "-fPIC -fno-inline -x c++"
+$CFLAGS = "-fPIC -fno-inline -x c++ -lstdc++"
REQUIRED_LIBRARIES = [
+ 'stdc++',
'qpidclient',
'qpidcommon',
'qpidmessaging',
diff --git a/qpid/cpp/bindings/swig_perl_typemaps.i b/qpid/cpp/bindings/swig_perl_typemaps.i
index 831576a7d4..02e2d4a6b6 100644
--- a/qpid/cpp/bindings/swig_perl_typemaps.i
+++ b/qpid/cpp/bindings/swig_perl_typemaps.i
@@ -47,7 +47,9 @@
return qpid::types::Variant((float)SvNV(value));
}
else if (SvPOK(value)) {
- return qpid::types::Variant(std::string(SvPV_nolen(value)));
+ STRLEN len;
+ char *ptr = SvPV(value, len);
+ return qpid::types::Variant(std::string(ptr, len));
}
}
return qpid::types::Variant();
@@ -173,7 +175,7 @@
argvi++;
}
-%typemap (in) uint16_t, uint32_t, uint64_t {
+%typemap (in) uint8_t, uint16_t, uint32_t, uint64_t {
if (SvIOK($input)) {
$1 = ($1_ltype)SvUV($input);
}
@@ -182,12 +184,12 @@
}
}
-%typemap (out) uint16_t, uint32_t, uint64_t {
+%typemap (out) uint8_t, uint16_t, uint32_t, uint64_t {
sv_setuv($result, (UV)$1);
argvi++;
}
-%typemap (in) int32_t, int64_t {
+%typemap (in) int8_t, int16_t, int32_t, int64_t {
if (SvIOK($input)) {
$1 = ($1_ltype)SvIV($input);
}
@@ -196,7 +198,7 @@
}
}
-%typemap (out) int32_t, int64_t {
+%typemap (out) int8_t, int16_t, int32_t, int64_t {
sv_setiv($result, (IV)$1);
argvi++;
}
diff --git a/qpid/cpp/etc/CMakeLists.txt b/qpid/cpp/etc/CMakeLists.txt
index d9266537b0..014842c9c7 100644
--- a/qpid/cpp/etc/CMakeLists.txt
+++ b/qpid/cpp/etc/CMakeLists.txt
@@ -23,11 +23,6 @@ install(FILES qpidc.conf
install(FILES qpidd.conf
DESTINATION ${QPID_INSTALL_CONFDIR}
COMPONENT ${QPID_COMPONENT_BROKER})
-if (UNIX)
- install(FILES qpidd.service
- DESTINATION ${QPID_INSTALL_SYSTEMDDIR}
- COMPONENT ${QPID_COMPONENT_BROKER})
-endif (UNIX)
if (BUILD_SASL)
install(FILES sasl2/qpidd.conf
DESTINATION ${QPID_INSTALL_SASLDIR}
diff --git a/qpid/cpp/etc/Makefile.am b/qpid/cpp/etc/Makefile.am
index 80c5fc51eb..aa41c65b37 100644
--- a/qpid/cpp/etc/Makefile.am
+++ b/qpid/cpp/etc/Makefile.am
@@ -20,7 +20,7 @@ SASL_CONF = sasl2/qpidd.conf
EXTRA_DIST = \
$(SASL_CONF) \
- qpidd.service qpidd.in qpidd-primary.in qpidd.conf qpidc.conf CMakeLists.txt \
+ qpidd.in qpidd-primary.in qpidd.conf qpidc.conf CMakeLists.txt \
cluster.conf-example.xml.in
confdir = $(sysconfdir)/qpid
@@ -53,5 +53,3 @@ CLEANFILES = qpidd qpidd-primary cluster.conf-example.xml
initddir = $(sysconfdir)/init.d
nobase_initd_SCRIPTS = qpidd qpidd-primary
-systemddir = /usr/lib/systemd/system
-nobase_systemd_SCRIPTS = qpidd.service
diff --git a/qpid/cpp/etc/qpidd.service b/qpid/cpp/etc/qpidd.service
deleted file mode 100644
index a6549834f4..0000000000
--- a/qpid/cpp/etc/qpidd.service
+++ /dev/null
@@ -1,13 +0,0 @@
-[Unit]
-Description=An AMQP message broker daemon.
-Documentation=man:qpidd(1) http://qpid.apache.org/
-Requires=network.target
-
-[Service]
-User=qpidd
-Group=qpidd
-Type=simple
-ExecStart=/usr/sbin/qpidd --config /etc/qpidd.conf
-
-[Install]
-WantedBy=multi-user.target
diff --git a/qpid/cpp/examples/makedist.mk b/qpid/cpp/examples/makedist.mk
index c494af5e8f..9a1568d427 100644
--- a/qpid/cpp/examples/makedist.mk
+++ b/qpid/cpp/examples/makedist.mk
@@ -20,6 +20,7 @@
AM_CXXFLAGS = $(WARNING_CFLAGS)
INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include
CLIENT_LIB=$(top_builddir)/src/libqpidclient.la
+COMMON_LIB=$(top_builddir)/src/libqpidcommon.la
CONSOLE_LIB=$(top_builddir)/src/libqmfconsole.la
CLIENTFLAGS=-lqpidclient
CONSOLEFLAGS=-lqmfconsole
diff --git a/qpid/cpp/examples/messaging/Makefile.am b/qpid/cpp/examples/messaging/Makefile.am
index d5303f4437..f11ca20c71 100644
--- a/qpid/cpp/examples/messaging/Makefile.am
+++ b/qpid/cpp/examples/messaging/Makefile.am
@@ -22,6 +22,7 @@ examplesdir=$(pkgdatadir)/examples/messaging
AM_CXXFLAGS = $(WARNING_CFLAGS)
INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include
CLIENT_LIB=$(top_builddir)/src/libqpidmessaging.la
+TYPES_LIB=$(top_builddir)/src/libqpidtypes.la
CLIENTFLAGS=-lqpidmessaging
noinst_PROGRAMS=drain spout client server map_sender map_receiver hello_world hello_xml
@@ -33,10 +34,10 @@ hello_xml_SOURCES=hello_xml.cpp
hello_xml_LDADD=$(CLIENT_LIB)
drain_SOURCES=drain.cpp OptionParser.h OptionParser.cpp
-drain_LDADD=$(CLIENT_LIB) -lqpidtypes
+drain_LDADD=$(CLIENT_LIB) $(TYPES_LIB)
spout_SOURCES=spout.cpp OptionParser.h OptionParser.cpp
-spout_LDADD=$(CLIENT_LIB) -lqpidtypes
+spout_LDADD=$(CLIENT_LIB) $(TYPES_LIB)
client_SOURCES=client.cpp
client_LDADD=$(CLIENT_LIB)
@@ -45,10 +46,10 @@ server_SOURCES=server.cpp
server_LDADD=$(CLIENT_LIB)
map_sender_SOURCES=map_sender.cpp
-map_sender_LDADD=$(CLIENT_LIB) -lqpidtypes
+map_sender_LDADD=$(CLIENT_LIB) $(TYPES_LIB)
map_receiver_SOURCES=map_receiver.cpp
-map_receiver_LDADD=$(CLIENT_LIB) -lqpidtypes
+map_receiver_LDADD=$(CLIENT_LIB) $(TYPES_LIB)
examples_DATA= \
hello_world.cpp \
diff --git a/qpid/cpp/examples/old_api/direct/Makefile.am b/qpid/cpp/examples/old_api/direct/Makefile.am
index 09709c2bf4..18957c84f4 100644
--- a/qpid/cpp/examples/old_api/direct/Makefile.am
+++ b/qpid/cpp/examples/old_api/direct/Makefile.am
@@ -23,13 +23,13 @@ include $(top_srcdir)/examples/makedist.mk
noinst_PROGRAMS=direct_producer listener declare_queues
direct_producer_SOURCES=direct_producer.cpp
-direct_producer_LDADD=$(CLIENT_LIB) -lqpidcommon
+direct_producer_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
listener_SOURCES=listener.cpp
-listener_LDADD=$(CLIENT_LIB) -lqpidcommon
+listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
declare_queues_SOURCES=declare_queues.cpp
-declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon
+declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
examples_DATA= \
direct_producer.cpp \
diff --git a/qpid/cpp/examples/old_api/failover/Makefile.am b/qpid/cpp/examples/old_api/failover/Makefile.am
index 516c3625c1..60e99b9ed6 100644
--- a/qpid/cpp/examples/old_api/failover/Makefile.am
+++ b/qpid/cpp/examples/old_api/failover/Makefile.am
@@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk
noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender
declare_queues_SOURCES=declare_queues.cpp
-declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon
+declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
resuming_receiver_SOURCES=resuming_receiver.cpp
-resuming_receiver_LDADD=$(CLIENT_LIB) -lqpidcommon
+resuming_receiver_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
replaying_sender_SOURCES=replaying_sender.cpp
-replaying_sender_LDADD=$(CLIENT_LIB) -lqpidcommon
+replaying_sender_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
examples_DATA= \
declare_queues.cpp \
diff --git a/qpid/cpp/examples/old_api/fanout/Makefile.am b/qpid/cpp/examples/old_api/fanout/Makefile.am
index 797312a72d..06e84b47b6 100644
--- a/qpid/cpp/examples/old_api/fanout/Makefile.am
+++ b/qpid/cpp/examples/old_api/fanout/Makefile.am
@@ -26,7 +26,7 @@ fanout_producer_SOURCES=fanout_producer.cpp
fanout_producer_LDADD=$(CLIENT_LIB)
listener_SOURCES=listener.cpp
-listener_LDADD=$(CLIENT_LIB) -lqpidcommon
+listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
examples_DATA= \
fanout_producer.cpp \
diff --git a/qpid/cpp/examples/old_api/pub-sub/Makefile.am b/qpid/cpp/examples/old_api/pub-sub/Makefile.am
index fc61236475..e8e19e4c32 100644
--- a/qpid/cpp/examples/old_api/pub-sub/Makefile.am
+++ b/qpid/cpp/examples/old_api/pub-sub/Makefile.am
@@ -24,10 +24,10 @@ include $(top_srcdir)/examples/makedist.mk
noinst_PROGRAMS=topic_listener topic_publisher
topic_listener_SOURCES=topic_listener.cpp
-topic_listener_LDADD=$(CLIENT_LIB) -lqpidcommon
+topic_listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
topic_publisher_SOURCES=topic_publisher.cpp
-topic_publisher_LDADD=$(CLIENT_LIB) -lqpidcommon
+topic_publisher_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
examples_DATA= \
topic_listener.cpp \
diff --git a/qpid/cpp/examples/old_api/request-response/Makefile.am b/qpid/cpp/examples/old_api/request-response/Makefile.am
index 92f5bc6558..cf10ae81db 100644
--- a/qpid/cpp/examples/old_api/request-response/Makefile.am
+++ b/qpid/cpp/examples/old_api/request-response/Makefile.am
@@ -24,10 +24,10 @@ include $(top_srcdir)/examples/makedist.mk
noinst_PROGRAMS=client server
client_SOURCES=client.cpp
-client_LDADD=$(CLIENT_LIB) -lqpidcommon
+client_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
server_SOURCES=server.cpp
-server_LDADD=$(CLIENT_LIB) -lqpidcommon
+server_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
examples_DATA= \
server.cpp \
diff --git a/qpid/cpp/examples/old_api/tradedemo/Makefile.am b/qpid/cpp/examples/old_api/tradedemo/Makefile.am
index a05bbc3780..9932d87a6b 100644
--- a/qpid/cpp/examples/old_api/tradedemo/Makefile.am
+++ b/qpid/cpp/examples/old_api/tradedemo/Makefile.am
@@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk
noinst_PROGRAMS=topic_listener topic_publisher declare_queues
topic_listener_SOURCES=topic_listener.cpp
-topic_listener_LDADD=$(CLIENT_LIB) -lqpidcommon
+topic_listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
topic_publisher_SOURCES=topic_publisher.cpp
-topic_publisher_LDADD=$(CLIENT_LIB) -lqpidcommon
+topic_publisher_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
declare_queues_SOURCES=declare_queues.cpp
-declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon
+declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
examples_DATA= \
diff --git a/qpid/cpp/examples/old_api/xml-exchange/Makefile.am b/qpid/cpp/examples/old_api/xml-exchange/Makefile.am
index 9391806849..d4bc6ba233 100644
--- a/qpid/cpp/examples/old_api/xml-exchange/Makefile.am
+++ b/qpid/cpp/examples/old_api/xml-exchange/Makefile.am
@@ -24,13 +24,13 @@ include $(top_srcdir)/examples/makedist.mk
noinst_PROGRAMS=declare_queues xml_producer listener
declare_queues_SOURCES=declare_queues.cpp
-declare_queues_LDADD=$(CLIENT_LIB) -lqpidcommon
+declare_queues_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
xml_producer_SOURCES=xml_producer.cpp
-xml_producer_LDADD=$(CLIENT_LIB) -lqpidcommon
+xml_producer_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
listener_SOURCES=listener.cpp
-listener_LDADD=$(CLIENT_LIB) -lqpidcommon
+listener_LDADD=$(CLIENT_LIB) $(COMMON_LIB)
EXTRA_DIST= \
README.txt \
diff --git a/qpid/cpp/include/qpid/management/Manageable.h b/qpid/cpp/include/qpid/management/Manageable.h
index e72dc0b332..ede5c29e43 100644
--- a/qpid/cpp/include/qpid/management/Manageable.h
+++ b/qpid/cpp/include/qpid/management/Manageable.h
@@ -55,8 +55,11 @@ class QPID_COMMON_EXTERN Manageable
//
// This accessor function returns a pointer to the management object.
//
- virtual ManagementObject* GetManagementObject() const;
- virtual ManagementObject::shared_ptr GetManagementObjectShared() const;
+#ifdef _IN_QPID_BROKER
+ virtual ManagementObject::shared_ptr GetManagementObject() const = 0;
+#else
+ virtual ManagementObject* GetManagementObject() const = 0;
+#endif
// Every "Manageable" object must implement ManagementMethod. This
// function is called when a remote management client invokes a method
diff --git a/qpid/cpp/include/qpid/management/ManagementObject.h b/qpid/cpp/include/qpid/management/ManagementObject.h
index 2aca6fb1c5..93fbec7bc7 100644
--- a/qpid/cpp/include/qpid/management/ManagementObject.h
+++ b/qpid/cpp/include/qpid/management/ManagementObject.h
@@ -25,10 +25,13 @@
#include "qpid/management/Mutex.h"
#include "qpid/types/Variant.h"
-#include <boost/shared_ptr.hpp>
#include <map>
#include <vector>
+#ifdef _IN_QPID_BROKER
+#include <boost/shared_ptr.hpp>
+#endif
+
namespace qpid {
namespace management {
@@ -155,7 +158,9 @@ protected:
QPID_COMMON_EXTERN uint32_t writeTimestampsSize() const;
public:
+#ifdef _IN_QPID_BROKER
typedef boost::shared_ptr<ManagementObject> shared_ptr;
+#endif
QPID_COMMON_EXTERN static const uint8_t MD5_LEN = 16;
QPID_COMMON_EXTERN static int maxThreads;
@@ -229,8 +234,10 @@ protected:
//QPID_COMMON_EXTERN void mapDecode(const types::Variant::Map& map);
};
+#ifdef _IN_QPID_BROKER
typedef std::map<ObjectId, ManagementObject::shared_ptr> ManagementObjectMap;
typedef std::vector<ManagementObject::shared_ptr> ManagementObjectVector;
+#endif
}}
diff --git a/qpid/cpp/managementgen/qmfgen/schema.py b/qpid/cpp/managementgen/qmfgen/schema.py
index cfbc88f7a9..7bf161dc2b 100755
--- a/qpid/cpp/managementgen/qmfgen/schema.py
+++ b/qpid/cpp/managementgen/qmfgen/schema.py
@@ -1523,12 +1523,8 @@ class SchemaClass:
def genParentRefAssignment (self, stream, variables):
for config in self.properties:
if config.isParentRef == 1:
- if variables['genForBroker']:
- stream.write (config.getName () + \
- " = _parent->GetManagementObjectShared()->getObjectId ();")
- else:
- stream.write (config.getName () + \
- " = _parent->GetManagementObject()->getObjectId ();")
+ stream.write (config.getName () + \
+ " = _parent->GetManagementObject()->getObjectId();")
return
def genSchemaMD5 (self, stream, variables):
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Class.h b/qpid/cpp/managementgen/qmfgen/templates/Class.h
index 362d268aba..cd43cef7f4 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Class.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Class.h
@@ -26,6 +26,7 @@
#include "qpid/management/ManagementObject.h"
/*MGEN:IF(Root.InBroker)*/
#include "qmf/BrokerImportExport.h"
+#include <boost/shared_ptr.hpp>
/*MGEN:ENDIF*/
#include <limits>
@@ -79,7 +80,9 @@ namespace qmf {
void aggregatePerThreadStats(struct PerThreadStats*) const;
/*MGEN:ENDIF*/
public:
+/*MGEN:IF(Root.InBroker)*/
typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr;
+/*MGEN:ENDIF*/
/*MGEN:Root.ExternMethod*/ static void writeSchema(std::string& schema);
/*MGEN:Root.ExternMethod*/ void mapEncodeValues(::qpid::types::Variant::Map& map,
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 579e792b62..731451754f 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -277,7 +277,15 @@ if (CMAKE_COMPILER_IS_GNUCXX)
set (CATCH_UNDEFINED "")
endif (CMAKE_SYSTEM_NAME STREQUAL SunOS)
set (COMPILER_FLAGS "-fvisibility-inlines-hidden")
- set (HIDE_SYMBOL_FLAGS "-fvisibility=hidden")
+ # gcc 4.1.2 on RHEL 5 needs -Wno-attributes to avoid an error that's fixed
+ # in later gcc versions.
+ execute_process(COMMAND ${CMAKE_CXX_COMPILER} -dumpversion
+ OUTPUT_VARIABLE GCC_VERSION)
+ if (GCC_VERSION VERSION_EQUAL 4.1.2)
+ set (HIDE_SYMBOL_FLAGS "-fvisibility=hidden -Wno-attributes")
+ else (GCC_VERSION VERSION_EQUAL 4.1.2)
+ set (HIDE_SYMBOL_FLAGS "-fvisibility=hidden")
+ endif (GCC_VERSION VERSION_EQUAL 4.1.2)
endif (CMAKE_COMPILER_IS_GNUCXX)
if (CMAKE_CXX_COMPILER_ID STREQUAL SunPro)
@@ -578,6 +586,7 @@ if (BUILD_XML)
target_link_libraries (xml xerces-c xqilla qpidbroker pthread)
set_target_properties (xml PROPERTIES
PREFIX ""
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${CATCH_UNDEFINED}")
install (TARGETS xml
DESTINATION ${QPIDD_MODULE_DIR}
@@ -614,6 +623,7 @@ if (BUILD_ACL)
target_link_libraries (acl qpidbroker ${Boost_PROGRAM_OPTIONS_LIBRARY})
set_target_properties (acl PROPERTIES
PREFIX ""
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${CATCH_UNDEFINED}")
install (TARGETS acl
DESTINATION ${QPIDD_MODULE_DIR}
@@ -661,7 +671,7 @@ if (BUILD_HA)
)
add_library (ha MODULE ${ha_SOURCES})
- set_target_properties (ha PROPERTIES PREFIX "")
+ set_target_properties (ha PROPERTIES PREFIX "" COMPILE_DEFINITIONS _IN_QPID_BROKER)
target_link_libraries (ha qpidtypes qpidcommon qpidbroker qpidmessaging)
if (CMAKE_COMPILER_IS_GNUCXX)
set_target_properties (ha PROPERTIES
@@ -1225,7 +1235,7 @@ set (qpidbroker_SOURCES
add_msvc_version (qpidbroker library dll)
add_library (qpidbroker SHARED ${qpidbroker_SOURCES})
target_link_libraries (qpidbroker qpidcommon ${qpidbroker_platform_LIBS})
-set_target_properties (qpidbroker PROPERTIES VERSION ${qpidbroker_version})
+set_target_properties (qpidbroker PROPERTIES VERSION ${qpidbroker_version} COMPILE_DEFINITIONS _IN_QPID_BROKER)
if (MSVC)
set_target_properties (qpidbroker PROPERTIES COMPILE_FLAGS /wd4290)
endif (MSVC)
@@ -1244,6 +1254,7 @@ add_msvc_version (qpidd application exe)
add_executable (qpidd ${qpidd_SOURCES})
target_link_libraries (qpidd qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_FILESYSTEM_LIBRARY})
+set_target_properties (qpidd PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
install (TARGETS qpidd RUNTIME
DESTINATION ${QPID_INSTALL_SBINDIR}
COMPONENT ${QPID_COMPONENT_BROKER})
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 91ff0621c0..cdddd22c41 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -139,9 +139,10 @@ tmoduleexecdir = $(libdir)/qpid/tests
tmoduleexec_LTLIBRARIES=
AM_CXXFLAGS += -DBOOST_FILESYSTEM_VERSION=2
+BROKER_CXXFLAGS = -D_IN_QPID_BROKER
## Automake macros to build libraries and executables.
-qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduleexecdir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"
+qpidd_CXXFLAGS = $(AM_CXXFLAGS) $(BROKER_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduleexecdir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"
libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduleexecdir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\"
qpidd_LDADD = \
@@ -259,7 +260,7 @@ rdma_la_LIBADD = \
-libverbs
rdma_la_LDFLAGS = $(PLUGINLDFLAGS)
rdma_la_CXXFLAGS = \
- $(AM_CXXFLAGS) -Wno-missing-field-initializers
+ $(AM_CXXFLAGS) -Wno-missing-field-initializers -D_IN_QPID_BROKER
dmoduleexec_LTLIBRARIES += \
rdma.la
@@ -759,6 +760,7 @@ libqpidbroker_la_SOURCES = \
QPIDBROKER_VERSION_INFO = 2:0:0
libqpidbroker_la_LDFLAGS = -version-info $(QPIDBROKER_VERSION_INFO)
+libqpidbroker_la_CXXFLAGS=$(AM_CXXFLAGS) $(BROKER_CXXFLAGS)
if HAVE_PROTON
@@ -793,7 +795,7 @@ amqp_la_SOURCES = \
qpid/broker/amqp/Translation.h \
qpid/broker/amqp/Translation.cpp
-amqp_la_CXXFLAGS=$(AM_CXXFLAGS) $(PROTON_CFLAGS)
+amqp_la_CXXFLAGS=$(AM_CXXFLAGS) $(BROKER_CXXFLAGS) $(PROTON_CFLAGS)
amqp_la_LDFLAGS = $(PLUGINLDFLAGS) $(PROTON_LIBS)
cmoduleexec_LTLIBRARIES += amqpc.la
@@ -950,6 +952,7 @@ libqpidmessaging_la_LDFLAGS = -version-info $(QPIDMESSAGING_VERSION_INFO)
# NOTE: only public header files (which should be in ../include)
# should go in this list. Private headers should go in the SOURCES
# list for one of the libraries or executables that includes it.
+# Also included are the swig descriptor files.
nobase_include_HEADERS += \
../include/qpid/Address.h \
@@ -1036,7 +1039,10 @@ nobase_include_HEADERS += \
../include/qpid/types/Exception.h \
../include/qpid/types/Uuid.h \
../include/qpid/types/Variant.h \
- ../include/qpid/types/ImportExport.h
+ ../include/qpid/types/ImportExport.h \
+ ../include/qpid/qpid.i \
+ ../include/qmf/qmfengine.i \
+ ../include/qmf/qmf2.i
# Create the default data directory
install-data-local:
diff --git a/qpid/cpp/src/acl.mk b/qpid/cpp/src/acl.mk
index dfbcd06f4c..87821a3741 100644
--- a/qpid/cpp/src/acl.mk
+++ b/qpid/cpp/src/acl.mk
@@ -43,4 +43,5 @@ if SUNOS
endif
acl_la_LDFLAGS = $(PLUGINLDFLAGS)
+acl_la_CXXFLAGS = $(AM_CXXFLAGS) -D_IN_QPID_BROKER
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake
index 355e591cf6..718e6fe342 100644
--- a/qpid/cpp/src/amqp.cmake
+++ b/qpid/cpp/src/amqp.cmake
@@ -82,6 +82,7 @@ if (BUILD_AMQP)
PREFIX ""
COMPILE_FLAGS "${PROTON_COMPILE_FLAGS}"
LINK_FLAGS "${PROTON_LINK_FLAGS}")
+ set_target_properties (amqp PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
install (TARGETS amqp
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 0cc0760d94..31b3bc243d 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -59,3 +59,4 @@ ha_la_SOURCES = \
ha_la_LIBADD = libqpidbroker.la libqpidmessaging.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
+ha_la_CXXFLAGS = $(AM_CXXFLAGS) -D_IN_QPID_BROKER
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index 40857f411f..a681a6d18d 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon {
uint16_t port=brokerPtr->getPort(options->daemon.transport);
ready(port); // Notify parent.
if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) {
- boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port);
+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
}
brokerPtr->run();
}
@@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *options) {
uint16_t port = brokerPtr->getPort(myOptions->daemon.transport);
cout << port << endl;
if (options->broker.enableMgmt) {
- boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port);
+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
}
}
brokerPtr->run();
diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp
index 3634c0cdc1..61e0b56104 100644
--- a/qpid/cpp/src/qpid/acl/Acl.cpp
+++ b/qpid/cpp/src/qpid/acl/Acl.cpp
@@ -317,7 +317,7 @@ Acl::~Acl(){
broker->getConnectionObservers().remove(connectionCounter);
}
-ManagementObject::shared_ptr Acl::GetManagementObjectShared(void) const
+ManagementObject::shared_ptr Acl::GetManagementObject(void) const
{
return mgmtObject;
}
diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h
index 8c1a925713..ea3c6586a3 100644
--- a/qpid/cpp/src/qpid/acl/Acl.h
+++ b/qpid/cpp/src/qpid/acl/Acl.h
@@ -117,7 +117,7 @@ private:
bool readAclFile(std::string& aclFile, std::string& errorText);
Manageable::status_t lookup (management::Args& args, std::string& text);
Manageable::status_t lookupPublish(management::Args& args, std::string& text);
- virtual qpid::management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
+ virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 09b7fa58e9..a48789973a 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -654,7 +654,10 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk)
{
- moveNewObjectsLH();
+ {
+ sys::Mutex::ScopedLock lock(agentLock);
+ moveNewObjectsLH(lock);
+ }
Variant::Map inMap;
Variant::Map::const_iterator i;
@@ -985,14 +988,37 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(
return result.first;
}
-void ManagementAgentImpl::moveNewObjectsLH()
+// note well: caller must hold agentLock when calling this!
+void ManagementAgentImpl::moveNewObjectsLH(const sys::Mutex::ScopedLock& /*agentLock*/)
{
sys::Mutex::ScopedLock lock(addLock);
- for (ObjectMap::iterator iter = newManagementObjects.begin();
- iter != newManagementObjects.end();
- iter++)
- managementObjects[iter->first] = iter->second;
- newManagementObjects.clear();
+ ObjectMap::iterator newObj = newManagementObjects.begin();
+ while (newObj != newManagementObjects.end()) {
+ // before adding a new mgmt object, check for duplicates:
+ ObjectMap::iterator oldObj = managementObjects.find(newObj->first);
+ if (oldObj == managementObjects.end()) {
+ managementObjects[newObj->first] = newObj->second;
+ newManagementObjects.erase(newObj++); // post inc iterator safe!
+ } else {
+ // object exists with same object id. This may be legit, for example, when a
+ // recently deleted object is re-added before the mgmt poll runs.
+ if (newObj->second->isDeleted()) {
+ // @TODO fixme: we missed an add-delete for the new object
+ QPID_LOG(warning, "Mgmt Object deleted before update sent, oid=" << newObj->first);
+ newManagementObjects.erase(newObj++); // post inc iterator safe!
+ } else if (oldObj->second->isDeleted()) {
+ // skip adding newObj, try again later once oldObj has been cleaned up by poll
+ ++newObj;
+ } else {
+ // real bad - two objects exist with same OID. This is a bug in the application
+ QPID_LOG(error, "Detected two Mgmt Objects using the same object id! oid=" << newObj->first
+ << ", this is bad!");
+ // what to do here? Can't erase an active obj - owner has a pointer to it.
+ // for now I punt. Maybe the flood of log messages will get someone's attention :P
+ ++newObj;
+ }
+ }
+ }
}
void ManagementAgentImpl::addClassLocal(uint8_t classKind,
@@ -1060,7 +1086,7 @@ void ManagementAgentImpl::periodicProcessing()
if (!connected)
return;
- moveNewObjectsLH();
+ moveNewObjectsLH(lock);
//
// Clear the been-here flag on all objects in the map.
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index 53f3c13a91..d801989f64 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -261,7 +261,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void storeData(bool requested=false);
void retrieveData(std::string& vendor, std::string& product, std::string& inst);
PackageMap::iterator findOrAddPackage(const std::string& name);
- void moveNewObjectsLH();
+ void moveNewObjectsLH(const sys::Mutex::ScopedLock& agentLock);
void addClassLocal (uint8_t classKind,
PackageMap::iterator pIter,
const std::string& className,
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 90cb1a79ed..d7844b50ce 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -49,6 +49,11 @@ using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace {
+const std::string QPID_REPLICATE("qpid.replicate");
+const std::string NONE("none");
+}
+
namespace qpid {
namespace broker {
@@ -298,7 +303,7 @@ uint32_t Bridge::encodedSize() const
+ 2; // sync
}
-management::ManagementObject::shared_ptr Bridge::GetManagementObjectShared (void) const
+management::ManagementObject::shared_ptr Bridge::GetManagementObject(void) const
{
return mgmtObject;
}
@@ -333,6 +338,7 @@ void Bridge::propagateBinding(const string& key, const string& tagList,
}
string newTagList(tagList + string(tagList.empty() ? "" : ",") + localTag);
+ bindArgs.setString(QPID_REPLICATE, NONE);
bindArgs.setString(qpidFedOp, op);
bindArgs.setString(qpidFedTags, newTagList);
if (origin.empty())
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 9f99c9ce01..da397b8f77 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -73,7 +73,7 @@ class Bridge : public PersistableConfig,
bool isDetached() const { return detached; }
- management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ management::ManagementObject::shared_ptr GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
management::Args& args,
std::string& text);
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 292820abe4..094dd63527 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) :
systemObject = System::shared_ptr(system);
mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
- mgmtObject->set_systemRef(system->GetManagementObjectShared()->getObjectId());
+ mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
mgmtObject->set_connBacklog(conf.connectionBacklog);
@@ -454,7 +454,7 @@ Broker::~Broker() {
QPID_LOG(notice, "Shut down");
}
-ManagementObject::shared_ptr Broker::GetManagementObjectShared(void) const
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
{
return mgmtObject;
}
@@ -1250,6 +1250,7 @@ void Broker::bind(const std::string& queueName,
QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName
<< " queue:" << queueName
<< " key:" << key
+ << " arguments:" << arguments
<< " user:" << userId
<< " rhost:" << connectionId);
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index eecfd3925c..0a8f406dbf 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -235,7 +235,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
- QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const;
QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
uint32_t methodId, management::Args& args, std::string& text);
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 238bb71fb5..3cb30a82e3 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -402,7 +402,7 @@ SessionHandler& Connection::getChannel(ChannelId id) {
return *ptr_map_ptr(i);
}
-ManagementObject::shared_ptr Connection::GetManagementObjectShared(void) const
+ManagementObject::shared_ptr Connection::GetManagementObject(void) const
{
return mgmtObject;
}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 91470dc3df..2f25b0e3f9 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -112,7 +112,7 @@ class Connection : public sys::ConnectionInputHandler,
void closeChannel(framing::ChannelId channel);
// Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
diff --git a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h b/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index 2fa7ce0fc5..773a99d2c9 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -70,7 +70,7 @@ bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, con
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
Mutex::ScopedLock l(lock);
- Binding::shared_ptr b(new Binding(routingKey, queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr b(new Binding(routingKey, queue, this, args ? *args : FieldTable(), fedOrigin));
BoundKey& bk = bindings[routingKey];
if (exclusiveBinding) bk.queues.clear();
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index 20bd76f645..9098c75f0b 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -177,7 +177,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
mgmtExchange->set_autoDelete(false);
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared());
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
}
}
}
@@ -198,7 +198,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange->set_arguments(ManagementAgent::toMap(args));
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared());
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
}
}
@@ -227,7 +227,7 @@ void Exchange::setAlternate(Exchange::shared_ptr _alternate)
alternate = _alternate;
if (mgmtExchange != 0) {
if (alternate.get() != 0)
- mgmtExchange->set_altExchange(alternate->GetManagementObjectShared()->getObjectId());
+ mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());
else
mgmtExchange->clr_altExchange();
}
@@ -294,7 +294,7 @@ void Exchange::recoveryComplete(ExchangeRegistry& exchanges)
}
}
-ManagementObject::shared_ptr Exchange::GetManagementObjectShared (void) const
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
{
return mgmtExchange;
}
@@ -352,7 +352,7 @@ Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchang
Exchange::Binding::~Binding ()
{
if (mgmtBinding != 0) {
- _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared());
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
if (mo != 0)
mo->dec_bindingCount();
mgmtBinding->resourceDestroy ();
@@ -367,7 +367,7 @@ void Exchange::Binding::startManagement()
if (broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared());
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
if (mo != 0) {
management::ObjectId queueId = mo->getObjectId();
@@ -383,7 +383,7 @@ void Exchange::Binding::startManagement()
}
}
-ManagementObject::shared_ptr Exchange::Binding::GetManagementObjectShared () const
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
{
return mgmtBinding;
}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index ec9a0bea2f..70ed393f64 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -58,7 +58,7 @@ public:
framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
~Binding();
void startManagement();
- management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ management::ManagementObject::shared_ptr GetManagementObject() const;
};
private:
@@ -210,7 +210,7 @@ public:
static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
// Federation hooks
class DynamicBridge {
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index 56c894c129..43c67af810 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -54,7 +54,7 @@ bool FanOutExchange::bind(Queue::shared_ptr queue, const string& /*key*/, const
bool propagate = false;
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
- Binding::shared_ptr binding (new Binding ("", queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding ("", queue, this, args ? *args : FieldTable(), fedOrigin));
if (bindings.add_unless(binding, MatchQueue(queue))) {
binding->startManagement();
propagate = fedBinding.addOrigin(queue->getName(), fedOrigin);
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 02c05852ff..ea7fce4ff6 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -48,6 +48,7 @@ namespace {
const std::string empty;
// federation related args and values
+ const std::string QPID_RESERVED("qpid.");
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
const std::string qpidFedOrigin("qpid.fed.origin");
@@ -200,8 +201,8 @@ bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, co
//matching (they are internally added properties
//controlling binding propagation but not relevant to
//actual routing)
- Binding::shared_ptr binding (new Binding (bindingKey, queue, this, extra_args));
- BoundKey bk(binding);
+ Binding::shared_ptr binding (new Binding (bindingKey, queue, this, args ? *args : FieldTable()));
+ BoundKey bk(binding, extra_args);
if (bindings.add_unless(bk, MatchArgs(queue, &extra_args))) {
binding->startManagement();
propagate = bk.fedBinding.addOrigin(queue->getName(), fedOrigin);
@@ -282,7 +283,7 @@ void HeadersExchange::route(Deliverable& msg)
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()) {
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
- Matcher matcher(i->binding->args);
+ Matcher matcher(i->args);
msg.getMessage().processProperties(matcher);
if (matcher.matches()) {
b->push_back(i->binding);
@@ -298,7 +299,7 @@ bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, cons
Bindings::ConstPtr p = bindings.snapshot();
if (p.get()){
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if ( (!args || equal((*i).binding->args, *args)) && (!queue || (*i).binding->queue == queue)) {
+ if ( (!args || equal((*i).args, *args)) && (!queue || (*i).binding->queue == queue)) {
return true;
}
}
@@ -315,10 +316,7 @@ void HeadersExchange::getNonFedArgs(const FieldTable* args, FieldTable& nonFedAr
for (qpid::framing::FieldTable::ValueMap::const_iterator i=args->begin(); i != args->end(); ++i)
{
- const string & name(i->first);
- if (name == qpidFedOp ||
- name == qpidFedTags ||
- name == qpidFedOrigin)
+ if (i->first.find(QPID_RESERVED) == 0)
{
continue;
}
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h
index 2e4669a018..67ba793ba8 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.h
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h
@@ -38,8 +38,9 @@ class HeadersExchange : public virtual Exchange {
struct BoundKey
{
Binding::shared_ptr binding;
+ qpid::framing::FieldTable args;
FedBinding fedBinding;
- BoundKey(Binding::shared_ptr binding_) : binding(binding_) {}
+ BoundKey(Binding::shared_ptr binding_, const qpid::framing::FieldTable& args_) : binding(binding_), args(args_) {}
};
struct MatchArgs
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.h b/qpid/cpp/src/qpid/broker/LegacyLVQ.h
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/broker/LegacyLVQ.h
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index db789d79cf..0c18e08cd1 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -292,8 +292,8 @@ void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
- if (!hideManagement() && connection->GetManagementObjectShared()) {
- mgmtObject->set_connectionRef(connection->GetManagementObjectShared()->getObjectId());
+ if (!hideManagement() && connection->GetManagementObject()) {
+ mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
}
// Get default URL from known-hosts if not already set
@@ -669,7 +669,7 @@ uint32_t Link::encodedSize() const
+ password.size() + 1;
}
-ManagementObject::shared_ptr Link::GetManagementObjectShared (void) const
+ManagementObject::shared_ptr Link::GetManagementObject(void) const
{
return mgmtObject;
}
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 2087b5259c..97511de08f 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -183,7 +183,7 @@ class Link : public PersistableConfig, public management::Manageable {
static bool isEncodedLink(const std::string& key);
// Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
// manage the exchange owned by this link
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 271e8476f9..8af61bb49a 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -198,7 +198,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete));
mgmtObject->set_arguments(settings.asMap());
agent->addObject(mgmtObject, 0, store != 0);
- brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared());
+ brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject());
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
}
@@ -1108,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
{
if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore)
{
- ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObjectShared();
+ ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject();
if (childObj != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1154,7 +1154,7 @@ void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
alternateExchange = exchange;
if (mgmtObject) {
if (exchange.get() != 0)
- mgmtObject->set_altExchange(exchange->GetManagementObjectShared()->getObjectId());
+ mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
else
mgmtObject->clr_altExchange();
}
@@ -1258,7 +1258,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
externalQueueStore = inst;
if (inst) {
- ManagementObject::shared_ptr childObj = inst->GetManagementObjectShared();
+ ManagementObject::shared_ptr childObj = inst->GetManagementObject();
if (childObj != 0 && mgmtObject != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1306,7 +1306,7 @@ void Queue::countLoadedFromDisk(uint64_t size) const
}
-ManagementObject::shared_ptr Queue::GetManagementObjectShared (void) const
+ManagementObject::shared_ptr Queue::GetManagementObject(void) const
{
return mgmtObject;
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 25cefd144d..bf1103902e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -340,7 +340,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t
QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 9d6053669b..944cc7e838 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
broker = queue->getBroker();
- queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObjectShared());
+ queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
if (queueMgmtObj) {
queueMgmtObj->set_flowStopped(isFlowControlActive());
}
diff --git a/qpid/cpp/src/qpid/broker/RecoverableMessage.h b/qpid/cpp/src/qpid/broker/RecoverableMessage.h
index c98857ceb0..aafcd756d5 100644
--- a/qpid/cpp/src/qpid/broker/RecoverableMessage.h
+++ b/qpid/cpp/src/qpid/broker/RecoverableMessage.h
@@ -22,12 +22,14 @@
*
*/
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/Buffer.h"
namespace qpid {
namespace broker {
+class ExpiryPolicy;
/**
* The interface through which messages are reloaded on recovery.
@@ -38,6 +40,7 @@ public:
typedef boost::shared_ptr<RecoverableMessage> shared_ptr;
virtual void setPersistenceId(uint64_t id) = 0;
virtual void setRedelivered() = 0;
+ virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) = 0;
/**
* Used by store to determine whether to load content on recovery
* or let message load its own content as and when it requires it.
diff --git a/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
index f3ead261c1..a46f5a3676 100644
--- a/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
+++ b/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
@@ -37,6 +37,7 @@ public:
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
+ void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep);
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(boost::shared_ptr<Queue> queue);
diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 6d831563e2..ab89a46a46 100644
--- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -186,6 +186,11 @@ void RecoverableMessageImpl::setRedelivered()
msg.deliver();//increment delivery count (but at present that isn't recorded durably)
}
+void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep)
+{
+ msg.computeExpiration(ep);
+}
+
void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
{
dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 0dc8d6cdfe..0965381fcd 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -37,9 +37,11 @@
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/FedOps.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <iostream>
#include <sstream>
@@ -48,6 +50,11 @@
#include <assert.h>
+namespace {
+const std::string X_SCOPE("x-scope");
+const std::string SESSION("session");
+}
+
namespace qpid {
namespace broker {
@@ -87,6 +94,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
+ unbindSessionBindings();
requeue();
//now unsubscribe, which may trigger queue deletion and thus
@@ -303,14 +311,14 @@ Consumer(_name, type),
deliveryCount(0),
protocols(parent->getSession().getBroker().getProtocolRegistry())
{
- if (parent != 0 && queue.get() != 0 && queue->GetManagementObjectShared() !=0)
+ if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
{
ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
if (agent != 0)
{
- mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObjectShared()->getObjectId(), getTag(),
+ mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -318,7 +326,7 @@ Consumer(_name, type),
}
}
-ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObjectShared (void) const
+ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const
{
return mgmtObject;
}
@@ -803,4 +811,63 @@ void SemanticState::detached()
}
}
+void SemanticState::addBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey, const framing::FieldTable& arguments)
+{
+ QPID_LOG (debug, "SemanticState::addBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey << ", "
+ << "args=" << arguments << "]");
+ std::string fedOp = arguments.getAsString(qpidFedOp);
+ if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) {
+ fedOp = fedOpBind;
+ }
+ std::string fedOrigin = arguments.getAsString(qpidFedOrigin);
+ if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) {
+ bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+ else if (fedOp == fedOpUnbind) {
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+}
+
+void SemanticState::removeBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey)
+{
+ QPID_LOG (debug, "SemanticState::removeBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey)
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, ""));
+}
+
+void SemanticState::unbindSessionBindings()
+{
+ //unbind session-scoped bindings
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ QPID_LOG (debug, "SemanticState::unbindSessionBindings ["
+ << "queue=" << i->get<0>() << ", "
+ << "exchange=" << i->get<1>()<< ", "
+ << "key=" << i->get<2>() << ", "
+ << "fedOrigin=" << i->get<3>() << "]");
+ try {
+ std::string fedOrigin = i->get<3>();
+ if (!fedOrigin.empty()) {
+ framing::FieldTable fedArguments;
+ fedArguments.setString(qpidFedOp, fedOpUnbind);
+ fedArguments.setString(qpidFedOrigin, fedOrigin);
+ session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments,
+ userID, connectionId);
+ } else {
+ session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(),
+ userID, connectionId);
+ }
+ }
+ catch (...) {
+ }
+ }
+ bindings.clear();
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index afb527b0f5..f873c5c656 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -46,10 +46,12 @@
#include <list>
#include <map>
+#include <set>
#include <vector>
#include <boost/enable_shared_from_this.hpp>
#include <boost/cast.hpp>
+#include <boost/tuple/tuple.hpp>
namespace qpid {
namespace broker {
@@ -163,7 +165,7 @@ class SemanticState : private boost::noncopyable {
// manageable entry points
QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
- GetManagementObjectShared(void) const;
+ GetManagementObject(void) const;
QPID_BROKER_EXTERN management::Manageable::status_t
ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
@@ -173,6 +175,8 @@ class SemanticState : private boost::noncopyable {
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+ typedef boost::tuple<std::string, std::string, std::string, std::string> Binding;
+ typedef std::set<Binding> Bindings;
SessionState& session;
ConsumerImplMap consumers;
@@ -190,6 +194,8 @@ class SemanticState : private boost::noncopyable {
//needed for queue delete events in auto-delete:
const std::string connectionId;
+ Bindings bindings;
+
void checkDtxTimeout();
bool complete(DeliveryRecord&);
@@ -197,6 +203,7 @@ class SemanticState : private boost::noncopyable {
void requestDispatch();
void cancel(ConsumerImpl::shared_ptr);
void disable(ConsumerImpl::shared_ptr);
+ void unbindSessionBindings();
public:
@@ -271,6 +278,11 @@ class SemanticState : private boost::noncopyable {
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
DtxBufferMap& getSuspendedXids() { return suspendedXids; }
+
+ void addBinding(const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey, const framing::FieldTable& arguments);
+ void removeBinding(const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 0263ff2a58..b679aebbfa 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -154,12 +154,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
{
getBroker().bind(queueName, exchangeName, routingKey, arguments,
getConnection().getUserId(), getConnection().getUrl());
+ state.addBinding(queueName, exchangeName, routingKey, arguments);
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
+ state.removeBinding(queueName, exchangeName, routingKey);
getBroker().unbind(queueName, exchangeName, routingKey,
getConnection().getUserId(), getConnection().getUrl());
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index f48bf653fb..a6494bc362 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -65,7 +65,7 @@ SessionState::SessionState(
}
void SessionState::addManagementObject() {
- if (GetManagementObjectShared()) return; // Already added.
+ if (GetManagementObject()) return; // Already added.
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -127,7 +127,7 @@ void SessionState::attach(SessionHandler& h) {
if (mgmtObject != 0)
{
mgmtObject->set_attached (1);
- mgmtObject->set_connectionRef (h.getConnection().GetManagementObjectShared()->getObjectId());
+ mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
asyncCommandCompleter->attached();
@@ -148,7 +148,7 @@ void SessionState::giveReadCredit(int32_t credit) {
getConnection().outputTasks.giveReadCredit(credit);
}
-ManagementObject::shared_ptr SessionState::GetManagementObjectShared (void) const
+ManagementObject::shared_ptr SessionState::GetManagementObject(void) const
{
return mgmtObject;
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 06643fdbef..ae28df8026 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -110,7 +110,7 @@ class SessionState : public qpid::SessionState,
const qpid::types::Variant::Map& annotations, bool sync);
// Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h
index 179a3275a7..52643fb2d5 100644
--- a/qpid/cpp/src/qpid/broker/System.h
+++ b/qpid/cpp/src/qpid/broker/System.h
@@ -45,7 +45,7 @@ class System : public management::Manageable
System (std::string _dataDir, Broker* broker = 0);
- management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
+ management::ManagementObject::shared_ptr GetManagementObject(void) const
{ return mgmtObject; }
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index c11389bb17..d49464b4e1 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, cons
}
}
- Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin));
binding->startManagement();
bk->bindingVector.push_back(binding);
nBindings++;
diff --git a/qpid/cpp/src/qpid/broker/TxOpVisitor.h b/qpid/cpp/src/qpid/broker/TxOpVisitor.h
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/broker/TxOpVisitor.h
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/broker/TxPublish.cpp b/qpid/cpp/src/qpid/broker/TxPublish.cpp
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/broker/TxPublish.cpp
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/broker/TxPublish.h b/qpid/cpp/src/qpid/broker/TxPublish.h
deleted file mode 100644
index e69de29bb2..0000000000
--- a/qpid/cpp/src/qpid/broker/TxPublish.h
+++ /dev/null
diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h
index c4b1c280e1..599b821870 100644
--- a/qpid/cpp/src/qpid/broker/Vhost.h
+++ b/qpid/cpp/src/qpid/broker/Vhost.h
@@ -40,7 +40,7 @@ class Vhost : public management::Manageable
Vhost (management::Manageable* parentBroker, Broker* broker = 0);
- management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
+ management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
void setFederationTag(const std::string& tag);
};
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
index 8daf860f8e..0253ba5552 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
@@ -73,7 +73,7 @@ void ManagedConnection::setSaslSsf(int ssf)
connection->set_saslSsf(ssf);
}
-qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObjectShared() const
+qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const
{
return connection;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
index f1514d11c5..e2d0376918 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
@@ -44,7 +44,7 @@ class ManagedConnection : public qpid::management::Manageable, public Connection
std::string getUserid() const;
void setSaslMechanism(const std::string&);
void setSaslSsf(int);
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const ConnectionToken* t) const;
void incomingMessageReceived();
void outgoingMessageSent();
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
index 0fe20f68ab..f36a1e8da4 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
@@ -37,7 +37,7 @@ ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, Queue& q, ManagedSessio
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObjectShared()->getObjectId(), id,
+ subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id,
false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map()));
agent->addObject(subscription);
subscription->set_creditMode("n/a");
@@ -48,7 +48,7 @@ ManagedOutgoingLink::~ManagedOutgoingLink()
if (subscription != 0) subscription->resourceDestroy();
}
-qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObjectShared() const
+qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const
{
return subscription;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
index 19667da698..20a1095db2 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
@@ -39,7 +39,7 @@ class ManagedOutgoingLink : public qpid::management::Manageable
public:
ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic);
virtual ~ManagedOutgoingLink();
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
void outgoingMessageSent();
void outgoingMessageAccepted();
void outgoingMessageRejected();
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
index f1c4940118..9bef0e842b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
@@ -38,7 +38,7 @@ ManagedSession::ManagedSession(Broker& broker, ManagedConnection& p, const std::
session->set_attached(true);
session->set_detachedLifespan(0);
session->clr_expireTime();
- session->set_connectionRef(parent.GetManagementObjectShared()->getObjectId());
+ session->set_connectionRef(parent.GetManagementObject()->getObjectId());
agent->addObject(session);
}
}
@@ -48,7 +48,7 @@ ManagedSession::~ManagedSession()
if (session) session->resourceDestroy();
}
-qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObjectShared() const
+qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const
{
return session;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
index 2f62c8705a..1f56964bb6 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
@@ -40,7 +40,7 @@ class ManagedSession : public qpid::management::Manageable, public OwnershipToke
public:
ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id);
virtual ~ManagedSession();
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const ConnectionToken* t) const;
void incomingMessageReceived();
void incomingMessageAccepted();
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 6b88111732..8f3eb3bf90 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -534,17 +534,19 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
queues.find(values[QNAME].asString());
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
- << " key=" << key);
+ << " key=" << key
+ << " args=" << args);
queue->bind(exchange, key, args);
}
}
@@ -559,13 +561,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
+ exchange->unbind(queue, key, 0);
}
}
@@ -692,16 +692,19 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
boost::shared_ptr<Queue> queue = queues.find(qName);
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
- << " key:" << key);
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ << " key:" << key
+ << " args:" << args);
queue->bind(exchange, key, args);
}
}
@@ -837,6 +840,13 @@ void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
}
}
+// Callback function for accumulating exchange candidates
+namespace {
+ void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
+ c.push_back(i);
+ }
+}
+
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected");
connection = 0;
@@ -844,7 +854,7 @@ void BrokerReplicator::disconnected() {
vector<boost::shared_ptr<Exchange> > collect;
// Make a copy so we can work outside the ExchangeRegistry lock
exchanges.eachExchange(
- boost::bind(&vector<boost::shared_ptr<Exchange> >::push_back, ref(collect), _1));
+ boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
for_each(collect.begin(), collect.end(),
boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 1358baf0e1..8c16a5ea38 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -79,6 +79,11 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
}
}
+namespace {
+const std::string NONE("none");
+bool isNone(const std::string& x) { return x.empty() || x == NONE; }
+}
+
// Called in Plugin::initialize
void HaBroker::initialize() {
@@ -110,11 +115,10 @@ void HaBroker::initialize() {
backup.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
+ if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
+ if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
- if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
- if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
-
// NOTE: lock is not needed in a constructor, but create one
// to pass to functions that have a ScopedLock parameter.
@@ -182,7 +186,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
break;
}
case _qmf::HaBroker::METHOD_SETPUBLICURL: {
- setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
+ setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_REPLICATE: {
@@ -217,19 +221,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url) {
+void HaBroker::setPublicUrl(const Url& url) {
Mutex::ScopedLock l(lock);
- if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
- clientUrl = url;
- updateClientUrl(l);
-}
-
-void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
- Url url = clientUrl.empty() ? brokerUrl : clientUrl;
+ publicUrl = url;
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
+ QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
@@ -238,10 +236,8 @@ void HaBroker::setBrokerUrl(const Url& url) {
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
+ QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
- // Updating broker URL also updates defaulted client URL:
- if (clientUrl.empty()) updateClientUrl(l);
b = backup;
}
if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 4b3f1d49c1..76dbf57a0c 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -71,7 +71,7 @@ class HaBroker : public management::Manageable
void initialize();
// Implement Manageable.
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
@@ -100,7 +100,7 @@ class HaBroker : public management::Manageable
types::Uuid getSystemId() const { return systemId; }
private:
- void setClientUrl(const Url&);
+ void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -125,7 +125,7 @@ class HaBroker : public management::Manageable
boost::shared_ptr<Backup> backup;
boost::shared_ptr<Primary> primary;
qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
- Url clientUrl, brokerUrl;
+ Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
BrokerStatus status;
BrokerInfo brokerInfo;
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 3d77a4cbd1..5edb98c135 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -33,9 +33,11 @@ struct Options : public qpid::Options {
addOptions()
("ha-cluster", optValue(settings.cluster, "yes|no"),
"Join a HA active/passive cluster.")
+ ("ha-queue-replication", optValue(settings.queueReplication, "yes|no"),
+ "Enable replication of specific queues without joining a cluster")
("ha-brokers-url", optValue(settings.brokerUrl,"URL"),
"URL with address of each broker in the cluster.")
- ("ha-public-url", optValue(settings.clientUrl,"URL"),
+ ("ha-public-url", optValue(settings.publicUrl,"URL"),
"URL advertized to clients to connect to the cluster.")
("ha-replicate",
optValue(settings.replicateDefault, "LEVEL"),
@@ -68,7 +70,7 @@ struct HaPlugin : public Plugin {
void earlyInitialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) {
+ if (broker && (settings.cluster || settings.queueReplication)) {
if (!broker->getManagementAgent()) {
QPID_LOG(info, "HA plugin disabled because management is disabled");
if (settings.cluster)
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 1be068063a..53b61415cf 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -35,12 +35,14 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5),
+ Settings() : cluster(false), queueReplication(false),
+ replicateDefault(NONE), backupTimeout(5),
flowMessages(100), flowBytes(0)
{}
bool cluster; // True if we are a cluster member.
- std::string clientUrl;
+ bool queueReplication; // True if enabled.
+ std::string publicUrl;
std::string brokerUrl;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
diff --git a/qpid/cpp/src/qpid/management/Manageable.cpp b/qpid/cpp/src/qpid/management/Manageable.cpp
index 322ec16656..651215ffb5 100644
--- a/qpid/cpp/src/qpid/management/Manageable.cpp
+++ b/qpid/cpp/src/qpid/management/Manageable.cpp
@@ -41,16 +41,6 @@ string Manageable::StatusText (status_t status, string text)
return "??";
}
-ManagementObject* Manageable::GetManagementObject(void) const
-{
- return 0;
-}
-
-ManagementObject::shared_ptr Manageable::GetManagementObjectShared() const
-{
- return ManagementObject::shared_ptr();
-}
-
Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&)
{
return STATUS_UNKNOWN_METHOD;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 3f647ba052..7b8808c0a0 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -698,7 +698,7 @@ void ManagementAgent::periodicProcessing (void)
//
if (publish) {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
qpid::sys::MemStat::loadMemInfo(memstat.get());
}
@@ -1722,7 +1722,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
uint32_t assignedBank;
- ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObjectShared()->getObjectId();
+ ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
moveNewObjects();
@@ -1754,7 +1754,7 @@ void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& reply
agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get()));
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
- agent->mgmtObject->set_registeredTo (broker->GetManagementObjectShared()->getObjectId());
+ agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
@@ -1831,7 +1831,7 @@ void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey,
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
@@ -1945,7 +1945,7 @@ void ManagementAgent::handleGetQuery(const string& body, const string& rte, cons
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
/*
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index 9df5825e32..7f1a2e3e66 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -211,7 +211,7 @@ private:
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
- ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; }
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
diff --git a/qpid/cpp/src/qpid/store/CMakeLists.txt b/qpid/cpp/src/qpid/store/CMakeLists.txt
index 9abdf0ae3d..31623f8e84 100644
--- a/qpid/cpp/src/qpid/store/CMakeLists.txt
+++ b/qpid/cpp/src/qpid/store/CMakeLists.txt
@@ -42,6 +42,7 @@ if (CMAKE_COMPILER_IS_GNUCXX)
set_target_properties (store PROPERTIES
PREFIX ""
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
endif (CMAKE_COMPILER_IS_GNUCXX)
@@ -54,7 +55,9 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
endif (MSVC)
endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
-set_target_properties (store PROPERTIES VERSION ${qpidc_version})
+set_target_properties (store PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ VERSION ${qpidc_version})
install (TARGETS store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
@@ -81,6 +84,7 @@ if (BUILD_MSSQL)
ms-sql/State.cpp
ms-sql/TplRecordset.cpp
ms-sql/VariantHelper.cpp)
+ set_target_properties (mssql_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
install (TARGETS mssql_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
@@ -110,6 +114,7 @@ if (BUILD_MSCLFS)
ms-sql/State.cpp
ms-sql/VariantHelper.cpp)
include_directories(ms-sql)
+ set_target_properties (msclfs_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
target_link_libraries (msclfs_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY} clfsw32.lib)
install (TARGETS msclfs_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
diff --git a/qpid/cpp/src/rdma.cmake b/qpid/cpp/src/rdma.cmake
index 21597f85d2..1d355e7ae6 100644
--- a/qpid/cpp/src/rdma.cmake
+++ b/qpid/cpp/src/rdma.cmake
@@ -79,6 +79,7 @@ if (BUILD_RDMA)
add_library (rdma MODULE qpid/sys/RdmaIOPlugin.cpp)
target_link_libraries (rdma qpidbroker rdmawrap)
set_target_properties (rdma PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${CATCH_UNDEFINED}"
PREFIX "")
diff --git a/qpid/cpp/src/ssl.cmake b/qpid/cpp/src/ssl.cmake
index b1a1ba9fa3..b7ad58b9f0 100644
--- a/qpid/cpp/src/ssl.cmake
+++ b/qpid/cpp/src/ssl.cmake
@@ -90,7 +90,8 @@ if (BUILD_SSL)
target_link_libraries (ssl qpidbroker sslcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
set_target_properties (ssl PROPERTIES
PREFIX ""
- COMPILE_FLAGS ${NSS_COMPILE_FLAGS})
+ COMPILE_FLAGS "${NSS_COMPILE_FLAGS}"
+ COMPILE_DEFINITIONS _IN_QPID_BROKER)
if (CMAKE_COMPILER_IS_GNUCXX)
set_target_properties(ssl PROPERTIES
LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
diff --git a/qpid/cpp/src/ssl.mk b/qpid/cpp/src/ssl.mk
index 24ba8f585e..ff2aa502d6 100644
--- a/qpid/cpp/src/ssl.mk
+++ b/qpid/cpp/src/ssl.mk
@@ -39,7 +39,7 @@ ssl_la_SOURCES = \
ssl_la_LIBADD= libqpidbroker.la libsslcommon.la
-ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFLAGS)
+ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFLAGS) -D_IN_QPID_BROKER
ssl_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
index 71e1945d94..9c21e51a18 100644
--- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
+++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
@@ -123,7 +123,7 @@ class TestManageable : public qpid::management::Manageable
mgmtObj = tmp;
};
~TestManageable() { mgmtObj.reset(); }
- management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObj; };
+ management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; };
static void validateTestObjectProperties(_qmf::TestObject& to)
{
// verify the default values are as expected. We don't check 'string1',
@@ -209,11 +209,11 @@ QPID_AUTO_TEST_CASE(v1ObjPublish)
// create a manageable test object
TestManageable *tm = new TestManageable(agent, std::string("obj1"));
- uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize();
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent->addObject(tm->GetManagementObjectShared(), 1);
+ agent->addObject(tm->GetManagementObject(), 1);
// wait for the object to be published
Message m1;
@@ -234,7 +234,7 @@ QPID_AUTO_TEST_CASE(v1ObjPublish)
// destroy the object
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
// wait for the deleted object to be published
@@ -272,9 +272,9 @@ QPID_AUTO_TEST_CASE(v2ObjPublish)
TestManageable *tm = new TestManageable(agent, std::string("obj2"));
- Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObjectShared()->getPackageName(), "#");
+ Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#");
- agent->addObject(tm->GetManagementObjectShared(), "testobj-1");
+ agent->addObject(tm->GetManagementObject(), "testobj-1");
// wait for the object to be published
Message m1;
@@ -295,7 +295,7 @@ QPID_AUTO_TEST_CASE(v2ObjPublish)
// destroy the object
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
// wait for the deleted object to be published
@@ -335,11 +335,11 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj)
// create a manageable test object
TestManageable *tm = new TestManageable(agent, std::string("myObj"));
- uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize();
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent->addObject(tm->GetManagementObjectShared(), 1);
+ agent->addObject(tm->GetManagementObject(), 1);
// wait for the object to be published
Message m1;
@@ -352,7 +352,7 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj)
// destroy the object, then immediately export (before the next poll cycle)
::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
agent->exportDeletedObjects( delObjs );
BOOST_CHECK(delObjs.size() == 1);
@@ -399,11 +399,11 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj)
// create a manageable test object
TestManageable *tm = new TestManageable(agent, std::string("anObj"));
- uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize();
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent->addObject(tm->GetManagementObjectShared(), 1);
+ agent->addObject(tm->GetManagementObject(), 1);
// wait for the object to be published
Message m1;
@@ -416,7 +416,7 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj)
// destroy the object, then immediately export (before the next poll cycle)
::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
agent->exportDeletedObjects( delObjs );
BOOST_CHECK(delObjs.size() == 1);
@@ -478,8 +478,8 @@ QPID_AUTO_TEST_CASE(v1ExportFastDelObj)
// add, then immediately delete and export the object...
::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->addObject(tm->GetManagementObjectShared(), 999);
- tm->GetManagementObjectShared()->resourceDestroy();
+ agent->addObject(tm->GetManagementObject(), 999);
+ tm->GetManagementObject()->resourceDestroy();
agent->exportDeletedObjects( delObjs );
BOOST_CHECK(delObjs.size() == 1);
@@ -511,8 +511,8 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
// FOR ALL OBJECTS, so objLen will be the same. Otherwise the
// decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length).
TestManageable *tm = new TestManageable(agent, key.str());
- objLen = tm->GetManagementObjectShared()->writePropertiesSize();
- agent->addObject(tm->GetManagementObjectShared(), i + 1);
+ objLen = tm->GetManagementObject()->writePropertiesSize();
+ agent->addObject(tm->GetManagementObject(), i + 1);
tmv.push_back(tm);
}
@@ -531,7 +531,7 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
uint32_t delCount = 0;
for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObjectShared()->resourceDestroy();
+ tmv[i]->GetManagementObject()->resourceDestroy();
delCount++;
}
@@ -604,8 +604,8 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
std::stringstream key;
key << "testobj-" << i;
TestManageable *tm = new TestManageable(agent, key.str());
- if (tm->GetManagementObjectShared()->writePropertiesSize()) {}
- agent->addObject(tm->GetManagementObjectShared(), key.str());
+ if (tm->GetManagementObject()->writePropertiesSize()) {}
+ agent->addObject(tm->GetManagementObject(), key.str());
tmv.push_back(tm);
}
@@ -624,7 +624,7 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
uint32_t delCount = 0;
for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObjectShared()->resourceDestroy();
+ tmv[i]->GetManagementObject()->resourceDestroy();
delCount++;
}
@@ -689,12 +689,12 @@ QPID_AUTO_TEST_CASE(v2RapidRestoreObj)
TestManageable *tm1 = new TestManageable(agent, std::string("obj2"));
TestManageable *tm2 = new TestManageable(agent, std::string("obj2"));
- Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObjectShared()->getPackageName(), "#");
+ Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#");
// add, then immediately delete and re-add a copy of the object
- agent->addObject(tm1->GetManagementObjectShared(), "testobj-1");
- tm1->GetManagementObjectShared()->resourceDestroy();
- agent->addObject(tm2->GetManagementObjectShared(), "testobj-1");
+ agent->addObject(tm1->GetManagementObject(), "testobj-1");
+ tm1->GetManagementObject()->resourceDestroy();
+ agent->addObject(tm2->GetManagementObject(), "testobj-1");
// expect: a delete notification, then an update notification
TestObjectVector objs;
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index b0af187087..63afc46831 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -158,6 +158,7 @@ add_executable (unit_test unit_test
target_link_libraries (unit_test
${qpid_test_boost_libs}
qpidmessaging qpidbroker qmfconsole)
+set_target_properties (unit_test PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
remember_location(unit_test)
add_library (shlibtest MODULE shlibtest.cpp)
@@ -327,7 +328,9 @@ endif (PYTHON_EXECUTABLE)
add_library(test_store MODULE test_store.cpp)
target_link_libraries (test_store qpidbroker qpidcommon)
-set_target_properties (test_store PROPERTIES PREFIX "")
+set_target_properties (test_store PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ PREFIX "")
add_library (dlclose_noop MODULE dlclose_noop.c)
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 55387f0091..4184b5f38a 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -17,7 +17,7 @@
# under the License.
#
-AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK
+AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK -D_IN_QPID_BROKER
INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src
PUBLIC_INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include # Use public API only
QMF_GEN=$(top_srcdir)/managementgen/qmf-gen
@@ -28,6 +28,7 @@ abs_srcdir=@abs_srcdir@
extra_libs =
lib_client = $(abs_builddir)/../libqpidclient.la
lib_messaging = $(abs_builddir)/../libqpidmessaging.la
+lib_types = $(abs_builddir)/../libqpidtypes.la
lib_common = $(abs_builddir)/../libqpidcommon.la
lib_broker = $(abs_builddir)/../libqpidbroker.la
lib_console = $(abs_builddir)/../libqmfconsole.la
@@ -154,7 +155,7 @@ receiver_SOURCES = \
receiver.cpp \
TestOptions.h \
ConnectionOptions.h
-receiver_LDADD = $(lib_client) -lboost_program_options -lqpidcommon
+receiver_LDADD = $(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS += sender
sender_SOURCES = \
@@ -162,7 +163,7 @@ sender_SOURCES = \
TestOptions.h \
ConnectionOptions.h \
Statistics.cpp
-sender_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes -lqpidclient
+sender_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) $(lib_client)
qpidexectest_PROGRAMS += qpid-receive
qpid_receive_SOURCES = \
@@ -171,7 +172,7 @@ qpid_receive_SOURCES = \
ConnectionOptions.h \
Statistics.h \
Statistics.cpp
-qpid_receive_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes
+qpid_receive_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types)
qpidexectest_PROGRAMS += qpid-send
qpid_send_SOURCES = \
@@ -180,42 +181,42 @@ qpid_send_SOURCES = \
ConnectionOptions.h \
Statistics.h \
Statistics.cpp
-qpid_send_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes
+qpid_send_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types)
qpidexectest_PROGRAMS+=qpid-perftest
qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES)
-qpid_perftest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_perftest_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-txtest
qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES)
qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h
-qpid_txtest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_txtest_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-latency-test
qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
-qpid_latency_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_latency_test_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-client-test
qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
-qpid_client_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_client_test_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-topic-listener
qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-topic-publisher
qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-ping
qpid_ping_INCLUDES=$(PUBLIC_INCLUDES)
qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h
-qpid_ping_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_ping_LDADD=$(lib_client) -lboost_program_options $(lib_common)
#
# Other test programs
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index edb50fce9c..55cff046e2 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testLinkBindingCleanup)
+{
+ MessagingFixture fix;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+
+ Connection connection = fix.newConnection();
+ connection.open();
+
+ Session session(connection.createSession());
+ Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}");
+ Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}");
+ connection.close();
+
+ sender.send(Message("test-message"), true);
+
+ // The session-scoped binding should be removed when receiver1's network connection is lost
+ Message in;
+ BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 0597a933a3..24f4bcadf9 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -203,7 +203,9 @@ class Popen(subprocess.Popen):
self.wait()
def kill(self):
- self.expect = EXPECT_EXIT_FAIL
+ # Set to EXPECT_UNKNOWN, EXPECT_EXIT_FAIL creates a race condition
+ # if the process exits normally concurrent with the call to kill.
+ self.expect = EXPECT_UNKNOWN
try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
diff --git a/qpid/cpp/src/tests/federation.py b/qpid/cpp/src/tests/federation.py
index dcd074eda9..6477c6effd 100755
--- a/qpid/cpp/src/tests/federation.py
+++ b/qpid/cpp/src/tests/federation.py
@@ -2604,3 +2604,109 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_dynamic_bounce_unbinds_named_queue(self):
+ """ Verify that a propagated binding is removed when the connection is
+ bounced
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_declare(exchange="fedX", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX").type,
+ "direct", "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ timeout = time() + 10
+ while my_exchange is None and time() <= timeout:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ self.fail("QMF failed to find new exchange!")
+ exchanges.append(my_exchange)
+
+ # on the destination broker, create a binding for propagation
+ self._brokers[0].client_session.queue_declare(queue="fedDstQ")
+ self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud")
+
+ # on the source broker, create a bridge queue
+ self._brokers[1].client_session.queue_declare(queue="fedSrcQ")
+
+ # connect B1 --> B0
+ result = self._brokers[0].qmf_object.create( "link",
+ "Link-dynamic",
+ {"host":self._brokers[1].host,
+ "port":self._brokers[1].port}, False)
+ self.assertEqual(result.status, 0)
+
+ # bridge the "fedX" exchange:
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-dynamic",
+ {"link":"Link-dynamic",
+ "src":"fedX",
+ "dest":"fedX",
+ "dynamic":True,
+ "queue":"fedSrcQ"}, False)
+ self.assertEqual(result.status, 0)
+
+ # wait for the inter-broker links to become operational
+ operational = False
+ timeout = time() + 10
+ while not operational and time() <= timeout:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ self.failUnless(operational, "inter-broker links failed to become operational.")
+
+ # wait until the binding key has propagated to the src broker
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount < 1 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 1)
+
+ #
+ # Tear down the bridges between the two exchanges, then wait
+ # for the bindings to be cleaned up
+ #
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount != 0 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 0)
+
+ self._brokers[1].client_session.queue_delete(queue="fedSrcQ")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_delete(exchange="fedX")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index d7885d9622..4efbfdba3d 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -100,7 +100,7 @@ class HaBroker(Broker):
self.qpid_ha_script.main_except(["", "-b", url]+args)
def promote(self): self.ready(); self.qpid_ha(["promote"])
- def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+ def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
@@ -113,10 +113,12 @@ class HaBroker(Broker):
self._agent = QmfAgent(self.host_port())
return self._agent
- def ha_status(self):
+ def qmf(self):
hb = self.agent().getHaBroker()
hb.update()
- return hb.status
+ return hb
+
+ def ha_status(self): return self.qmf().status
def wait_status(self, status):
def try_get_status():
@@ -234,7 +236,9 @@ class HaCluster(object):
def update_urls(self):
self.url = ",".join([b.host_port() for b in self])
if len(self) > 1: # No failover addresses on a 1 cluster.
- for b in self: b.set_brokers_url(self.url)
+ for b in self:
+ b.set_brokers_url(self.url)
+ b.set_public_url(self.url)
def connect(self, i):
"""Connect with reconnect_urls"""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index bc5566ae63..b29ff42627 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -279,11 +279,13 @@ class ReplicationTests(HaBrokerTest):
"""Test replication of individual queues outside of cluster mode"""
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
- primary = HaBroker(self, name="primary", ha_cluster=False)
+ primary = HaBroker(self, name="primary", ha_cluster=False,
+ args=["--ha-queue-replication=yes"]);
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False)
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
br = backup.connect().session().receiver("q;{create:always}")
# Set up replication with qpid-ha
@@ -304,7 +306,8 @@ class ReplicationTests(HaBrokerTest):
finally: l.restore()
def test_queue_replica_failover(self):
- """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
+ """Test individual queue replication from a cluster to a standalone
+ backup broker, verify it fails over."""
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
cluster = HaCluster(self, 2)
@@ -312,7 +315,8 @@ class ReplicationTests(HaBrokerTest):
pc = cluster.connect(0)
ps = pc.session().sender("q;{create:always}")
pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False)
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
@@ -474,6 +478,23 @@ class ReplicationTests(HaBrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
+ def test_replicate_binding(self):
+ """Verify that binding replication can be disabled"""
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ ps = primary.connect().session()
+ ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
+ ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
+ backup.wait_backup("q")
+
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
+ backup.promote()
+ bs = backup.connect_admin().session()
+ bs.sender("ex").send(Message("msg"))
+ self.assert_browse_retry(bs, "q", [])
+
def test_invalid_replication(self):
"""Verify that we reject an attempt to declare a queue with invalid replication value."""
cluster = HaCluster(self, 1, ha_replicate="all")
@@ -761,9 +782,9 @@ acl deny all all
cluster[1].wait_queue("q0")
cluster[1].wait_queue("q1")
cluster[0].kill()
- cluster[1].wait_queue("q1") # Not timed out yet
- cluster[1].wait_no_queue("q1", timeout=2) # Wait for timeout
- cluster[1].wait_no_queue("q0", timeout=2)
+ cluster[1].wait_queue("q1") # Not timed out yet
+ cluster[1].wait_no_queue("q1") # Wait for timeout
+ cluster[1].wait_no_queue("q0")
def test_alt_exchange_dup(self):
"""QPID-4349: if a queue has an alterante exchange and is deleted the
@@ -1114,6 +1135,38 @@ class RecoveryTests(HaBrokerTest):
cluster.bounce(0, promote_next=False)
cluster[0].promote()
+
+class ConfigurationTests(HaBrokerTest):
+ """Tests for configuration settings."""
+
+ def test_client_broker_url(self):
+ """Check that setting of broker and public URLs obeys correct defaulting
+ and precedence"""
+
+ def check(broker, brokers, public):
+ qmf = broker.qmf()
+ self.assertEqual(brokers, qmf.brokersUrl)
+ self.assertEqual(public, qmf.publicUrl)
+
+ def start(brokers, public, known=None):
+ args=[]
+ if brokers: args.append("--ha-brokers-url="+brokers)
+ if public: args.append("--ha-public-url="+public)
+ if known: args.append("--known-hosts-url="+known)
+ return HaBroker(self, args=args)
+
+ # Both set explictily, no defaulting
+ b = start("foo:123", "bar:456")
+ check(b, "amqp:tcp:foo:123", "amqp:tcp:bar:456")
+ b.set_brokers_url("foo:999")
+ check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:456")
+ b.set_public_url("bar:999")
+ check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:999")
+
+ # Allow "none" to mean "not set"
+ b = start("none", "none")
+ check(b, "", "")
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")
diff --git a/qpid/cpp/src/tests/testagent.mk b/qpid/cpp/src/tests/testagent.mk
index 9f530621c6..0492f3e3bb 100644
--- a/qpid/cpp/src/tests/testagent.mk
+++ b/qpid/cpp/src/tests/testagent.mk
@@ -46,6 +46,6 @@ testagent-testagent.$(OBJEXT): $(TESTAGENT_GEN_SRC)
qpidexectest_PROGRAMS+=testagent
testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen
testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC)
-testagent_LDADD=$(top_builddir)/src/libqmf.la -lqpidcommon -lqpidtypes -lqpidclient
+testagent_LDADD=$(top_builddir)/src/libqmf.la $(top_builddir)/src/libqpidcommon.la $(top_builddir)/src/libqpidtypes.la $(top_builddir)/src/libqpidclient.la
EXTRA_DIST+=testagent.xml
diff --git a/qpid/cpp/src/xml.mk b/qpid/cpp/src/xml.mk
index baf3803647..9376cfd54a 100644
--- a/qpid/cpp/src/xml.mk
+++ b/qpid/cpp/src/xml.mk
@@ -24,6 +24,6 @@ xml_la_SOURCES = \
qpid/xml/XmlExchangePlugin.cpp
xml_la_LIBADD = -lxerces-c -lxqilla libqpidbroker.la
-
+xml_la_CXXFLAGS = $(AM_CXXFLAGS) -D_IN_QPID_BROKER
xml_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
index 65ce108aef..55893387a4 100644
--- a/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
+++ b/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
@@ -173,6 +173,13 @@ under the License.
</listitem>
</itemizedlist>
</para>
+ <para>
+ You should not enable the old and new cluster modules at the same time
+ in a broker, as they may interfere with each other. In other words you
+ should not set <literal>cluster-name</literal> at the same time as
+ either <literal>ha-cluster</literal> or
+ <literal>ha-queue-replication</literal>
+ </para>
</section>
<section>
<title>Limitations</title>
@@ -254,6 +261,14 @@ under the License.
</row>
<row>
<entry>
+ <literal>ha-queue-replication <replaceable>yes|no</replaceable></literal>
+ </entry>
+ <entry>
+ Enable replication of specific queues without joining a cluster, see <xref linkend="ha-queue-replication"/>.
+ </entry>
+ </row>
+ <row>
+ <entry>
<literal>ha-brokers-url <replaceable>URL</replaceable></literal>
</entry>
<entry>
@@ -273,8 +288,7 @@ ssl_addr = "ssl:" host [":" port]'
</footnote>
used by cluster brokers to connect to each other. The URL should
contain a comma separated list of the broker addresses, rather than a
- virtual IP address. For example:
- <literal>amqp:node1.exaple.com,node2.exaple.com,node3.exaple.com</literal>
+ virtual IP address.
</para>
</entry>
</row>
@@ -282,14 +296,18 @@ ssl_addr = "ssl:" host [":" port]'
<entry><literal>ha-public-url <replaceable>URL</replaceable></literal> </entry>
<entry>
<para>
- The URL <footnoteref linkend="ha-url-grammar"/> used by clients to connect to the cluster. This can be a list or
- a single virtual IP address. A virtual IP address is recommended as it
- simplifies deployment. If not specified this defaults to the value of
- <literal>ha-brokers-url</literal>.
+ The URL <footnoteref linkend="ha-url-grammar"/> is advertised to
+ clients as the "known-hosts" for fail-over. It can be a list or
+ a single virtual IP address. A virtual IP address is recommended.
</para>
<para>
- This option allows you to put client traffic on a different network from
- broker traffic, which is recommended.
+ Using this option you can put client and broker traffic on
+ separate networks, which is recommended.
+ </para>
+ <para>
+ Note: When HA clustering is enabled the broker option
+ <literal>known-hosts-url</literal> is ignored and over-ridden by
+ the <literal>ha-public-url</literal> setting.
</para>
</entry>
</row>
@@ -548,7 +566,7 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl
</section>
<section id="ha-creating-replicated">
- <title>Creating replicated queues and exchanges</title>
+ <title>Controlling replication of queues and exchanges</title>
<para>
By default, queues and exchanges are not replicated automatically. You can change
the default behavior by setting the <literal>ha-replicate</literal> configuration
@@ -849,6 +867,30 @@ NOTE: fencing is not shown, you must configure fencing appropriately for your cl
or to simulate a cluster on a single node. For deployment, a resource manager is required.
</para>
</section>
+ <section id="ha-queue-replication">
+ <title>Replicating specific queues</title>
+ <para>
+ In addition to the automatic replication performed in a cluster, you can
+ set up replication for specific queues between arbitrary brokers, even if
+ the brokers are not members of a cluster. The command:
+ </para>
+ <programlisting>
+ qpid-ha replicate <replaceable>QUEUE</replaceable> <replaceable>REMOTE-BROKER</replaceable>
+ </programlisting>
+ <para>
+ sets up replication of <replaceable>QUEUE</replaceable> on <replaceable>REMOTE-BROKER</replaceable> to <replaceable>QUEUE</replaceable> on the current broker.
+ </para>
+ <para>
+ Set the configuration option
+ <literal>ha-queue-replication=yes</literal> on both brokers to enable this
+ feature on non-cluster brokers. It is automatically enabled for brokers
+ that are part of a cluster.
+ </para>
+ <para>
+ Note that this feature does not provide automatic fail-over, for that you
+ need to run a cluster.
+ </para>
+ </section>
</section>
<!-- LocalWords: scalability rgmanager multicast RGManager mailto LVQ qpidd IP dequeued Transactional username
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
index 66cfe10771..4856a7c491 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
@@ -43,6 +43,8 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
private String _remoteHost;
private boolean _ssl;
+ private String _queuePrefix;
+ private String _topicPrefix;
public ConnectionFactoryImpl(final String host,
final int port,
@@ -90,12 +92,15 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
public ConnectionImpl createConnection() throws JMSException
{
- return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl);
+ return createConnection(_username, _password);
}
public ConnectionImpl createConnection(final String username, final String password) throws JMSException
{
- return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+ ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+ connection.setQueuePrefix(_queuePrefix);
+ connection.setTopicPrefix(_topicPrefix);
+ return connection;
}
public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException
@@ -211,4 +216,23 @@ public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnection
return connection;
}
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queuePrefix)
+ {
+ _queuePrefix = queuePrefix;
+ }
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
index 417f6f71e1..be1c2d6514 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
@@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transport.Container;
import javax.jms.*;
import javax.jms.IllegalStateException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import javax.jms.Queue;
+import java.util.*;
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
@@ -50,6 +49,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
private final String _remoteHost;
private final boolean _ssl;
private String _clientId;
+ private String _queuePrefix;
+ private String _topicPrefix;
private static enum State
@@ -379,4 +380,78 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
_isTopicConnection = topicConnection;
}
+
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queueprefix)
+ {
+ _queuePrefix = queueprefix;
+ }
+
+ DecodedDestination toDecodedDestination(DestinationImpl dest)
+ {
+ String address = dest.getAddress();
+ Set<String> kind = null;
+ Class clazz = dest.getClass();
+ if( clazz==QueueImpl.class )
+ {
+ kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+ if( _queuePrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_queuePrefix) )
+ {
+ address = _queuePrefix+address;
+ }
+ }
+ }
+ else if( clazz==TopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+ if( _topicPrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_topicPrefix) )
+ {
+ address = _topicPrefix+address;
+ }
+ }
+ }
+ else if( clazz==TemporaryQueueImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+ }
+ else if( clazz==TemporaryTopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+ }
+ return new DecodedDestination(address, kind);
+ }
+
+ DecodedDestination toDecodedDestination(String address, Set<String> kind)
+ {
+ if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+ {
+ return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+ }
+ if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+ {
+ return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+ }
+ return new DecodedDestination(address, kind);
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java
new file mode 100644
index 0000000000..74e98c2163
--- /dev/null
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import java.util.Set;
+
+/**
+* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+*/
+class DecodedDestination
+{
+ private final String _address;
+ private final Set<String> _attributes;
+
+ DecodedDestination(String address, Set<String> kind)
+ {
+ _address = address;
+ _attributes = kind;
+ }
+
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ public Set<String> getAttributes()
+ {
+ return _attributes;
+ }
+}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
index eb34cead08..3c15c74d6f 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
@@ -127,7 +127,7 @@ public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, Topi
{
try
{
- return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+ return _session.getClientSession(). createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
_linkName, _durable, getFilters(), null);
}
catch (AmqpErrorException e)
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
index f1056b94fd..fba50c5477 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
@@ -50,14 +50,24 @@ public abstract class MessageImpl implements Message
static final Set<Class> _supportedClasses =
new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
Float.class, Double.class, Character.class, String.class, byte[].class));
- private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type");
+ static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type");
+
+ static final String QUEUE_ATTRIBUTE = "queue";
+ static final String TOPIC_ATTRIBUTE = "topic";
+ static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+ static final Set<String> JMS_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE);
+ static final Set<String> JMS_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE);
+ static final Set<String> JMS_TEMP_QUEUE_ATTRIBUTES = set(QUEUE_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
+ static final Set<String> JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE);
private Header _header;
private Properties _properties;
private ApplicationProperties _applicationProperties;
private Footer _footer;
- public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
- private SessionImpl _sessionImpl;
+ private final SessionImpl _sessionImpl;
private boolean _readOnly;
private MessageAnnotations _messageAnnotations;
@@ -171,45 +181,53 @@ public abstract class MessageImpl implements Message
public DestinationImpl getJMSReplyTo() throws JMSException
{
- return DestinationImpl.valueOf(getReplyTo());
+ return toDestination(getReplyTo(), splitCommaSeparateSet((String) getMessageAnnotation(REPLY_TO_TYPE)));
}
public void setJMSReplyTo(Destination destination) throws NonAMQPDestinationException
{
- if(destination == null)
+ if( destination==null )
{
setReplyTo(null);
- }
- else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
- {
- setReplyTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ messageAnnotationMap().remove(REPLY_TO_TYPE);
}
else
{
- throw new NonAMQPDestinationException(destination);
+ DecodedDestination dd = toDecodedDestination(destination);
+ setReplyTo(dd.getAddress());
+ messageAnnotationMap().put(REPLY_TO_TYPE, join(",", dd.getAttributes()));
}
}
public DestinationImpl getJMSDestination() throws JMSException
{
- return _isFromQueue ? QueueImpl.valueOf(getTo())
- : _isFromTopic ? TopicImpl.valueOf(getTo())
- : DestinationImpl.valueOf(getTo());
+ Set<String> type = splitCommaSeparateSet((String) getMessageAnnotation(TO_TYPE));
+ if( type==null )
+ {
+ if( _isFromQueue )
+ {
+ type = JMS_QUEUE_ATTRIBUTES;
+ }
+ else if( _isFromTopic )
+ {
+ type = JMS_TOPIC_ATTRIBUTES;
+ }
+ }
+ return toDestination(getTo(), type);
}
public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
{
- if(destination == null)
+ if( destination==null )
{
setTo(null);
- }
- else if (destination instanceof org.apache.qpid.amqp_1_0.jms.Destination)
- {
- setTo(((org.apache.qpid.amqp_1_0.jms.Destination)destination).getAddress());
+ messageAnnotationMap().remove(TO_TYPE);
}
else
{
- throw new NonAMQPDestinationException(destination);
+ DecodedDestination dd = toDecodedDestination(destination);
+ setTo(dd.getAddress());
+ messageAnnotationMap().put(TO_TYPE, join(",", dd.getAttributes()));
}
}
@@ -264,22 +282,13 @@ public abstract class MessageImpl implements Message
public String getJMSType() throws JMSException
{
- Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
- final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
-
+ final Object attrValue = getMessageAnnotation(JMS_TYPE);
return attrValue instanceof String ? attrValue.toString() : null;
}
public void setJMSType(String s) throws JMSException
{
- Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
- if(messageAttrs == null)
- {
- messageAttrs = new HashMap();
- _messageAnnotations = new MessageAnnotations(messageAttrs);
- }
-
- messageAttrs.put(JMS_TYPE, s);
+ messageAnnotationMap().put(JMS_TYPE, s);
}
public long getJMSExpiration() throws JMSException
@@ -1206,4 +1215,118 @@ public abstract class MessageImpl implements Message
}
abstract Collection<Section> getSections();
+
+ DecodedDestination toDecodedDestination(Destination destination) throws NonAMQPDestinationException
+ {
+ if(destination == null)
+ {
+ return null;
+ }
+ if (destination instanceof DestinationImpl)
+ {
+ return _sessionImpl.getConnection().toDecodedDestination((DestinationImpl) destination);
+ }
+ throw new NonAMQPDestinationException(destination);
+ }
+
+ DestinationImpl toDestination(String address, Set<String> kind)
+ {
+ if( address == null )
+ {
+ return null;
+ }
+
+ // If destination prefixes are in play, we have to strip the the prefix, and we might
+ // be able to infer the kind, if we don't know it yet.
+ DecodedDestination decoded = _sessionImpl.getConnection().toDecodedDestination(address, kind);
+ address = decoded.getAddress();
+ kind = decoded.getAttributes();
+
+ if( kind == null )
+ {
+ return DestinationImpl.valueOf(address);
+ }
+ if( kind.contains(QUEUE_ATTRIBUTE) )
+ {
+ if( kind.contains(TEMPORARY_ATTRIBUTE) )
+ {
+ return new TemporaryQueueImpl(address, null, _sessionImpl);
+ }
+ else
+ {
+ return QueueImpl.valueOf(address);
+ }
+ }
+ else if ( kind.contains(TOPIC_ATTRIBUTE) )
+ {
+ if( kind.contains(TEMPORARY_ATTRIBUTE) )
+ {
+ return new TemporaryTopicImpl(address, null, _sessionImpl);
+ }
+ else
+ {
+ return TopicImpl.valueOf(address);
+ }
+ }
+
+ return DestinationImpl.valueOf(address);
+ }
+
+ private Object getMessageAnnotation(Symbol key)
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ return messageAttrs == null ? null : messageAttrs.get(key);
+ }
+
+ private Map messageAnnotationMap()
+ {
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ if(messageAttrs == null)
+ {
+ messageAttrs = new HashMap();
+ _messageAnnotations = new MessageAnnotations(messageAttrs);
+ }
+ return messageAttrs;
+ }
+
+ Set<String> splitCommaSeparateSet(String value)
+ {
+ if( value == null )
+ {
+ return null;
+ }
+ HashSet<String> rc = new HashSet<String>();
+ for( String x: value.split("\\s*,\\s*") )
+ {
+ rc.add(x);
+ }
+ return rc;
+ }
+
+ private static Set<String> set(String ...args)
+ {
+ HashSet<String> s = new HashSet<String>();
+ for (String arg : args)
+ {
+ s.add(arg);
+ }
+ return Collections.unmodifiableSet(s);
+ }
+
+ static final String join(String sep, Iterable items)
+ {
+ StringBuilder result = new StringBuilder();
+
+ for (Object o : items)
+ {
+ if (result.length() > 0)
+ {
+ result.append(sep);
+ }
+ result.append(o.toString());
+ }
+
+ return result.toString();
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
index 5bb8845eb7..badc20472b 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
@@ -20,7 +20,6 @@ package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
-import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
@@ -61,7 +60,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
{
try
{
- _sender = _session.getClientSession().createSender(_destination.getAddress());
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
}
catch (Sender.SenderCreationException e)
{
@@ -297,7 +296,7 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP
try
{
_destination = (DestinationImpl) destination;
- _sender = _session.getClientSession().createSender(_destination.getAddress());
+ _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
send(message, deliveryMode, priority, ttl);
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
index 75003c0d77..8fab315b10 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
@@ -100,7 +100,7 @@ public class QueueBrowserImpl implements QueueBrowser
{
try
{
- _receiver = _session.getClientSession().createReceiver(_queue.getAddress(),
+ _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
StdDistMode.COPY,
AcknowledgeMode.AMO, null,
false,
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
index d46ed7183f..67b597f5cf 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
@@ -41,7 +41,7 @@ public class QueueReceiverImpl extends MessageConsumerImpl implements QueueRecei
{
try
{
- return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+ return getSession().getClientSession().createMovingReceiver(getSession().toAddress(getDestination()));
}
catch (AmqpErrorException e)
{
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
index cedf9a180a..58b7d4f625 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
@@ -899,4 +899,10 @@ public class SessionImpl implements Session, QueueSession, TopicSession
{
_isTopicSession = topicSession;
}
+
+ String toAddress(DestinationImpl dest)
+ {
+ return _connection.toDecodedDestination(dest).getAddress();
+ }
+
}
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
index 52d8c412ec..f267794796 100644
--- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
+++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
@@ -66,7 +66,7 @@ public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSub
{
try
{
- String address = getDestination().getAddress();
+ String address = getSession().toAddress(getDestination());
Receiver receiver = getSession().getClientSession().createReceiver(address,
StdDistMode.COPY, AcknowledgeMode.ALO,
getLinkName(), isDurable(), getFilters(),
diff --git a/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider b/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider
index b5bc947612..8ece9627b0 100644
--- a/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider
+++ b/qpid/java/bdbstore/jmx/src/main/resources/META-INF/services/org.apache.qpid.server.jmx.MBeanProvider
@@ -1 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBeanProvider
diff --git a/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory b/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory
index 85b942383f..b6c429baab 100644
--- a/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory
+++ b/qpid/java/broker-plugins/access-control/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AccessControlFactory
@@ -1 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
org.apache.qpid.server.security.access.plugins.DefaultAccessControlFactory
diff --git a/qpid/java/broker-plugins/management-http/build.xml b/qpid/java/broker-plugins/management-http/build.xml
index 734d762f17..abf35d9c88 100644
--- a/qpid/java/broker-plugins/management-http/build.xml
+++ b/qpid/java/broker-plugins/management-http/build.xml
@@ -33,11 +33,9 @@
<!-- Flagfile used to determine if uwar needs to be done. ._ is part of Ant's defaultexcludes so wont appear bundles -->
<property name="dojo.uptodate.flagfile" value="${module.classes}/resources/dojo/._dojouptodate.timestamp" />
- <uptodate property="unwardojo.done" targetfile="${dojo.uptodate.flagfile}" srcfile="${project.root}/${dojo}" />
-
<target name="precompile" depends="unwardojo" />
- <target name="unwardojo" unless="unwardojo.done">
+ <target name="unwardojo" depends="check-unwardojo.done" unless="unwardojo.done">
<unwar src="${project.root}/${dojo}" dest="${module.classes}/resources/dojo">
<patternset>
<exclude name="META-INF/**" />
@@ -48,5 +46,9 @@
<touch file="${dojo.uptodate.flagfile}" />
</target>
+ <target name="check-unwardojo.done">
+ <uptodate property="unwardojo.done" targetfile="${dojo.uptodate.flagfile}" srcfile="${project.root}/${dojo}" />
+ </target>
+
<target name="bundle" depends="bundle-tasks" />
</project>
diff --git a/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory b/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory
index 0565b60e64..7ffb9a9013 100644
--- a/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory
+++ b/qpid/java/broker-plugins/management-http/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory
@@ -1 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
org.apache.qpid.server.management.plugin.HttpManagementFactory
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory b/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory
index afbe217301..8fa778269e 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory
+++ b/qpid/java/broker-plugins/management-jmx/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.PluginFactory
@@ -1 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
org.apache.qpid.server.jmx.JMXManagementFactory
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index d84ff145e4..ac4fda2985 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -111,7 +111,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*/
private long _deliveryTag = 0;
- /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */
private AMQQueue _defaultQueue;
/** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
@@ -207,10 +207,6 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
private void incrementOutstandingTxnsIfNecessary()
{
@@ -1485,11 +1481,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (inTransaction())
+ final long transactionStartTime = _transaction.getTransactionStartTime();
+ final long transactionUpdateTime = _txnUpdateTime.get();
+ if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
{
long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
+ long openTime = currentTime - transactionStartTime;
+ long idleTime = currentTime - transactionUpdateTime;
_transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
index 11f16690ef..096d5265ed 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import java.util.UUID;
@@ -30,4 +50,4 @@ public class DirectExchangeType implements ExchangeType<DirectExchange>
{
return ExchangeDefaults.DIRECT_EXCHANGE_NAME;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
index de70373703..0371a363de 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import java.util.UUID;
@@ -28,4 +48,4 @@ public class FanoutExchangeType implements ExchangeType<FanoutExchange>
{
return ExchangeDefaults.FANOUT_EXCHANGE_NAME;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
index 61d9a3c2b0..ed4d57d0f8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import java.util.UUID;
@@ -29,4 +49,4 @@ public class HeadersExchangeType implements ExchangeType<HeadersExchange>
return ExchangeDefaults.HEADERS_EXCHANGE_NAME;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
index cb4e747a2d..25a3549e61 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.exchange;
import java.util.UUID;
@@ -30,4 +50,4 @@ public class TopicExchangeType implements ExchangeType<TopicExchange>
{
return ExchangeDefaults.TOPIC_EXCHANGE_NAME;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java
index f67c7a1c6a..8cf121b3d9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractManagementActor.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.logging.actors;
import java.security.AccessController;
@@ -45,4 +65,4 @@ public abstract class AbstractManagementActor extends AbstractActor
}
return identity;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index 440db73bea..d9e5e1c473 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -573,7 +573,29 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public void closed()
{
-
+ try
+ {
+ _delegate = new ClosedDelegateProtocolEngine();
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Connection from " + getRemoteAddress() + " was closed before any protocol version was established.");
+ }
+ }
+ catch(Exception e)
+ {
+ //ignore
+ }
+ finally
+ {
+ try
+ {
+ _network.close();
+ }
+ catch(Exception e)
+ {
+ //ignore
+ }
+ }
}
public void writerIdle()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index eed55a2e85..075ed2a87c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -42,7 +42,6 @@ import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -449,11 +448,6 @@ public class ServerSession extends Session
return _transaction.isTransactional();
}
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
-
public void selectTx()
{
_transaction = new LocalTransaction(this.getMessageStore());
@@ -591,7 +585,7 @@ public class ServerSession extends Session
/**
* Update last transaction activity timestamp
*/
- public void updateTransactionalActivity()
+ private void updateTransactionalActivity()
{
if (isTransactional())
{
@@ -709,11 +703,13 @@ public class ServerSession extends Session
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (inTransaction())
+ final long transactionStartTime = _transaction.getTransactionStartTime();
+ final long transactionUpdateTime = _txnUpdateTime.get();
+ if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
{
long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
+ long openTime = currentTime - transactionStartTime;
+ long idleTime = currentTime - transactionUpdateTime;
_transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
index 3fbcff7e2c..f11fb1086e 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
@@ -50,7 +50,7 @@ public class LocalTransaction implements ServerTransaction
private volatile Transaction _transaction;
private MessageStore _transactionLog;
- private long _txnStartTime = 0L;
+ private volatile long _txnStartTime = 0L;
private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory
index d49510530d..8ff67030ef 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.AuthenticationManagerFactory
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory
org.apache.qpid.server.security.auth.manager.Base64MD5PasswordFileAuthenticationManagerFactory
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType
index 26aa70c31f..4ad646b7a0 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ExchangeType
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
org.apache.qpid.server.exchange.DirectExchangeType
org.apache.qpid.server.exchange.TopicExchangeType
org.apache.qpid.server.exchange.FanoutExchangeType
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory
index 33ae92181b..6bfb55ff18 100644
--- a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory
@@ -1 +1,19 @@
-org.apache.qpid.server.security.group.FileGroupManagerFactory \ No newline at end of file
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.security.group.FileGroupManagerFactory
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
index 28e1d5a87e..f0eb83ad24 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Drain.java
@@ -88,7 +88,7 @@ public class Drain extends OptionParser
}
}
}
-
+ consumer.close();
ssn.close();
con.close();
}
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
index 61ff2dfc19..09e813f8c1 100644
--- a/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
+++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/Spout.java
@@ -100,6 +100,7 @@ public class Spout extends OptionParser
System.out.println(msg);
System.out.println("-------------------------------\n");
}
+ producer.close();
ssn.close();
con.close();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 12b174198a..9a7f5241a5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1095,7 +1095,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
- public boolean isExchangeExist(AMQDestination dest,boolean assertNode)
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
ExchangeQueryResult result = getQpidSession().exchangeQuery(dest.getAddressName(), Option.NONE).get();
@@ -1118,6 +1118,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setExchangeClass(new AMQShortString(result.getType()));
}
}
+
+ if (assertNode)
+ {
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +", Result was : " + result);
+ }
+ }
+
return match;
}
@@ -1137,9 +1146,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
(result.getExclusive() == node.isExclusive()) &&
(matchProps(result.getArguments(),node.getDeclareArgs()));
}
- else if (match)
+
+ if (assertNode)
{
- // should I use the queried details to update the local data structure.
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +", Result was : " + result);
+ }
}
}
catch(SessionException e)
@@ -1218,32 +1231,32 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
{
case AMQDestination.QUEUE_TYPE:
{
- if (isQueueExist(dest,assertNode))
+ if(createNode)
{
setLegacyFieldsForQueueType(dest);
+ handleQueueNodeCreation(dest,noLocal);
break;
}
- else if(createNode)
+ else if (isQueueExist(dest,assertNode))
{
setLegacyFieldsForQueueType(dest);
- handleQueueNodeCreation(dest,noLocal);
break;
- }
+ }
}
case AMQDestination.TOPIC_TYPE:
{
- if (isExchangeExist(dest,assertNode))
+ if(createNode)
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
+ handleExchangeNodeCreation(dest);
break;
}
- else if(createNode)
+ else if (isExchangeExist(dest,assertNode))
{
setLegacyFiledsForTopicType(dest);
verifySubject(dest);
- handleExchangeNodeCreation(dest);
break;
}
}
@@ -1322,6 +1335,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
arguments.put(AddressHelper.NO_LOCAL, noLocal);
}
+ if (link.isDurable() && queueName.startsWith("TempQueue"))
+ {
+ throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link.");
+ }
+
getQpidSession().queueDeclare(queueName,
queueProps.getAlternateExchange(), arguments,
queueProps.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
index 9b291b48f7..72fc74e19c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
@@ -283,7 +283,7 @@ public class AddressHelper
{
MapAccessor xDeclareMapAccessor = new MapAccessor(xDeclareMap);
queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,AUTO_DELETE,true));
- queue.setAutoDelete(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true));
+ queue.setExclusive(getBooleanProperty(xDeclareMapAccessor,EXCLUSIVE,true));
queue.setAlternateExchange(xDeclareMapAccessor.getString(ALT_EXCHANGE));
queue.setDeclareArgs((Map<String,Object>)xDeclareMap.get(ARGUMENTS));
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java
index a602dcbfd4..7401168978 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/messaging/address/AddressHelperTest.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.client.messaging.address;
import org.apache.qpid.client.AMQDestination;
diff --git a/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef b/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef
index 29998c89dd..46696bf942 100644
--- a/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef
+++ b/qpid/java/perftests/etc/chartdefs/1050-VaryingNumberOfProducerSessionsSingleConnection.chartdef
@@ -20,7 +20,7 @@
chartType=XYLINE
chartTitle=Varying number of producer sessions on single connection
chartSubtitle=Persistent messages (1024b)
-chartDescription=1-80P transacted on single connection, 20C auto-ack on separate connections, persistent, message payload 1KB.
+chartDescription=1-80P transacted on single connection, 20C transacted on separate connections, persistent, message payload 1KB.
xAxisTitle=Number of producer sessions
yAxisTitle=Throughput (KB/s)
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js
index 07f8bf9d92..527300eff4 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/ConfigReaderTest-test-config.js
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
jsonObject = {
"_tests":
QPID.iterations( { "__ACK_MODE": [ 0, 1 ] },
@@ -31,4 +51,4 @@ jsonObject = {
)
})
-} \ No newline at end of file
+}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js
index f64af82feb..eab98e8bd7 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/controller/config/JavaScriptConfigEvaluatorTest-test-config.js
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
jsonObject = {
"_countries":
QPID.iterations( { "__ITERATING_VALUE": [ 0, 1 ] },
@@ -20,4 +40,4 @@ jsonObject = {
)
})
-} \ No newline at end of file
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
index e9798f15b5..0e55557806 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.client.failover;
import java.net.InetSocketAddress;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 974e8d6e96..6041600364 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -96,7 +96,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,false));
// create always -------------------------------------------
@@ -124,7 +124,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
cons = jmsSession.createConsumer(dest);
@@ -159,7 +159,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
@@ -175,7 +175,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
"doesn't resolve to an exchange or a queue"));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
@@ -776,7 +776,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testSubscriptionForSameDestination() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createTopic("ADDR:amq.topic/foo; {link:{durable:true}}");
+ Destination dest = ssn.createTopic("ADDR:amq.topic/foo");
MessageConsumer consumer1 = ssn.createConsumer(dest);
MessageConsumer consumer2 = ssn.createConsumer(dest);
MessageProducer prod = ssn.createProducer(dest);
@@ -1033,7 +1033,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
@@ -1049,7 +1049,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
@@ -1066,7 +1066,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
}
/**