summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-02-04 11:13:42 +0000
committerKeith Wall <kwall@apache.org>2014-02-04 11:13:42 +0000
commita2ff3ed02e5c94c7b669a1be79505d831c017a19 (patch)
treed4191f515195280cf9083dbfb7da8853a8d2a53c
parentee9879e950b2721a996483e5346449ddad284db4 (diff)
parent6a6d0ce4b10814f4c535acc071aec0f5cf991037 (diff)
downloadqpid-python-a2ff3ed02e5c94c7b669a1be79505d831c017a19.tar.gz
NO-JIRA: Merge changes from the trunk in revisions 1562452:1564250 using
svn merge -r 1562452:1564250 https://svn.apache.org/repos/asf/qpid/trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1564254 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/AMQP_1.07
-rw-r--r--qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1102
-rw-r--r--qpid/cpp/design_docs/broker-acl-work.txt24
-rw-r--r--qpid/cpp/src/qpid/DataDir.cpp2
-rw-r--r--qpid/cpp/src/qpid/DataDir.h6
-rw-r--r--qpid/cpp/src/qpid/Exception.cpp12
-rw-r--r--qpid/cpp/src/qpid/acl/AclData.cpp14
-rw-r--r--qpid/cpp/src/qpid/acl/AclValidator.cpp16
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/AclModule.h29
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h8
-rw-r--r--qpid/cpp/src/qpid/broker/DtxTimeout.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/PagedQueue.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/PagedQueue.h1
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFactory.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h8
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandlerObserver.h51
-rw-r--r--qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp30
-rw-r--r--qpid/cpp/src/qpid/ha/ErrorListener.h60
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp46
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h2
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp14
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.h6
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp45
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h17
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.cpp21
-rw-r--r--qpid/cpp/src/qpid/ha/TxReplicator.h3
-rw-r--r--qpid/cpp/src/qpid/legacystore/StorePlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp6
-rw-r--r--qpid/cpp/src/qpid/linearstore/StorePlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp16
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp2
-rw-r--r--qpid/cpp/src/qpid/sys/MemoryMappedFile.h4
-rw-r--r--qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp31
-rw-r--r--qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp5
-rwxr-xr-xqpid/cpp/src/tests/acl.py223
-rwxr-xr-xqpid/cpp/src/tests/run_paged_queue_tests2
-rw-r--r--qpid/doc/book/src/cpp-broker/Security.xml38
-rw-r--r--qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml306
-rw-r--r--qpid/doc/book/src/java-broker/Java-Broker-Ports.xml4
-rw-r--r--qpid/doc/book/src/java-broker/commonEntities.xml6
-rw-r--r--qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.pngbin0 -> 66586 bytes
-rw-r--r--qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.pngbin0 -> 66411 bytes
-rw-r--r--qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml2
-rw-r--r--qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml18
-rw-r--r--qpid/java/amqp-1-0-client-websocket/pom.xml20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java63
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java62
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java67
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java88
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java18
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java25
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java42
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java170
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java48
-rw-r--r--qpid/java/broker-plugins/management-http/pom.xml20
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java4
-rw-r--r--qpid/java/broker-plugins/websocket/pom.xml20
-rw-r--r--qpid/java/build.deps18
-rw-r--r--qpid/java/ivy.nexus.xml6
-rw-r--r--qpid/java/ivy.retrieve.xml18
-rw-r--r--qpid/java/jca/build.xml2
-rw-r--r--qpid/java/jca/example/build-geronimo-properties.xml3
-rw-r--r--qpid/java/jca/pom.xml4
-rw-r--r--qpid/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml (renamed from qpid/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml)4
-rw-r--r--qpid/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/lib/poms/jetty-security-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-security-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/lib/poms/jetty-server-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-server-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml (renamed from qpid/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml)2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java24
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py6
-rw-r--r--qpid/tools/src/py/qpidstore/janal.py13
86 files changed, 1515 insertions, 622 deletions
diff --git a/qpid/cpp/AMQP_1.0 b/qpid/cpp/AMQP_1.0
index ef478c7a15..f5db31b8f5 100644
--- a/qpid/cpp/AMQP_1.0
+++ b/qpid/cpp/AMQP_1.0
@@ -164,7 +164,8 @@ warning is logged.
If the node is an exchange, then an outgoing link (i.e. messages to
travel out from broker) will cause a temporary, link-scoped queue to
be created on the broker and bound to the exchange. [See section on
-'Topics' below]
+'Topics' below]. The name of the queue will be of the form
+<container-id>_<link-name>.
Outgoing links may have a filter set on their source. The filters
currently supported by the broker are 'legacy-amqp-direct-binding',
@@ -272,7 +273,9 @@ trigger the establishment of the link. This gives the peer some means
of ensuring their expectations will be met.
The 'shared' capability allows subscriptions from an exchange to be
-shared by multiple receivers.
+shared by multiple receivers. Where this is specified the subscription
+queue created takes the name of the link (and does not include the
+container id).
The 'durable' capability will be added if the queue or exchange
refered to by the source or target is durable. The 'queue' capability
diff --git a/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1 b/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1
index e62e5e8bc3..4874899b7f 100644
--- a/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1
+++ b/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1
@@ -55,6 +55,10 @@
# - If a directory "looks like" is has already had CMake run in it
# then this script skips running CMake again.
#
+# * User chooses to include Proton or not.
+#
+# - Proton is included by having variable PROTON_ROOT reference the
+# directory where proton was installed.
#
# Prerequisites
#
@@ -143,6 +147,7 @@ $ErrorActionPreference='Stop'
#
$global:txtPath = '$env:PATH'
$global:txtQR = '$env:QPID_BUILD_ROOT'
+$global:txtPR = '$env:PROTON_ROOT'
$global:txtWH = 'Write-Host'
#############################
@@ -218,6 +223,37 @@ function SanityCheckBoostPath ($path=0)
#############################
+# SanityCheckProtonInstallPath
+# A path is a "proton install path" if it contains
+# both bin and include subdirectories.
+#
+function SanityCheckProtonInstallPath ($path=0)
+{
+ $result = $true
+ $displayPath = ""
+
+ if ($path -ne $null) {
+ $displayPath = $path
+
+ $toTest = ('include', 'bin')
+ foreach ($pattern in $toTest) {
+ $target = Join-Path $path $pattern
+ if (!(Test-Path -path $target)) {
+ $result = $false
+ }
+ }
+ } else {
+ $result = $false
+ }
+
+ if (! $result) {
+ Write-Host "The path ""$displayPath"" does not appear to be a Proton install root path."
+ }
+ $result
+}
+
+
+#############################
# SanityCheckBuildPath
# A path is a "build path" if it contains
# various subdirectories.
@@ -264,14 +300,16 @@ function WriteDotnetBindingSlnLauncherPs1
[string] $nBits,
[string] $outfileName,
[string] $studioVersion,
- [string] $studioSubdir
+ [string] $studioSubdir,
+ [string] $protonRoot
)
$out = @("#
# Launch $slnName in $studioVersion $vsPlatform ($nBits-bit) environment
#
-$global:txtPath = ""$boostRoot\lib;$global:txtPath""
+$global:txtPath = ""$protonRoot\bin;$boostRoot\lib;$global:txtPath""
$global:txtQR = ""$buildRoot""
+$global:txtPR = ""$protonRoot""
$global:txtWH ""Launch $slnName in $studioVersion $vsPlatform ($nBits-bit) environment.""
$cppDir\bindings\qpid\dotnet\$vsSubdir\$slnName
")
@@ -328,7 +366,8 @@ function WriteDotnetBindingEnvSetupBat
[string] $outfileName,
[string] $studioVersion,
[string] $studioSubdir,
- [string] $cmakeLine
+ [string] $cmakeLine,
+ [string] $protonRoot
)
$out = @("@ECHO OFF
@@ -344,8 +383,9 @@ REM The solution was generated with cmake command line:
REM $cmakeLine
ECHO %PATH% | FINDSTR /I boost > NUL
IF %ERRORLEVEL% EQU 0 ECHO WARNING: Boost is defined in your path multiple times!
-SET PATH=$boostRoot\lib;%PATH%
+SET PATH=$protonRoot\bin;$boostRoot\lib;%PATH%
SET QPID_BUILD_ROOT=$buildRoot
+SET PROTON_ROOT=$protonRoot
ECHO Environment set for $slnName $studioVersion $vsPlatform $nBits-bit development.
")
Write-Host " $buildRoot\$outfileName"
@@ -441,7 +481,7 @@ function SelectVisualStudioVersion {
SelectVisualStudioVersion
#############################
-# User dialog to get optional 32-bit boost and build paths
+# User dialog to get optional 32-bit boost, proton, and build paths
#
$boost32 = Select-Folder -message "Select 32-bit BOOST_ROOT folder for $global:vsVersion build. Press CANCEL to skip 32-bit processing."
@@ -456,6 +496,18 @@ if ($defined32) {
$make32 = $false
if ($defined32) {
+ $proton32folder = Select-Folder -message "Select 32-bit Proton install folder for $global:vsVersion build."
+
+ $found = ($proton32folder -ne $null) -and ($proton32folder -ne '')
+ if ($found) {
+ $found = SanityCheckProtonInstallPath $proton32folder
+ }
+ if ($found) {
+ $proton32cmake = """-DPROTON_ROOT=$proton32folder"""
+ } else {
+ $proton32cmake = ""
+ }
+
$build32 = Select-Folder -message "Select 32-bit QPID_BUILD_ROOT folder for $global:vsVersion build." -path $projRoot
$found = ($build32 -ne $null) -and ($build32 -ne '')
@@ -472,7 +524,7 @@ if ($defined32) {
}
#############################
-# User dialog to get optional 64-bit boost and build paths
+# User dialog to get optional 64-bit boost, proton, and build paths
#
$boost64 = Select-Folder -message "Select 64-bit BOOST_ROOT folder for $global:vsVersion build. Press CANCEL to skip 64-bit processing."
@@ -486,6 +538,18 @@ if ($defined64) {
$make64 = $false
if ($defined64) {
+ $proton64folder = Select-Folder -message "Select 64-bit Proton install folder for $global:vsVersion build."
+
+ $found = ($proton64folder -ne $null) -and ($proton64folder -ne '')
+ if ($found) {
+ $found = SanityCheckProtonInstallPath $proton64folder
+ }
+ if ($found) {
+ $proton64cmake = """-DPROTON_ROOT=$proton64folder"""
+ } else {
+ $proton64cmake = ""
+ }
+
$build64 = Select-Folder -message "Select 64-bit QPID_BUILD_ROOT folder for $global:vsVersion build." -path $projRoot
$found = ($build64 -ne $null) -and ($build64 -ne '')
@@ -508,10 +572,9 @@ if ($defined64) {
#
if ($make32) {
cd "$build32"
- Write-Host "Running 32-bit CMake in $build32 ..."
- $global:cmakeCommandLine32 = "CMake -G ""$global:cmakeGenerator"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_x86"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost32"" $cppDir"
- Write-Host "$global:cmakeCommadLine32"
- CMake -G "$global:cmakeGenerator" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_x86" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost32" $cppDir
+ $global:cmakeCommandLine32 = "CMake -G ""$global:cmakeGenerator"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_$build32"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost32"" $proton32cmake $cppDir"
+ Write-Host "Running 32-bit CMake in $build32 : $global:cmakeCommandLine32"
+ CMake -G "$global:cmakeGenerator" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_$build32" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost32" $proton32cmake $cppDir
} else {
Write-Host "Skipped 32-bit CMake."
}
@@ -522,9 +585,10 @@ if ($make32) {
if ($make64) {
cd "$build64"
Write-Host "Running 64-bit CMake in $build64"
- $global:cmakeCommandLine64 = "CMake -G ""$global:cmakeGenerator Win64"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_x64"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost64"" $cppDir"
- Write-Host "$global:cmakeCommadLine64"
- CMake -G "$global:cmakeGenerator Win64" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_x64" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost64" $cppDir
+ $global:cmakeCommandLine64 = "CMake -G ""$global:cmakeGenerator Win64"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_$build64"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost64"" $proton64cmake $cppDir"
+ Write-Host $global:cmakeCommandLine64
+ Write-Host ""
+ CMake -G "$global:cmakeGenerator Win64" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_$build64" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost64" $proton64cmake $cppDir
} else {
Write-Host "Skipped 64-bit CMake."
}
@@ -549,7 +613,8 @@ if ($defined32) {
-nBits "32" `
-outfileName "start-devenv-messaging-$global:vsSubdir-x86-32bit.ps1" `
-studioVersion "$global:vsVersion" `
- -studioSubdir "$global:vsSubdir"
+ -studioSubdir "$global:vsSubdir" `
+ -protonRoot "$proton32folder"
###########
@@ -577,7 +642,8 @@ if ($defined32) {
-outfileName "setenv-messaging-$global:vsSubdir-x86-32bit.bat" `
-studioVersion "$global:vsVersion" `
-studioSubdir "$global:vsSubdir" `
- -cmakeLine "$global:cmakeCommandLine32"
+ -cmakeLine "$global:cmakeCommandLine32" `
+ -protonRoot "$proton32folder"
} else {
Write-Host "Skipped writing 32-bit scripts."
@@ -615,7 +681,8 @@ if ($defined64) {
-psScriptName "start-devenv-messaging-$global:vsSubdir-x64-64bit.ps1" `
-outfileName "start-devenv-messaging-$global:vsSubdir-x64-64bit.bat" `
-studioVersion "$global:vsVersion" `
- -studioSubdir "$global:vsSubdir"
+ -studioSubdir "$global:vsSubdir" `
+ -protonRoot "$proton64folder"
###########
# Batch script (that you CALL from a command prompt)
@@ -629,7 +696,8 @@ if ($defined64) {
-outfileName "setenv-messaging-$global:vsSubdir-x64-64bit.bat" `
-studioVersion "$global:vsVersion" `
-studioSubdir "$global:vsSubdir" `
- -cmakeLine "$global:cmakeCommandLine64"
+ -cmakeLine "$global:cmakeCommandLine64" `
+ -protonRoot "$proton64folder"
} else {
Write-Host "Skipped writing 64-bit scripts."
diff --git a/qpid/cpp/design_docs/broker-acl-work.txt b/qpid/cpp/design_docs/broker-acl-work.txt
index e89e446a56..e587dc5198 100644
--- a/qpid/cpp/design_docs/broker-acl-work.txt
+++ b/qpid/cpp/design_docs/broker-acl-work.txt
@@ -28,16 +28,20 @@ in memory and on disk.
* Add property limit settings to CREATE QUEUE Acl rules.
-User Option Acl Limit Property Units
---------------- ---------------------- ---------------
-qpid.max_size queuemaxsizelowerlimit bytes
- queuemaxsizeupperlimit bytes
-qpid.max_count queuemaxcountlowerlimit messages
- queuemaxcountupperlimit messages
-qpid.file_size filemaxsizelowerlimit pages (64Kb per page)
- filemaxsizeupperlimit pages (64Kb per page)
-qpid.file_count filemaxcountlowerlimit files
- filemaxcountupperlimit files
+User Option Acl Limit Property Units
+--------------- ---------------------- ---------------
+qpid.max_size queuemaxsizelowerlimit bytes
+ queuemaxsizeupperlimit bytes
+qpid.max_count queuemaxcountlowerlimit messages
+ queuemaxcountupperlimit messages
+qpid.file_size filemaxsizelowerlimit pages (64Kb per page)
+ filemaxsizeupperlimit pages (64Kb per page)
+qpid.file_count filemaxcountlowerlimit files
+ filemaxcountupperlimit files
+qpid.max_pages_loaded pageslowerlimit pages
+ pagesupperlimit pages
+qpid.page_factor pagefactorlowerlimit integer (multiple of the platform-defined page size)
+ pagefactorlowerlimit integer (multiple of the platform-defined page size)
* Change rule match behavior to accomodate limit settings
diff --git a/qpid/cpp/src/qpid/DataDir.cpp b/qpid/cpp/src/qpid/DataDir.cpp
index cfdb536d2c..546df3dabd 100644
--- a/qpid/cpp/src/qpid/DataDir.cpp
+++ b/qpid/cpp/src/qpid/DataDir.cpp
@@ -26,7 +26,7 @@
namespace qpid {
-DataDir::DataDir (std::string path) :
+DataDir::DataDir (const std::string& path) :
enabled (!path.empty ()),
dirPath (path)
{
diff --git a/qpid/cpp/src/qpid/DataDir.h b/qpid/cpp/src/qpid/DataDir.h
index 828299f3ba..ec73d28796 100644
--- a/qpid/cpp/src/qpid/DataDir.h
+++ b/qpid/cpp/src/qpid/DataDir.h
@@ -42,11 +42,11 @@ class DataDir
public:
- QPID_COMMON_EXTERN DataDir (std::string path);
+ QPID_COMMON_EXTERN DataDir (const std::string& path);
QPID_COMMON_EXTERN ~DataDir ();
- bool isEnabled() { return enabled; }
- const std::string& getPath() { return dirPath; }
+ bool isEnabled() const { return enabled; }
+ const std::string& getPath() const { return dirPath; }
};
} // namespace qpid
diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp
index a6696f06e1..999c2aeb52 100644
--- a/qpid/cpp/src/qpid/Exception.cpp
+++ b/qpid/cpp/src/qpid/Exception.cpp
@@ -45,16 +45,20 @@ Exception::Exception(const std::string& msg) throw() : message(msg) {
Exception::~Exception() throw() {}
-std::string Exception::getPrefix() const { return ""; }
+std::string Exception::getPrefix() const { return std::string(); }
std::string Exception::getMessage() const { return message; }
+namespace { const std::string COLON(": "); }
+
const char* Exception::what() const throw() {
// Construct the what string the first time it is needed.
if (whatStr.empty()) {
- whatStr = getPrefix();
- if (!whatStr.empty()) whatStr += ": ";
- whatStr += message;
+ if (message.compare(0, getPrefix().size(), getPrefix()) == 0 || // Already has prefix
+ getPrefix().empty()) // No prefix
+ whatStr = message;
+ else
+ whatStr = getPrefix() + COLON + message;
}
return whatStr.c_str();
}
diff --git a/qpid/cpp/src/qpid/acl/AclData.cpp b/qpid/cpp/src/qpid/acl/AclData.cpp
index a205ddabfc..48b5e462e5 100644
--- a/qpid/cpp/src/qpid/acl/AclData.cpp
+++ b/qpid/cpp/src/qpid/acl/AclData.cpp
@@ -160,6 +160,16 @@ bool AclData::lookupMatchRule(
// as rule's index.
propertyMapItr lookupParamItr;
switch (rulePropMapItr->first) {
+ case acl::SPECPROP_MAXPAGESLOWERLIMIT:
+ case acl::SPECPROP_MAXPAGESUPPERLIMIT:
+ lookupParamItr = params->find(PROP_MAXPAGES);
+ break;
+
+ case acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT:
+ case acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT:
+ lookupParamItr = params->find(PROP_MAXPAGEFACTOR);
+ break;
+
case acl::SPECPROP_MAXQUEUECOUNTUPPERLIMIT:
case acl::SPECPROP_MAXQUEUECOUNTLOWERLIMIT:
lookupParamItr = params->find(PROP_MAXQUEUECOUNT);
@@ -201,6 +211,8 @@ bool AclData::lookupMatchRule(
case acl::SPECPROP_MAXQUEUESIZEUPPERLIMIT:
case acl::SPECPROP_MAXFILECOUNTUPPERLIMIT:
case acl::SPECPROP_MAXFILESIZEUPPERLIMIT:
+ case acl::SPECPROP_MAXPAGESUPPERLIMIT:
+ case acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT:
limitChecked &=
compareInt(
rulePropMapItr->first,
@@ -213,6 +225,8 @@ bool AclData::lookupMatchRule(
case acl::SPECPROP_MAXQUEUESIZELOWERLIMIT:
case acl::SPECPROP_MAXFILECOUNTLOWERLIMIT:
case acl::SPECPROP_MAXFILESIZELOWERLIMIT:
+ case acl::SPECPROP_MAXPAGESLOWERLIMIT:
+ case acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT:
limitChecked &=
compareInt(
rulePropMapItr->first,
diff --git a/qpid/cpp/src/qpid/acl/AclValidator.cpp b/qpid/cpp/src/qpid/acl/AclValidator.cpp
index a077667a33..89e072000e 100644
--- a/qpid/cpp/src/qpid/acl/AclValidator.cpp
+++ b/qpid/cpp/src/qpid/acl/AclValidator.cpp
@@ -110,6 +110,22 @@ namespace acl {
boost::shared_ptr<PropertyType>(
new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+ validators.insert(Validator(acl::SPECPROP_MAXPAGESLOWERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
+ validators.insert(Validator(acl::SPECPROP_MAXPAGESUPPERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
+ validators.insert(Validator(acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
+ validators.insert(Validator(acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT,
+ boost::shared_ptr<PropertyType>(
+ new IntPropertyType(0,std::numeric_limits<int64_t>::max()))));
+
std::string policyTypes[] = {"ring", "self-destruct", "reject"};
std::vector<std::string> v(policyTypes, policyTypes + sizeof(policyTypes) / sizeof(std::string));
validators.insert(Validator(acl::SPECPROP_POLICYTYPE,
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 5eedafc77b..43f39c2919 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -94,8 +94,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
catch(const SessionException& e) {
- QPID_LOG(error, "Execution exception: " << e.what());
- executionException(e.code, e.what()); // Let subclass handle this first.
+ executionException(e.code, e.what());
framing::AMQP_AllProxy::Execution execution(channel);
AMQMethodBody* m = f.getMethod();
SequenceNumber commandId;
@@ -105,16 +104,13 @@ void SessionHandler::handleIn(AMQFrame& f) {
sendDetach();
}
catch(const ChannelException& e){
- QPID_LOG(error, "Channel exception: " << e.what());
- channelException(e.code, e.what()); // Let subclass handle this first.
+ channelException(e.code, e.what());
peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
- QPID_LOG(error, "Connection exception: " << e.what());
connectionException(e.code, e.getMessage());
}
catch(const std::exception& e) {
- QPID_LOG(error, "Unexpected exception: " << e.what());
connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what());
}
}
@@ -186,13 +182,14 @@ void SessionHandler::detach(const std::string& name) {
}
void SessionHandler::detached(const std::string& /*name*/, uint8_t code) {
- // Special case for detached: Don't check if we are
- // attached. Checking can lead to an endless game of "detached
- // tennis" on federated brokers.
awaitingDetached = false;
+ // Special case for detached: Don't throw if we are not attached. Doing so
+ // can lead to an endless game of "detached tennis" on federated brokers.
+ if (!getState()) return; // Already detached.
if (code != session::DETACH_CODE_NORMAL) {
sendReady = receiveReady = false;
- channelException(convert(code), "session.detached from peer.");
+ channelException(convert(code), Msg() << "Channel " << channel.get()
+ << " received session.detached from peer");
} else {
handleDetach();
}
diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h
index c01697ace9..aa0ea0c6b0 100644
--- a/qpid/cpp/src/qpid/broker/AclModule.h
+++ b/qpid/cpp/src/qpid/broker/AclModule.h
@@ -77,6 +77,9 @@ namespace acl {
PROP_SCHEMAPACKAGE,
PROP_SCHEMACLASS,
PROP_POLICYTYPE,
+ PROP_PAGING,
+ PROP_MAXPAGES,
+ PROP_MAXPAGEFACTOR,
PROP_MAXQUEUESIZE,
PROP_MAXQUEUECOUNT,
PROP_MAXFILESIZE,
@@ -100,6 +103,7 @@ namespace acl {
SPECPROP_SCHEMAPACKAGE = PROP_SCHEMAPACKAGE,
SPECPROP_SCHEMACLASS = PROP_SCHEMACLASS,
SPECPROP_POLICYTYPE = PROP_POLICYTYPE,
+ SPECPROP_PAGING = PROP_PAGING,
SPECPROP_MAXQUEUESIZELOWERLIMIT,
SPECPROP_MAXQUEUESIZEUPPERLIMIT,
@@ -108,7 +112,11 @@ namespace acl {
SPECPROP_MAXFILESIZELOWERLIMIT,
SPECPROP_MAXFILESIZEUPPERLIMIT,
SPECPROP_MAXFILECOUNTLOWERLIMIT,
- SPECPROP_MAXFILECOUNTUPPERLIMIT };
+ SPECPROP_MAXFILECOUNTUPPERLIMIT,
+ SPECPROP_MAXPAGESLOWERLIMIT,
+ SPECPROP_MAXPAGESUPPERLIMIT,
+ SPECPROP_MAXPAGEFACTORLOWERLIMIT,
+ SPECPROP_MAXPAGEFACTORUPPERLIMIT };
// AclResult shared between ACL spec and ACL authorise interface
enum AclResult {
@@ -229,6 +237,9 @@ namespace acl {
if (str.compare("schemapackage") == 0) return PROP_SCHEMAPACKAGE;
if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS;
if (str.compare("policytype") == 0) return PROP_POLICYTYPE;
+ if (str.compare("paging") == 0) return PROP_PAGING;
+ if (str.compare("maxpages") == 0) return PROP_MAXPAGES;
+ if (str.compare("maxpagefactor") == 0) return PROP_MAXPAGEFACTOR;
if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE;
if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT;
if (str.compare("maxfilesize") == 0) return PROP_MAXFILESIZE;
@@ -249,6 +260,9 @@ namespace acl {
case PROP_SCHEMAPACKAGE: return "schemapackage";
case PROP_SCHEMACLASS: return "schemaclass";
case PROP_POLICYTYPE: return "policytype";
+ case PROP_PAGING: return "paging";
+ case PROP_MAXPAGES: return "maxpages";
+ case PROP_MAXPAGEFACTOR: return "maxpagefactor";
case PROP_MAXQUEUESIZE: return "maxqueuesize";
case PROP_MAXQUEUECOUNT: return "maxqueuecount";
case PROP_MAXFILESIZE: return "maxfilesize";
@@ -270,6 +284,7 @@ namespace acl {
if (str.compare("schemapackage") == 0) return SPECPROP_SCHEMAPACKAGE;
if (str.compare("schemaclass") == 0) return SPECPROP_SCHEMACLASS;
if (str.compare("policytype") == 0) return SPECPROP_POLICYTYPE;
+ if (str.compare("paging") == 0) return SPECPROP_PAGING;
if (str.compare("queuemaxsizelowerlimit") == 0) return SPECPROP_MAXQUEUESIZELOWERLIMIT;
if (str.compare("queuemaxsizeupperlimit") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT;
if (str.compare("queuemaxcountlowerlimit") == 0) return SPECPROP_MAXQUEUECOUNTLOWERLIMIT;
@@ -278,6 +293,10 @@ namespace acl {
if (str.compare("filemaxsizeupperlimit") == 0) return SPECPROP_MAXFILESIZEUPPERLIMIT;
if (str.compare("filemaxcountlowerlimit") == 0) return SPECPROP_MAXFILECOUNTLOWERLIMIT;
if (str.compare("filemaxcountupperlimit") == 0) return SPECPROP_MAXFILECOUNTUPPERLIMIT;
+ if (str.compare("pageslowerlimit") == 0) return SPECPROP_MAXPAGESLOWERLIMIT;
+ if (str.compare("pagesupperlimit") == 0) return SPECPROP_MAXPAGESUPPERLIMIT;
+ if (str.compare("pagefactorlowerlimit") == 0) return SPECPROP_MAXPAGEFACTORLOWERLIMIT;
+ if (str.compare("pagefactorupperlimit") == 0) return SPECPROP_MAXPAGEFACTORUPPERLIMIT;
// Allow old names in ACL file as aliases for newly-named properties
if (str.compare("maxqueuesize") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT;
if (str.compare("maxqueuecount") == 0) return SPECPROP_MAXQUEUECOUNTUPPERLIMIT;
@@ -297,6 +316,7 @@ namespace acl {
case SPECPROP_SCHEMAPACKAGE: return "schemapackage";
case SPECPROP_SCHEMACLASS: return "schemaclass";
case SPECPROP_POLICYTYPE: return "policytype";
+ case SPECPROP_PAGING: return "paging";
case SPECPROP_MAXQUEUESIZELOWERLIMIT: return "queuemaxsizelowerlimit";
case SPECPROP_MAXQUEUESIZEUPPERLIMIT: return "queuemaxsizeupperlimit";
case SPECPROP_MAXQUEUECOUNTLOWERLIMIT: return "queuemaxcountlowerlimit";
@@ -305,6 +325,10 @@ namespace acl {
case SPECPROP_MAXFILESIZEUPPERLIMIT: return "filemaxsizeupperlimit";
case SPECPROP_MAXFILECOUNTLOWERLIMIT: return "filemaxcountlowerlimit";
case SPECPROP_MAXFILECOUNTUPPERLIMIT: return "filemaxcountupperlimit";
+ case SPECPROP_MAXPAGESLOWERLIMIT: return "pageslowerlimit";
+ case SPECPROP_MAXPAGESUPPERLIMIT: return "pagesupperlimit";
+ case SPECPROP_MAXPAGEFACTORLOWERLIMIT: return "pagefactorlowerlimit";
+ case SPECPROP_MAXPAGEFACTORUPPERLIMIT: return "pagefactorupperlimit";
default: assert(false); // should never get here
}
return "";
@@ -381,6 +405,9 @@ namespace acl {
p4->insert(PROP_EXCLUSIVE);
p4->insert(PROP_AUTODELETE);
p4->insert(PROP_POLICYTYPE);
+ p4->insert(PROP_PAGING);
+ p4->insert(PROP_MAXPAGES);
+ p4->insert(PROP_MAXPAGEFACTOR);
p4->insert(PROP_MAXQUEUESIZE);
p4->insert(PROP_MAXQUEUECOUNT);
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 975acfdb87..4017fdbfe3 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -217,7 +217,9 @@ Broker::Broker(const Broker::Options& conf) :
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
- pagingDir(conf.pagingDir),
+ pagingDir(!conf.pagingDir.empty() ? conf.pagingDir :
+ dataDir.isEnabled() ? dataDir.getPath() + Options::DEFAULT_PAGED_QUEUE_DIR :
+ std::string() ),
queues(this),
exchanges(this),
links(this),
@@ -385,11 +387,6 @@ Broker::Broker(const Broker::Options& conf) :
}
}
-std::string Broker::getPagingDirectoryPath()
-{
- return pagingDir.isEnabled() ? pagingDir.getPath() : dataDir.getPath();
-}
-
void Broker::declareStandardExchange(const std::string& name, const std::string& type)
{
bool storeEnabled = store.get() != NULL;
@@ -1304,6 +1301,9 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE));
params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy()));
+ params.insert(make_pair(acl::PROP_PAGING, settings.paging ? _TRUE : _FALSE));
+ params.insert(make_pair(acl::PROP_MAXPAGES, boost::lexical_cast<string>(settings.maxPages)));
+ params.insert(make_pair(acl::PROP_MAXPAGEFACTOR, boost::lexical_cast<string>(settings.pageFactor)));
params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount())));
params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize())));
params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount)));
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 4e46b71cc9..4bad8f2960 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -38,6 +38,7 @@
#include "qpid/broker/System.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/broker/BrokerObservers.h"
#include "qpid/management/Manageable.h"
#include "qpid/sys/ConnectionCodec.h"
@@ -86,6 +87,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
struct Options : public qpid::Options {
static const std::string DEFAULT_DATA_DIR_LOCATION;
static const std::string DEFAULT_DATA_DIR_NAME;
+ static const std::string DEFAULT_PAGED_QUEUE_DIR;
QPID_BROKER_EXTERN Options(const std::string& name="Broker Options");
@@ -178,6 +180,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir dataDir;
DataDir pagingDir;
ConnectionObservers connectionObservers;
+ SessionHandlerObservers sessionHandlerObservers;
BrokerObservers brokerObservers;
QueueRegistry queues;
@@ -237,11 +240,11 @@ class Broker : public sys::Runnable, public Plugin::Target,
ExchangeRegistry& getExchanges() { return exchanges; }
LinkRegistry& getLinks() { return links; }
DtxManager& getDtxManager() { return dtxManager; }
- DataDir& getDataDir() { return dataDir; }
+ const DataDir& getDataDir() { return dataDir; }
+ const DataDir& getPagingDir() { return pagingDir; }
Options& getOptions() { return config; }
ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; }
- std::string getPagingDirectoryPath();
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
@@ -360,6 +363,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+ SessionHandlerObservers& getSessionHandlerObservers() { return sessionHandlerObservers; }
BrokerObservers& getBrokerObservers() { return brokerObservers; }
/** Properties to be set on outgoing link connections */
diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
index 58700846ef..f317df5713 100644
--- a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
+++ b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/sys/Time.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
@@ -31,5 +32,6 @@ DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _
void DtxTimeout::fire()
{
+ QPID_LOG(debug, "DTX transaction timeouted, XID=" << xid << ", timeout=" << timeout);
mgr.timedout(xid);
}
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 19c7f107f6..585c7ba764 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -302,7 +302,12 @@ void HeadersExchange::route(Deliverable& msg)
if (p.get()) {
for (std::vector<BoundKey>::const_iterator i = p->begin(); i != p->end(); ++i) {
if (match(i->args, msg.getMessage())) {
- b->push_back(i->binding);
+ /* check if a binding tothe same queue has not been already added to b */
+ std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> >::iterator bi = b->begin();
+ while ((bi != b->end()) && ((*bi)->queue != i->binding->queue))
+ ++bi;
+ if (bi == b->end())
+ b->push_back(i->binding);
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.cpp b/qpid/cpp/src/qpid/broker/PagedQueue.cpp
index 43208d74ee..afb330489b 100644
--- a/qpid/cpp/src/qpid/broker/PagedQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/PagedQueue.cpp
@@ -91,13 +91,16 @@ PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, u
: name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0),
expiryPolicy(e)
{
- path = file.open(name, directory);
- QPID_LOG(debug, "PagedQueue[" << path << "]");
+ if (directory.empty()) {
+ throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified"));
+ }
+ file.open(name, directory);
+ QPID_LOG(debug, "PagedQueue[" << name << "]");
}
PagedQueue::~PagedQueue()
{
- file.close(path);
+ file.close();
}
size_t PagedQueue::size()
@@ -133,7 +136,7 @@ bool PagedQueue::deleted(const QueueCursor& cursor)
void PagedQueue::publish(const Message& added)
{
if (encodedSize(added) > pageSize) {
- QPID_LOG(error, "Message is larger than page size for queue " << name << ", backed by " << path);
+ QPID_LOG(error, "Message is larger than page size for queue " << name);
throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name));
}
Used::reverse_iterator i = used.rbegin();
@@ -143,7 +146,7 @@ void PagedQueue::publish(const Message& added)
}
//used is empty or last page is full, need to add a new page
if (!newPage(added.getSequence()).add(added)) {
- QPID_LOG(error, "Could not add message to paged queue " << name << ", backed by " << path);
+ QPID_LOG(error, "Could not add message to paged queue " << name);
throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name));
}
}
@@ -388,14 +391,14 @@ void PagedQueue::load(Page& page)
}
page.load(file, protocols, expiryPolicy);
++loaded;
- QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
+ QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded");
}
void PagedQueue::unload(Page& page)
{
page.unload(file);
--loaded;
- QPID_LOG(debug, "PagedQueue[" << path << "] unloaded page, " << loaded << " pages now loaded");
+ QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded");
}
diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.h b/qpid/cpp/src/qpid/broker/PagedQueue.h
index cb83fa9f34..e4c98f4119 100644
--- a/qpid/cpp/src/qpid/broker/PagedQueue.h
+++ b/qpid/cpp/src/qpid/broker/PagedQueue.h
@@ -79,7 +79,6 @@ class PagedQueue : public Messages {
qpid::sys::MemoryMappedFile file;
std::string name;
- std::string path;
const size_t pageSize;
const uint maxLoaded;
ProtocolRegistry& protocols;
diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
index e60349edfb..08988ed4ac 100644
--- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp
@@ -76,8 +76,10 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
QPID_LOG(warning, "Cannot create paged queue without broker context");
} else if (!qpid::sys::MemoryMappedFile::isSupported()) {
QPID_LOG(warning, "Cannot create paged queue; memory mapped file support not available on this platform");
+ } else if ( !broker->getPagingDir().isEnabled() ) {
+ QPID_LOG(warning, "Cannot create paged queue; no paging directory enabled");
} else {
- queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDirectoryPath(),
+ queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getPagingDir().getPath(),
settings.maxPages ? settings.maxPages : 4,
settings.pageFactor ? settings.pageFactor : 1,
broker->getProtocolRegistry(), broker->getExpiryPolicy()));
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index a8f5734af7..3d5faf2dab 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -33,11 +33,35 @@ using namespace framing;
using namespace std;
using namespace qpid::sys;
+namespace {
+class DefaultErrorListener : public SessionHandler::ErrorListener {
+ public:
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, "Connection exception: " << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, "Channel exception: " << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, "Execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, "Incoming execution exception: " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {}
+
+ private:
+};
+}
+
SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch)
: qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
-{}
+ proxy(out),
+ errorListener(boost::shared_ptr<ErrorListener>(new DefaultErrorListener()))
+{
+ c.getBroker().getSessionHandlerObservers().newSessionHandler(*this);
+}
SessionHandler::~SessionHandler()
{
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index cb81084014..ee6baed0b6 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -21,7 +21,7 @@
* under the License.
*
*/
-
+#include "qpid/broker/BrokerImportExport.h"
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
@@ -76,8 +76,8 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- amqp_0_10::Connection& getConnection();
- const amqp_0_10::Connection& getConnection() const;
+ QPID_BROKER_EXTERN amqp_0_10::Connection& getConnection();
+ QPID_BROKER_EXTERN const amqp_0_10::Connection& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
@@ -86,7 +86,7 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
- void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
+ QPID_BROKER_EXTERN void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
// Called by SessionAdapter
void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg);
diff --git a/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h
new file mode 100644
index 0000000000..6d0ea16254
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h
@@ -0,0 +1,51 @@
+#ifndef QPID_BROKER_SESSIONHANDLEROBSERVER_H
+#define QPID_BROKER_SESSIONHANDLEROBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Observers.h"
+
+namespace qpid {
+namespace broker {
+class SessionHandler;
+
+/**
+ * Observer of session handler events.
+ */
+class SessionHandlerObserver
+{
+ public:
+ virtual ~SessionHandlerObserver() {}
+ virtual void newSessionHandler(SessionHandler&) {}
+};
+
+
+class SessionHandlerObservers : public Observers<SessionHandlerObserver> {
+ public:
+ void newSessionHandler(SessionHandler& sh) {
+ each(boost::bind(&SessionHandlerObserver::newSessionHandler, _1, boost::ref(sh)));
+ }
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_SESSIONHANDLEROBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp b/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
index 9e463fa32d..9e118c19f7 100644
--- a/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
+++ b/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp
@@ -27,6 +27,7 @@ namespace broker {
const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("/tmp");
const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("/.qpidd");
+const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("/pq");
std::string
Broker::Options::getHome() {
diff --git a/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp b/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
index b65440b5ad..260bd7c073 100644
--- a/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
+++ b/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp
@@ -27,6 +27,7 @@ namespace broker {
const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("\\TEMP");
const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("\\QPIDD.DATA");
+const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("\\PQ");
std::string
Broker::Options::getHome() {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index b1faf19e52..7928b6ab71 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -172,34 +172,29 @@ Variant::Map asMapVoid(const Variant& value) {
}
} // namespace
-// Listens for errors on the bridge session.
-class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+// Report errors on the broker replication session.
+class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener {
public:
- ErrorListener(const std::string& lp, BrokerReplicator& br) :
- logPrefix(lp), brokerReplicator(br) {}
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
- void connectionException(framing::connection::CloseCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
}
- void channelException(framing::session::DetachCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
}
- void executionException(framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
}
-
- void incomingExecutionException(
- framing::execution::ErrorCode, const std::string& msg) {
- QPID_LOG(error, logPrefix << "Incoming execution error: " << msg);
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
}
-
void detach() {
QPID_LOG(debug, logPrefix << "Session detached.");
}
private:
std::string logPrefix;
- BrokerReplicator& brokerReplicator;
};
/** Keep track of queues or exchanges during the update process to solve 2
@@ -328,8 +323,7 @@ void BrokerReplicator::initialize() {
boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
assert(result.second);
- result.first->setErrorListener(
- boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
+ result.first->setErrorListener(boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
broker.getConnectionObservers().add(shared_from_this());
}
diff --git a/qpid/cpp/src/qpid/ha/ErrorListener.h b/qpid/cpp/src/qpid/ha/ErrorListener.h
new file mode 100644
index 0000000000..1ae2078a11
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ErrorListener.h
@@ -0,0 +1,60 @@
+#ifndef QPID_HA_ERRORLISTENER_H
+#define QPID_HA_ERRORLISTENER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace ha {
+
+/** Default ErrorListener for HA module */
+class ErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(error, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ERRORLISTENER_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 3bb51b1813..b4d50d1652 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -33,6 +33,7 @@
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionHandlerObserver.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary& primary;
};
+class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
+ public:
+ PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+};
+
+class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
+ public:
+ PrimarySessionHandlerObserver(const std::string& logPrefix)
+ : errorListener(new PrimaryErrorListener(logPrefix)) {}
+ void newSessionHandler(broker::SessionHandler& sh) {
+ BrokerInfo info;
+ // Suppress error logging for backup connections
+ // TODO aconway 2014-01-31: Be more selective, suppress only expected errors?
+ if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) {
+ sh.setErrorListener(errorListener);
+ }
+ }
+ private:
+ boost::shared_ptr<PrimaryErrorListener> errorListener;
+};
+
+
} // namespace
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
haBroker(hb), membership(hb.getMembership()),
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
+ sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
queueLimits(logPrefix)
{
// Note that at this point, we are still rejecting client connections.
@@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
brokerObserver.reset(new PrimaryBrokerObserver(*this));
haBroker.getBroker().getBrokerObservers().add(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver);
+
checkReady(); // Outside lock
// Allow client connections
@@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
Primary::~Primary() {
if (timerTask) timerTask->cancel();
haBroker.getBroker().getBrokerObservers().remove(brokerObserver);
+ haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver);
haBroker.getObserver()->reset();
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 2e32515c9a..af368bca0f 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -42,6 +42,7 @@ class Queue;
class Connection;
class ConnectionObserver;
class BrokerObserver;
+class SessionHandlerObserver;
class TxBuffer;
class DtxBuffer;
}
@@ -152,6 +153,7 @@ class Primary : public Role
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::BrokerObserver> brokerObserver;
+ boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver;
boost::intrusive_ptr<sys::TimerTask> timerTask;
ReplicaMap replicas;
TxMap txMap;
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index dc5bf15911..c94ced7024 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -98,7 +98,8 @@ PrimaryTxObserver::PrimaryTxObserver(
replicationTest(hb.getSettings().replicateDefault.get()),
txBuffer(tx),
id(true),
- exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str())
+ exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
+ empty(true)
{
logPrefix = "Primary transaction "+shortStr(id)+": ";
@@ -149,6 +150,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m)
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m));
checkState(SENDING, "Too late for enqueue");
+ empty = false;
enqueues[q] += m.getReplicationId();
txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message());
txQueue->deliver(m);
@@ -162,12 +164,9 @@ void PrimaryTxObserver::dequeue(
checkState(SENDING, "Too late for dequeue");
if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues.
QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id));
+ empty = false;
txQueue->deliver(TxDequeueEvent(q->getName(), id).message());
}
- else {
- QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: "
- << LogMessageId(*q, pos, id));
- }
}
namespace {
@@ -221,8 +220,10 @@ void PrimaryTxObserver::commit() {
}
void PrimaryTxObserver::rollback() {
- QPID_LOG(debug, logPrefix << "Rollback");
Mutex::ScopedLock l(lock);
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (state != ENDED) {
txQueue->deliver(TxRollbackEvent().message());
end(l);
@@ -287,7 +288,6 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) {
Mutex::ScopedLock l(lock);
types::Uuid backup = rs.getBrokerInfo().getSystemId();
- QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup);
// Normally the backup should be completed before it is cancelled.
if (completed(backup, l)) error(backup, "Unexpected disconnect:", l);
// Break the pointer cycle if backups have completed and we are done with txBuffer.
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
index 105fee4d40..5b7c2e3e93 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
@@ -55,8 +55,9 @@ class Primary;
* A TxReplicator on the backup replicates the tx-queue and creates
* a TxBuffer on the backup equivalent to the one on the primary.
*
- * Also observes the tx-queue for prepare-complete messages and
- * subscription cancellations.
+ * Creates an exchange to receive prepare-ok/prepare-fail messages from backups.
+ *
+ * Monitors for tx-queue subscription cancellations.
*
* THREAD SAFE: called in user connection thread for TX events,
* and in backup connection threads for prepare-completed events
@@ -122,6 +123,7 @@ class PrimaryTxObserver : public broker::TransactionObserver,
QueueIdsMap enqueues;
UuidSet backups; // All backups of transaction.
UuidSet incomplete; // Incomplete backups (not yet responded to prepare)
+ bool empty; // True if the transaction is empty - no enqueues/dequeues.
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 507df6ea5a..59b2013f59 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
}
+// Debug log expected exceptions on queue replicator, check incoming execution
+// exceptions for "deleted on primary" conditions.
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
: queueReplicator(qr), logPrefix(qr->logPrefix) {}
- void connectionException(framing::connection::CloseCode, const std::string&) {}
- void channelException(framing::session::DetachCode, const std::string&) {}
- void executionException(framing::execution::ErrorCode, const std::string&) {}
-
- void incomingExecutionException(ErrorCode e, const std::string& msg) {
- queueReplicator->incomingExecutionException(e, msg);
+ void connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+ }
+ void channelException(framing::session::DetachCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+ }
+ void executionException(framing::execution::ErrorCode code, const std::string& msg) {
+ QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+ }
+ void incomingExecutionException(ErrorCode code, const std::string& msg) {
+ if (!queueReplicator->deletedOnPrimary(code, msg))
+ QPID_LOG(error, logPrefix << "Incoming "
+ << framing::createSessionException(code, msg).what());
}
void detach() {
QPID_LOG(debug, logPrefix << "Session detached");
@@ -197,20 +206,25 @@ void QueueReplicator::disconnect() {
// Called from Queue::destroyed()
void QueueReplicator::destroy() {
+ QPID_LOG(debug, logPrefix << "Destroyed");
boost::shared_ptr<Bridge> bridge2; // To call outside of lock
{
Mutex::ScopedLock l(lock);
if (!queue) return; // Already destroyed
- QPID_LOG(debug, logPrefix << "Destroyed");
bridge2 = bridge; // call close outside the lock.
- // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
- queue.reset();
- bridge.reset();
- getBroker()->getExchanges().destroy(getName());
+ destroy(l);
}
if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.
}
+void QueueReplicator::destroy(Mutex::ScopedLock&) {
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
+}
+
+
// Called in a broker connection thread when the bridge is created.
// Note: called with the Link lock held.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) {
@@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) {
nextId = decodeStr<IdEvent>(data).id;
}
-void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
+bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) {
if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
// If the queue is destroyed at the same time we are subscribing, we may
// get a not-found or resource-deleted exception before the
// BrokerReplicator gets the queue-delete event. Shut down the bridge by
// calling destroy(), we can let the BrokerReplicator delete the queue
// when the queue-delete arrives.
- QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg);
+ QPID_LOG(debug, logPrefix << "Deleted on primary: "
+ << framing::createSessionException(e, msg).what());
destroy();
+ return true;
}
- else
- QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg);
+ return false;
}
// Unused Exchange methods.
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 22cd13a0a8..f94c6de116 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -46,11 +46,13 @@ class HaBroker;
class Settings;
/**
- * Exchange created on a backup broker to replicate a queue on the primary.
+ * Exchange created on a backup broker to receive replicated messages and
+ * replication events from a queue on the primary. It subscribes to the primary
+ * queue via a ReplicatingSubscription on the primary by passing special
+ * arguments to the subscribe command.
*
- * Puts replicated messages on the local queue, handles dequeue events.
- * Creates a ReplicatingSubscription on the primary by passing special
- * arguments to the consume command.
+ * It puts replicated messages on the local replica queue and handles dequeue
+ * events by removing local messages.
*
* THREAD SAFE: Called in different connection threads.
*/
@@ -74,7 +76,7 @@ class QueueReplicator : public broker::Exchange,
void disconnect(); // Called when we are disconnected from the primary.
- std::string getType() const;
+ virtual std::string getType() const;
void route(broker::Deliverable&);
@@ -101,7 +103,9 @@ class QueueReplicator : public broker::Exchange,
void initialize(); // Called as part of create()
virtual void deliver(const broker::Message&);
+
virtual void destroy(); // Called when the queue is destroyed.
+ virtual void destroy(sys::Mutex::ScopedLock&);
sys::Mutex lock;
HaBroker& haBroker;
@@ -124,8 +128,7 @@ class QueueReplicator : public broker::Exchange,
void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&);
void idEvent(const std::string& data, sys::Mutex::ScopedLock&);
- void incomingExecutionException(framing::execution::ErrorCode e,
- const std::string& msg);
+ bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg);
std::string logPrefix;
std::string bridgeName;
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
index 7ff03b5f92..d2a647ae8f 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp
@@ -87,7 +87,7 @@ TxReplicator::TxReplicator(
QueueReplicator(hb, txQueue, link),
store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
channel(link->nextChannel()),
- ended(false),
+ empty(true), ended(false),
dequeueState(hb.getBroker().getQueues())
{
string id(getTxId(txQueue->getName()));
@@ -151,6 +151,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) {
decodeStr(data, e);
QPID_LOG(trace, logPrefix << "Enqueue: " << e);
enq = e;
+ empty = false;
}
void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
@@ -163,6 +164,7 @@ void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) {
// prepared, then they are all receieved before the prepare event.
// We collect the events here so we can do a single scan of the queue in prepare.
dequeueState.add(e);
+ empty = false;
}
void TxReplicator::DequeueState::add(const TxDequeueEvent& event) {
@@ -227,7 +229,9 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) {
void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) {
if (!txBuffer) return;
- QPID_LOG(debug, logPrefix << "Rollback");
+ // Don't bleat about rolling back empty transactions, this happens all the time
+ // when a session closes and rolls back its outstanding transaction.
+ if (!empty) QPID_LOG(debug, logPrefix << "Rollback");
if (context.get()) store->abort(*context);
txBuffer->rollback();
end(l);
@@ -255,15 +259,12 @@ void TxReplicator::end(sys::Mutex::ScopedLock&) {
}
// Called when the tx queue is deleted.
-void TxReplicator::destroy() {
- {
- sys::Mutex::ScopedLock l(lock);
- if (!ended) {
- QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback.");
- rollback(string(), l);
- }
+void TxReplicator::destroy(sys::Mutex::ScopedLock& l) {
+ if (!ended) {
+ if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback");
+ rollback(string(), l);
}
- QueueReplicator::destroy();
+ QueueReplicator::destroy(l);
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h
index 7f1256699a..5c509d14a7 100644
--- a/qpid/cpp/src/qpid/ha/TxReplicator.h
+++ b/qpid/cpp/src/qpid/ha/TxReplicator.h
@@ -67,7 +67,8 @@ class TxReplicator : public QueueReplicator {
// QueueReplicator overrides
void route(broker::Deliverable& deliverable);
- void destroy();
+ using QueueReplicator::destroy;
+ void destroy(sys::Mutex::ScopedLock&);
protected:
diff --git a/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp b/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp
index f9b77ce02c..5cba10d0f9 100644
--- a/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp
+++ b/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp
@@ -45,7 +45,7 @@ struct StorePlugin : public Plugin {
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
store.reset(new MessageStoreImpl(broker));
- DataDir& dataDir = broker->getDataDir ();
+ const DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
diff --git a/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp b/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp
index 43461b66a3..64ee65f1ac 100644
--- a/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp
+++ b/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp
@@ -125,7 +125,8 @@ wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
u_int32_t fwd_dblks = subm_dblks + enq_dsize_dblks + _enq_cap_offs_dblks;
u_int16_t findex = _fc_index;
fcntl* fcp = _curr_fc;
- bool in_use = false;
+ bool in_use = false; // at least one file contains an enqueued record
+ bool overwrite = false; // reached the original journal file we started with
while (fwd_dblks && !(findex != _fc_index && fcp->enqcnt()))
{
fwd_dblks -= fwd_dblks > _fsize_dblks ? _fsize_dblks : fwd_dblks;
@@ -133,12 +134,13 @@ wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const
{
if (++findex == _lpmp->num_jfiles())
findex = 0;
+ overwrite |= findex == _fc_index;
fcp = _lpmp->get_fcntlp(findex);
}
in_use |= fcp->enqcnt() > 0;
}
// Return true if threshold exceeded
- return findex != _fc_index && in_use;
+ return (findex != _fc_index && in_use) || overwrite;
}
bool wrfc::wr_reset()
diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
index 6dfb2056bf..d64b44b5e3 100644
--- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
+++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp
@@ -46,7 +46,7 @@ struct StorePlugin : public Plugin {
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
store.reset(new MessageStoreImpl(broker));
- DataDir& dataDir = broker->getDataDir ();
+ const DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 0bf8bef27f..59e1832cfd 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -95,11 +95,17 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
return true;
} else {
deliveries.push_back(Delivery(nextId++));
- Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
- delivery.send(sender, unreliable);
- *out = &delivery;
- return true;
+ try {
+ Delivery& delivery = deliveries.back();
+ delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
+ delivery.send(sender, unreliable);
+ *out = &delivery;
+ return true;
+ } catch (const std::exception& e) {
+ deliveries.pop_back();
+ --nextId;
+ throw SendError(e.what());
+ }
}
} else {
return false;
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
index 512e71230b..5ff24e7d33 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
@@ -370,7 +370,7 @@ MSSqlClfsProvider::earlyInitialize(Plugin::Target &target)
// Check the store dir option; if not specified, need to
// grab the broker's data dir.
if (options.storeDir.empty()) {
- DataDir& dir = store->getBroker()->getDataDir();
+ const DataDir& dir = store->getBroker()->getDataDir();
if (dir.isEnabled()) {
options.storeDir = dir.getPath();
}
diff --git a/qpid/cpp/src/qpid/sys/MemoryMappedFile.h b/qpid/cpp/src/qpid/sys/MemoryMappedFile.h
index 43e4c852af..ecf38e98e6 100644
--- a/qpid/cpp/src/qpid/sys/MemoryMappedFile.h
+++ b/qpid/cpp/src/qpid/sys/MemoryMappedFile.h
@@ -38,11 +38,11 @@ class MemoryMappedFile {
/**
* Opens a file that can be mapped by region into memory
*/
- QPID_COMMON_EXTERN std::string open(const std::string& name, const std::string& directory);
+ QPID_COMMON_EXTERN void open(const std::string& name, const std::string& directory);
/**
* Closes and removes the file that can be mapped by region into memory
*/
- QPID_COMMON_EXTERN void close(const std::string& path);
+ QPID_COMMON_EXTERN void close();
/**
* Returns the page size
*/
diff --git a/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
index f647ab3943..b4292aa4bc 100644
--- a/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
@@ -29,13 +29,14 @@
namespace qpid {
namespace sys {
namespace {
+const std::string PAGEFILE_PREFIX("pf_");
const std::string PATH_SEPARATOR("/");
const std::string ESCAPE("%");
const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.");
std::string getFileName(const std::string& name, const std::string& dir)
{
std::stringstream filename;
- if (dir.size()) filename << dir << PATH_SEPARATOR;
+ if (dir.size()) filename << dir << PATH_SEPARATOR << PAGEFILE_PREFIX;
size_t start = 0;
while (true) {
size_t i = name.find_first_not_of(VALID, start);
@@ -55,31 +56,39 @@ std::string getFileName(const std::string& name, const std::string& dir)
class MemoryMappedFilePrivate
{
friend class MemoryMappedFile;
+ std::string path;
int fd;
MemoryMappedFilePrivate() : fd(0) {}
};
MemoryMappedFile::MemoryMappedFile() : state(new MemoryMappedFilePrivate) {}
MemoryMappedFile::~MemoryMappedFile() { delete state; }
-std::string MemoryMappedFile::open(const std::string& name, const std::string& directory)
+void MemoryMappedFile::open(const std::string& name, const std::string& directory)
{
- std::string path = getFileName(name, directory);
+ // Ensure directory exists
+ if ( ::mkdir(directory.c_str(), S_IRWXU | S_IRGRP | S_IXGRP )!=0 && errno!=EEXIST ) {
+ throw qpid::Exception(QPID_MSG("Failed to create memory mapped file directory " << directory << ": " << qpid::sys::strError(errno)));
+ }
+
+ state->path = getFileName(name, directory);
- int flags = O_CREAT | O_EXCL | O_RDWR;
- int fd = ::open(path.c_str(), flags, S_IRUSR | S_IWUSR);
- if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
+ int flags = O_CREAT | O_TRUNC | O_RDWR;
+ int fd = ::open(state->path.c_str(), flags, S_IRUSR | S_IWUSR);
+ if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << state->path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
state->fd = fd;
- return path;
}
-void MemoryMappedFile::close(const std::string& path)
+
+void MemoryMappedFile::close()
{
::close(state->fd);
- ::unlink(path.c_str());
+ ::unlink(state->path.c_str());
}
+
size_t MemoryMappedFile::getPageSize()
{
return ::sysconf(_SC_PAGE_SIZE);
}
+
char* MemoryMappedFile::map(size_t offset, size_t size)
{
int protection = PROT_READ | PROT_WRITE;
@@ -90,20 +99,24 @@ char* MemoryMappedFile::map(size_t offset, size_t size)
return region;
}
+
void MemoryMappedFile::unmap(char* region, size_t size)
{
::munmap(region, size);
}
+
void MemoryMappedFile::flush(char* region, size_t size)
{
::msync(region, size, MS_ASYNC);
}
+
void MemoryMappedFile::expand(size_t offset)
{
if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) {
throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno)));
}
}
+
bool MemoryMappedFile::isSupported()
{
return true;
diff --git a/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
index 9bb3fa2320..60b3df7da6 100644
--- a/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
+++ b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
@@ -27,11 +27,10 @@ class MemoryMappedFilePrivate {};
MemoryMappedFile::MemoryMappedFile() : state(0) {}
MemoryMappedFile::~MemoryMappedFile() {}
-std::string MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/)
+void MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/)
{
- return std::string();
}
-void MemoryMappedFile::close(const std::string& /*path*/)
+void MemoryMappedFile::close()
{
}
size_t MemoryMappedFile::getPageSize()
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 66705e6d24..c9b2db64db 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -671,6 +671,118 @@ class ACLTests(TestBase010):
self.fail(result)
+ def test_illegal_pages_lower_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pageslowerlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pageslowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pageslowerlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pageslowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
+ def test_illegal_pages_upper_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagesupperlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pagesupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagesupperlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pagesupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
+ def test_illegal_pagefactor_lower_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorlowerlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pagefactorlowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorlowerlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pagefactorlowerlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
+ def test_illegal_pagefactor_upper_limit_spec(self):
+ """
+ Test illegal paged queue policy
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorupperlimit=-1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "-1 is not a valid value for 'pagefactorupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=q2 pagefactorupperlimit=9223372036854775808\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ expected = "9223372036854775808 is not a valid value for 'pagefactorupperlimit', " \
+ "values should be between 0 and 9223372036854775807";
+ if (result.find(expected) == -1):
+ self.fail(result)
+
+
#=====================================
# ACL queue tests
#=====================================
@@ -687,6 +799,7 @@ class ACLTests(TestBase010):
aclf.write('acl deny bob@QPID purge queue name=q3\n')
aclf.write('acl deny bob@QPID delete queue name=q4\n')
aclf.write('acl deny bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n')
+ aclf.write('acl deny bob@QPID create queue name=q6 paging=true\n')
aclf.write('acl allow all all')
aclf.close()
@@ -739,6 +852,15 @@ class ACLTests(TestBase010):
try:
queue_options = {}
+ queue_options["qpid.paging"] = True
+ session.queue_declare(queue="q6", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=q6, qpid.paging=True");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
queue_options["qpid.max_count"] = 200
queue_options["qpid.max_size"] = 100
session.queue_declare(queue="q2", exclusive=True, arguments=queue_options)
@@ -972,6 +1094,107 @@ class ACLTests(TestBase010):
self.fail("ACL should allow queue delete request for q4");
#=====================================
+ # ACL paged tests
+ #=====================================
+
+ def test_paged_allow_mode(self):
+ """
+ Test cases for paged acl in allow mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny bob@QPID create queue name=qf1 pageslowerlimit=1000\n')
+ aclf.write('acl deny bob@QPID create queue name=qf2 pagesupperlimit=100\n')
+ aclf.write('acl deny bob@QPID create queue name=qf3 pagefactorlowerlimit=10\n')
+ aclf.write('acl deny bob@QPID create queue name=qf4 pagefactorupperlimit=1\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.max_pages_loaded"] = 500
+ session.queue_declare(queue="qf1", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf1, qpid.paging=True, qpid.max_pages_loaded=500");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.max_pages_loaded"] = 500
+ session.queue_declare(queue="qf2", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf2, qpid.paging=True, qpid.max_pages_loaded=500");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.page_factor"] = 5
+ session.queue_declare(queue="qf3", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf3, qpid.paging=True, qpid.page_factor=5");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.page_factor"] = 5
+ session.queue_declare(queue="qf4", arguments=queue_options)
+ self.fail("ACL should deny queue create request with name=qf4, qpid.paging=True, qpid.page_factor=5");
+ except qpid.session.SessionException, e:
+ self.assertEqual(403,e.args[0].error_code)
+ session = self.get_session('bob','bob')
+
+
+ def test_paged_deny_mode(self):
+ """
+ Test cases for paged acl in deny mode
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID create queue name=qf1 pageslowerlimit=100 pagesupperlimit=1000\n')
+ aclf.write('acl allow bob@QPID create queue name=qf2 pagefactorlowerlimit=1 pagefactorupperlimit=10\n')
+ aclf.write('acl allow anonymous all all\n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.max_pages_loaded"] = 500
+ session.queue_declare(queue="qf1", arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=qf1, qpid.paging=True, qpid.max_pages_loaded=500");
+ session = self.get_session('bob','bob')
+
+ try:
+ queue_options = {}
+ queue_options["qpid.paging"] = True
+ queue_options["qpid.page_factor"] = 5
+ session.queue_declare(queue="qf2", arguments=queue_options)
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request with name=qf2, qpid.paging=True, qpid.page_factor=5");
+ session = self.get_session('bob','bob')
+
+
+ #=====================================
# ACL file tests
#=====================================
diff --git a/qpid/cpp/src/tests/run_paged_queue_tests b/qpid/cpp/src/tests/run_paged_queue_tests
index 8f72ddd5b7..cd1dc72641 100755
--- a/qpid/cpp/src/tests/run_paged_queue_tests
+++ b/qpid/cpp/src/tests/run_paged_queue_tests
@@ -26,7 +26,7 @@ trap stop_broker INT TERM QUIT
export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
start_broker() {
- QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker"
+ QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir --paging-dir=$PWD/pqtest_data $MODULES --auth no) || fail "Could not start broker"
}
stop_broker() {
diff --git a/qpid/doc/book/src/cpp-broker/Security.xml b/qpid/doc/book/src/cpp-broker/Security.xml
index 7bf7034996..00795a05d8 100644
--- a/qpid/doc/book/src/cpp-broker/Security.xml
+++ b/qpid/doc/book/src/cpp-broker/Security.xml
@@ -421,7 +421,11 @@ com.sun.security.jgss.initiate {
filemaxsizelowerlimit |
filemaxsizeupperlimit |
filemaxcountlowerlimit |
- filemaxcountupperlimit ]
+ filemaxcountupperlimit |
+ pageslowerlimit |
+ pagesupperlimit |
+ pagefactorlowerlimit |
+ pagefactorupperlimit ]
acl permission {<group-name>|<user-name>|"all"} {action|"all"} [object|"all"
[property=<property-value> ...]]
@@ -728,6 +732,12 @@ com.sun.security.jgss.initiate {
<entry>Indicates the presence of an <parameter>exclusive</parameter> flag</entry>
<entry>CREATE QUEUE, ACCESS QUEUE</entry>
</row>
+ <row>
+ <entry> <command>paging</command> </entry>
+ <entry>Boolean</entry>
+ <entry>Indicates if the queue is paging queue</entry>
+ <entry>CREATE QUEUE, ACCESS QUEUE</entry>
+ </row>
<row>
<entry> <command>type</command> </entry>
<entry>String</entry>
@@ -806,6 +816,30 @@ com.sun.security.jgss.initiate {
<entry>Maximum value for file.max_count (files)</entry>
<entry>CREATE QUEUE, ACCESS QUEUE</entry>
</row>
+ <row>
+ <entry> <command>pageslowerlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Minimum value for number of pages in memory of paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
+ <row>
+ <entry> <command>pagesupperlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Maximum value for number of pages in memory of paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
+ <row>
+ <entry> <command>pagefactorlowerlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Minimum value for size of one page in paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
+ <row>
+ <entry> <command>pagefactorupperlimit</command> </entry>
+ <entry>Integer</entry>
+ <entry>Maximum value for size of one page in paged queue</entry>
+ <entry>CREATE QUEUE</entry>
+ </row>
</tbody>
</tgroup>
</table>
@@ -910,7 +944,7 @@ com.sun.security.jgss.initiate {
<row>
<entry>create</entry>
<entry>queue</entry>
- <entry>name alternate durable exclusive autodelete policy queuemaxsizelowerlimit queuemaxsizeupperlimit queuemaxcountlowerlimit queuemaxcountupperlimit filemaxsizelowerlimit filemaxsizeupperlimit filemaxcountlowerlimit filemaxcountupperlimit</entry>
+ <entry>name alternate durable exclusive autodelete policy queuemaxsizelowerlimit queuemaxsizeupperlimit queuemaxcountlowerlimit queuemaxcountupperlimit filemaxsizelowerlimit filemaxsizeupperlimit filemaxcountlowerlimit filemaxcountupperlimit paging pageslowerlimit pagesupperlimit pagefactorlowerlimit pagefactorupperlimit</entry>
<entry></entry>
</row>
<row>
diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml b/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml
index 5b4ea42d2b..026213c157 100644
--- a/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml
+++ b/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml
@@ -1,4 +1,8 @@
<?xml version="1.0" encoding="utf-8"?>
+<!DOCTYPE entities [
+<!ENTITY % entities SYSTEM "commonEntities.xml">
+%entities;
+]>
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -25,29 +29,293 @@
<section id="Java-Broker-Configuring-And-Managing-JMX-Management-Introduction">
<title>Introduction</title>
- <para>
- The brokers JMX Management Plugin provides the support for creating JMX MBeans for broker objects such as Queues, Exchanges, Connections etc.
- </para>
- <para>
- It is included into the brokers Initial Configuration by default, and is responsible for servicing the RMI and JMX_RMI ports configured on the broker, with the former serving as the RMI Registry used to advertise the actual JMX Connector Server started on the latter.
- </para>
+ <para>The JMX management plugin provides a series of managed beans (MBeans) allowing you to
+ control and monitor the Broker via an industry compliant interface. This provides a
+ convenient intergration point for a variety of Infrastructure Monitoring Solutions,
+ tools such as Jconsole and VisualVM, as well as custom Java programs and scripts.</para>
+ <para>The following sections describe how to connect to JMX, the configuration of the JMX
+ plugin covering topis including securing with SSL, programmatically interacting with
+ Qpid MBeans and finally a summary of all the MBeans made available from by the
+ plugin.</para>
+ <important>
+ <para>For new development work, the reader is directed towards the strategic <link
+ linkend="Java-Broker-Configuring-And-Managing-Web-Console">Web Management
+ Console</link> and the <link
+ linkend="Java-Broker-Configuring-And-Managing-REST-API">REST API</link>. Use the
+ Web/REST interfaces in preference to JMX whenever possible. The JMX interface may be
+ withdrawn in a future release.</para>
+ </important>
</section>
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-DefaultConfiguration">
+ <title>Default Configuration</title>
+ <para>By default, the Broker is shipped with JMX enabled.</para>
+ <para>The RMI registry port runs on port <literal>8999</literal> and the JMX connector on
+ port <literal>9099</literal>. The connector is not SSL protected. Qpid will use the
+ <ulink
+ url="&oracleJdkDocUrl;java/lang/management/ManagementFactory.html#getPlatformMBeanServer()"
+ >Platform MBeanServer</ulink>.</para>
+ <para>To change these settings, use the <link
+ linkend="Java-Broker-Configuring-And-Managing-Web-Console">Web Management
+ interface</link>.</para>
+ </section>
+
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX">
+ <title>Connecting to JMX</title>
+ <para>The following example uses Jconsole to illustrates how to connect to JMX and assume
+ the defaults described above. Jconsole is a management tool that comes with the JDK. It
+ provides a very simple view of the MBeans, but requires no special configuration to be
+ used with Qpid.</para>
+ <para>For full details of Jconsole itself refer to Oracle's <ulink url="&oracleJconsole;"
+ >JConsole Guide</ulink>.</para>
+ <para>Jconsole can be used to connect to local or remote Java processes. On startup, it
+ presents a list of all the Java processes running on the local host and also allows you
+ to specify a service url to connect to a Java process running on a remote host.</para>
+ <para>To start Jconsole on the command line, type:</para>
+ <programlisting><![CDATA[jconsole]]></programlisting>
+ <section
+ id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX-Local">
+ <title>Local</title>
+ <para>To connect to a Broker running locally, simply select the process from the list.
+ You can identify the Broker by looking for its classname
+ <literal>org.apache.qpid.server.Main</literal>.</para>
+ </section>
+ <section
+ id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX-Remote">
+ <title>Remote</title>
+ <para>To connect to a broker running remotely, provide the hostname and port number of
+ the <emphasis>RMI registry port</emphasis> (e.g. <literal>hostname:8999</literal>)
+ and a valid username and password.</para>
+ <para>You can also provide a service url in the form
+ <literal>service:jmx:rmi:///jndi/rmi://hostname:8999/jmxrmi</literal></para>
+ <figure>
+ <title>Making a remote JMX connection to a Broker using jconsole</title>
+ <graphic fileref="images/JMX-Connect-Remote.png"/>
+ </figure>
+ </section>
+ <para>Once you are connected expand the tree of nodes marked
+ <literal>org.apache.qpid</literal> to begin to interact with the Qpid MBeans.</para>
+ <figure>
+ <title>Qpid MBean hierarchy</title>
+ <graphic fileref="images/JMX-Connect-MBeans.png"/>
+ </figure>
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-ConnectingToJMX-SSL">
+ <title>Connecting to a remote Broker protected by SSL</title>
+ <para>If you are connecting to a remote Broker whose JMX connector port has been secured
+ with SSL certificate signed by a private CA (or a self-signed certificate), you will
+ need to pass a trust store and trust store password to Jconsole. If this is
+ required, start jconsole with the following options:</para>
+ <programlisting><![CDATA[jconsole -J-Djavax.net.ssl.trustStore=jmxtruststore.jks -J-Djavax.net.ssl.trustStorePassword=password]]></programlisting>
+ </section>
+ </section>
+
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Example-Client">
+ <title>Example JMX Client</title>
+ <para>The following java snippet illustrates a JMX client that connects to Qpid over JMX
+ passing a userid and password, looks up the <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedBroker&qpidSrcSuffix;"
+ >ManagedBroker</ulink> object corresponding to the <literal>myvhost</literal>
+ virtualhost, then invokes a method on the virtualhost to create a new queue.</para>
+ <para>A full introduction to custom JMX clients is beyond the scope of this book. For this
+ the reader is directed toward Oracle's <ulink url="&oracleJmxTutorial;">JMX
+ tutorial.</ulink></para>
+ <example id="Java-Broker-Configuring-And-Managing-JMX-Example-Client-Code">
+ <title>JMX Client illustrating the creation of a new queue</title>
+ <programlisting language="java">
+Map&lt;String, Object&lt; environment = new HashMap&lt;String, Object&gt;();
+environment.put(JMXConnector.CREDENTIALS, new String[] {"admin","password"});
+// Connect to service
+JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:8999/jmxrmi");
+JMXConnector jmxConnector = JMXConnectorFactory.connect(url, environment);
+MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
+// Object name for ManagedBroker mbean for virtualhost myvhost
+ObjectName objectName = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=\"myvhost\"");
+// Get the ManagedBroker object
+ManagedBroker managedBroker = JMX.newMBeanProxy(mbsc, objectName, ManagedBroker.class);;
+
+// Create the queue named "myqueue"
+managedBroker.createNewQueue("myqueue", null, true);</programlisting>
+ </example>
+ <para>The Qpid classes required for a custom JMX client are included in the
+ <literal>qpid-management-common</literal> artefact.</para>
+ </section>
- <section id="Java-Broker-Configuring-And-Managing-JMX-Management-Plugin-Configuration">
- <title>JMX Management Plugin Configuration</title>
+ <section id="Java-Broker-Configuring-And-Managing-JMX-Management-MBeans">
+ <title>The MBeans</title>
+ <para>The following table summarises the available MBeans. The MBeans are self-describing:
+ each attribute and operation carry a description describing their purpose. This
+ description is visible when using tools such Jconsole. They are also available on
+ Management interfaces themselves (linked below).</para>
+ <table>
+ <title>Qpid Broker MBeans</title>
+ <tgroup cols="2">
+ <thead>
+ <row>
+ <entry>Management Interface</entry>
+ <entry>Object Name</entry>
+ </row>
+ </thead>
+ <tbody>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedBroker&qpidSrcSuffix;"
+ >ManagedBroker</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost="<replaceable>virtualhostname</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean corresponding to the named virtualhost. Allows operations
+ such as the creation/deletion of queues and exchanges on that
+ virtualhost and virtualhost levell statistics.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedQueue&qpidSrcSuffix;"
+ >ManagedQueue</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.Queue,VirtualHost="<replaceable>virtualhostname</replaceable>",name="<replaceable>queuename</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean corresponding to the named queue on the given virtualhost.
+ Allows queue management operations such as view message, move
+ message and clear queue. Exposes attributes such as queue depth and
+ durability.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedExchange&qpidSrcSuffix;"
+ >ManagedExchange</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.Exchange,VirtualHost="<replaceable>virtualhostname</replaceable>",name="<replaceable>amq.direct</replaceable>",ExchangeType=<replaceable>type</replaceable></literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean corresponding to the named exchange on the given
+ virtualhost. Allows exchange management operations such as the
+ creation and removal of bindings. The supported exchange types are
+ exposed by the <literal>exchangeTypes</literal> attribute of the
+ virtualhost.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ManagedConnection&qpidSrcSuffix;"
+ >ManagedConnection</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=VirtualHost.Connection,VirtualHost="<replaceable>virtualhostname</replaceable>",name="<replaceable>/peerid:ephemeralport</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean representing a active AMQP connection to the named virtual
+ host. Name is formed from the IP and ephemeral port of the peer.
+ Attributes include the client version and connection level
+ statistics.</para>
+ </entry>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/UserManagement&qpidSrcSuffix;"
+ >UserManagement</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=UserManagement,name="UserManagement-<replaceable>authentication
+ manager name</replaceable>"</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>When using <link
+ linkend="Java-Broker-Security-PlainPasswordFile-Provider">Plain
+ password provider</link> or <link
+ linkend="Java-Broker-Security-Base64MD5PasswordFile-Provider"
+ >Base 64 MD5 password provider</link>, permits user operations
+ such creation and deletion of users. and password changes.</para>
+ </entry>
- <para>
- The JMX Management Plugin can be configured through the <link linkend="Java-Broker-Configuring-And-Managing-Web-Console">Web Management Console</link>
- and underlying REST management interface. By double-clicking on the JMX Management Plugin name in the object tree a tab for the plugin
- is displayed with its current settings, which can be changed by clicking on the "Edit" button.
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/ServerInformation&qpidSrcSuffix;"
+ >ServerInformation</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=ServerInformation,name=ServerInformation</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>Exposes broker wide statistics, product version number and JMX
+ management API version number.</para>
+ </entry>
- The following attributes can be set on the JMX Management Plugin:
- <itemizedlist>
- <listitem><para><emphasis>Use Platform MBean Server</emphasis>. The JMX Management Plugin can start its own MBean Server or it can use the JVMs 'Platform MBean Server'.
- By default this is true, and the Platform MBean Server is used.</para></listitem>
- </itemizedlist>
- NOTE: Changes to the "Use Platform MBean Server" attribute only take effect at broker restart.
- </para>
+ </row>
+ <row>
+ <entry morerows="1">
+ <para>
+ <ulink
+ url="&qpidManagementCommonSrc;org/apache/qpid/management/common/mbeans/LoggingManagement&qpidSrcSuffix;"
+ >LoggingManagement</ulink>
+ </para>
+ </entry>
+ <entry>
+ <para>
+ <literal>org.apache.qpid:type=LoggingManagement,name=LoggingManagement</literal>
+ </para>
+ </entry>
+ </row>
+ <row>
+ <entry>
+ <para>MBean permitting control of the Broker's logging. Exposes
+ operations allow the logging level to be controlled at runtime
+ (without restarting the Broker) and others that allow changes to be
+ written back to the log4j.xml logging configuration file, or the
+ contents of the log4.xml file to be re-read at runtime.</para>
+ </entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
</section>
</section>
diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml b/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml
index c322045336..37d675eacc 100644
--- a/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml
+++ b/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml
@@ -63,7 +63,7 @@
</para>
<para>
- SSL can be enabled forPorts with protocols that support it by selecting the 'SSL' transport, at which
+ SSL can be enabled for Ports with protocols that support it by selecting the 'SSL' transport, at which
point a configured <link linkend="Java-Broker-SSL-Keystore">KeyStore</link> must also be selected for the Port.
</para>
@@ -83,7 +83,7 @@
</important>
<important>
- Following deletion of an active Port, the port remains bound until the Broker is restarted. You should restart the broker
+ Following deletion of an active HTTP or JMX Port, the port remains bound until the Broker is restarted. You should restart the broker
immediately if you require preventing new connections on the port or disconnecting existing clients.
</important>
diff --git a/qpid/doc/book/src/java-broker/commonEntities.xml b/qpid/doc/book/src/java-broker/commonEntities.xml
index 69edde91e9..2e7a181d65 100644
--- a/qpid/doc/book/src/java-broker/commonEntities.xml
+++ b/qpid/doc/book/src/java-broker/commonEntities.xml
@@ -36,6 +36,7 @@
<!ENTITY oracleJdkDocUrl "http://docs.oracle.com/javase/6/docs/api/">
<!ENTITY oracleJeeDocUrl "http://docs.oracle.com/javaee/6/api/">
<!ENTITY oracleKeytool "http://docs.oracle.com/javase/6/docs/technotes/tools/solaris/keytool.html">
+<!ENTITY oracleJconsole "http://java.sun.com/javase/6/docs/technotes/guides/management/jconsole.html">
<!-- Oracle BDB JE-->
<!ENTITY oracleJeDownloadUrl "http://www.oracle.com/technetwork/products/berkeleydb/downloads/index.html?ssSourceSiteId=ocomen">
@@ -44,3 +45,8 @@
<!ENTITY oracleBdbJavaDocUrl "http://docs.oracle.com/cd/E17277_02/html/java/">
<!ENTITY oracleBdbProductVersion "5.0.97">
+<!ENTITY oracleJmxTutorial "http://docs.oracle.com/javase/tutorial/jmx/">
+
+<!ENTITY qpidSrc "http://svn.apache.org/viewvc/qpid/trunk/qpid/java/">
+<!ENTITY qpidManagementCommonSrc "&qpidSrc;management/common/src/main/java/">
+<!ENTITY qpidSrcSuffix ".java?view=co">
diff --git a/qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.png b/qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.png
new file mode 100644
index 0000000000..f766197166
--- /dev/null
+++ b/qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.png
Binary files differ
diff --git a/qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.png b/qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.png
new file mode 100644
index 0000000000..5fcf9dd497
--- /dev/null
+++ b/qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.png
Binary files differ
diff --git a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml
index d7dc481888..87abbb8bfb 100644
--- a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml
+++ b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml
@@ -226,7 +226,7 @@ public class StocksExample {
Message message = session.createMessage();
message.setStringProperty("instrument", "IBM");
- message.setStringProperty("price", "100");
+ message.setIntProperty("price", 100);
messageProducer.send(message);
session.commit();
diff --git a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml
index 4da75875a8..7113831f04 100644
--- a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml
+++ b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml
@@ -136,7 +136,7 @@ amqp://username:password@clientid/test
</screen>
</example>
<para>For full details see <xref linkend="JMS-Client-0-8-Connection-URL"/></para>
- <note>Note, that a single broker failover is enabled by default. If the failover behaviour is not desired it can be switched off
+ <note><para>Note, that a single broker failover is enabled by default. If the failover behaviour is not desired it can be switched off
by setting a failover option to <emphasis>nofailover</emphasis> as in the example below
<example>
<title>Connection URL configured with nofailover</title>
@@ -145,7 +145,7 @@ amqp://username:password@clientid/test
?brokerlist='tcp://localhost:15672?failover='nofailover']]>
</screen>
</example>
- </note>
+ </para></note>
<!-- TODO perhaps mention ConnectionListener?-->
</section>
<section id="JMS-Client-0-8-Client-Understanding-Connection-Heartbeating">
@@ -284,7 +284,7 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
>MessageConsumer#receive()</ulink> for Consumer B on the same Session, the application
will hang indefinitely as even if messages suitable for B arrive at the Broker. Those
messages can never be sent to the Session as no space is available in prefetch. </para>
- <note>Please note, when the acknowlegement mode <emphasis>Session#SESSION_TRANSACTED</emphasis>
+ <note><para>Please note, when the acknowlegement mode <emphasis>Session#SESSION_TRANSACTED</emphasis>
or <emphasis>Session#CLIENT_ACKNOWLEDGE</emphasis> is set on a consuming session,
the prefetched messages are released from the prefetch buffer on transaction commit/rollback
(in case of acknowledgement mode <emphasis>Session#SESSION_TRANSACTED</emphasis> )
@@ -294,7 +294,7 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
the prefetched messages continue to remain in the prefetch buffer preventing the delivery of the following messages.
As result, the application might stop the receiving of the messages
until the transaction is committed/rolled back (for <emphasis>Session#SESSION_TRANSACTED</emphasis> )
- or received messages are acknowledged (for <emphasis>Session#CLIENT_ACKNOWLEDGE</emphasis>).</note>
+ or received messages are acknowledged (for <emphasis>Session#CLIENT_ACKNOWLEDGE</emphasis>).</para></note>
</section>
<section id="JMS-Client-0-8-Client-Understanding-Session-TemporaryQueues">
<title>TemporaryQueues</title>
@@ -379,10 +379,10 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
linkend="JMS-Client-0-8-System-Properties-DefaultMandatoryTopic"
><literal>qpid.default_mandatory_topic</literal></link> for Queues and Topics
respectively.</para>
- <note>Please note, according to AMQP specifications the mandatory flag on a message tells the server
+ <note><para>Please note, according to AMQP specifications the mandatory flag on a message tells the server
how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a
Return method. If this flag is zero, the server silently drops the message. Please, refer <ulink url="&amqpSrc;">AMQP specifications</ulink>
- for more details.</note>
+ for more details.</para></note>
</section>
<section id="JMS-Client-0-8-Client-Understanding-MessageProducer-CloseWhenNoRoute">
<title>Close When No Route</title>
@@ -508,8 +508,10 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap
<literal>server</literal>.</para>
<para>See <ulink
url="&qpidJavaBrokerBook;Java-Broker-Runtime-Handling-Undeliverable-Messages.html#Java-Broker-Runtime-Handling-Undeliverable-Messages-Maximum-Delivery-Count"
- > Unhandling Undeliverable Messages</ulink> within the Java Broker book for full details
- of the functioning of this feature.</para>
+ > Handling Undeliverable Messages</ulink> within the Java Broker book for full details of
+ the functioning of this feature.</para>
+ <note><para>The optional JMS message header <literal>JMSXDeliveryCount</literal> is <emphasis>not</emphasis>
+ supported.</para></note>
</section>
</section>
<section id="JMS-Client-0-8-Client-Understanding-Destinations">
diff --git a/qpid/java/amqp-1-0-client-websocket/pom.xml b/qpid/java/amqp-1-0-client-websocket/pom.xml
index 205e0d5ab7..3862fb0fc5 100644
--- a/qpid/java/amqp-1-0-client-websocket/pom.xml
+++ b/qpid/java/amqp-1-0-client-websocket/pom.xml
@@ -44,15 +44,15 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -73,14 +73,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -93,7 +93,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -105,7 +105,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -118,7 +118,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -131,14 +131,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index b00d98637e..6a959df440 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -33,12 +33,14 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -374,9 +376,9 @@ public abstract class AbstractExchange implements Exchange
return getBindings().size();
}
- @Override
- public final List<? extends BaseQueue> route(final ServerMessage message,
- final InstanceProperties instanceProperties)
+
+ final List<? extends BaseQueue> route(final ServerMessage message,
+ final InstanceProperties instanceProperties)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
@@ -416,6 +418,59 @@ public abstract class AbstractExchange implements Exchange
return queues;
}
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final BaseQueue.PostEnqueueAction postEnqueueAction)
+ {
+ List<? extends BaseQueue> queues = route(message, instanceProperties);
+
+ if(queues == null || queues.isEmpty())
+ {
+ Exchange altExchange = getAlternateExchange();
+ if(altExchange != null)
+ {
+ return altExchange.send(message, instanceProperties, txn, postEnqueueAction);
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ else
+ {
+ final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
+
+ txn.enqueue(queues,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ for(int i = 0; i < baseQueues.length; i++)
+ {
+ try
+ {
+ baseQueues[i].enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ _reference.release();
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return queues.size();
+ }
+ }
+
protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
final InstanceProperties instanceProperties);
@@ -679,4 +734,6 @@ public abstract class AbstractExchange implements Exchange
public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
}
+
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index e2582019cd..71d0f8b4dd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -36,11 +36,14 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -204,22 +207,6 @@ public class DefaultExchange implements Exchange
}
@Override
- public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties)
- {
- AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
- if(q == null)
- {
- List<AMQQueue> noQueues = Collections.emptyList();
- return noQueues;
- }
- else
- {
- return Collections.singletonList(q);
- }
-
- }
-
- @Override
public boolean isBound(AMQQueue queue)
{
return _virtualHost.getQueue(queue.getName()) == queue;
@@ -343,4 +330,47 @@ public class DefaultExchange implements Exchange
{
return _id;
}
+
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final BaseQueue.PostEnqueueAction postEnqueueAction)
+ {
+ final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
+ if(q == null)
+ {
+ return 0;
+ }
+ else
+ {
+ txn.enqueue(q,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ q.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+ }
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 78455c9261..18e912e972 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -94,13 +95,17 @@ public interface Exchange extends ExchangeReferrer
void close() throws AMQException;
/**
- * Returns a list of queues to which to route this message. If there are
- * no queues the empty list must be returned.
- *
- * @return list of queues to which to route the message.
+ * Routes a message
+ * @param message the message to be routed
+ * @param instanceProperties the instance properties
+ * @param txn the transaction to enqueue within
+ * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
+ * @return the number of queues in which the message was enqueued performed
*/
- List<? extends BaseQueue> route(ServerMessage message, final InstanceProperties instanceProperties);
-
+ int send(ServerMessage message,
+ InstanceProperties instanceProperties,
+ ServerTransaction txn,
+ BaseQueue.PostEnqueueAction postEnqueueAction);
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
new file mode 100644
index 0000000000..afd7ff0269
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+
+public interface MessageInstance
+{
+
+ boolean isAvailable();
+
+ boolean acquire();
+
+ boolean isAcquired();
+
+ void release();
+
+ void delete();
+
+ boolean isDeleted();
+
+ ServerMessage getMessage();
+
+ InstanceProperties getInstanceProperties();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 80ccbe1649..2aa1d1f473 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -22,11 +22,11 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
-public interface QueueEntry extends Comparable<QueueEntry>
+public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
@@ -177,26 +177,17 @@ public interface QueueEntry extends Comparable<QueueEntry>
AMQQueue getQueue();
- ServerMessage getMessage();
-
long getSize();
boolean getDeliveredToConsumer();
boolean expired() throws AMQException;
- boolean isAvailable();
-
- boolean isAcquired();
-
- boolean acquire();
boolean acquire(Subscription sub);
boolean acquiredBySubscription();
boolean isAcquiredBy(Subscription subscription);
- void release();
-
void setRedelivered();
boolean isRedelivered();
@@ -207,16 +198,7 @@ public interface QueueEntry extends Comparable<QueueEntry>
boolean isRejectedBy(long subscriptionId);
- void delete();
-
- /**
- * Returns true if entry is either DEQUED or DELETED state.
- *
- * @return true if entry is either DEQUED or DELETED state
- */
- boolean isDeleted();
-
- void routeToAlternate();
+ int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
boolean isQueueDeleted();
@@ -241,5 +223,4 @@ public interface QueueEntry extends Comparable<QueueEntry>
Filterable asFilterable();
- InstanceProperties getInstanceProperties();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index ed61f1acf6..461d493437 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.txn.ServerTransaction;
import java.util.EnumMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -250,7 +249,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
else if(acquire())
{
- routeToAlternate();
+ routeToAlternate(null, null);
}
}
@@ -368,65 +367,43 @@ public abstract class QueueEntryImpl implements QueueEntry
dispose();
}
- public void routeToAlternate()
+ public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
-
+ boolean autocommit = txn == null;
if (alternateExchange != null)
{
- List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), getInstanceProperties());
- final ServerMessage message = getMessage();
- if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
+ if(autocommit)
{
- queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties());
+ txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
+ int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
-
- if (queues != null && queues.size() != 0)
+ txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
{
- final List<? extends BaseQueue> rerouteQueues = queues;
- ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
-
- txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
+ public void postCommit()
{
- public void postCommit()
- {
- try
- {
- for (BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
-
- }
- });
-
- txn.dequeue(currentQueue, message, new ServerTransaction.Action()
- {
- public void postCommit()
- {
- delete();
- }
+ delete();
+ }
- public void onRollback()
- {
+ public void onRollback()
+ {
- }
- });
+ }
+ });
+ if(autocommit)
+ {
txn.commit();
}
+ return enqueues;
+
+ }
+ else
+ {
+ return 0;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d63d1946d3..87d11a892e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1355,93 +1355,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- if(_alternateExchange != null)
+
+ for(final QueueEntry entry : entries)
{
+ // TODO log requeues with a post enqueue action
+ int requeues = entry.routeToAlternate(null, txn);
- for(final QueueEntry entry : entries)
+ if(requeues == 0)
{
-
- List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
- if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
- {
- queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties());
- }
-
- final ServerMessage message = entry.getMessage();
- if(queues != null && queues.size() != 0)
- {
- final List<? extends BaseQueue> rerouteQueues = queues;
- txn.enqueue(rerouteQueues, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- try
- {
- for(BaseQueue queue : rerouteQueues)
- {
- queue.enqueue(message);
- }
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
-
- }
-
- public void onRollback()
- {
-
- }
- });
- txn.dequeue(this, entry.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.delete();
- }
-
- public void onRollback()
- {
- }
- });
- }
-
+ // TODO log discard
}
-
- _alternateExchange.removeReference(this);
}
- else
- {
- // TODO log discard
-
- for(final QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
- if(message != null)
- {
- txn.dequeue(this, message,
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- entry.delete();
- }
+ txn.commit();
- public void onRollback()
- {
- }
- });
- }
- }
+ if(_alternateExchange != null)
+ {
+ _alternateExchange.removeReference(this);
}
- txn.commit();
for (Task task : _deleteTaskList)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 7bd525c90f..764549626a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -312,10 +312,9 @@ public class TopicExchangeTest extends QpidTestCase
private int routeMessage(String routingKey, long messageNumber) throws AMQException
{
- ServerMessage serverMessage = mock(ServerMessage.class);
- when(serverMessage.getRoutingKey()).thenReturn(routingKey);
- List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY);
ServerMessage message = mock(ServerMessage.class);
+ when(message.getRoutingKey()).thenReturn(routingKey);
+ List<? extends BaseQueue> queues = _exchange.route(message, InstanceProperties.EMPTY);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 2e3231e208..d3c866f747 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
public class MockQueueEntry implements QueueEntry
{
@@ -62,9 +63,9 @@ public class MockQueueEntry implements QueueEntry
}
- public void routeToAlternate()
+ public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn)
{
-
+ return 0;
}
public boolean expired() throws AMQException
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index fe82f65115..bae5616042 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -53,6 +54,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -102,6 +104,14 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
+ private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ entry.getQueue().checkCapacity(ServerSession.this);
+ }
+ };
public static interface MessageDispositionChangeListener
{
@@ -182,7 +192,9 @@ public class ServerSession extends Session
return isCommandsFull(id);
}
- public void enqueue(final MessageTransferMessage message, final List<? extends BaseQueue> queues)
+ public int enqueue(final MessageTransferMessage message,
+ final InstanceProperties instanceProperties,
+ final Exchange exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -190,10 +202,10 @@ public class ServerSession extends Session
_outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
+ int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction);
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
- _transaction.enqueue(queues,message, postTransactionAction);
incrementOutstandingTxnsIfNecessary();
+ return enqueues;
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 73f8569384..72cce5e472 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -337,28 +336,10 @@ public class ServerSessionDelegate extends SessionDelegate
}
};
- List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
- if(queues.isEmpty() && exchange.getAlternateExchange() != null)
- {
- final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(message, instanceProperties);
- if (!queues.isEmpty())
- {
- exchangeInUse = alternateExchange;
- }
- else
- {
- exchangeInUse = exchange;
- }
- }
- else
- {
- exchangeInUse = exchange;
- }
+ int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(!queues.isEmpty())
+ if(enqueues != 0)
{
- serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -372,7 +353,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey()));
+ serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey()));
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 17d0e5cb64..357b565365 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -59,7 +59,6 @@ import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -544,7 +543,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
void reject(final QueueEntry entry)
{
entry.setRedelivered();
- entry.routeToAlternate();
+ entry.routeToAlternate(null, null);
if(entry.isAcquiredBy(this))
{
entry.delete();
@@ -575,35 +574,36 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
protected void sendToDLQOrDiscard(QueueEntry entry)
{
- final Exchange alternateExchange = entry.getQueue().getAlternateExchange();
final LogActor logActor = CurrentActor.get();
final ServerMessage msg = entry.getMessage();
- if (alternateExchange != null)
+
+ int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
+
+ if (requeues == 0)
{
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties());
+ final AMQQueue queue = entry.getQueue();
+ final Exchange alternateExchange = queue.getAlternateExchange();
- if (destinationQueues == null || destinationQueues.isEmpty())
+ if(alternateExchange != null)
{
- entry.delete();
-
- logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
+ alternateExchange.getName()));
}
else
{
- entry.routeToAlternate();
-
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
- }
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
+ queue.getName(),
+ msg.getRoutingKey()));
}
}
- else
- {
- entry.delete();
- logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
- }
}
private boolean isMaxDeliveryLimitReached(QueueEntry entry)
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b7dc105cb7..c6d4151628 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -165,6 +165,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
+
+ private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
+ private final ImmediateAction _immediateAction = new ImmediateAction();
+
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -330,6 +335,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
+ final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate();
+
final InstanceProperties instanceProperties =
new InstanceProperties()
{
@@ -341,7 +348,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
case EXPIRATION:
return amqMessage.getExpiration();
case IMMEDIATE:
- return _currentMessage.getMessagePublishInfo().isImmediate();
+ return immediate;
case PERSISTENT:
return amqMessage.isPersistent();
case MANDATORY:
@@ -353,21 +360,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
};
- final List<? extends BaseQueue> destinationQueues =
- _currentMessage.getExchange().route(amqMessage, instanceProperties);
-
- if(destinationQueues == null || destinationQueues.isEmpty())
+ int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ immediate ? _immediateAction : _capacityCheckAction);
+ if(enqueues == 0)
{
handleUnroutableMessage(amqMessage);
}
else
{
- _transaction.enqueue(destinationQueues,
- amqMessage,
- new MessageDeliveryAction(amqMessage, destinationQueues));
incrementOutstandingTxnsIfNecessary();
handle.flushToStore();
-
}
}
}
@@ -1258,7 +1260,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
if(immediate)
{
- action = new ImmediateAction(queue);
+ action = new ImmediateAction();
}
else
{
@@ -1291,58 +1293,72 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_reference.release();
}
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+
+ }
+ private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ {
+
+ public ImmediateAction()
{
- private final BaseQueue _queue;
+ }
- public ImmediateAction(BaseQueue queue)
- {
- _queue = queue;
- }
+ public void onEnqueue(QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
- public void onEnqueue(QueueEntry entry)
+ if (!entry.getDeliveredToConsumer() && entry.acquire())
{
- if (!entry.getDeliveredToConsumer() && entry.acquire())
- {
-
- ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(_queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ txn.dequeue(queue, entry.getMessage(),
+ new MessageAcknowledgeAction(entries)
+ {
+ @Override
+ public void postCommit()
{
- @Override
- public void postCommit()
+ try
{
- try
- {
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- super.postCommit();
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
}
+ super.postCommit();
}
- );
- txn.commit();
-
+ }
+ );
+ txn.commit();
- }
}
+ else
+ {
+ queue.checkCapacity(AMQChannel.this);
+ }
+
+ }
+ }
+
+ private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ {
+ @Override
+ public void onEnqueue(final QueueEntry entry)
+ {
+ AMQQueue queue = entry.getQueue();
+ queue.checkCapacity(AMQChannel.this);
}
}
@@ -1550,48 +1566,46 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag);
+ final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
_logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
- return;
}
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final AMQQueue queue = rejectedQueueEntry.getQueue();
-
- final Exchange altExchange = queue.getAlternateExchange();
- unackedMap.remove(deliveryTag);
+ int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ {
+ @Override
+ public void onEnqueue(final QueueEntry requeueEntry)
+ {
+ _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
+ requeueEntry.getQueue().getName()));
+ }
+ }, null);
- if (altExchange == null)
+ if(requeues == 0)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- rejectedQueueEntry.delete();
- return;
- }
+ final AMQQueue queue = rejectedQueueEntry.getQueue();
+ final Exchange altExchange = queue.getAlternateExchange();
- final List<? extends BaseQueue> destinationQueues =
- altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties());
-
- if (destinationQueues == null || destinationQueues.isEmpty())
- {
- _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
- rejectedQueueEntry.delete();
- return;
- }
-
- rejectedQueueEntry.routeToAlternate();
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
- //output operational logging for each delivery post commit
- for (final BaseQueue destinationQueue : destinationQueues)
- {
- _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName()));
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 3b981b46b8..3d030890e0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
@@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
{
private static final Accepted ACCEPTED = new Accepted();
- private static final Outcome[] OUTCOMES = { ACCEPTED };
+ public static final Rejected REJECTED = new Rejected();
+ private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
private Exchange _exchange;
private TerminusDurability _durability;
@@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
return null;
}};
- List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
+ int enqueues = _exchange.send(message, instanceProperties, txn, null);
- if(queues == null || queues.isEmpty())
- {
- Exchange altExchange = _exchange.getAlternateExchange();
- if(altExchange != null)
- {
- queues = altExchange.route(message, instanceProperties);
- }
- }
-
- if(queues != null && !queues.isEmpty())
- {
- final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]);
-
- txn.enqueue(queues,message, new ServerTransaction.Action()
- {
- MessageReference _reference = message.newReference();
-
- public void postCommit()
- {
- for(int i = 0; i < baseQueues.length; i++)
- {
- try
- {
- baseQueues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- _reference.release();
- }
-
- public void onRollback()
- {
- _reference.release();
- }
- });
- }
- return ACCEPTED;
+ return enqueues == 0 ? REJECTED : ACCEPTED;
}
TerminusDurability getDurability()
diff --git a/qpid/java/broker-plugins/management-http/pom.xml b/qpid/java/broker-plugins/management-http/pom.xml
index abc754902a..57b2dd863b 100644
--- a/qpid/java/broker-plugins/management-http/pom.xml
+++ b/qpid/java/broker-plugins/management-http/pom.xml
@@ -50,15 +50,15 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -79,14 +79,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -99,7 +99,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -112,7 +112,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -125,7 +125,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -138,14 +138,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index 99e9a8b5f0..3992c707f4 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -29,6 +29,7 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import javax.servlet.DispatcherType;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -78,7 +79,6 @@ import org.apache.qpid.server.plugin.PluginFactory;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.DispatcherType;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.SessionManager;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
@@ -409,7 +409,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem
root.addServlet(new ServletHolder(new LogFileServlet()), "/rest/logfile");
final SessionManager sessionManager = root.getSessionHandler().getSessionManager();
- sessionManager.setSessionCookie(JSESSIONID_COOKIE_PREFIX + lastPort);
+ sessionManager.getSessionCookieConfig().setName(JSESSIONID_COOKIE_PREFIX + lastPort);
sessionManager.setMaxInactiveInterval((Integer)getAttribute(TIME_OUT));
return server;
diff --git a/qpid/java/broker-plugins/websocket/pom.xml b/qpid/java/broker-plugins/websocket/pom.xml
index 2029bd33aa..fb55be05c8 100644
--- a/qpid/java/broker-plugins/websocket/pom.xml
+++ b/qpid/java/broker-plugins/websocket/pom.xml
@@ -38,15 +38,15 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -67,14 +67,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -87,7 +87,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -100,7 +100,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -113,7 +113,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@@ -126,14 +126,14 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<scope>compile</scope>
<exclusions>
<exclusion>
diff --git a/qpid/java/build.deps b/qpid/java/build.deps
index 58dea7009e..4dc5b0ca46 100644
--- a/qpid/java/build.deps
+++ b/qpid/java/build.deps
@@ -35,7 +35,7 @@ geronimo-j2ee=lib/required/geronimo-j2ee-connector_1.5_spec-2.0.0.jar
geronimo-jta=lib/required/geronimo-jta_1.1_spec-1.1.1.jar
geronimo-kernel=lib/required/geronimo-kernel-2.2.1.jar
geronimo-openejb=lib/required/geronimo-ejb_3.0_spec-1.0.1.jar
-geronimo-servlet=lib/required/geronimo-servlet_2.5_spec-1.2.jar
+geronimo-servlet=lib/required/geronimo-servlet_3.0_spec-1.0.jar
junit=lib/required/junit-3.8.1.jar
mockito-all=lib/required/mockito-all-1.9.0.jar
@@ -49,14 +49,14 @@ slf4j-log4j=lib/required/slf4j-log4j12-1.6.4.jar
xalan=lib/required/xalan-2.7.0.jar
-jetty=lib/required/jetty-server-7.6.10.v20130312.jar
-jetty-continuation=lib/required/jetty-continuation-7.6.10.v20130312.jar
-jetty-security=lib/required/jetty-security-7.6.10.v20130312.jar
-jetty-util=lib/required/jetty-util-7.6.10.v20130312.jar
-jetty-io=lib/required/jetty-io-7.6.10.v20130312.jar
-jetty-http=lib/required/jetty-http-7.6.10.v20130312.jar
-jetty-servlet=lib/required/jetty-servlet-7.6.10.v20130312.jar
-jetty-websocket=lib/required/jetty-websocket-7.6.10.v20130312.jar
+jetty=lib/required/jetty-server-8.1.14.v20131031.jar
+jetty-continuation=lib/required/jetty-continuation-8.1.14.v20131031.jar
+jetty-security=lib/required/jetty-security-8.1.14.v20131031.jar
+jetty-util=lib/required/jetty-util-8.1.14.v20131031.jar
+jetty-io=lib/required/jetty-io-8.1.14.v20131031.jar
+jetty-http=lib/required/jetty-http-8.1.14.v20131031.jar
+jetty-servlet=lib/required/jetty-servlet-8.1.14.v20131031.jar
+jetty-websocket=lib/required/jetty-websocket-8.1.14.v20131031.jar
servlet-api=${geronimo-servlet}
dojo-version=1.9.1
diff --git a/qpid/java/ivy.nexus.xml b/qpid/java/ivy.nexus.xml
index e301bcb0cf..55b2d2d729 100644
--- a/qpid/java/ivy.nexus.xml
+++ b/qpid/java/ivy.nexus.xml
@@ -39,6 +39,12 @@
<artifact name="qpid-broker" type="jar.asc" ext="jar.asc"/>
<artifact name="qpid-broker" type="source" ext="jar" e:classifier="sources"/>
<artifact name="qpid-broker" type="source.asc" ext="jar.asc" e:classifier="sources"/>
+ <artifact name="qpid-broker-core" type="pom" ext="pom"/>
+ <artifact name="qpid-broker-core" type="pom.asc" ext="pom.asc"/>
+ <artifact name="qpid-broker-core" type="jar" ext="jar"/>
+ <artifact name="qpid-broker-core" type="jar.asc" ext="jar.asc"/>
+ <artifact name="qpid-broker-core" type="source" ext="jar" e:classifier="sources"/>
+ <artifact name="qpid-broker-core" type="source.asc" ext="jar.asc" e:classifier="sources"/>
<artifact name="qpid-broker-plugins-access-control" type="pom" ext="pom"/>
<artifact name="qpid-broker-plugins-access-control" type="pom.asc" ext="pom.asc"/>
<artifact name="qpid-broker-plugins-access-control" type="jar" ext="jar"/>
diff --git a/qpid/java/ivy.retrieve.xml b/qpid/java/ivy.retrieve.xml
index 388e2d0dc4..59b3fa70af 100644
--- a/qpid/java/ivy.retrieve.xml
+++ b/qpid/java/ivy.retrieve.xml
@@ -49,7 +49,7 @@
<dependency org="org.apache.geronimo.specs" name="geronimo-j2ee-connector_1.5_spec" rev="2.0.0" transitive="false"/>
<dependency org="org.apache.geronimo.specs" name="geronimo-jms_1.1_spec" rev="1.0" transitive="false"/>
<dependency org="org.apache.geronimo.specs" name="geronimo-jta_1.1_spec" rev="1.1.1" transitive="false"/>
- <dependency org="org.apache.geronimo.specs" name="geronimo-servlet_2.5_spec" rev="1.2" transitive="false"/>
+ <dependency org="org.apache.geronimo.specs" name="geronimo-servlet_3.0_spec" rev="1.0" transitive="false"/>
<dependency org="com.google.code.gson" name="gson" rev="2.0" transitive="false"/>
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.9.0" transitive="false"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.9.0" transitive="false"/>
@@ -61,14 +61,14 @@
<dependency org="org.mockito" name="mockito-all" rev="1.9.0" transitive="false"/>
<dependency org="org.slf4j" name="slf4j-api" rev="1.6.4" transitive="false"/>
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.4" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-server" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-websocket" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-continuation" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-io" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-http" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-security" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="7.6.10.v20130312" transitive="false"/>
- <dependency org="org.eclipse.jetty" name="jetty-util" rev="7.6.10.v20130312" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-server" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-websocket" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-continuation" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-io" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-http" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-security" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-servlet" rev="8.1.14.v20131031" transitive="false"/>
+ <dependency org="org.eclipse.jetty" name="jetty-util" rev="8.1.14.v20131031" transitive="false"/>
<dependency org="xalan" name="xalan" rev="2.7.0" transitive="false"/>
<dependency org="velocity" name="velocity" rev="1.4" transitive="false"/>
<dependency org="velocity" name="velocity-dep" rev="1.4" transitive="false"/>
diff --git a/qpid/java/jca/build.xml b/qpid/java/jca/build.xml
index 7137467e4b..83cc781ba9 100644
--- a/qpid/java/jca/build.xml
+++ b/qpid/java/jca/build.xml
@@ -24,7 +24,7 @@
<property name="module.name" value="jca"/>
<property name="module.genpom" value="true"/>
- <property name="module.genpom.args" value="-Sgeronimo-j2ee-connector_1.5_spec=provided -Sgeronimo-jta_1.1_spec=provided -Sgeronimo-jms_1.1_spec=provided -Sgeronimo-ejb_3.0_spec=provided -Sgeronimo-servlet_2.5_spec=provided -Sgeronimo-kernel=provided"/>
+ <property name="module.genpom.args" value="-Sgeronimo-j2ee-connector_1.5_spec=provided -Sgeronimo-jta_1.1_spec=provided -Sgeronimo-jms_1.1_spec=provided -Sgeronimo-ejb_3.0_spec=provided -Sgeronimo-servlet_3.0_spec=provided -Sgeronimo-kernel=provided"/>
<import file="../module.xml"/>
diff --git a/qpid/java/jca/example/build-geronimo-properties.xml b/qpid/java/jca/example/build-geronimo-properties.xml
index a20753117f..3c84b7634a 100644
--- a/qpid/java/jca/example/build-geronimo-properties.xml
+++ b/qpid/java/jca/example/build-geronimo-properties.xml
@@ -87,7 +87,6 @@
<path id="compile.classpath">
<fileset dir="${geronimo.home}/repository/org/apache/geronimo/specs">
<include name="geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar"/>
- <include name="geronimo-servlet_2.5_spec/1.2/geronimo-servlet_2.5_spec-1.2.jar"/>
<include name="geronimo-ejb_3.0_spec/1.0.1/geronimo-ejb_3.0_spec-1.0.1.jar"/>
<include name="geronimo-jta_1.1_spec/1.1.1/geronimo-jta_1.1_spec-1.1.1.jar"/>
@@ -113,7 +112,7 @@
<fileset dir="${geronimo.home}/repository/org/apache/geronimo/specs">
<include name="geronimo-j2ee-connector_1.5_spec/2.0.0/geronimo-j2ee-connector_1.5_spec-2.0.0.jar"/>
<include name="geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar"/>
- <include name="geronimo-servlet_2.5_spec/1.2/geronimo-servlet_2.5_spec-1.2.jar"/>
+ <include name="geronimo-servlet_3.0_spec/1.0/geronimo-servlet_3.0_spec-1.0.jar"/>
<include name="geronimo-ejb_3.0_spec/1.0.1/geronimo-ejb_3.0_spec-1.0.1.jar"/>
<include name="geronimo-jta_1.1_spec/1.1.1/geronimo-jta_1.1_spec-1.1.1.jar"/>
</fileset>
diff --git a/qpid/java/jca/pom.xml b/qpid/java/jca/pom.xml
index 859b8aabac..c7a8de61fe 100644
--- a/qpid/java/jca/pom.xml
+++ b/qpid/java/jca/pom.xml
@@ -70,8 +70,8 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
<scope>provided</scope>
</dependency>
diff --git a/qpid/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml b/qpid/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml
index 11228afcfa..5e7093bb0a 100644
--- a/qpid/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml
+++ b/qpid/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml
@@ -17,6 +17,6 @@
-->
<dep>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-servlet_2.5_spec</artifactId>
- <version>1.2</version>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
</dep>
diff --git a/qpid/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml
index 5beba95d17..10b7a4c499 100644
--- a/qpid/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml
@@ -18,5 +18,5 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
</dep>
diff --git a/qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml
index 5c840bedd6..929fcbef3a 100644
--- a/qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml
index 9cec3998ea..42be6ad6ab 100644
--- a/qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/qpid/java/lib/poms/jetty-security-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-security-8.1.14.v20131031.xml
index 9501750ba0..8079c78d96 100644
--- a/qpid/java/lib/poms/jetty-security-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-security-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/qpid/java/lib/poms/jetty-server-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-server-8.1.14.v20131031.xml
index 587860b50f..5b8160efd4 100644
--- a/qpid/java/lib/poms/jetty-server-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-server-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.orbit</groupId>
diff --git a/qpid/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml
index 4c0ff0a41b..5abcf03a18 100644
--- a/qpid/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml
index f5c990248f..e134444e44 100644
--- a/qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml
@@ -18,5 +18,5 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
</dep>
diff --git a/qpid/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml
index 4d3ebd1666..1592ca3d56 100644
--- a/qpid/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml
+++ b/qpid/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml
@@ -18,7 +18,7 @@
<dep>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>7.6.10.v20130312</version>
+ <version>8.1.14.v20131031</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 710a6c5731..17847fbdfc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -625,32 +625,10 @@ public class MessageStoreTest extends QpidTestCase
storedMessage.flushToStore();
final AMQMessage currentMessage = new AMQMessage(storedMessage);
- final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY);
ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());
-
- trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
- for(BaseQueue queue : destinationQueues)
- {
- queue.enqueue(currentMessage);
- }
- }
- catch (AMQException e)
- {
- _logger.error("Problem enqueing message", e);
- }
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null);
}
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
index 315991d585..53de37f12c 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py
@@ -456,6 +456,12 @@ class HeadersExchangeTests(TestHelper):
self.myBasicPublish({"irrelevant":0})
self.assertEmpty(self.q)
+ def testMultipleBindings(self):
+ self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="SomeKey", arguments={ 'x-match':'any', "name":"fred"})
+ self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="AnotherKey", arguments={ 'x-match':'all', "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.assertEmpty(self.q)
+
class MiscellaneousErrorsTests(TestHelper):
"""
diff --git a/qpid/tools/src/py/qpidstore/janal.py b/qpid/tools/src/py/qpidstore/janal.py
index 1f89207b4d..231b283c05 100644
--- a/qpid/tools/src/py/qpidstore/janal.py
+++ b/qpid/tools/src/py/qpidstore/janal.py
@@ -215,9 +215,18 @@ class TxnMap(object):
def _abort(self, xid):
"""Perform an abort operation for the given xid record"""
- for fid, hdr, lock in self.__map[xid]:
+ for _, hdr, _ in self.__map[xid]:
if isinstance(hdr, jrnl.DeqRec):
- self.__emap.unlock(hdr.deq_rid)
+ try:
+ self.__emap.unlock(hdr.deq_rid)
+ except jerr.NonExistentRecordError as err: # Not in emap, look in current transaction op list (TPL)
+ found_rid = False
+ for _, hdr1, _ in self.__map[xid]:
+ if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid:
+ found_rid = True
+ break
+ if not found_rid: # Not found in current transaction op list, re-throw error
+ raise err
del self.__map[xid]
def _commit(self, xid):