summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-05 00:30:44 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-05 00:30:44 +0000
commit8e7aa5d709fae37fb7f643f600274198bdea9745 (patch)
treedaf9d48272b27283353f8b2354d2f376306c46df
parentbd6a93bc29c3b4f8d2ed572f46e020a541feba9e (diff)
downloadqpid-python-8e7aa5d709fae37fb7f643f600274198bdea9745.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1564582 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/AMQP_1.07
-rw-r--r--cpp/bindings/qpid/dotnet/configure-windows.ps1102
-rw-r--r--cpp/design_docs/broker-acl-work.txt24
-rw-r--r--cpp/src/qpid/DataDir.cpp2
-rw-r--r--cpp/src/qpid/DataDir.h6
-rw-r--r--cpp/src/qpid/Exception.cpp12
-rw-r--r--cpp/src/qpid/acl/AclData.cpp14
-rw-r--r--cpp/src/qpid/acl/AclValidator.cpp16
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp17
-rw-r--r--cpp/src/qpid/broker/AclModule.h29
-rw-r--r--cpp/src/qpid/broker/Broker.cpp12
-rw-r--r--cpp/src/qpid/broker/Broker.h8
-rw-r--r--cpp/src/qpid/broker/DtxTimeout.cpp2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp7
-rw-r--r--cpp/src/qpid/broker/PagedQueue.cpp17
-rw-r--r--cpp/src/qpid/broker/PagedQueue.h1
-rw-r--r--cpp/src/qpid/broker/QueueFactory.cpp4
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp28
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h8
-rw-r--r--cpp/src/qpid/broker/SessionHandlerObserver.h51
-rw-r--r--cpp/src/qpid/broker/posix/BrokerDefaults.cpp1
-rw-r--r--cpp/src/qpid/broker/windows/BrokerDefaults.cpp1
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp30
-rw-r--r--cpp/src/qpid/ha/ErrorListener.h60
-rw-r--r--cpp/src/qpid/ha/Primary.cpp46
-rw-r--r--cpp/src/qpid/ha/Primary.h2
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.cpp14
-rw-r--r--cpp/src/qpid/ha/PrimaryTxObserver.h6
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp45
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h17
-rw-r--r--cpp/src/qpid/ha/TxReplicator.cpp21
-rw-r--r--cpp/src/qpid/ha/TxReplicator.h3
-rw-r--r--cpp/src/qpid/legacystore/StorePlugin.cpp2
-rw-r--r--cpp/src/qpid/legacystore/jrnl/wrfc.cpp6
-rw-r--r--cpp/src/qpid/linearstore/StorePlugin.cpp2
-rw-r--r--cpp/src/qpid/messaging/amqp/SenderContext.cpp16
-rw-r--r--cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp2
-rw-r--r--cpp/src/qpid/sys/MemoryMappedFile.h4
-rw-r--r--cpp/src/qpid/sys/posix/MemoryMappedFile.cpp31
-rw-r--r--cpp/src/qpid/sys/windows/MemoryMappedFile.cpp5
-rwxr-xr-xcpp/src/tests/acl.py223
-rwxr-xr-xcpp/src/tests/run_paged_queue_tests2
-rw-r--r--doc/book/src/cpp-broker/Security.xml38
-rw-r--r--doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml306
-rw-r--r--doc/book/src/java-broker/Java-Broker-Ports.xml4
-rw-r--r--doc/book/src/java-broker/commonEntities.xml6
-rw-r--r--doc/book/src/java-broker/images/JMX-Connect-MBeans.pngbin0 -> 66586 bytes
-rw-r--r--doc/book/src/java-broker/images/JMX-Connect-Remote.pngbin0 -> 66411 bytes
-rw-r--r--doc/book/src/jms-client-0-8/JMS-Client-Examples.xml2
-rw-r--r--doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml18
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/exchange.py6
-rw-r--r--tools/src/py/qpidstore/janal.py13
52 files changed, 1108 insertions, 191 deletions
diff --git a/cpp/AMQP_1.0 b/cpp/AMQP_1.0
index ef478c7a15..f5db31b8f5 100644
--- a/cpp/AMQP_1.0
+++ b/cpp/AMQP_1.0
@@ -164,7 +164,8 @@ warning is logged.
If the node is an exchange, then an outgoing link (i.e. messages to
travel out from broker) will cause a temporary, link-scoped queue to
be created on the broker and bound to the exchange. [See section on
-'Topics' below]
+'Topics' below]. The name of the queue will be of the form
+<container-id>_<link-name>.
Outgoing links may have a filter set on their source. The filters
currently supported by the broker are 'legacy-amqp-direct-binding',
@@ -272,7 +273,9 @@ trigger the establishment of the link. This gives the peer some means
of ensuring their expectations will be met.
The 'shared' capability allows subscriptions from an exchange to be
-shared by multiple receivers.
+shared by multiple receivers. Where this is specified the subscription
+queue created takes the name of the link (and does not include the
+container id).
The 'durable' capability will be added if the queue or exchange
refered to by the source or target is durable. The 'queue' capability
diff --git a/cpp/bindings/qpid/dotnet/configure-windows.ps1 b/cpp/bindings/qpid/dotnet/configure-windows.ps1
index e62e5e8bc3..4874899b7f 100644
--- a/cpp/bindings/qpid/dotnet/configure-windows.ps1
+++ b/cpp/bindings/qpid/dotnet/configure-windows.ps1
@@ -55,6 +55,10 @@
# - If a directory "looks like" is has already had CMake run in it
# then this script skips running CMake again.
#
+# * User chooses to include Proton or not.
+#
+# - Proton is included by having variable PROTON_ROOT reference the
+# directory where proton was installed.
#
# Prerequisites
#
@@ -143,6 +147,7 @@ $ErrorActionPreference='Stop'
#
$global:txtPath = '$env:PATH'
$global:txtQR = '$env:QPID_BUILD_ROOT'
+$global:txtPR = '$env:PROTON_ROOT'
$global:txtWH = 'Write-Host'
#############################
@@ -218,6 +223,37 @@ function SanityCheckBoostPath ($path=0)
#############################
+# SanityCheckProtonInstallPath
+# A path is a "proton install path" if it contains
+# both bin and include subdirectories.
+#
+function SanityCheckProtonInstallPath ($path=0)
+{
+ $result = $true
+ $displayPath = ""
+
+ if ($path -ne $null) {
+ $displayPath = $path
+
+ $toTest = ('include', 'bin')
+ foreach ($pattern in $toTest) {
+ $target = Join-Path $path $pattern
+ if (!(Test-Path -path $target)) {
+ $result = $false
+ }
+ }
+ } else {
+ $result = $false
+ }
+
+ if (! $result) {
+ Write-Host "The path ""$displayPath"" does not appear to be a Proton install root path."
+ }
+ $result
+}
+
+
+#############################
# SanityCheckBuildPath
# A path is a "build path" if it contains
# various subdirectories.
@@ -264,14 +300,16 @@ function WriteDotnetBindingSlnLauncherPs1
[string] $nBits,
[string] $outfileName,
[string] $studioVersion,
- [string] $studioSubdir
+ [string] $studioSubdir,
+ [string] $protonRoot
)
$out = @("#
# Launch $slnName in $studioVersion $vsPlatform ($nBits-bit) environment
#
-$global:txtPath = ""$boostRoot\lib;$global:txtPath""
+$global:txtPath = ""$protonRoot\bin;$boostRoot\lib;$global:txtPath""
$global:txtQR = ""$buildRoot""
+$global:txtPR = ""$protonRoot""
$global:txtWH ""Launch $slnName in $studioVersion $vsPlatform ($nBits-bit) environment.""
$cppDir\bindings\qpid\dotnet\$vsSubdir\$slnName
")
@@ -328,7 +366,8 @@ function WriteDotnetBindingEnvSetupBat
[string] $outfileName,
[string] $studioVersion,
[string] $studioSubdir,
- [string] $cmakeLine
+ [string] $cmakeLine,
+ [string] $protonRoot
)
$out = @("@ECHO OFF
@@ -344,8 +383,9 @@ REM The solution was generated with cmake command line:
REM $cmakeLine
ECHO %PATH% | FINDSTR /I boost > NUL
IF %ERRORLEVEL% EQU 0 ECHO WARNING: Boost is defined in your path multiple times!
-SET PATH=$boostRoot\lib;%PATH%
+SET PATH=$protonRoot\bin;$boostRoot\lib;%PATH%
SET QPID_BUILD_ROOT=$buildRoot
+SET PROTON_ROOT=$protonRoot
ECHO Environment set for $slnName $studioVersion $vsPlatform $nBits-bit development.
")
Write-Host " $buildRoot\$outfileName"
@@ -441,7 +481,7 @@ function SelectVisualStudioVersion {
SelectVisualStudioVersion
#############################
-# User dialog to get optional 32-bit boost and build paths
+# User dialog to get optional 32-bit boost, proton, and build paths
#
$boost32 = Select-Folder -message "Select 32-bit BOOST_ROOT folder for $global:vsVersion build. Press CANCEL to skip 32-bit processing."
@@ -456,6 +496,18 @@ if ($defined32) {
$make32 = $false
if ($defined32) {
+ $proton32folder = Select-Folder -message "Select 32-bit Proton install folder for $global:vsVersion build."
+
+ $found = ($proton32folder -ne $null) -and ($proton32folder -ne '')
+ if ($found) {
+ $found = SanityCheckProtonInstallPath $proton32folder
+ }
+ if ($found) {
+ $proton32cmake = """-DPROTON_ROOT=$proton32folder"""
+ } else {
+ $proton32cmake = ""
+ }
+
$build32 = Select-Folder -message "Select 32-bit QPID_BUILD_ROOT folder for $global:vsVersion build." -path $projRoot
$found = ($build32 -ne $null) -and ($build32 -ne '')
@@ -472,7 +524,7 @@ if ($defined32) {
}
#############################
-# User dialog to get optional 64-bit boost and build paths
+# User dialog to get optional 64-bit boost, proton, and build paths
#
$boost64 = Select-Folder -message "Select 64-bit BOOST_ROOT folder for $global:vsVersion build. Press CANCEL to skip 64-bit processing."
@@ -486,6 +538,18 @@ if ($defined64) {
$make64 = $false
if ($defined64) {
+ $proton64folder = Select-Folder -message "Select 64-bit Proton install folder for $global:vsVersion build."
+
+ $found = ($proton64folder -ne $null) -and ($proton64folder -ne '')
+ if ($found) {
+ $found = SanityCheckProtonInstallPath $proton64folder
+ }
+ if ($found) {
+ $proton64cmake = """-DPROTON_ROOT=$proton64folder"""
+ } else {
+ $proton64cmake = ""
+ }
+
$build64 = Select-Folder -message "Select 64-bit QPID_BUILD_ROOT folder for $global:vsVersion build." -path $projRoot
$found = ($build64 -ne $null) -and ($build64 -ne '')
@@ -508,10 +572,9 @@ if ($defined64) {
#
if ($make32) {
cd "$build32"
- Write-Host "Running 32-bit CMake in $build32 ..."
- $global:cmakeCommandLine32 = "CMake -G ""$global:cmakeGenerator"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_x86"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost32"" $cppDir"
- Write-Host "$global:cmakeCommadLine32"
- CMake -G "$global:cmakeGenerator" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_x86" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost32" $cppDir
+ $global:cmakeCommandLine32 = "CMake -G ""$global:cmakeGenerator"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_$build32"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost32"" $proton32cmake $cppDir"
+ Write-Host "Running 32-bit CMake in $build32 : $global:cmakeCommandLine32"
+ CMake -G "$global:cmakeGenerator" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_$build32" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost32" $proton32cmake $cppDir
} else {
Write-Host "Skipped 32-bit CMake."
}
@@ -522,9 +585,10 @@ if ($make32) {
if ($make64) {
cd "$build64"
Write-Host "Running 64-bit CMake in $build64"
- $global:cmakeCommandLine64 = "CMake -G ""$global:cmakeGenerator Win64"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_x64"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost64"" $cppDir"
- Write-Host "$global:cmakeCommadLine64"
- CMake -G "$global:cmakeGenerator Win64" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_x64" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost64" $cppDir
+ $global:cmakeCommandLine64 = "CMake -G ""$global:cmakeGenerator Win64"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_$build64"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost64"" $proton64cmake $cppDir"
+ Write-Host $global:cmakeCommandLine64
+ Write-Host ""
+ CMake -G "$global:cmakeGenerator Win64" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_$build64" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost64" $proton64cmake $cppDir
} else {
Write-Host "Skipped 64-bit CMake."
}
@@ -549,7 +613,8 @@ if ($defined32) {
-nBits "32" `
-outfileName "start-devenv-messaging-$global:vsSubdir-x86-32bit.ps1" `
-studioVersion "$global:vsVersion" `
- -studioSubdir "$global:vsSubdir"
+ -studioSubdir "$global:vsSubdir" `
+ -protonRoot "$proton32folder"
###########
@@ -577,7 +642,8 @@ if ($defined32) {
-outfileName "setenv-messaging-$global:vsSubdir-x86-32bit.bat" `
-studioVersion "$global:vsVersion" `
-studioSubdir "$global:vsSubdir" `
- -cmakeLine "$global:cmakeCommandLine32"
+ -cmakeLine "$global:cmakeCommandLine32" `
+ -protonRoot "$proton32folder"
} else {
Write-Host "Skipped writing 32-bit scripts."
@@ -615,7 +681,8 @@ if ($defined64) {
-psScriptName "start-devenv-messaging-$global:vsSubdir-x64-64bit.ps1" `
-outfileName "start-devenv-messaging-$global:vsSubdir-x64-64bit.bat" `
-studioVersion "$global:vsVersion" `
- -studioSubdir "$global:vsSubdir"
+ -studioSubdir "$global:vsSubdir" `
+ -protonRoot "$proton64folder"
###########
# Batch script (that you CALL from a command prompt)
@@ -629,7 +696,8 @@ if ($defined64) {
-outfileName "setenv-messaging-$global:vsSubdir-x64-64bit.bat" `
-studioVersion "$global:vsVersion" `
-studioSubdir "$global:vsSubdir" `
- -cmakeLine "$global:cmakeCommandLine64"
+ -cmakeLine "$global:cmakeCommandLine64" `
+ -protonRoot "$proton64folder"
} else {
Write-Host "Skipped writing 64-bit scripts."
diff --git a/cpp/design_docs/broker-acl-work.txt b/cpp/design_docs/broker-acl-work.txt
index e89e446a56..e587dc5198 100644
--- a/cpp/design_docs/broker-acl-work.txt
+++ b/cpp/design_docs/broker-acl-work.txt
@@ -28,16 +28,20 @@ in memory and on disk.
* Add property limit settings to CREATE QUEUE Acl rules.
-User Option Acl Limit Property Units
---------------- ---------------------- ---------------
-qpid.max_size queuemaxsizelowerlimit bytes
- queuemaxsizeupperlimit bytes
-qpid.max_count queuemaxcountlowerlimit messages
- queuemaxcountupperlimit messages
-qpid.file_size filemaxsizelowerlimit pages (64Kb per page)
- filemaxsizeupperlimit pages (64Kb per page)
-qpid.file_count filemaxcountlowerlimit files
- filemaxcountupperlimit files
+User Option Acl Limit Property Units
+--------------- ---------------------- ---------------
+qpid.max_size queuemaxsizelowerlimit bytes
+ queuemaxsizeupperlimit bytes
+qpid.max_count queuemaxcountlowerlimit messages
+ queuemaxcountupperlimit messages
+qpid.file_size filemaxsizelowerlimit pages (64Kb per page)
+ filemaxsizeupperlimit pages (64Kb per page)
+qpid.file_count filemaxcountlowerlimit files
+ filemaxcountupperlimit files
+qpid.max_pages_loaded pageslowerlimit pages
+ pagesupperlimit pages
+qpid.page_factor pagefactorlowerlimit integer (multiple of the platform-defined page size)
+ pagefactorlowerlimit integer (multiple of the platform-defined page size)
* Change rule match behavior to accomodate limit settings
diff --git a/cpp/src/qpid/DataDir.cpp b/cpp/src/qpid/DataDir.cpp
index cfdb536d2c..546df3dabd 100644
--- a/cpp/src/qpid/DataDir.cpp
+++ b/cpp/src/qpid/DataDir.cpp
@@ -26,7 +26,7 @@
namespace qpid {
-DataDir::DataDir (std::string path) :
+DataDir::DataDir (const std::string& path) :
enabled (!path.empty ()),
dirPath (path)
{
diff --git a/cpp/src/qpid/DataDir.h b/cpp/src/qpid/DataDir.h
index 828299f3ba..ec73d28796 100644
--- a/cpp/src/qpid/DataDir.h
+++ b/cpp/src/qpid/DataDir.h
@@ -42,11 +42,11 @@ class DataDir
public:
- QPID_COMMON_EXTERN DataDir (std::string path);
+ QPID_COMMON_EXTERN DataDir (const std::string& path);
QPID_COMMON_EXTERN ~DataDir ();
- bool isEnabled() { return enabled; }
- const std::string& getPath() { return dirPath; }
+ bool isEnabled() const { return enabled; }
+ const std::string& getPath() const { return dirPath; }
};
} // namespace qpid
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp
index a6696f06e1..999c2aeb52 100644
--- a/cpp/src/qpid/Exception.cpp
+++ b/cpp/src/qpid/Exception.cpp
@@ -45,16 +45,20 @@ Exception::Exception(const std::string& msg) throw() : message(msg) {
Exception::~Exception() throw() {}
-std::string Exception::getPrefix() const { return ""; }
+std::string Exception::getPrefix() const { return std::string(); }
std::string Exception::getMessage() const { return message; }
+namespace { const std::string COLON(": "); }
+
const char* Exception::what() const throw() {
// Construct the what string the first time it is needed.
if (whatStr.empty()) {
- whatStr = getPrefix();
- if (!whatStr.empty()) whatStr += ": ";
- whatStr += message;
+ if (message.compare(0, getPrefix().size(), getPrefix()) == 0 || // Already has prefix
+ getPrefix().empty()) // No prefix
+ whatStr = message;
+ else
+ whatStr = getPrefix() + COLON + message;
}
return whatStr.c_str();
}
diff --git a/cpp/src/qpid/acl/AclData.cpp b/cpp/src/qpid/acl/AclData.cpp
index a205ddabfc..48b5e462e5 100644
--- a/cpp/src/qpid/acl/AclData.cpp
+++ b/cpp/src/qpid/acl/AclData.cpp
@@ -160,6 +160,16 @@ bool AclData::lookupMatchRule(
// as rule's index.
propertyMapItr lookupParamItr;
switch (rulePropMapItr->first) {
+ case acl::SPECPROP_MAXPAGESLOWERLIMIT:
+ case acl::SPECPROP_MAXPAGESUPPERLIMIT:
+ lookupParamItr = params->find(PROP_MAXPAGES);
+ break;
+
+ case acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT:
+ case acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT:
+ lookupParamItr = params->find(PROP_MAXPAGEFACTOR);
+ break;
+
case acl::SPECPROP_MAXQUEUECOUNTUPPERLIMIT:
case acl::SPECPROP_MAXQUEUECOUNTLOWERLIMIT:
lookupParamItr = params->find(PROP_MAXQUEUECOUNT);
@@ -201,6 +211,8 @@ bool AclData::lookupMatchRule(
case acl::SPECPROP_MAXQUEUESIZEUPPERLIMIT:
case acl::SPECPROP_MAXFILECOUNTUPPERLIMIT:
case acl::SPECPROP_MAXFILESIZEUPPERLIMIT:
+ case acl::SPECPROP_MAXPAGESUPPERLIMIT:
+ case acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT:
limitChecked &=
compareInt(
rulePropMapItr->first,
@@ -213,6 +225,8 @@ bool AclData::lookupMatchRule(
case acl::SPECPROP_MAXQUEUESIZELOWERLIMIT:
case acl::SPECPROP_MAXFILECOUNTLOWERLIMIT:
case acl::SPECPROP_MAXFILESIZELOWERLIMIT:
+ case acl::SPECPROP_MAXPAGESLOWERLIMIT:
+ case acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT:
limitChecked &=
compareInt(
rulePropMapItr->first,
diff --git a/cpp/src/qpid/acl/AclValidator.cpp b/cpp/src/qpid/acl/AclValidator.cpp
index a077667a33..89e072000e 100644
--- a/cpp/src/qpid/acl/AclValidator.cpp
+++ b/cpp/src/qpid/acl/AclValidator.cpp
@@ -110,6 +110,22 @@ namespace acl {
boost::shared_ptr<PropertyType>(
new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+ validators.insert(Validator(acl::SPECPROP_MAXPAGESLOWERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
+ validators.insert(Validator(acl::SPECPROP_MAXPAGESUPPERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
+ validators.insert(Validator(acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
+ validators.insert(Validator(acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
std::string policyTypes[] = {"ring", "self-destruct", "reject"};
std::vector<std::string> v(policyTypes, policyTypes + sizeof(policyTypes) / sizeof(std::string));
validators.insert(Validator(acl::SPECPROP_POLICYTYPE,
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 5eedafc77b..43f39c2919 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -94,8 +94,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
catch(const SessionException& e) {
- QPID_LOG(error, "Execution exception: " << e.what());
- executionException(e.code, e.what()); // Let subclass handle this first.
+ executionException(e.code, e.what());
framing::AMQP_AllProxy::Execution execution(channel);
AMQMethodBody* m = f.getMethod();
SequenceNumber commandId;
@@ -105,16 +104,13 @@ void SessionHandler::handleIn(AMQFrame& f) {
sendDetach();
}
catch(const ChannelException& e){
- QPID_LOG(error, "Channel exception: " << e.what());
- channelException(e.code, e.what()); // Let subclass handle this first.
+ channelException(e.code, e.what());
peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
- QPID_LOG(error, "Connection exception: " << e.what());
connectionException(e.code, e.getMessage());
}
catch(const std::exception& e) {
- QPID_LOG(error, "Unexpected exception: " << e.what());
connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
}
@@ -186,13 +182,14 @@ void SessionHandler::detach(const std::string& name) {
}
void SessionHandler::detached(const std::string& /*name*/, uint8_t code) {
- // Special case for detached: Don't check if we are
- // attached. Checking can lead to an endless game of "detached
- // tennis" on federated brokers.
awaitingDetached = false;
+ // Special case for detached: Don't throw if we are not attached. Doing so
+ // can lead to an endless game of "detached tennis" on federated brokers.
+ if (!getState()) return; // Already detached.
if (code != session::DETACH_CODE_NORMAL) {
sendReady = receiveReady = false;
- channelException(convert(code), "session.detached from peer.");
+ channelException(convert(code), Msg() << "Channel " << channel.get()
+ << " received session.detached from peer");
} else {
handleDetach();
}
diff --git a/cpp/src/qpid/broker/AclModule.h b/cpp/src/qpid/broker/AclModule.h
index c01697ace9..aa0ea0c6b0 100644
--- a/cpp/src/qpid/broker/AclModule.h
+++ b/cpp/src/qpid/broker/AclModule.h
@@ -77,6 +77,9 @@ namespace acl {
PROP_SCHEMAPACKAGE,
PROP_SCHEMACLASS,
PROP_POLICYTYPE,
+ PROP_PAGING,
+ PROP_MAXPAGES,
+ PROP_MAXPAGEFACTOR,
PROP_MAXQUEUESIZE,
PROP_MAXQUEUECOUNT,
PROP_MAXFILESIZE,
@@ -100,6 +103,7 @@ namespace acl {
SPECPROP_SCHEMAPACKAGE = PROP_SCHEMAPACKAGE,
SPECPROP_SCHEMACLASS = PROP_SCHEMACLASS,
SPECPROP_POLICYTYPE = PROP_POLICYTYPE,
+ SPECPROP_PAGING = PROP_PAGING,
SPECPROP_MAXQUEUESIZELOWERLIMIT,
SPECPROP_MAXQUEUESIZEUPPERLIMIT,
@@ -108,7 +112,11 @@ namespace acl {
SPECPROP_MAXFILESIZELOWERLIMIT,
SPECPROP_MAXFILESIZEUPPERLIMIT,
SPECPROP_MAXFILECOUNTLOWERLIMIT,
- SPECPROP_MAXFILECOUNTUPPERLIMIT };
+ SPECPROP_MAXFILECOUNTUPPERLIMIT,
+ SPECPROP_MAXPAGESLOWERLIMIT,
+ SPECPROP_MAXPAGESUPPERLIMIT,
+ SPECPROP_MAXPAGEFACTORLOWERLIMIT,
+ SPECPROP_MAXPAGEFACTORUPPERLIMIT };
// AclResult shared between ACL spec and ACL authorise interface
enum AclResult {
@@ -229,6 +237,9 @@ namespace acl {
if (str.compare("schemapackage") == 0) return PROP_SCHEMAPACKAGE;
if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS;
if (str.compare("policytype") == 0) return PROP_POLICYTYPE;
+ if (str.compare("paging") == 0) return PROP_PAGING;
+ if (str.compare("maxpages") == 0) return PROP_MAXPAGES;
+ if (str.compare("maxpagefactor") == 0) return PROP_MAXPAGEFACTOR;
if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE;
if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT;
if (str.compare("maxfilesize") == 0) return PROP_MAXFILESIZE;
@@ -249,6 +260,9 @@ namespace acl {
case PROP_SCHEMAPACKAGE: return "schemapackage";
case PROP_SCHEMACLASS: return "schemaclass";
case PROP_POLICYTYPE: return "policytype";
+ case PROP_PAGING: return "paging";
+ case PROP_MAXPAGES: return "maxpages";
+ case PROP_MAXPAGEFACTOR: return "maxpagefactor";
case PROP_MAXQUEUESIZE: return "maxqueuesize";
case PROP_MAXQUEUECOUNT: return "maxqueuecount";
case PROP_MAXFILESIZE: return "maxfilesize";
@@ -270,6 +284,7 @@ namespace acl {
if (str.compare("schemapackage") == 0) return SPECPROP_SCHEMAPACKAGE;
if (str.compare("schemaclass") == 0) return SPECPROP_SCHEMACLASS;
if (str.compare("policytype") == 0) return SPECPROP_POLICYTYPE;
+ if (str.compare("paging") == 0) return SPECPROP_PAGING;
if (str.compare("queuemaxsizelowerlimit") == 0) return SPECPROP_MAXQUEUESIZELOWERLIMIT;
if (str.compare("queuemaxsizeupperlimit") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT;
if (str.compare("queuemaxcountlowerlimit") == 0) return SPECPROP_MAXQUEUECOUNTLOWERLIMIT;
@@ -278,6 +293,10 @@ namespace acl {
if (str.compare("filemaxsizeupperlimit") == 0) return SPECPROP_MAXFILESIZEUPPERLIMIT;
if (str.compare("filemaxcountlowerlimit") == 0) return SPECPROP_MAXFILECOUNTLOWERLIMIT;
if (str.compare("filemaxcountupperlimit") == 0) return SPECPROP_MAXFILECOUNTUPPERLIMIT;
+ if (str.compare("pageslowerlimit") == 0) return SPECPROP_MAXPAGESLOWERLIMIT;
+ if (str.compare("pagesupperlimit") == 0) return SPECPROP_MAXPAGESUPPERLIMIT;
+ if (str.compare("pagefactorlowerlimit") == 0) return SPECPROP_MAXPAGEFACTORLOWERLIMIT;
+ if (str.compare("pagefactorupperlimit") == 0) return SPECPROP_MAXPAGEFACTORUPPERLIMIT;
// Allow old names in ACL file as aliases for newly-named properties
if (str.compare("maxqueuesize") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT;
if (str.compare("maxqueuecount") == 0) return SPECPROP_MAXQUEUECOUNTUPPERLIMIT;
@@ -297,6 +316,7 @@ namespace acl {
case SPECPROP_SCHEMAPACKAGE: return "schemapackage";
case SPECPROP_SCHEMACLASS: return "schemaclass";
case SPECPROP_POLICYTYPE: return "policytype";
+ case SPECPROP_PAGING: return "paging";
case SPECPROP_MAXQUEUESIZELOWERLIMIT: return "queuemaxsizelowerlimit";
case SPECPROP_MAXQUEUESIZEUPPERLIMIT: return "queuemaxsizeupperlimit";
case SPECPROP_MAXQUEUECOUNTLOWERLIMIT: return "queuemaxcountlowerlimit";
@@ -305,6 +325,10 @@ namespace acl {
case SPECPROP_MAXFILESIZEUPPERLIMIT: return "filemaxsizeupperlimit";
case SPECPROP_MAXFILECOUNTLOWERLIMIT: return "filemaxcountlowerlimit";
case SPECPROP_MAXFILECOUNTUPPERLIMIT: return "filemaxcountupperlimit";
+ case SPECPROP_MAXPAGESLOWERLIMIT: return "pageslowerlimit";
+ case SPECPROP_MAXPAGESUPPERLIMIT: return "pagesupperlimit";
+ case SPECPROP_MAXPAGEFACTORLOWERLIMIT: return "pagefactorlowerlimit";
+ case SPECPROP_MAXPAGEFACTORUPPERLIMIT: return "pagefactorupperlimit";
default: assert(false); // should never get here
}
return "";
@@ -381,6 +405,9 @@ namespace acl {
p4->insert(PROP_EXCLUSIVE);
p4->insert(PROP_AUTODELETE);
p4->insert(PROP_POLICYTYPE);
+ p4->insert(PROP_PAGING);
+ p4->insert(PROP_MAXPAGES);
+ p4->insert(PROP_MAXPAGEFACTOR);
p4->insert(PROP_MAXQUEUESIZE);
p4->insert(PROP_MAXQUEUECOUNT);
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 975acfdb87..4017fdbfe3 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -217,7 +217,9 @@ Broker::Broker(const Broker::Options& conf) :
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
- pagingDir(conf.pagingDir),
+ pagingDir(!conf.pagingDir.empty() ? conf.pagingDir :
+ dataDir.isEnabled() ? dataDir.getPath() + Options::DEFAULT_PAGED_QUEUE_DIR :
+ std::string() ),
queues(this),
exchanges(this),
links(this),
@@ -385,11 +387,6 @@ Broker::Broker(const Broker::Options& conf) :
}
}
-std::string Broker::getPagingDirectoryPath()
-{
- return pagingDir.isEnabled() ? pagingDir.getPath() : dataDir.getPath();
-}
-
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
{
bool storeEnabled = store.get() != NULL;
@@ -1304,6 +1301,9 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy()));
+ params.insert(make_pair(acl::PROP_PAGING, settings.paging ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_MAXPAGES, boost::lexical_cast<string>(settings.maxPages)));
+ params.insert(make_pair(acl::PROP_MAXPAGEFACTOR, boost::lexical_cast<string>(settings.pageFactor)));
params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount)));
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 4e46b71cc9..4bad8f2960 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -38,6 +38,7 @@
#include "qpid/broker/System.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/broker/BrokerObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/ConnectionCodec.h"
@@ -86,6 +87,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
struct Options : public qpid::Options {
static const std::string DEFAULT_DATA_DIR_LOCATION;
static const std::string DEFAULT_DATA_DIR_NAME;
+ static const std::string DEFAULT_PAGED_QUEUE_DIR;
QPID_BROKER_EXTERN Options(const std::string& name="Broker Options");
@@ -178,6 +180,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir dataDir;
DataDir pagingDir;
ConnectionObservers connectionObservers;
+ SessionHandlerObservers sessionHandlerObservers;
BrokerObservers brokerObservers;
QueueRegistry queues;
@@ -237,11 +240,11 @@ class Broker : public sys::Runnable, public Plugin::Target,
ExchangeRegistry& getExchanges() { return exchanges; }
LinkRegistry& getLinks() { return links; }
DtxManager& getDtxManager() { return dtxManager; }
- DataDir& getDataDir() { return dataDir; }
+ const DataDir& getDataDir() { return dataDir; }
+ const DataDir& getPagingDir() { return pagingDir; }
Options& getOptions() { return config; }
ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; }
- std::string getPagingDirectoryPath();
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
@@ -360,6 +363,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+ SessionHandlerObservers& getSessionHandlerObservers() { return sessionHandlerObservers; }
BrokerObservers& getBrokerObservers() { return brokerObservers; }
/** Properties to be set on outgoing link connections */
diff --git a/cpp/src/qpid/broker/DtxTimeout.cpp b/cpp/src/qpid/broker/DtxTimeout.cpp
index 58700846ef..f317df5713 100644
--- a/cpp/src/qpid/broker/DtxTimeout.cpp
+++ b/cpp/src/qpid/broker/DtxTimeout.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
@@ -31,5 +32,6 @@ DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _
void DtxTimeout::fire()
{
+ QPID_LOG(debug, "DTX transaction timeouted, XID=" << xid << ", timeout=" << timeout);
mgr.timedout(xid);
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 19c7f107f6..585c7ba764 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -302,7 +302,12 @@ void HeadersExchange::route(Deliverable& msg)
if (p.get()) {
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
if (match(i->args, msg.getMessage())) {
- b->push_back(i->binding);
+ /* check if a binding tothe same queue has not been already added to b */
+ std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >::iterator bi = b->begin();
+ while ((bi != b->end()) && ((*bi)->queue != i->binding->queue))
+ ++bi;
+ if (bi == b->end())
+ b->push_back(i->binding);
}
}
}
diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp
index 43208d74ee..afb330489b 100644
--- a/cpp/src/qpid/broker/PagedQueue.cpp
+++ b/cpp/src/qpid/broker/PagedQueue.cpp
@@ -91,13 +91,16 @@ PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, u
: name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
expiryPolicy(e)
{
- path = file.open(name, directory);
- QPID_LOG(debug, "PagedQueue[" << path << "]");
+ if (directory.empty()) {
+ throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified"));
+ }
+ file.open(name, directory);
+ QPID_LOG(debug, "PagedQueue[" << name << "]");
}
PagedQueue::~PagedQueue()
{
- file.close(path);
+ file.close();
}
size_t PagedQueue::size()
@@ -133,7 +136,7 @@ bool PagedQueue::deleted(const QueueCursor& cursor)
void PagedQueue::publish(const Message& added)
{
if (encodedSize(added) > pageSize) {
- QPID_LOG(error, "Message is larger than page size for queue " << name << ", backed by " << path);
+ QPID_LOG(error, "Message is larger than page size for queue " << name);
throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name));
}
Used::reverse_iterator i = used.rbegin();
@@ -143,7 +146,7 @@ void PagedQueue::publish(const Message& added)
}
//used is empty or last page is full, need to add a new page
if (!newPage(added.getSequence()).add(added)) {
- QPID_LOG(error, "Could not add message to paged queue " << name << ", backed by " << path);
+ QPID_LOG(error, "Could not add message to paged queue " << name);
throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name));
}
}
@@ -388,14 +391,14 @@ void PagedQueue::load(Page& page)
}
page.load(file, protocols, expiryPolicy);
++loaded;
- QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
+ QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded");
}
void PagedQueue::unload(Page& page)
{
page.unload(file);
--loaded;
- QPID_LOG(debug, "PagedQueue[" << path << "] unloaded page, " << loaded << " pages now loaded");
+ QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded");
}
diff --git a/cpp/src/qpid/broker/PagedQueue.h b/cpp/src/qpid/broker/PagedQueue.h
index cb83fa9f34..e4c98f4119 100644
--- a/cpp/src/qpid/broker/PagedQueue.h
+++ b/cpp/src/qpid/broker/PagedQueue.h
@@ -79,7 +79,6 @@ class PagedQueue : public Messages {
qpid::sys::MemoryMappedFile file;
std::string name;
- std::string path;
const size_t pageSize;
const uint maxLoaded;
ProtocolRegistry& protocols;
diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp
index e60349edfb..08988ed4ac 100644
--- a/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/cpp/src/qpid/broker/QueueFactory.cpp
@@ -76,8 +76,10 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
QPID_LOG(warning, "Cannot create paged queue without broker context");
} else if (!qpid::sys::MemoryMappedFile::isSupported()) {
QPID_LOG(warning, "Cannot create paged queue; memory mapped file support not available on this platform");
+ } else if ( !broker->getPagingDir().isEnabled() ) {
+ QPID_LOG(warning, "Cannot create paged queue; no paging directory enabled");
} else {
- queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDirectoryPath(),
+ queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDir().getPath(),
settings.maxPages ? settings.maxPages : 4,
settings.pageFactor ? settings.pageFactor : 1,
broker->getProtocolRegistry(), broker->getExpiryPolicy()));
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index a8f5734af7..3d5faf2dab 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -33,11 +33,35 @@ using namespace framing;
using namespace std;
using namespace qpid::sys;
+namespace {
+class DefaultErrorListener : public SessionHandler::ErrorListener {
+ public:
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, "Connection exception: " << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, "Channel exception: " << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, "Execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, "Incoming execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {}
+
+ private:
+};
+}
+
SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch)
: qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
-{}
+ proxy(out),
+ errorListener(boost::shared_ptr<ErrorListener>(new DefaultErrorListener()))
+{
+ c.getBroker().getSessionHandlerObservers().newSessionHandler(*this);
+}
SessionHandler::~SessionHandler()
{
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index cb81084014..ee6baed0b6 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -21,7 +21,7 @@
* under the License.
*
*/
-
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
@@ -76,8 +76,8 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- amqp_0_10::Connection& getConnection();
- const amqp_0_10::Connection& getConnection() const;
+ QPID_BROKER_EXTERN amqp_0_10::Connection& getConnection();
+ QPID_BROKER_EXTERN const amqp_0_10::Connection& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
@@ -86,7 +86,7 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
- void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
+ QPID_BROKER_EXTERN void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
// Called by SessionAdapter
void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg);
diff --git a/cpp/src/qpid/broker/SessionHandlerObserver.h b/cpp/src/qpid/broker/SessionHandlerObserver.h
new file mode 100644
index 0000000000..6d0ea16254
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionHandlerObserver.h
@@ -0,0 +1,51 @@
+#ifndef QPID_BROKER_SESSIONHANDLEROBSERVER_H
+#define QPID_BROKER_SESSIONHANDLEROBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Observers.h"
+
+namespace qpid {
+namespace broker {
+class SessionHandler;
+
+/**
+ * Observer of session handler events.
+ */
+class SessionHandlerObserver
+{
+ public:
+ virtual ~SessionHandlerObserver() {}
+ virtual void newSessionHandler(SessionHandler&) {}
+};
+
+
+class SessionHandlerObservers : public Observers<SessionHandlerObserver> {
+ public:
+ void newSessionHandler(SessionHandler& sh) {
+ each(boost::bind(&SessionHandlerObserver::newSessionHandler, _1, boost::ref(sh)));
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SESSIONHANDLEROBSERVER_H*/
diff --git a/cpp/src/qpid/broker/posix/BrokerDefaults.cpp b/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
index 9e463fa32d..9e118c19f7 100644
--- a/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
+++ b/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
@@ -27,6 +27,7 @@ namespace broker {
const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("/tmp");
const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("/.qpidd");
+const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("/pq");
std::string
Broker::Options::getHome() {
diff --git a/cpp/src/qpid/broker/windows/BrokerDefaults.cpp b/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
index b65440b5ad..260bd7c073 100644
--- a/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
+++ b/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
@@ -27,6 +27,7 @@ namespace broker {
const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("\\TEMP");
const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("\\QPIDD.DATA");
+const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("\\PQ");
std::string
Broker::Options::getHome() {
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index b1faf19e52..7928b6ab71 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -172,34 +172,29 @@ Variant::Map asMapVoid(const Variant& value) {
}
} // namespace
-// Listens for errors on the bridge session.
-class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+// Report errors on the broker replication session.
+class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& lp, BrokerReplicator& br) :
- logPrefix(lp), brokerReplicator(br) {}
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
- void connectionException(framing::connection::CloseCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
}
- void channelException(framing::session::DetachCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
}
- void executionException(framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
}
-
- void incomingExecutionException(
- framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Incoming execution error: " << msg);
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
-
void detach() {
QPID_LOG(debug, logPrefix << "Session detached.");
}
private:
std::string logPrefix;
- BrokerReplicator& brokerReplicator;
};
/** Keep track of queues or exchanges during the update process to solve 2
@@ -328,8 +323,7 @@ void BrokerReplicator::initialize() {
boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
assert(result.second);
- result.first->setErrorListener(
- boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
+ result.first->setErrorListener(boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
broker.getConnectionObservers().add(shared_from_this());
}
diff --git a/cpp/src/qpid/ha/ErrorListener.h b/cpp/src/qpid/ha/ErrorListener.h
new file mode 100644
index 0000000000..1ae2078a11
--- /dev/null
+++ b/cpp/src/qpid/ha/ErrorListener.h
@@ -0,0 +1,60 @@
+#ifndef QPID_HA_ERRORLISTENER_H
+#define QPID_HA_ERRORLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace ha {
+
+/** Default ErrorListener for HA module */
+class ErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(error, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ERRORLISTENER_H*/
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index 3bb51b1813..b4d50d1652 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary& primary;
};
+class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
+ public:
+ PrimarySessionHandlerObserver(const std::string& logPrefix)
+ : errorListener(new PrimaryErrorListener(logPrefix)) {}
+ void newSessionHandler(broker::SessionHandler& sh) {
+ BrokerInfo info;
+ // Suppress error logging for backup connections
+ // TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
+ if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
+ sh.setErrorListener(errorListener);
+ }
+ }
+ private:
+ boost::shared_ptr<PrimaryErrorListener> errorListener;
+};
+
+
} // namespace
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
+ sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix)
{
// Note that at this point, we are still rejecting client connections.
@@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
brokerObserver.reset(new PrimaryBrokerObserver(*this));
haBroker.getBroker().getBrokerObservers().add(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
+
checkReady(); // Outside lock
// Allow client connections
@@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
haBroker.getObserver()->reset();
}
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index 2e32515c9a..af368bca0f 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -42,6 +42,7 @@ class Queue;
class Connection;
class ConnectionObserver;
class BrokerObserver;
+class SessionHandlerObserver;
class TxBuffer;
class DtxBuffer;
}
@@ -152,6 +153,7 @@ class Primary : public Role
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
+ boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
ReplicaMap replicas;
TxMap txMap;
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index dc5bf15911..c94ced7024 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -98,7 +98,8 @@ PrimaryTxObserver::PrimaryTxObserver(
replicationTest(hb.getSettings().replicateDefault.get()),
txBuffer(tx),
id(true),
- exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str())
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ empty(true)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -149,6 +150,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
checkState(SENDING, "Too late for enqueue");
+ empty = false;
enqueues[q] += m.getReplicationId();
txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
txQueue->deliver(m);
@@ -162,12 +164,9 @@ void PrimaryTxObserver::dequeue(
checkState(SENDING, "Too late for dequeue");
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+ empty = false;
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
}
- else {
- QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: "
- << LogMessageId(*q, pos, id));
- }
}
namespace {
@@ -221,8 +220,10 @@ void PrimaryTxObserver::commit() {
}
void PrimaryTxObserver::rollback() {
- QPID_LOG(debug, logPrefix << "Rollback");
Mutex::ScopedLock l(lock);
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (state != ENDED) {
txQueue->deliver(TxRollbackEvent().message());
end(l);
@@ -287,7 +288,6 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
- QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup);
// Normally the backup should be completed before it is cancelled.
if (completed(backup, l)) error(backup, "Unexpected disconnect:", l);
// Break the pointer cycle if backups have completed and we are done with txBuffer.
diff --git a/cpp/src/qpid/ha/PrimaryTxObserver.h b/cpp/src/qpid/ha/PrimaryTxObserver.h
index 105fee4d40..5b7c2e3e93 100644
--- a/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -55,8 +55,9 @@ class Primary;
* A TxReplicator on the backup replicates the tx-queue and creates
* a TxBuffer on the backup equivalent to the one on the primary.
*
- * Also observes the tx-queue for prepare-complete messages and
- * subscription cancellations.
+ * Creates an exchange to receive prepare-ok/prepare-fail messages from backups.
+ *
+ * Monitors for tx-queue subscription cancellations.
*
* THREAD SAFE: called in user connection thread for TX events,
* and in backup connection threads for prepare-completed events
@@ -122,6 +123,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueueIdsMap enqueues;
UuidSet backups; // All backups of transaction.
UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
+ bool empty; // True if the transaction is empty - no enqueues/dequeues.
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index 507df6ea5a..59b2013f59 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
}
+// Debug log expected exceptions on queue replicator, check incoming execution
+// exceptions for "deleted on primary" conditions.
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
: queueReplicator(qr), logPrefix(qr->logPrefix) {}
- void connectionException(framing::connection::CloseCode, const std::string&) {}
- void channelException(framing::session::DetachCode, const std::string&) {}
- void executionException(framing::execution::ErrorCode, const std::string&) {}
-
- void incomingExecutionException(ErrorCode e, const std::string& msg) {
- queueReplicator->incomingExecutionException(e, msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(ErrorCode code, const std::string& msg) {
+ if (!queueReplicator->deletedOnPrimary(code, msg))
+ QPID_LOG(error, logPrefix << "Incoming "
+ << framing::createSessionException(code, msg).what());
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached");
@@ -197,20 +206,25 @@ void QueueReplicator::disconnect() {
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
+ QPID_LOG(debug, logPrefix << "Destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
- QPID_LOG(debug, logPrefix << "Destroyed");
bridge2 = bridge; // call close outside the lock.
- // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
- queue.reset();
- bridge.reset();
- getBroker()->getExchanges().destroy(getName());
+ destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
+void QueueReplicator::destroy(Mutex::ScopedLock&) {
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
+}
+
+
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
@@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
// get a not-found or resource-deleted exception before the
// BrokerReplicator gets the queue-delete event. Shut down the bridge by
// calling destroy(), we can let the BrokerReplicator delete the queue
// when the queue-delete arrives.
- QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+ QPID_LOG(debug, logPrefix << "Deleted on primary: "
+ << framing::createSessionException(e, msg).what());
destroy();
+ return true;
}
- else
- QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+ return false;
}
// Unused Exchange methods.
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index 22cd13a0a8..f94c6de116 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -46,11 +46,13 @@ class HaBroker;
class Settings;
/**
- * Exchange created on a backup broker to replicate a queue on the primary.
+ * Exchange created on a backup broker to receive replicated messages and
+ * replication events from a queue on the primary. It subscribes to the primary
+ * queue via a ReplicatingSubscription on the primary by passing special
+ * arguments to the subscribe command.
*
- * Puts replicated messages on the local queue, handles dequeue events.
- * Creates a ReplicatingSubscription on the primary by passing special
- * arguments to the consume command.
+ * It puts replicated messages on the local replica queue and handles dequeue
+ * events by removing local messages.
*
* THREAD SAFE: Called in different connection threads.
*/
@@ -74,7 +76,7 @@ class QueueReplicator : public broker::Exchange,
void disconnect(); // Called when we are disconnected from the primary.
- std::string getType() const;
+ virtual std::string getType() const;
void route(broker::Deliverable&);
@@ -101,7 +103,9 @@ class QueueReplicator : public broker::Exchange,
void initialize(); // Called as part of create()
virtual void deliver(const broker::Message&);
+
virtual void destroy(); // Called when the queue is destroyed.
+ virtual void destroy(sys::Mutex::ScopedLock&);
sys::Mutex lock;
HaBroker& haBroker;
@@ -124,8 +128,7 @@ class QueueReplicator : public broker::Exchange,
void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
- void incomingExecutionException(framing::execution::ErrorCode e,
- const std::string& msg);
+ bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg);
std::string logPrefix;
std::string bridgeName;
diff --git a/cpp/src/qpid/ha/TxReplicator.cpp b/cpp/src/qpid/ha/TxReplicator.cpp
index 7ff03b5f92..d2a647ae8f 100644
--- a/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/cpp/src/qpid/ha/TxReplicator.cpp
@@ -87,7 +87,7 @@ TxReplicator::TxReplicator(
QueueReplicator(hb, txQueue, link),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- ended(false),
+ empty(true), ended(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -151,6 +151,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Enqueue: " << e);
enq = e;
+ empty = false;
}
void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
@@ -163,6 +164,7 @@ void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
// prepared, then they are all receieved before the prepare event.
// We collect the events here so we can do a single scan of the queue in prepare.
dequeueState.add(e);
+ empty = false;
}
void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
@@ -227,7 +229,9 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
if (!txBuffer) return;
- QPID_LOG(debug, logPrefix << "Rollback");
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
end(l);
@@ -255,15 +259,12 @@ void TxReplicator::end(sys::Mutex::ScopedLock&) {
}
// Called when the tx queue is deleted.
-void TxReplicator::destroy() {
- {
- sys::Mutex::ScopedLock l(lock);
- if (!ended) {
- QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback.");
- rollback(string(), l);
- }
+void TxReplicator::destroy(sys::Mutex::ScopedLock& l) {
+ if (!ended) {
+ if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback");
+ rollback(string(), l);
}
- QueueReplicator::destroy();
+ QueueReplicator::destroy(l);
}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h
index 7f1256699a..5c509d14a7 100644
--- a/cpp/src/qpid/ha/TxReplicator.h
+++ b/cpp/src/qpid/ha/TxReplicator.h
@@ -67,7 +67,8 @@ class TxReplicator : public QueueReplicator {
// QueueReplicator overrides
void route(broker::Deliverable& deliverable);
- void destroy();
+ using QueueReplicator::destroy;
+ void destroy(sys::Mutex::ScopedLock&);
protected:
diff --git a/cpp/src/qpid/legacystore/StorePlugin.cpp b/cpp/src/qpid/legacystore/StorePlugin.cpp
index f9b77ce02c..5cba10d0f9 100644
--- a/cpp/src/qpid/legacystore/StorePlugin.cpp
+++ b/cpp/src/qpid/legacystore/StorePlugin.cpp
@@ -45,7 +45,7 @@ struct StorePlugin : public Plugin {
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
store.reset(new MessageStoreImpl(broker));
- DataDir& dataDir = broker->getDataDir ();
+ const DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
diff --git a/cpp/src/qpid/legacystore/jrnl/wrfc.cpp b/cpp/src/qpid/legacystore/jrnl/wrfc.cpp
index 43461b66a3..64ee65f1ac 100644
--- a/cpp/src/qpid/legacystore/jrnl/wrfc.cpp
+++ b/cpp/src/qpid/legacystore/jrnl/wrfc.cpp
@@ -125,7 +125,8 @@ wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
u_int32_t fwd_dblks = subm_dblks + enq_dsize_dblks + _enq_cap_offs_dblks;
u_int16_t findex = _fc_index;
fcntl* fcp = _curr_fc;
- bool in_use = false;
+ bool in_use = false; // at least one file contains an enqueued record
+ bool overwrite = false; // reached the original journal file we started with
while (fwd_dblks && !(findex != _fc_index && fcp->enqcnt()))
{
fwd_dblks -= fwd_dblks > _fsize_dblks ? _fsize_dblks : fwd_dblks;
@@ -133,12 +134,13 @@ wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
{
if (++findex == _lpmp->num_jfiles())
findex = 0;
+ overwrite |= findex == _fc_index;
fcp = _lpmp->get_fcntlp(findex);
}
in_use |= fcp->enqcnt() > 0;
}
// Return true if threshold exceeded
- return findex != _fc_index && in_use;
+ return (findex != _fc_index && in_use) || overwrite;
}
bool wrfc::wr_reset()
diff --git a/cpp/src/qpid/linearstore/StorePlugin.cpp b/cpp/src/qpid/linearstore/StorePlugin.cpp
index 6dfb2056bf..d64b44b5e3 100644
--- a/cpp/src/qpid/linearstore/StorePlugin.cpp
+++ b/cpp/src/qpid/linearstore/StorePlugin.cpp
@@ -46,7 +46,7 @@ struct StorePlugin : public Plugin {
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
store.reset(new MessageStoreImpl(broker));
- DataDir& dataDir = broker->getDataDir ();
+ const DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
diff --git a/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 0bf8bef27f..59e1832cfd 100644
--- a/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -95,11 +95,17 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
return true;
} else {
deliveries.push_back(Delivery(nextId++));
- Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
- *out = &delivery;
- return true;
+ try {
+ Delivery& delivery = deliveries.back();
+ delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
+ delivery.send(sender, unreliable);
+ *out = &delivery;
+ return true;
+ } catch (const std::exception& e) {
+ deliveries.pop_back();
+ --nextId;
+ throw SendError(e.what());
+ }
}
} else {
return false;
diff --git a/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
index 512e71230b..5ff24e7d33 100644
--- a/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
+++ b/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
@@ -370,7 +370,7 @@ MSSqlClfsProvider::earlyInitialize(Plugin::Target &target)
// Check the store dir option; if not specified, need to
// grab the broker's data dir.
if (options.storeDir.empty()) {
- DataDir& dir = store->getBroker()->getDataDir();
+ const DataDir& dir = store->getBroker()->getDataDir();
if (dir.isEnabled()) {
options.storeDir = dir.getPath();
}
diff --git a/cpp/src/qpid/sys/MemoryMappedFile.h b/cpp/src/qpid/sys/MemoryMappedFile.h
index 43e4c852af..ecf38e98e6 100644
--- a/cpp/src/qpid/sys/MemoryMappedFile.h
+++ b/cpp/src/qpid/sys/MemoryMappedFile.h
@@ -38,11 +38,11 @@ class MemoryMappedFile {
/**
* Opens a file that can be mapped by region into memory
*/
- QPID_COMMON_EXTERN std::string open(const std::string& name, const std::string& directory);
+ QPID_COMMON_EXTERN void open(const std::string& name, const std::string& directory);
/**
* Closes and removes the file that can be mapped by region into memory
*/
- QPID_COMMON_EXTERN void close(const std::string& path);
+ QPID_COMMON_EXTERN void close();
/**
* Returns the page size
*/
diff --git a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
index f647ab3943..b4292aa4bc 100644
--- a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
+++ b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
@@ -29,13 +29,14 @@
namespace qpid {
namespace sys {
namespace {
+const std::string PAGEFILE_PREFIX("pf_");
const std::string PATH_SEPARATOR("/");
const std::string ESCAPE("%");
const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.");
std::string getFileName(const std::string& name, const std::string& dir)
{
std::stringstream filename;
- if (dir.size()) filename << dir << PATH_SEPARATOR;
+ if (dir.size()) filename << dir << PATH_SEPARATOR << PAGEFILE_PREFIX;
size_t start = 0;
while (true) {
size_t i = name.find_first_not_of(VALID, start);
@@ -55,31 +56,39 @@ std::string getFileName(const std::string& name, const std::string& dir)
class MemoryMappedFilePrivate
{
friend class MemoryMappedFile;
+ std::string path;
int fd;
MemoryMappedFilePrivate() : fd(0) {}
};
MemoryMappedFile::MemoryMappedFile() : state(new MemoryMappedFilePrivate) {}
MemoryMappedFile::~MemoryMappedFile() { delete state; }
-std::string MemoryMappedFile::open(const std::string& name, const std::string& directory)
+void MemoryMappedFile::open(const std::string& name, const std::string& directory)
{
- std::string path = getFileName(name, directory);
+ // Ensure directory exists
+ if ( ::mkdir(directory.c_str(), S_IRWXU | S_IRGRP | S_IXGRP )!=0 && errno!=EEXIST ) {
+ throw qpid::Exception(QPID_MSG("Failed to create memory mapped file directory " << directory << ": " << qpid::sys::strError(errno)));
+ }
+
+ state->path = getFileName(name, directory);
- int flags = O_CREAT | O_EXCL | O_RDWR;
- int fd = ::open(path.c_str(), flags, S_IRUSR | S_IWUSR);
- if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
+ int flags = O_CREAT | O_TRUNC | O_RDWR;
+ int fd = ::open(state->path.c_str(), flags, S_IRUSR | S_IWUSR);
+ if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << state->path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
state->fd = fd;
- return path;
}
-void MemoryMappedFile::close(const std::string& path)
+
+void MemoryMappedFile::close()
{
::close(state->fd);
- ::unlink(path.c_str());
+ ::unlink(state->path.c_str());
}
+
size_t MemoryMappedFile::getPageSize()
{
return ::sysconf(_SC_PAGE_SIZE);
}
+
char* MemoryMappedFile::map(size_t offset, size_t size)
{
int protection = PROT_READ | PROT_WRITE;
@@ -90,20 +99,24 @@ char* MemoryMappedFile::map(size_t offset, size_t size)
return region;
}
+
void MemoryMappedFile::unmap(char* region, size_t size)
{
::munmap(region, size);
}
+
void MemoryMappedFile::flush(char* region, size_t size)
{
::msync(region, size, MS_ASYNC);
}
+
void MemoryMappedFile::expand(size_t offset)
{
if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) {
throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno)));
}
}
+
bool MemoryMappedFile::isSupported()
{
return true;
diff --git a/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp b/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
index 9bb3fa2320..60b3df7da6 100644
--- a/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
+++ b/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
@@ -27,11 +27,10 @@ class MemoryMappedFilePrivate {};
MemoryMappedFile::MemoryMappedFile() : state(0) {}
MemoryMappedFile::~MemoryMappedFile() {}
-std::string MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/)
+void MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/)
{
- return std::string();
}
-void MemoryMappedFile::close(const std::string& /*path*/)
+void MemoryMappedFile::close()
{
}
size_t MemoryMappedFile::getPageSize()
diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py
index 66705e6d24..c9b2db64db 100755
--- a/cpp/src/tests/acl.py
+++ b/cpp/src/tests/acl.py
@@ -671,6 +671,118 @@ class ACLTests(TestBase010):
self.fail(result)
+ def test_illegal_pages_lower_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pageslowerlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pageslowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pageslowerlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pageslowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
+ def test_illegal_pages_upper_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagesupperlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pagesupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagesupperlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pagesupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
+ def test_illegal_pagefactor_lower_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorlowerlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pagefactorlowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorlowerlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pagefactorlowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
+ def test_illegal_pagefactor_upper_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorupperlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pagefactorupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorupperlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pagefactorupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
#=====================================
# ACL queue tests
#=====================================
@@ -687,6 +799,7 @@ class ACLTests(TestBase010):
aclf.write('acl deny bob@QPID purge queue name=q3\n')
aclf.write('acl deny bob@QPID delete queue name=q4\n')
aclf.write('acl deny bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
+ aclf.write('acl deny bob@QPID create queue name=q6 paging=true\n')
aclf.write('acl allow all all')
aclf.close()
@@ -739,6 +852,15 @@ class ACLTests(TestBase010):
try:
queue_options = {}
+ queue_options["qpid.paging"] = True
+ session.queue_declare(queue="q6", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=q6, qpid.paging=True");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
queue_options["qpid.max_count"] = 200
queue_options["qpid.max_size"] = 100
session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
@@ -972,6 +1094,107 @@ class ACLTests(TestBase010):
self.fail("ACL should allow queue delete request for q4");
#=====================================
+ # ACL paged tests
+ #=====================================
+
+ def test_paged_allow_mode(self):
+ """
+ Test cases for paged acl in allow mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=qf1 pageslowerlimit=1000\n')
+ aclf.write('acl deny bob@QPID create queue name=qf2 pagesupperlimit=100\n')
+ aclf.write('acl deny bob@QPID create queue name=qf3 pagefactorlowerlimit=10\n')
+ aclf.write('acl deny bob@QPID create queue name=qf4 pagefactorupperlimit=1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.max_pages_loaded"] = 500
+ session.queue_declare(queue="qf1", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf1, qpid.paging=True, qpid.max_pages_loaded=500");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.max_pages_loaded"] = 500
+ session.queue_declare(queue="qf2", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf2, qpid.paging=True, qpid.max_pages_loaded=500");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.page_factor"] = 5
+ session.queue_declare(queue="qf3", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf3, qpid.paging=True, qpid.page_factor=5");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.page_factor"] = 5
+ session.queue_declare(queue="qf4", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf4, qpid.paging=True, qpid.page_factor=5");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+
+ def test_paged_deny_mode(self):
+ """
+ Test cases for paged acl in deny mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID create queue name=qf1 pageslowerlimit=100 pagesupperlimit=1000\n')
+ aclf.write('acl allow bob@QPID create queue name=qf2 pagefactorlowerlimit=1 pagefactorupperlimit=10\n')
+ aclf.write('acl allow anonymous all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.max_pages_loaded"] = 500
+ session.queue_declare(queue="qf1", arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=qf1, qpid.paging=True, qpid.max_pages_loaded=500");
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.page_factor"] = 5
+ session.queue_declare(queue="qf2", arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=qf2, qpid.paging=True, qpid.page_factor=5");
+ session = self.get_session('bob','bob')
+
+
+ #=====================================
# ACL file tests
#=====================================
diff --git a/cpp/src/tests/run_paged_queue_tests b/cpp/src/tests/run_paged_queue_tests
index 8f72ddd5b7..cd1dc72641 100755
--- a/cpp/src/tests/run_paged_queue_tests
+++ b/cpp/src/tests/run_paged_queue_tests
@@ -26,7 +26,7 @@ trap stop_broker INT TERM QUIT
export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
start_broker() {
- QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker"
+ QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir --paging-dir=$PWD/pqtest_data $MODULES --auth no) || fail "Could not start broker"
}
stop_broker() {
diff --git a/doc/book/src/cpp-broker/Security.xml b/doc/book/src/cpp-broker/Security.xml
index 7bf7034996..00795a05d8 100644
--- a/doc/book/src/cpp-broker/Security.xml
+++ b/doc/book/src/cpp-broker/Security.xml
@@ -421,7 +421,11 @@ com.sun.security.jgss.initiate {
filemaxsizelowerlimit |
filemaxsizeupperlimit |
filemaxcountlowerlimit |
- filemaxcountupperlimit ]
+ filemaxcountupperlimit |
+ pageslowerlimit |
+ pagesupperlimit |
+ pagefactorlowerlimit |
+ pagefactorupperlimit ]
acl permission {<group-name>|<user-name>|"all"} {action|"all"} [object|"all"
[property=<property-value> ...]]
@@ -728,6 +732,12 @@ com.sun.security.jgss.initiate {
<entry>Indicates the presence of an <parameter>exclusive</parameter> flag</entry>
<entry>CREATE QUEUE, ACCESS QUEUE</entry>
</row>
+ <row>
+ <entry> <command>paging</command> </entry>
+ <entry>Boolean</entry>
+ <entry>Indicates if the queue is paging queue</entry>
+ <entry>CREATE QUEUE, ACCESS QUEUE</entry>
+ </row>
<row>
<entry> <command>type</command> </entry>
<entry>String</entry>
@@ -806,6 +816,30 @@ com.sun.security.jgss.initiate {
<entry>Maximum value for file.max_count (files)</entry>
<entry>CREATE QUEUE, ACCESS QUEUE</entry>
</row>
+ <row>
+ <entry> <command>pageslowerlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Minimum value for number of pages in memory of paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
+ <row>
+ <entry> <command>pagesupperlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Maximum value for number of pages in memory of paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
+ <row>
+ <entry> <command>pagefactorlowerlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Minimum value for size of one page in paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
+ <row>
+ <entry> <command>pagefactorupperlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Maximum value for size of one page in paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
</tbody>
</tgroup>
</table>
@@ -910,7 +944,7 @@ com.sun.security.jgss.initiate {
<row>
<entry>create</entry>
<entry>queue</entry>
- <entry>name alternate durable exclusive autodelete policy queuemaxsizelowerlimit queuemaxsizeupperlimit queuemaxcountlowerlimit queuemaxcountupperlimit filemaxsizelowerlimit filemaxsizeupperlimit filemaxcountlowerlimit filemaxcountupperlimit</entry>
+ <entry>name alternate durable exclusive autodelete policy queuemaxsizelowerlimit queuemaxsizeupperlimit queuemaxcountlowerlimit queuemaxcountupperlimit filemaxsizelowerlimit filemaxsizeupperlimit filemaxcountlowerlimit filemaxcountupperlimit paging pageslowerlimit pagesupperlimit pagefactorlowerlimit pagefactorupperlimit</entry>
<entry></entry>
</row>
<row>
diff --git a/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml b/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml
index 5b4ea42d2b..026213c157 100644
--- a/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml
+++ b/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml
@@ -1,4 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
+<!DOCTYPE entities [
+<!ENTITY % entities SYSTEM "commonEntities.xml">
+%entities;
+]>
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -25,29 +29,293 @@
<section id="Java-Broker-Configuring-And-Managing-JMX-Management-Introduction">
<title>Introduction</title>
- <para>
- The brokers JMX Management Plugin provides the support for creating JMX MBeans for broker objects such as Queues, Exchanges, Connections etc.
- </para>
- <para>
- It is included into the brokers Initial Configuration by default, and is responsible for servicing the RMI and JMX_RMI ports configured on the broker, with the former serving as the RMI Registry used to advertise the actual JMX Connector Server started on the latter.
- </para>
+ <para>The JMX management plugin provides a series of managed beans (MBeans) allowing you to
+ control and monitor the Broker via an industry compliant interface. This provides a
+ convenient intergration point for a variety of Infrastructure Monitoring Solutions,
+ tools such as Jconsole and VisualVM, as well as custom Java programs and scripts.</para>
+ <para>The following sections describe how to connect to JMX, the configuration of the JMX
+ plugin covering topis including securing with SSL, programmatically interacting with
+ Qpid MBeans and finally a summary of all the MBeans made available from by the
+ plugin.</para>
+ <important>
+ <para>For new development work, the reader is directed towards the strategic <link
+ linkend="Java-Broker-Configuring-And-Managing-Web-Console">Web Management
+ Console</link> and the <link
+ linkend="Java-Broker-Configuring-And-Managing-REST-API">REST API</link>. Use the
+ Web/REST interfaces in preference to JMX whenever possible. The JMX interface may be
+ withdrawn in a future release.</para>
+ </important>
</section>
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-DefaultConfiguration">
+ <title>Default Configuration</title>
+ <para>By default, the Broker is shipped with JMX enabled.</para>
+ <para>The RMI registry port runs on port <literal>8999</literal> and the JMX connector on
+ port <literal>9099</literal>. The connector is not SSL protected. Qpid will use the
+ <ulink
+ url="&oracleJdkDocUrl;java/lang/management/ManagementFactory.html#getPlatformMBeanServer()"
+ >Platform MBeanServer</ulink>.</para>
+ <para>To change these settings, use the <link
+ linkend="Java-Broker-Configuring-And-Managing-Web-Console">Web Management
+ interface</link>.</para>
+ </section>
+
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX">
+ <title>Connecting to JMX</title>
+ <para>The following example uses Jconsole to illustrates how to connect to JMX and assume
+ the defaults described above. Jconsole is a management tool that comes with the JDK. It
+ provides a very simple view of the MBeans, but requires no special configuration to be
+ used with Qpid.</para>
+ <para>For full details of Jconsole itself refer to Oracle's <ulink url="&oracleJconsole;"
+ >JConsole Guide</ulink>.</para>
+ <para>Jconsole can be used to connect to local or remote Java processes. On startup, it
+ presents a list of all the Java processes running on the local host and also allows you
+ to specify a service url to connect to a Java process running on a remote host.</para>
+ <para>To start Jconsole on the command line, type:</para>
+ <programlisting><![CDATA[jconsole]]></programlisting>
+ <section
+ id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX-Local">
+ <title>Local</title>
+ <para>To connect to a Broker running locally, simply select the process from the list.
+ You can identify the Broker by looking for its classname
+ <literal>org.apache.qpid.server.Main</literal>.</para>
+ </section>
+ <section
+ id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX-Remote">
+ <title>Remote</title>
+ <para>To connect to a broker running remotely, provide the hostname and port number of
+ the <emphasis>RMI registry port</emphasis> (e.g. <literal>hostname:8999</literal>)
+ and a valid username and password.</para>
+ <para>You can also provide a service url in the form
+ <literal>service:jmx:rmi:///jndi/rmi://hostname:8999/jmxrmi</literal></para>
+ <figure>
+ <title>Making a remote JMX connection to a Broker using jconsole</title>
+ <graphic fileref="images/JMX-Connect-Remote.png"/>
+ </figure>
+ </section>
+ <para>Once you are connected expand the tree of nodes marked
+ <literal>org.apache.qpid</literal> to begin to interact with the Qpid MBeans.</para>
+ <figure>
+ <title>Qpid MBean hierarchy</title>
+ <graphic fileref="images/JMX-Connect-MBeans.png"/>
+ </figure>
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX-SSL">
+ <title>Connecting to a remote Broker protected by SSL</title>
+ <para>If you are connecting to a remote Broker whose JMX connector port has been secured
+ with SSL certificate signed by a private CA (or a self-signed certificate), you will
+ need to pass a trust store and trust store password to Jconsole. If this is
+ required, start jconsole with the following options:</para>
+ <programlisting><![CDATA[jconsole -J-Djavax.net.ssl.trustStore=jmxtruststore.jks -J-Djavax.net.ssl.trustStorePassword=password]]></programlisting>
+ </section>
+ </section>
+
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Example-Client">
+ <title>Example JMX Client</title>
+ <para>The following java snippet illustrates a JMX client that connects to Qpid over JMX
+ passing a userid and password, looks up the <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedBroker&qpidSrcSuffix;"
+ >ManagedBroker</ulink> object corresponding to the <literal>myvhost</literal>
+ virtualhost, then invokes a method on the virtualhost to create a new queue.</para>
+ <para>A full introduction to custom JMX clients is beyond the scope of this book. For this
+ the reader is directed toward Oracle's <ulink url="&oracleJmxTutorial;">JMX
+ tutorial.</ulink></para>
+ <example id="Java-Broker-Configuring-And-Managing-JMX-Example-Client-Code">
+ <title>JMX Client illustrating the creation of a new queue</title>
+ <programlisting language="java">
+Map&lt;String, Object&lt; environment = new HashMap&lt;String, Object&gt;();
+environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","password"});
+// Connect to service
+JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
+JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
+MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
+// Object name for ManagedBroker mbean for virtualhost myvhost
+ObjectName objectName = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"myvhost\"");
+// Get the ManagedBroker object
+ManagedBroker managedBroker = JMX.newMBeanProxy(mbsc, objectName, ManagedBroker.class);;
+
+// Create the queue named "myqueue"
+managedBroker.createNewQueue("myqueue", null, true);</programlisting>
+ </example>
+ <para>The Qpid classes required for a custom JMX client are included in the
+ <literal>qpid-management-common</literal> artefact.</para>
+ </section>
- <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-Configuration">
- <title>JMX Management Plugin Configuration</title>
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-MBeans">
+ <title>The MBeans</title>
+ <para>The following table summarises the available MBeans. The MBeans are self-describing:
+ each attribute and operation carry a description describing their purpose. This
+ description is visible when using tools such Jconsole. They are also available on
+ Management interfaces themselves (linked below).</para>
+ <table>
+ <title>Qpid Broker MBeans</title>
+ <tgroup cols="2">
+ <thead>
+ <row>
+ <entry>Management Interface</entry>
+ <entry>Object Name</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedBroker&qpidSrcSuffix;"
+ >ManagedBroker</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost="<replaceable>virtualhostname</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean corresponding to the named virtualhost. Allows operations
+ such as the creation/deletion of queues and exchanges on that
+ virtualhost and virtualhost levell statistics.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedQueue&qpidSrcSuffix;"
+ >ManagedQueue</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.Queue,VirtualHost="<replaceable>virtualhostname</replaceable>",name="<replaceable>queuename</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean corresponding to the named queue on the given virtualhost.
+ Allows queue management operations such as view message, move
+ message and clear queue. Exposes attributes such as queue depth and
+ durability.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedExchange&qpidSrcSuffix;"
+ >ManagedExchange</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.Exchange,VirtualHost="<replaceable>virtualhostname</replaceable>",name="<replaceable>amq.direct</replaceable>",ExchangeType=<replaceable>type</replaceable></literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean corresponding to the named exchange on the given
+ virtualhost. Allows exchange management operations such as the
+ creation and removal of bindings. The supported exchange types are
+ exposed by the <literal>exchangeTypes</literal> attribute of the
+ virtualhost.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedConnection&qpidSrcSuffix;"
+ >ManagedConnection</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.Connection,VirtualHost="<replaceable>virtualhostname</replaceable>",name="<replaceable>/peerid:ephemeralport</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean representing a active AMQP connection to the named virtual
+ host. Name is formed from the IP and ephemeral port of the peer.
+ Attributes include the client version and connection level
+ statistics.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/UserManagement&qpidSrcSuffix;"
+ >UserManagement</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=UserManagement,name="UserManagement-<replaceable>authentication
+ manager name</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>When using <link
+ linkend="Java-Broker-Security-PlainPasswordFile-Provider">Plain
+ password provider</link> or <link
+ linkend="Java-Broker-Security-Base64MD5PasswordFile-Provider"
+ >Base 64 MD5 password provider</link>, permits user operations
+ such creation and deletion of users. and password changes.</para>
+ </entry>
- <para>
- The JMX Management Plugin can be configured through the <link linkend="Java-Broker-Configuring-And-Managing-Web-Console">Web Management Console</link>
- and underlying REST management interface. By double-clicking on the JMX Management Plugin name in the object tree a tab for the plugin
- is displayed with its current settings, which can be changed by clicking on the "Edit" button.
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ServerInformation&qpidSrcSuffix;"
+ >ServerInformation</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=ServerInformation,name=ServerInformation</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>Exposes broker wide statistics, product version number and JMX
+ management API version number.</para>
+ </entry>
- The following attributes can be set on the JMX Management Plugin:
- <itemizedlist>
- <listitem><para><emphasis>Use Platform MBean Server</emphasis>. The JMX Management Plugin can start its own MBean Server or it can use the JVMs 'Platform MBean Server'.
- By default this is true, and the Platform MBean Server is used.</para></listitem>
- </itemizedlist>
- NOTE: Changes to the "Use Platform MBean Server" attribute only take effect at broker restart.
- </para>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/LoggingManagement&qpidSrcSuffix;"
+ >LoggingManagement</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=LoggingManagement,name=LoggingManagement</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean permitting control of the Broker's logging. Exposes
+ operations allow the logging level to be controlled at runtime
+ (without restarting the Broker) and others that allow changes to be
+ written back to the log4j.xml logging configuration file, or the
+ contents of the log4.xml file to be re-read at runtime.</para>
+ </entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
</section>
</section>
diff --git a/doc/book/src/java-broker/Java-Broker-Ports.xml b/doc/book/src/java-broker/Java-Broker-Ports.xml
index c322045336..37d675eacc 100644
--- a/doc/book/src/java-broker/Java-Broker-Ports.xml
+++ b/doc/book/src/java-broker/Java-Broker-Ports.xml
@@ -63,7 +63,7 @@
</para>
<para>
- SSL can be enabled forPorts with protocols that support it by selecting the 'SSL' transport, at which
+ SSL can be enabled for Ports with protocols that support it by selecting the 'SSL' transport, at which
point a configured <link linkend="Java-Broker-SSL-Keystore">KeyStore</link> must also be selected for the Port.
</para>
@@ -83,7 +83,7 @@
</important>
<important>
- Following deletion of an active Port, the port remains bound until the Broker is restarted. You should restart the broker
+ Following deletion of an active HTTP or JMX Port, the port remains bound until the Broker is restarted. You should restart the broker
immediately if you require preventing new connections on the port or disconnecting existing clients.
</important>
diff --git a/doc/book/src/java-broker/commonEntities.xml b/doc/book/src/java-broker/commonEntities.xml
index 69edde91e9..2e7a181d65 100644
--- a/doc/book/src/java-broker/commonEntities.xml
+++ b/doc/book/src/java-broker/commonEntities.xml
@@ -36,6 +36,7 @@
<!ENTITY oracleJdkDocUrl "http://docs.oracle.com/javase/6/docs/api/">
<!ENTITY oracleJeeDocUrl "http://docs.oracle.com/javaee/6/api/">
<!ENTITY oracleKeytool "http://docs.oracle.com/javase/6/docs/technotes/tools/solaris/keytool.html">
+<!ENTITY oracleJconsole "http://java.sun.com/javase/6/docs/technotes/guides/management/jconsole.html">
<!-- Oracle BDB JE-->
<!ENTITY oracleJeDownloadUrl "http://www.oracle.com/technetwork/products/berkeleydb/downloads/index.html?ssSourceSiteId=ocomen">
@@ -44,3 +45,8 @@
<!ENTITY oracleBdbJavaDocUrl "http://docs.oracle.com/cd/E17277_02/html/java/">
<!ENTITY oracleBdbProductVersion "5.0.97">
+<!ENTITY oracleJmxTutorial "http://docs.oracle.com/javase/tutorial/jmx/">
+
+<!ENTITY qpidSrc "http://svn.apache.org/viewvc/qpid/trunk/qpid/java/">
+<!ENTITY qpidManagementCommonSrc "&qpidSrc;management/common/src/main/java/">
+<!ENTITY qpidSrcSuffix ".java?view=co">
diff --git a/doc/book/src/java-broker/images/JMX-Connect-MBeans.png b/doc/book/src/java-broker/images/JMX-Connect-MBeans.png
new file mode 100644
index 0000000000..f766197166
--- /dev/null
+++ b/doc/book/src/java-broker/images/JMX-Connect-MBeans.png
Binary files differ
diff --git a/doc/book/src/java-broker/images/JMX-Connect-Remote.png b/doc/book/src/java-broker/images/JMX-Connect-Remote.png
new file mode 100644
index 0000000000..5fcf9dd497
--- /dev/null
+++ b/doc/book/src/java-broker/images/JMX-Connect-Remote.png
Binary files differ
diff --git a/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml b/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml
index d7dc481888..87abbb8bfb 100644
--- a/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml
+++ b/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml
@@ -226,7 +226,7 @@ public class StocksExample {
Message message = session.createMessage();
message.setStringProperty("instrument", "IBM");
- message.setStringProperty("price", "100");
+ message.setIntProperty("price", 100);
messageProducer.send(message);
session.commit();
diff --git a/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml b/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml
index 4da75875a8..7113831f04 100644
--- a/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml
+++ b/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml
@@ -136,7 +136,7 @@ amqp://username:password@clientid/test
</screen>
</example>
<para>For full details see <xref linkend="JMS-Client-0-8-Connection-URL"/></para>
- <note>Note, that a single broker failover is enabled by default. If the failover behaviour is not desired it can be switched off
+ <note><para>Note, that a single broker failover is enabled by default. If the failover behaviour is not desired it can be switched off
by setting a failover option to <emphasis>nofailover</emphasis> as in the example below
<example>
<title>Connection URL configured with nofailover</title>
@@ -145,7 +145,7 @@ amqp://username:password@clientid/test
?brokerlist='tcp://localhost:15672?failover='nofailover']]>
</screen>
</example>
- </note>
+ </para></note>
<!-- TODO perhaps mention ConnectionListener?-->
</section>
<section id="JMS-Client-0-8-Client-Understanding-Connection-Heartbeating">
@@ -284,7 +284,7 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
>MessageConsumer#receive()</ulink> for Consumer B on the same Session, the application
will hang indefinitely as even if messages suitable for B arrive at the Broker. Those
messages can never be sent to the Session as no space is available in prefetch. </para>
- <note>Please note, when the acknowlegement mode <emphasis>Session#SESSION_TRANSACTED</emphasis>
+ <note><para>Please note, when the acknowlegement mode <emphasis>Session#SESSION_TRANSACTED</emphasis>
or <emphasis>Session#CLIENT_ACKNOWLEDGE</emphasis> is set on a consuming session,
the prefetched messages are released from the prefetch buffer on transaction commit/rollback
(in case of acknowledgement mode <emphasis>Session#SESSION_TRANSACTED</emphasis> )
@@ -294,7 +294,7 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
the prefetched messages continue to remain in the prefetch buffer preventing the delivery of the following messages.
As result, the application might stop the receiving of the messages
until the transaction is committed/rolled back (for <emphasis>Session#SESSION_TRANSACTED</emphasis> )
- or received messages are acknowledged (for <emphasis>Session#CLIENT_ACKNOWLEDGE</emphasis>).</note>
+ or received messages are acknowledged (for <emphasis>Session#CLIENT_ACKNOWLEDGE</emphasis>).</para></note>
</section>
<section id="JMS-Client-0-8-Client-Understanding-Session-TemporaryQueues">
<title>TemporaryQueues</title>
@@ -379,10 +379,10 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
linkend="JMS-Client-0-8-System-Properties-DefaultMandatoryTopic"
><literal>qpid.default_mandatory_topic</literal></link> for Queues and Topics
respectively.</para>
- <note>Please note, according to AMQP specifications the mandatory flag on a message tells the server
+ <note><para>Please note, according to AMQP specifications the mandatory flag on a message tells the server
how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a
Return method. If this flag is zero, the server silently drops the message. Please, refer <ulink url="&amqpSrc;">AMQP specifications</ulink>
- for more details.</note>
+ for more details.</para></note>
</section>
<section id="JMS-Client-0-8-Client-Understanding-MessageProducer-CloseWhenNoRoute">
<title>Close When No Route</title>
@@ -508,8 +508,10 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
<literal>server</literal>.</para>
<para>See <ulink
url="&qpidJavaBrokerBook;Java-Broker-Runtime-Handling-Undeliverable-Messages.html#Java-Broker-Runtime-Handling-Undeliverable-Messages-Maximum-Delivery-Count"
- > Unhandling Undeliverable Messages</ulink> within the Java Broker book for full details
- of the functioning of this feature.</para>
+ > Handling Undeliverable Messages</ulink> within the Java Broker book for full details of
+ the functioning of this feature.</para>
+ <note><para>The optional JMS message header <literal>JMSXDeliveryCount</literal> is <emphasis>not</emphasis>
+ supported.</para></note>
</section>
</section>
<section id="JMS-Client-0-8-Client-Understanding-Destinations">
diff --git a/tests/src/py/qpid_tests/broker_0_10/exchange.py b/tests/src/py/qpid_tests/broker_0_10/exchange.py
index 315991d585..53de37f12c 100644
--- a/tests/src/py/qpid_tests/broker_0_10/exchange.py
+++ b/tests/src/py/qpid_tests/broker_0_10/exchange.py
@@ -456,6 +456,12 @@ class HeadersExchangeTests(TestHelper):
self.myBasicPublish({"irrelevant":0})
self.assertEmpty(self.q)
+ def testMultipleBindings(self):
+ self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="SomeKey", arguments={ 'x-match':'any', "name":"fred"})
+ self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="AnotherKey", arguments={ 'x-match':'all', "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.assertEmpty(self.q)
+
class MiscellaneousErrorsTests(TestHelper):
"""
diff --git a/tools/src/py/qpidstore/janal.py b/tools/src/py/qpidstore/janal.py
index 1f89207b4d..231b283c05 100644
--- a/tools/src/py/qpidstore/janal.py
+++ b/tools/src/py/qpidstore/janal.py
@@ -215,9 +215,18 @@ class TxnMap(object):
def _abort(self, xid):
"""Perform an abort operation for the given xid record"""
- for fid, hdr, lock in self.__map[xid]:
+ for _, hdr, _ in self.__map[xid]:
if isinstance(hdr, jrnl.DeqRec):
- self.__emap.unlock(hdr.deq_rid)
+ try:
+ self.__emap.unlock(hdr.deq_rid)
+ except jerr.NonExistentRecordError as err: # Not in emap, look in current transaction op list (TPL)
+ found_rid = False
+ for _, hdr1, _ in self.__map[xid]:
+ if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid:
+ found_rid = True
+ break
+ if not found_rid: # Not found in current transaction op list, re-throw error
+ raise err
del self.__map[xid]
def _commit(self, xid):