From 1299de40a497216d824224365b2e08e2d78b6f4b Mon Sep 17 00:00:00 2001 From: "Charles E. Rolke" Date: Wed, 29 Jan 2014 18:24:44 +0000 Subject: NO-JIRA: Update configuration script to handle a PROTON_ROOT. Expose the cmake command lines as they are executed. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1562539 13f79535-47bb-0310-9956-ffa450edef68 --- .../cpp/bindings/qpid/dotnet/configure-windows.ps1 | 102 +++++++++++++++++---- 1 file changed, 85 insertions(+), 17 deletions(-) diff --git a/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1 b/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1 index e62e5e8bc3..4874899b7f 100644 --- a/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1 +++ b/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1 @@ -55,6 +55,10 @@ # - If a directory "looks like" is has already had CMake run in it # then this script skips running CMake again. # +# * User chooses to include Proton or not. +# +# - Proton is included by having variable PROTON_ROOT reference the +# directory where proton was installed. # # Prerequisites # @@ -143,6 +147,7 @@ $ErrorActionPreference='Stop' # $global:txtPath = '$env:PATH' $global:txtQR = '$env:QPID_BUILD_ROOT' +$global:txtPR = '$env:PROTON_ROOT' $global:txtWH = 'Write-Host' ############################# @@ -217,6 +222,37 @@ function SanityCheckBoostPath ($path=0) } +############################# +# SanityCheckProtonInstallPath +# A path is a "proton install path" if it contains +# both bin and include subdirectories. +# +function SanityCheckProtonInstallPath ($path=0) +{ + $result = $true + $displayPath = "" + + if ($path -ne $null) { + $displayPath = $path + + $toTest = ('include', 'bin') + foreach ($pattern in $toTest) { + $target = Join-Path $path $pattern + if (!(Test-Path -path $target)) { + $result = $false + } + } + } else { + $result = $false + } + + if (! $result) { + Write-Host "The path ""$displayPath"" does not appear to be a Proton install root path." + } + $result +} + + ############################# # SanityCheckBuildPath # A path is a "build path" if it contains @@ -264,14 +300,16 @@ function WriteDotnetBindingSlnLauncherPs1 [string] $nBits, [string] $outfileName, [string] $studioVersion, - [string] $studioSubdir + [string] $studioSubdir, + [string] $protonRoot ) $out = @("# # Launch $slnName in $studioVersion $vsPlatform ($nBits-bit) environment # -$global:txtPath = ""$boostRoot\lib;$global:txtPath"" +$global:txtPath = ""$protonRoot\bin;$boostRoot\lib;$global:txtPath"" $global:txtQR = ""$buildRoot"" +$global:txtPR = ""$protonRoot"" $global:txtWH ""Launch $slnName in $studioVersion $vsPlatform ($nBits-bit) environment."" $cppDir\bindings\qpid\dotnet\$vsSubdir\$slnName ") @@ -328,7 +366,8 @@ function WriteDotnetBindingEnvSetupBat [string] $outfileName, [string] $studioVersion, [string] $studioSubdir, - [string] $cmakeLine + [string] $cmakeLine, + [string] $protonRoot ) $out = @("@ECHO OFF @@ -344,8 +383,9 @@ REM The solution was generated with cmake command line: REM $cmakeLine ECHO %PATH% | FINDSTR /I boost > NUL IF %ERRORLEVEL% EQU 0 ECHO WARNING: Boost is defined in your path multiple times! -SET PATH=$boostRoot\lib;%PATH% +SET PATH=$protonRoot\bin;$boostRoot\lib;%PATH% SET QPID_BUILD_ROOT=$buildRoot +SET PROTON_ROOT=$protonRoot ECHO Environment set for $slnName $studioVersion $vsPlatform $nBits-bit development. ") Write-Host " $buildRoot\$outfileName" @@ -441,7 +481,7 @@ function SelectVisualStudioVersion { SelectVisualStudioVersion ############################# -# User dialog to get optional 32-bit boost and build paths +# User dialog to get optional 32-bit boost, proton, and build paths # $boost32 = Select-Folder -message "Select 32-bit BOOST_ROOT folder for $global:vsVersion build. Press CANCEL to skip 32-bit processing." @@ -456,6 +496,18 @@ if ($defined32) { $make32 = $false if ($defined32) { + $proton32folder = Select-Folder -message "Select 32-bit Proton install folder for $global:vsVersion build." + + $found = ($proton32folder -ne $null) -and ($proton32folder -ne '') + if ($found) { + $found = SanityCheckProtonInstallPath $proton32folder + } + if ($found) { + $proton32cmake = """-DPROTON_ROOT=$proton32folder""" + } else { + $proton32cmake = "" + } + $build32 = Select-Folder -message "Select 32-bit QPID_BUILD_ROOT folder for $global:vsVersion build." -path $projRoot $found = ($build32 -ne $null) -and ($build32 -ne '') @@ -472,7 +524,7 @@ if ($defined32) { } ############################# -# User dialog to get optional 64-bit boost and build paths +# User dialog to get optional 64-bit boost, proton, and build paths # $boost64 = Select-Folder -message "Select 64-bit BOOST_ROOT folder for $global:vsVersion build. Press CANCEL to skip 64-bit processing." @@ -486,6 +538,18 @@ if ($defined64) { $make64 = $false if ($defined64) { + $proton64folder = Select-Folder -message "Select 64-bit Proton install folder for $global:vsVersion build." + + $found = ($proton64folder -ne $null) -and ($proton64folder -ne '') + if ($found) { + $found = SanityCheckProtonInstallPath $proton64folder + } + if ($found) { + $proton64cmake = """-DPROTON_ROOT=$proton64folder""" + } else { + $proton64cmake = "" + } + $build64 = Select-Folder -message "Select 64-bit QPID_BUILD_ROOT folder for $global:vsVersion build." -path $projRoot $found = ($build64 -ne $null) -and ($build64 -ne '') @@ -508,10 +572,9 @@ if ($defined64) { # if ($make32) { cd "$build32" - Write-Host "Running 32-bit CMake in $build32 ..." - $global:cmakeCommandLine32 = "CMake -G ""$global:cmakeGenerator"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_x86"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost32"" $cppDir" - Write-Host "$global:cmakeCommadLine32" - CMake -G "$global:cmakeGenerator" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_x86" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost32" $cppDir + $global:cmakeCommandLine32 = "CMake -G ""$global:cmakeGenerator"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_$build32"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost32"" $proton32cmake $cppDir" + Write-Host "Running 32-bit CMake in $build32 : $global:cmakeCommandLine32" + CMake -G "$global:cmakeGenerator" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_$build32" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost32" $proton32cmake $cppDir } else { Write-Host "Skipped 32-bit CMake." } @@ -522,9 +585,10 @@ if ($make32) { if ($make64) { cd "$build64" Write-Host "Running 64-bit CMake in $build64" - $global:cmakeCommandLine64 = "CMake -G ""$global:cmakeGenerator Win64"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_x64"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost64"" $cppDir" - Write-Host "$global:cmakeCommadLine64" - CMake -G "$global:cmakeGenerator Win64" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_x64" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost64" $cppDir + $global:cmakeCommandLine64 = "CMake -G ""$global:cmakeGenerator Win64"" ""-DBUILD_DOCS=No"" ""-DCMAKE_INSTALL_PREFIX=install_$build64"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost64"" $proton64cmake $cppDir" + Write-Host $global:cmakeCommandLine64 + Write-Host "" + CMake -G "$global:cmakeGenerator Win64" "-DBUILD_DOCS=No" "-DCMAKE_INSTALL_PREFIX=install_$build64" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost64" $proton64cmake $cppDir } else { Write-Host "Skipped 64-bit CMake." } @@ -549,7 +613,8 @@ if ($defined32) { -nBits "32" ` -outfileName "start-devenv-messaging-$global:vsSubdir-x86-32bit.ps1" ` -studioVersion "$global:vsVersion" ` - -studioSubdir "$global:vsSubdir" + -studioSubdir "$global:vsSubdir" ` + -protonRoot "$proton32folder" ########### @@ -577,7 +642,8 @@ if ($defined32) { -outfileName "setenv-messaging-$global:vsSubdir-x86-32bit.bat" ` -studioVersion "$global:vsVersion" ` -studioSubdir "$global:vsSubdir" ` - -cmakeLine "$global:cmakeCommandLine32" + -cmakeLine "$global:cmakeCommandLine32" ` + -protonRoot "$proton32folder" } else { Write-Host "Skipped writing 32-bit scripts." @@ -615,7 +681,8 @@ if ($defined64) { -psScriptName "start-devenv-messaging-$global:vsSubdir-x64-64bit.ps1" ` -outfileName "start-devenv-messaging-$global:vsSubdir-x64-64bit.bat" ` -studioVersion "$global:vsVersion" ` - -studioSubdir "$global:vsSubdir" + -studioSubdir "$global:vsSubdir" ` + -protonRoot "$proton64folder" ########### # Batch script (that you CALL from a command prompt) @@ -629,7 +696,8 @@ if ($defined64) { -outfileName "setenv-messaging-$global:vsSubdir-x64-64bit.bat" ` -studioVersion "$global:vsVersion" ` -studioSubdir "$global:vsSubdir" ` - -cmakeLine "$global:cmakeCommandLine64" + -cmakeLine "$global:cmakeCommandLine64" ` + -protonRoot "$proton64folder" } else { Write-Host "Skipped writing 64-bit scripts." -- cgit v1.2.1 From 561105d8014ff4c9e6ad13a713c6ecf91d80a09b Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 30 Jan 2014 23:10:08 +0000 Subject: QPID-5485: By default use a subdirectory of the data dir called "pq" for page queue files. If neither data dir nor paging dir are set then error on any attempt to create a paged queue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563012 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Broker.cpp | 4 +-- qpid/cpp/src/qpid/broker/Broker.h | 1 + qpid/cpp/src/qpid/broker/PagedQueue.cpp | 17 +++++++----- qpid/cpp/src/qpid/broker/PagedQueue.h | 1 - qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp | 1 + .../cpp/src/qpid/broker/windows/BrokerDefaults.cpp | 1 + qpid/cpp/src/qpid/sys/MemoryMappedFile.h | 4 +-- qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp | 31 +++++++++++++++------- qpid/cpp/src/tests/run_paged_queue_tests | 2 +- 9 files changed, 40 insertions(+), 22 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 975acfdb87..7741d83665 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -217,7 +217,7 @@ Broker::Broker(const Broker::Options& conf) : store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), - pagingDir(conf.pagingDir), + pagingDir(!conf.pagingDir.empty() ? conf.pagingDir : !conf.noDataDir ? dataDir.getPath() + Options::DEFAULT_PAGED_QUEUE_DIR : std::string() ), queues(this), exchanges(this), links(this), @@ -387,7 +387,7 @@ Broker::Broker(const Broker::Options& conf) : std::string Broker::getPagingDirectoryPath() { - return pagingDir.isEnabled() ? pagingDir.getPath() : dataDir.getPath(); + return pagingDir.getPath(); } void Broker::declareStandardExchange(const std::string& name, const std::string& type) diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 4e46b71cc9..5923d6c0e5 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -86,6 +86,7 @@ class Broker : public sys::Runnable, public Plugin::Target, struct Options : public qpid::Options { static const std::string DEFAULT_DATA_DIR_LOCATION; static const std::string DEFAULT_DATA_DIR_NAME; + static const std::string DEFAULT_PAGED_QUEUE_DIR; QPID_BROKER_EXTERN Options(const std::string& name="Broker Options"); diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.cpp b/qpid/cpp/src/qpid/broker/PagedQueue.cpp index 43208d74ee..afb330489b 100644 --- a/qpid/cpp/src/qpid/broker/PagedQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PagedQueue.cpp @@ -91,13 +91,16 @@ PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, u : name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0), expiryPolicy(e) { - path = file.open(name, directory); - QPID_LOG(debug, "PagedQueue[" << path << "]"); + if (directory.empty()) { + throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified")); + } + file.open(name, directory); + QPID_LOG(debug, "PagedQueue[" << name << "]"); } PagedQueue::~PagedQueue() { - file.close(path); + file.close(); } size_t PagedQueue::size() @@ -133,7 +136,7 @@ bool PagedQueue::deleted(const QueueCursor& cursor) void PagedQueue::publish(const Message& added) { if (encodedSize(added) > pageSize) { - QPID_LOG(error, "Message is larger than page size for queue " << name << ", backed by " << path); + QPID_LOG(error, "Message is larger than page size for queue " << name); throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name)); } Used::reverse_iterator i = used.rbegin(); @@ -143,7 +146,7 @@ void PagedQueue::publish(const Message& added) } //used is empty or last page is full, need to add a new page if (!newPage(added.getSequence()).add(added)) { - QPID_LOG(error, "Could not add message to paged queue " << name << ", backed by " << path); + QPID_LOG(error, "Could not add message to paged queue " << name); throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name)); } } @@ -388,14 +391,14 @@ void PagedQueue::load(Page& page) } page.load(file, protocols, expiryPolicy); ++loaded; - QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded"); + QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded"); } void PagedQueue::unload(Page& page) { page.unload(file); --loaded; - QPID_LOG(debug, "PagedQueue[" << path << "] unloaded page, " << loaded << " pages now loaded"); + QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded"); } diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.h b/qpid/cpp/src/qpid/broker/PagedQueue.h index cb83fa9f34..e4c98f4119 100644 --- a/qpid/cpp/src/qpid/broker/PagedQueue.h +++ b/qpid/cpp/src/qpid/broker/PagedQueue.h @@ -79,7 +79,6 @@ class PagedQueue : public Messages { qpid::sys::MemoryMappedFile file; std::string name; - std::string path; const size_t pageSize; const uint maxLoaded; ProtocolRegistry& protocols; diff --git a/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp b/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp index 9e463fa32d..9e118c19f7 100644 --- a/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp +++ b/qpid/cpp/src/qpid/broker/posix/BrokerDefaults.cpp @@ -27,6 +27,7 @@ namespace broker { const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("/tmp"); const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("/.qpidd"); +const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("/pq"); std::string Broker::Options::getHome() { diff --git a/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp b/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp index b65440b5ad..260bd7c073 100644 --- a/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp +++ b/qpid/cpp/src/qpid/broker/windows/BrokerDefaults.cpp @@ -27,6 +27,7 @@ namespace broker { const std::string Broker::Options::DEFAULT_DATA_DIR_LOCATION("\\TEMP"); const std::string Broker::Options::DEFAULT_DATA_DIR_NAME("\\QPIDD.DATA"); +const std::string Broker::Options::DEFAULT_PAGED_QUEUE_DIR("\\PQ"); std::string Broker::Options::getHome() { diff --git a/qpid/cpp/src/qpid/sys/MemoryMappedFile.h b/qpid/cpp/src/qpid/sys/MemoryMappedFile.h index 43e4c852af..ecf38e98e6 100644 --- a/qpid/cpp/src/qpid/sys/MemoryMappedFile.h +++ b/qpid/cpp/src/qpid/sys/MemoryMappedFile.h @@ -38,11 +38,11 @@ class MemoryMappedFile { /** * Opens a file that can be mapped by region into memory */ - QPID_COMMON_EXTERN std::string open(const std::string& name, const std::string& directory); + QPID_COMMON_EXTERN void open(const std::string& name, const std::string& directory); /** * Closes and removes the file that can be mapped by region into memory */ - QPID_COMMON_EXTERN void close(const std::string& path); + QPID_COMMON_EXTERN void close(); /** * Returns the page size */ diff --git a/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp index f647ab3943..b4292aa4bc 100644 --- a/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp +++ b/qpid/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp @@ -29,13 +29,14 @@ namespace qpid { namespace sys { namespace { +const std::string PAGEFILE_PREFIX("pf_"); const std::string PATH_SEPARATOR("/"); const std::string ESCAPE("%"); const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-."); std::string getFileName(const std::string& name, const std::string& dir) { std::stringstream filename; - if (dir.size()) filename << dir << PATH_SEPARATOR; + if (dir.size()) filename << dir << PATH_SEPARATOR << PAGEFILE_PREFIX; size_t start = 0; while (true) { size_t i = name.find_first_not_of(VALID, start); @@ -55,31 +56,39 @@ std::string getFileName(const std::string& name, const std::string& dir) class MemoryMappedFilePrivate { friend class MemoryMappedFile; + std::string path; int fd; MemoryMappedFilePrivate() : fd(0) {} }; MemoryMappedFile::MemoryMappedFile() : state(new MemoryMappedFilePrivate) {} MemoryMappedFile::~MemoryMappedFile() { delete state; } -std::string MemoryMappedFile::open(const std::string& name, const std::string& directory) +void MemoryMappedFile::open(const std::string& name, const std::string& directory) { - std::string path = getFileName(name, directory); + // Ensure directory exists + if ( ::mkdir(directory.c_str(), S_IRWXU | S_IRGRP | S_IXGRP )!=0 && errno!=EEXIST ) { + throw qpid::Exception(QPID_MSG("Failed to create memory mapped file directory " << directory << ": " << qpid::sys::strError(errno))); + } + + state->path = getFileName(name, directory); - int flags = O_CREAT | O_EXCL | O_RDWR; - int fd = ::open(path.c_str(), flags, S_IRUSR | S_IWUSR); - if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]")); + int flags = O_CREAT | O_TRUNC | O_RDWR; + int fd = ::open(state->path.c_str(), flags, S_IRUSR | S_IWUSR); + if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << state->path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]")); state->fd = fd; - return path; } -void MemoryMappedFile::close(const std::string& path) + +void MemoryMappedFile::close() { ::close(state->fd); - ::unlink(path.c_str()); + ::unlink(state->path.c_str()); } + size_t MemoryMappedFile::getPageSize() { return ::sysconf(_SC_PAGE_SIZE); } + char* MemoryMappedFile::map(size_t offset, size_t size) { int protection = PROT_READ | PROT_WRITE; @@ -90,20 +99,24 @@ char* MemoryMappedFile::map(size_t offset, size_t size) return region; } + void MemoryMappedFile::unmap(char* region, size_t size) { ::munmap(region, size); } + void MemoryMappedFile::flush(char* region, size_t size) { ::msync(region, size, MS_ASYNC); } + void MemoryMappedFile::expand(size_t offset) { if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) { throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno))); } } + bool MemoryMappedFile::isSupported() { return true; diff --git a/qpid/cpp/src/tests/run_paged_queue_tests b/qpid/cpp/src/tests/run_paged_queue_tests index 8f72ddd5b7..cd1dc72641 100755 --- a/qpid/cpp/src/tests/run_paged_queue_tests +++ b/qpid/cpp/src/tests/run_paged_queue_tests @@ -26,7 +26,7 @@ trap stop_broker INT TERM QUIT export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH start_broker() { - QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker" + QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir --paging-dir=$PWD/pqtest_data $MODULES --auth no) || fail "Could not start broker" } stop_broker() { -- cgit v1.2.1 From 60057de2529d0aab0f5492be5a60f8ae87258e3f Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 31 Jan 2014 12:25:32 +0000 Subject: QPID-5529: remove failed send from delivery buffer git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563122 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 0bf8bef27f..59e1832cfd 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -95,11 +95,17 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: return true; } else { deliveries.push_back(Delivery(nextId++)); - Delivery& delivery = deliveries.back(); - delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); - *out = &delivery; - return true; + try { + Delivery& delivery = deliveries.back(); + delivery.encode(MessageImplAccess::get(message), address, setToOnSend); + delivery.send(sender, unreliable); + *out = &delivery; + return true; + } catch (const std::exception& e) { + deliveries.pop_back(); + --nextId; + throw SendError(e.what()); + } } } else { return false; -- cgit v1.2.1 From f7e49d4b3d51aa3b9c1e57032f1ade212552dbac Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Fri, 31 Jan 2014 15:51:15 +0000 Subject: QPID-5485: Fixes to broken original checkin - Fix signatures of dummy windows memory mapped file implementation - Fix detecting no data dir git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563152 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Broker.cpp | 4 +++- qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 7741d83665..7367336aff 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -217,7 +217,9 @@ Broker::Broker(const Broker::Options& conf) : store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), - pagingDir(!conf.pagingDir.empty() ? conf.pagingDir : !conf.noDataDir ? dataDir.getPath() + Options::DEFAULT_PAGED_QUEUE_DIR : std::string() ), + pagingDir(!conf.pagingDir.empty() ? conf.pagingDir : + dataDir.isEnabled() ? dataDir.getPath() + Options::DEFAULT_PAGED_QUEUE_DIR : + std::string() ), queues(this), exchanges(this), links(this), diff --git a/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp index 9bb3fa2320..60b3df7da6 100644 --- a/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp +++ b/qpid/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp @@ -27,11 +27,10 @@ class MemoryMappedFilePrivate {}; MemoryMappedFile::MemoryMappedFile() : state(0) {} MemoryMappedFile::~MemoryMappedFile() {} -std::string MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/) +void MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/) { - return std::string(); } -void MemoryMappedFile::close(const std::string& /*path*/) +void MemoryMappedFile::close() { } size_t MemoryMappedFile::getPageSize() -- cgit v1.2.1 From e39e8353b5321a8ddd9a19054e83822a7b629684 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Fri, 31 Jan 2014 15:52:15 +0000 Subject: QPID-5527: Upgrade to Jetty 8 Applied patch from Emmanuel Bourg. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563153 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/amqp-1-0-client-websocket/pom.xml | 20 ++++++------ qpid/java/broker-plugins/management-http/pom.xml | 20 ++++++------ .../server/management/plugin/HttpManagement.java | 4 +-- qpid/java/broker-plugins/websocket/pom.xml | 20 ++++++------ qpid/java/build.deps | 18 +++++------ qpid/java/ivy.retrieve.xml | 18 +++++------ qpid/java/jca/build.xml | 2 +- .../java/jca/example/build-geronimo-properties.xml | 3 +- qpid/java/jca/pom.xml | 4 +-- .../lib/poms/geronimo-servlet_2.5_spec-1.2.xml | 22 ------------- .../lib/poms/geronimo-servlet_3.0_spec-1.0.xml | 22 +++++++++++++ .../poms/jetty-continuation-7.6.10.v20130312.xml | 22 ------------- .../poms/jetty-continuation-8.1.14.v20131031.xml | 22 +++++++++++++ qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml | 28 ----------------- qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml | 28 +++++++++++++++++ qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml | 28 ----------------- qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml | 28 +++++++++++++++++ .../lib/poms/jetty-security-7.6.10.v20130312.xml | 28 ----------------- .../lib/poms/jetty-security-8.1.14.v20131031.xml | 28 +++++++++++++++++ .../lib/poms/jetty-server-7.6.10.v20130312.xml | 36 ---------------------- .../lib/poms/jetty-server-8.1.14.v20131031.xml | 36 ++++++++++++++++++++++ .../lib/poms/jetty-servlet-7.6.10.v20130312.xml | 28 ----------------- .../lib/poms/jetty-servlet-8.1.14.v20131031.xml | 28 +++++++++++++++++ qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml | 22 ------------- qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml | 22 +++++++++++++ .../lib/poms/jetty-websocket-7.6.10.v20130312.xml | 36 ---------------------- .../lib/poms/jetty-websocket-8.1.14.v20131031.xml | 36 ++++++++++++++++++++++ 27 files changed, 304 insertions(+), 305 deletions(-) delete mode 100644 qpid/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml create mode 100644 qpid/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml delete mode 100644 qpid/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml delete mode 100644 qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml delete mode 100644 qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml delete mode 100644 qpid/java/lib/poms/jetty-security-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-security-8.1.14.v20131031.xml delete mode 100644 qpid/java/lib/poms/jetty-server-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-server-8.1.14.v20131031.xml delete mode 100644 qpid/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml delete mode 100644 qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml delete mode 100644 qpid/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml create mode 100644 qpid/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml diff --git a/qpid/java/amqp-1-0-client-websocket/pom.xml b/qpid/java/amqp-1-0-client-websocket/pom.xml index 205e0d5ab7..3862fb0fc5 100644 --- a/qpid/java/amqp-1-0-client-websocket/pom.xml +++ b/qpid/java/amqp-1-0-client-websocket/pom.xml @@ -44,15 +44,15 @@ org.apache.geronimo.specs - geronimo-servlet_2.5_spec - 1.2 + geronimo-servlet_3.0_spec + 1.0 compile org.eclipse.jetty jetty-server - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -73,14 +73,14 @@ org.eclipse.jetty jetty-continuation - 7.6.10.v20130312 + 8.1.14.v20131031 compile org.eclipse.jetty jetty-security - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -93,7 +93,7 @@ org.eclipse.jetty jetty-http - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -105,7 +105,7 @@ org.eclipse.jetty jetty-io - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -118,7 +118,7 @@ org.eclipse.jetty jetty-servlet - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -131,14 +131,14 @@ org.eclipse.jetty jetty-util - 7.6.10.v20130312 + 8.1.14.v20131031 compile org.eclipse.jetty jetty-websocket - 7.6.10.v20130312 + 8.1.14.v20131031 compile diff --git a/qpid/java/broker-plugins/management-http/pom.xml b/qpid/java/broker-plugins/management-http/pom.xml index abc754902a..57b2dd863b 100644 --- a/qpid/java/broker-plugins/management-http/pom.xml +++ b/qpid/java/broker-plugins/management-http/pom.xml @@ -50,15 +50,15 @@ org.apache.geronimo.specs - geronimo-servlet_2.5_spec - 1.2 + geronimo-servlet_3.0_spec + 1.0 compile org.eclipse.jetty jetty-server - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -79,14 +79,14 @@ org.eclipse.jetty jetty-continuation - 7.6.10.v20130312 + 8.1.14.v20131031 compile org.eclipse.jetty jetty-security - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -99,7 +99,7 @@ org.eclipse.jetty jetty-http - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -112,7 +112,7 @@ org.eclipse.jetty jetty-io - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -125,7 +125,7 @@ org.eclipse.jetty jetty-servlet - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -138,14 +138,14 @@ org.eclipse.jetty jetty-util - 7.6.10.v20130312 + 8.1.14.v20131031 compile org.eclipse.jetty jetty-websocket - 7.6.10.v20130312 + 8.1.14.v20131031 compile diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 039114056f..3375a784ea 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -29,6 +29,7 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import javax.servlet.DispatcherType; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -77,7 +78,6 @@ import org.apache.qpid.server.plugin.PluginFactory; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.DispatcherType; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.SessionManager; import org.eclipse.jetty.server.nio.SelectChannelConnector; @@ -396,7 +396,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem root.addServlet(new ServletHolder(new LogFileServlet()), "/rest/logfile"); final SessionManager sessionManager = root.getSessionHandler().getSessionManager(); - sessionManager.setSessionCookie(JSESSIONID_COOKIE_PREFIX + lastPort); + sessionManager.getSessionCookieConfig().setName(JSESSIONID_COOKIE_PREFIX + lastPort); sessionManager.setMaxInactiveInterval((Integer)getAttribute(TIME_OUT)); return server; diff --git a/qpid/java/broker-plugins/websocket/pom.xml b/qpid/java/broker-plugins/websocket/pom.xml index 2029bd33aa..fb55be05c8 100644 --- a/qpid/java/broker-plugins/websocket/pom.xml +++ b/qpid/java/broker-plugins/websocket/pom.xml @@ -38,15 +38,15 @@ org.apache.geronimo.specs - geronimo-servlet_2.5_spec - 1.2 + geronimo-servlet_3.0_spec + 1.0 compile org.eclipse.jetty jetty-server - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -67,14 +67,14 @@ org.eclipse.jetty jetty-continuation - 7.6.10.v20130312 + 8.1.14.v20131031 compile org.eclipse.jetty jetty-security - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -87,7 +87,7 @@ org.eclipse.jetty jetty-http - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -100,7 +100,7 @@ org.eclipse.jetty jetty-io - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -113,7 +113,7 @@ org.eclipse.jetty jetty-servlet - 7.6.10.v20130312 + 8.1.14.v20131031 compile @@ -126,14 +126,14 @@ org.eclipse.jetty jetty-util - 7.6.10.v20130312 + 8.1.14.v20131031 compile org.eclipse.jetty jetty-websocket - 7.6.10.v20130312 + 8.1.14.v20131031 compile diff --git a/qpid/java/build.deps b/qpid/java/build.deps index 58dea7009e..4dc5b0ca46 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -35,7 +35,7 @@ geronimo-j2ee=lib/required/geronimo-j2ee-connector_1.5_spec-2.0.0.jar geronimo-jta=lib/required/geronimo-jta_1.1_spec-1.1.1.jar geronimo-kernel=lib/required/geronimo-kernel-2.2.1.jar geronimo-openejb=lib/required/geronimo-ejb_3.0_spec-1.0.1.jar -geronimo-servlet=lib/required/geronimo-servlet_2.5_spec-1.2.jar +geronimo-servlet=lib/required/geronimo-servlet_3.0_spec-1.0.jar junit=lib/required/junit-3.8.1.jar mockito-all=lib/required/mockito-all-1.9.0.jar @@ -49,14 +49,14 @@ slf4j-log4j=lib/required/slf4j-log4j12-1.6.4.jar xalan=lib/required/xalan-2.7.0.jar -jetty=lib/required/jetty-server-7.6.10.v20130312.jar -jetty-continuation=lib/required/jetty-continuation-7.6.10.v20130312.jar -jetty-security=lib/required/jetty-security-7.6.10.v20130312.jar -jetty-util=lib/required/jetty-util-7.6.10.v20130312.jar -jetty-io=lib/required/jetty-io-7.6.10.v20130312.jar -jetty-http=lib/required/jetty-http-7.6.10.v20130312.jar -jetty-servlet=lib/required/jetty-servlet-7.6.10.v20130312.jar -jetty-websocket=lib/required/jetty-websocket-7.6.10.v20130312.jar +jetty=lib/required/jetty-server-8.1.14.v20131031.jar +jetty-continuation=lib/required/jetty-continuation-8.1.14.v20131031.jar +jetty-security=lib/required/jetty-security-8.1.14.v20131031.jar +jetty-util=lib/required/jetty-util-8.1.14.v20131031.jar +jetty-io=lib/required/jetty-io-8.1.14.v20131031.jar +jetty-http=lib/required/jetty-http-8.1.14.v20131031.jar +jetty-servlet=lib/required/jetty-servlet-8.1.14.v20131031.jar +jetty-websocket=lib/required/jetty-websocket-8.1.14.v20131031.jar servlet-api=${geronimo-servlet} dojo-version=1.9.1 diff --git a/qpid/java/ivy.retrieve.xml b/qpid/java/ivy.retrieve.xml index 388e2d0dc4..59b3fa70af 100644 --- a/qpid/java/ivy.retrieve.xml +++ b/qpid/java/ivy.retrieve.xml @@ -49,7 +49,7 @@ - + @@ -61,14 +61,14 @@ - - - - - - - - + + + + + + + + diff --git a/qpid/java/jca/build.xml b/qpid/java/jca/build.xml index 7137467e4b..83cc781ba9 100644 --- a/qpid/java/jca/build.xml +++ b/qpid/java/jca/build.xml @@ -24,7 +24,7 @@ - + diff --git a/qpid/java/jca/example/build-geronimo-properties.xml b/qpid/java/jca/example/build-geronimo-properties.xml index a20753117f..3c84b7634a 100644 --- a/qpid/java/jca/example/build-geronimo-properties.xml +++ b/qpid/java/jca/example/build-geronimo-properties.xml @@ -87,7 +87,6 @@ - @@ -113,7 +112,7 @@ - + diff --git a/qpid/java/jca/pom.xml b/qpid/java/jca/pom.xml index 859b8aabac..c7a8de61fe 100644 --- a/qpid/java/jca/pom.xml +++ b/qpid/java/jca/pom.xml @@ -70,8 +70,8 @@ org.apache.geronimo.specs - geronimo-servlet_2.5_spec - 1.2 + geronimo-servlet_3.0_spec + 1.0 provided diff --git a/qpid/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml b/qpid/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml deleted file mode 100644 index 11228afcfa..0000000000 --- a/qpid/java/lib/poms/geronimo-servlet_2.5_spec-1.2.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - org.apache.geronimo.specs - geronimo-servlet_2.5_spec - 1.2 - diff --git a/qpid/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml b/qpid/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml new file mode 100644 index 0000000000..5e7093bb0a --- /dev/null +++ b/qpid/java/lib/poms/geronimo-servlet_3.0_spec-1.0.xml @@ -0,0 +1,22 @@ + + + + org.apache.geronimo.specs + geronimo-servlet_3.0_spec + 1.0 + diff --git a/qpid/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml deleted file mode 100644 index 5beba95d17..0000000000 --- a/qpid/java/lib/poms/jetty-continuation-7.6.10.v20130312.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - org.eclipse.jetty - jetty-continuation - 7.6.10.v20130312 - diff --git a/qpid/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml new file mode 100644 index 0000000000..10b7a4c499 --- /dev/null +++ b/qpid/java/lib/poms/jetty-continuation-8.1.14.v20131031.xml @@ -0,0 +1,22 @@ + + + + org.eclipse.jetty + jetty-continuation + 8.1.14.v20131031 + diff --git a/qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml deleted file mode 100644 index 5c840bedd6..0000000000 --- a/qpid/java/lib/poms/jetty-http-7.6.10.v20130312.xml +++ /dev/null @@ -1,28 +0,0 @@ - - - - org.eclipse.jetty - jetty-http - 7.6.10.v20130312 - - - org.eclipse.jetty - jetty-io - - - diff --git a/qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml new file mode 100644 index 0000000000..929fcbef3a --- /dev/null +++ b/qpid/java/lib/poms/jetty-http-8.1.14.v20131031.xml @@ -0,0 +1,28 @@ + + + + org.eclipse.jetty + jetty-http + 8.1.14.v20131031 + + + org.eclipse.jetty + jetty-io + + + diff --git a/qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml deleted file mode 100644 index 9cec3998ea..0000000000 --- a/qpid/java/lib/poms/jetty-io-7.6.10.v20130312.xml +++ /dev/null @@ -1,28 +0,0 @@ - - - - org.eclipse.jetty - jetty-io - 7.6.10.v20130312 - - - org.eclipse.jetty - jetty-util - - - diff --git a/qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml new file mode 100644 index 0000000000..42be6ad6ab --- /dev/null +++ b/qpid/java/lib/poms/jetty-io-8.1.14.v20131031.xml @@ -0,0 +1,28 @@ + + + + org.eclipse.jetty + jetty-io + 8.1.14.v20131031 + + + org.eclipse.jetty + jetty-util + + + diff --git a/qpid/java/lib/poms/jetty-security-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-security-7.6.10.v20130312.xml deleted file mode 100644 index 9501750ba0..0000000000 --- a/qpid/java/lib/poms/jetty-security-7.6.10.v20130312.xml +++ /dev/null @@ -1,28 +0,0 @@ - - - - org.eclipse.jetty - jetty-security - 7.6.10.v20130312 - - - org.eclipse.jetty - jetty-server - - - diff --git a/qpid/java/lib/poms/jetty-security-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-security-8.1.14.v20131031.xml new file mode 100644 index 0000000000..8079c78d96 --- /dev/null +++ b/qpid/java/lib/poms/jetty-security-8.1.14.v20131031.xml @@ -0,0 +1,28 @@ + + + + org.eclipse.jetty + jetty-security + 8.1.14.v20131031 + + + org.eclipse.jetty + jetty-server + + + diff --git a/qpid/java/lib/poms/jetty-server-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-server-7.6.10.v20130312.xml deleted file mode 100644 index 587860b50f..0000000000 --- a/qpid/java/lib/poms/jetty-server-7.6.10.v20130312.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - org.eclipse.jetty - jetty-server - 7.6.10.v20130312 - - - org.eclipse.jetty.orbit - javax.servlet - - - org.eclipse.jetty - jetty-continuation - - - org.eclipse.jetty - jetty-http - - - diff --git a/qpid/java/lib/poms/jetty-server-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-server-8.1.14.v20131031.xml new file mode 100644 index 0000000000..5b8160efd4 --- /dev/null +++ b/qpid/java/lib/poms/jetty-server-8.1.14.v20131031.xml @@ -0,0 +1,36 @@ + + + + org.eclipse.jetty + jetty-server + 8.1.14.v20131031 + + + org.eclipse.jetty.orbit + javax.servlet + + + org.eclipse.jetty + jetty-continuation + + + org.eclipse.jetty + jetty-http + + + diff --git a/qpid/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml deleted file mode 100644 index 4c0ff0a41b..0000000000 --- a/qpid/java/lib/poms/jetty-servlet-7.6.10.v20130312.xml +++ /dev/null @@ -1,28 +0,0 @@ - - - - org.eclipse.jetty - jetty-servlet - 7.6.10.v20130312 - - - org.eclipse.jetty - jetty-security - - - diff --git a/qpid/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml new file mode 100644 index 0000000000..5abcf03a18 --- /dev/null +++ b/qpid/java/lib/poms/jetty-servlet-8.1.14.v20131031.xml @@ -0,0 +1,28 @@ + + + + org.eclipse.jetty + jetty-servlet + 8.1.14.v20131031 + + + org.eclipse.jetty + jetty-security + + + diff --git a/qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml deleted file mode 100644 index f5c990248f..0000000000 --- a/qpid/java/lib/poms/jetty-util-7.6.10.v20130312.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - - org.eclipse.jetty - jetty-util - 7.6.10.v20130312 - diff --git a/qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml new file mode 100644 index 0000000000..e134444e44 --- /dev/null +++ b/qpid/java/lib/poms/jetty-util-8.1.14.v20131031.xml @@ -0,0 +1,22 @@ + + + + org.eclipse.jetty + jetty-util + 8.1.14.v20131031 + diff --git a/qpid/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml b/qpid/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml deleted file mode 100644 index 4d3ebd1666..0000000000 --- a/qpid/java/lib/poms/jetty-websocket-7.6.10.v20130312.xml +++ /dev/null @@ -1,36 +0,0 @@ - - - - org.eclipse.jetty - jetty-websocket - 7.6.10.v20130312 - - - org.eclipse.jetty - jetty-util - - - org.eclipse.jetty - jetty-io - - - org.eclipse.jetty - jetty-http - - - diff --git a/qpid/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml b/qpid/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml new file mode 100644 index 0000000000..1592ca3d56 --- /dev/null +++ b/qpid/java/lib/poms/jetty-websocket-8.1.14.v20131031.xml @@ -0,0 +1,36 @@ + + + + org.eclipse.jetty + jetty-websocket + 8.1.14.v20131031 + + + org.eclipse.jetty + jetty-util + + + org.eclipse.jetty + jetty-io + + + org.eclipse.jetty + jetty-http + + + -- cgit v1.2.1 From f0e4a6536341cacdc4a1e7e4ce3f33f878cd1014 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Fri, 31 Jan 2014 21:05:26 +0000 Subject: QPID-5485: If no directory for paging file is enabled print warning (as if memory mapped files were not supported) but still create a queue. - Also improve DataDir const correctness. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563256 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/DataDir.cpp | 2 +- qpid/cpp/src/qpid/DataDir.h | 6 +++--- qpid/cpp/src/qpid/broker/Broker.cpp | 5 ----- qpid/cpp/src/qpid/broker/Broker.h | 4 ++-- qpid/cpp/src/qpid/broker/QueueFactory.cpp | 4 +++- qpid/cpp/src/qpid/legacystore/StorePlugin.cpp | 2 +- qpid/cpp/src/qpid/linearstore/StorePlugin.cpp | 2 +- qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp | 2 +- 8 files changed, 12 insertions(+), 15 deletions(-) diff --git a/qpid/cpp/src/qpid/DataDir.cpp b/qpid/cpp/src/qpid/DataDir.cpp index cfdb536d2c..546df3dabd 100644 --- a/qpid/cpp/src/qpid/DataDir.cpp +++ b/qpid/cpp/src/qpid/DataDir.cpp @@ -26,7 +26,7 @@ namespace qpid { -DataDir::DataDir (std::string path) : +DataDir::DataDir (const std::string& path) : enabled (!path.empty ()), dirPath (path) { diff --git a/qpid/cpp/src/qpid/DataDir.h b/qpid/cpp/src/qpid/DataDir.h index 828299f3ba..ec73d28796 100644 --- a/qpid/cpp/src/qpid/DataDir.h +++ b/qpid/cpp/src/qpid/DataDir.h @@ -42,11 +42,11 @@ class DataDir public: - QPID_COMMON_EXTERN DataDir (std::string path); + QPID_COMMON_EXTERN DataDir (const std::string& path); QPID_COMMON_EXTERN ~DataDir (); - bool isEnabled() { return enabled; } - const std::string& getPath() { return dirPath; } + bool isEnabled() const { return enabled; } + const std::string& getPath() const { return dirPath; } }; } // namespace qpid diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 7367336aff..73910ec6c4 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -387,11 +387,6 @@ Broker::Broker(const Broker::Options& conf) : } } -std::string Broker::getPagingDirectoryPath() -{ - return pagingDir.getPath(); -} - void Broker::declareStandardExchange(const std::string& name, const std::string& type) { bool storeEnabled = store.get() != NULL; diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 5923d6c0e5..13679bd623 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -238,11 +238,11 @@ class Broker : public sys::Runnable, public Plugin::Target, ExchangeRegistry& getExchanges() { return exchanges; } LinkRegistry& getLinks() { return links; } DtxManager& getDtxManager() { return dtxManager; } - DataDir& getDataDir() { return dataDir; } + const DataDir& getDataDir() { return dataDir; } + const DataDir& getPagingDir() { return pagingDir; } Options& getOptions() { return config; } ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; } ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; } - std::string getPagingDirectoryPath(); void setExpiryPolicy(const boost::intrusive_ptr& e) { expiryPolicy = e; } boost::intrusive_ptr getExpiryPolicy() { return expiryPolicy; } diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp index e60349edfb..08988ed4ac 100644 --- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp @@ -76,8 +76,10 @@ boost::shared_ptr QueueFactory::create(const std::string& name, const Que QPID_LOG(warning, "Cannot create paged queue without broker context"); } else if (!qpid::sys::MemoryMappedFile::isSupported()) { QPID_LOG(warning, "Cannot create paged queue; memory mapped file support not available on this platform"); + } else if ( !broker->getPagingDir().isEnabled() ) { + QPID_LOG(warning, "Cannot create paged queue; no paging directory enabled"); } else { - queue->messages = std::auto_ptr(new PagedQueue(name, broker->getPagingDirectoryPath(), + queue->messages = std::auto_ptr(new PagedQueue(name, broker->getPagingDir().getPath(), settings.maxPages ? settings.maxPages : 4, settings.pageFactor ? settings.pageFactor : 1, broker->getProtocolRegistry(), broker->getExpiryPolicy())); diff --git a/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp b/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp index f9b77ce02c..5cba10d0f9 100644 --- a/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp @@ -45,7 +45,7 @@ struct StorePlugin : public Plugin { Broker* broker = dynamic_cast(&target); if (!broker) return; store.reset(new MessageStoreImpl(broker)); - DataDir& dataDir = broker->getDataDir (); + const DataDir& dataDir = broker->getDataDir (); if (options.storeDir.empty ()) { if (!dataDir.isEnabled ()) diff --git a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp index 6dfb2056bf..d64b44b5e3 100644 --- a/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp +++ b/qpid/cpp/src/qpid/linearstore/StorePlugin.cpp @@ -46,7 +46,7 @@ struct StorePlugin : public Plugin { Broker* broker = dynamic_cast(&target); if (!broker) return; store.reset(new MessageStoreImpl(broker)); - DataDir& dataDir = broker->getDataDir (); + const DataDir& dataDir = broker->getDataDir (); if (options.storeDir.empty ()) { if (!dataDir.isEnabled ()) diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp index 512e71230b..5ff24e7d33 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp @@ -370,7 +370,7 @@ MSSqlClfsProvider::earlyInitialize(Plugin::Target &target) // Check the store dir option; if not specified, need to // grab the broker's data dir. if (options.storeDir.empty()) { - DataDir& dir = store->getBroker()->getDataDir(); + const DataDir& dir = store->getBroker()->getDataDir(); if (dir.isEnabled()) { options.storeDir = dir.getPath(); } -- cgit v1.2.1 From 1810245333a77d791b198e1b5556d553742b897c Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 31 Jan 2014 23:19:21 +0000 Subject: QPID-5525: [Java Broker Documentation] Add simple JMX documentation: simple guide to connecting with jconsole, simple jmx client example and MBean summary with links to interfaces source files. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563336 13f79535-47bb-0310-9956-ffa450edef68 --- .../Java-Broker-Configuring-And-Managing-JMX.xml | 306 +++++++++++++++++++-- qpid/doc/book/src/java-broker/commonEntities.xml | 6 + .../src/java-broker/images/JMX-Connect-MBeans.png | Bin 0 -> 66586 bytes .../src/java-broker/images/JMX-Connect-Remote.png | Bin 0 -> 66411 bytes 4 files changed, 293 insertions(+), 19 deletions(-) create mode 100644 qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.png create mode 100644 qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.png diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml b/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml index 5b4ea42d2b..026213c157 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-Configuring-And-Managing-JMX.xml @@ -1,4 +1,8 @@ + +%entities; +]> @@ -44,3 +45,8 @@ + + + + + diff --git a/qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.png b/qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.png new file mode 100644 index 0000000000..f766197166 Binary files /dev/null and b/qpid/doc/book/src/java-broker/images/JMX-Connect-MBeans.png differ diff --git a/qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.png b/qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.png new file mode 100644 index 0000000000..5fcf9dd497 Binary files /dev/null and b/qpid/doc/book/src/java-broker/images/JMX-Connect-Remote.png differ -- cgit v1.2.1 From c89a0948ac307c6b8526f9cd94ff33e5548157f2 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Fri, 31 Jan 2014 23:35:50 +0000 Subject: QPID-5514: [Java Broker Documentation] Change ports section to note that fact that only HTTP/JMX ports remain listening after port deletion. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563343 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/doc/book/src/java-broker/Java-Broker-Ports.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml b/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml index c322045336..37d675eacc 100644 --- a/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml +++ b/qpid/doc/book/src/java-broker/Java-Broker-Ports.xml @@ -63,7 +63,7 @@ - SSL can be enabled forPorts with protocols that support it by selecting the 'SSL' transport, at which + SSL can be enabled for Ports with protocols that support it by selecting the 'SSL' transport, at which point a configured KeyStore must also be selected for the Port. @@ -83,7 +83,7 @@ - Following deletion of an active Port, the port remains bound until the Broker is restarted. You should restart the broker + Following deletion of an active HTTP or JMX Port, the port remains bound until the Broker is restarted. You should restart the broker immediately if you require preventing new connections on the port or disconnecting existing clients. -- cgit v1.2.1 From e9e3e6281da5172965a53ea83753b998646a3ec3 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Sat, 1 Feb 2014 00:08:56 +0000 Subject: NO-JIRA: [JMS Client 0-8 documentation] Correct pub/sub example to use a integer price property (suitable for the inequality in the example's message selector). Correct typo in 'Maximum Delivery Count' section and add note that JMSXDeliveryCount optional property is not supported. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563355 13f79535-47bb-0310-9956-ffa450edef68 --- .../book/src/jms-client-0-8/JMS-Client-Examples.xml | 2 +- .../src/jms-client-0-8/JMS-Client-Understanding.xml | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml index d7dc481888..87abbb8bfb 100644 --- a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml +++ b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Examples.xml @@ -226,7 +226,7 @@ public class StocksExample { Message message = session.createMessage(); message.setStringProperty("instrument", "IBM"); - message.setStringProperty("price", "100"); + message.setIntProperty("price", 100); messageProducer.send(message); session.commit(); diff --git a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml index 4da75875a8..7113831f04 100644 --- a/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml +++ b/qpid/doc/book/src/jms-client-0-8/JMS-Client-Understanding.xml @@ -136,7 +136,7 @@ amqp://username:password@clientid/test For full details see - Note, that a single broker failover is enabled by default. If the failover behaviour is not desired it can be switched off + Note, that a single broker failover is enabled by default. If the failover behaviour is not desired it can be switched off by setting a failover option to nofailover as in the example below Connection URL configured with nofailover @@ -145,7 +145,7 @@ amqp://username:password@clientid/test ?brokerlist='tcp://localhost:15672?failover='nofailover']]> - +
@@ -284,7 +284,7 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap >MessageConsumer#receive() for Consumer B on the same Session, the application will hang indefinitely as even if messages suitable for B arrive at the Broker. Those messages can never be sent to the Session as no space is available in prefetch. - Please note, when the acknowlegement mode Session#SESSION_TRANSACTED + Please note, when the acknowlegement mode Session#SESSION_TRANSACTED or Session#CLIENT_ACKNOWLEDGE is set on a consuming session, the prefetched messages are released from the prefetch buffer on transaction commit/rollback (in case of acknowledgement mode Session#SESSION_TRANSACTED ) @@ -294,7 +294,7 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap the prefetched messages continue to remain in the prefetch buffer preventing the delivery of the following messages. As result, the application might stop the receiving of the messages until the transaction is committed/rolled back (for Session#SESSION_TRANSACTED ) - or received messages are acknowledged (for Session#CLIENT_ACKNOWLEDGE). + or received messages are acknowledged (for Session#CLIENT_ACKNOWLEDGE).
TemporaryQueues @@ -379,10 +379,10 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap linkend="JMS-Client-0-8-System-Properties-DefaultMandatoryTopic" >qpid.default_mandatory_topic for Queues and Topics respectively. - Please note, according to AMQP specifications the mandatory flag on a message tells the server + Please note, according to AMQP specifications the mandatory flag on a message tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message. Please, refer AMQP specifications - for more details. + for more details.
Close When No Route @@ -508,8 +508,10 @@ amqp://guest:guest@clientid/?brokerlist='localhost:5671?trust_store='/path/to/ap server. See Unhandling Undeliverable Messages within the Java Broker book for full details - of the functioning of this feature. + > Handling Undeliverable Messages within the Java Broker book for full details of + the functioning of this feature. + The optional JMS message header JMSXDeliveryCount is not + supported.
-- cgit v1.2.1 From 3b036d9cef353c142c1cb0aec31cb8e8f96fa314 Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Sat, 1 Feb 2014 10:16:39 +0000 Subject: QPID-5530: [legacystore] store_chk raises "Operation on non-existent record: operation=unlock; rid=.." on aborted DTX transaction in TplStore git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563387 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/tools/src/py/qpidstore/janal.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/qpid/tools/src/py/qpidstore/janal.py b/qpid/tools/src/py/qpidstore/janal.py index 1f89207b4d..231b283c05 100644 --- a/qpid/tools/src/py/qpidstore/janal.py +++ b/qpid/tools/src/py/qpidstore/janal.py @@ -215,9 +215,18 @@ class TxnMap(object): def _abort(self, xid): """Perform an abort operation for the given xid record""" - for fid, hdr, lock in self.__map[xid]: + for _, hdr, _ in self.__map[xid]: if isinstance(hdr, jrnl.DeqRec): - self.__emap.unlock(hdr.deq_rid) + try: + self.__emap.unlock(hdr.deq_rid) + except jerr.NonExistentRecordError as err: # Not in emap, look in current transaction op list (TPL) + found_rid = False + for _, hdr1, _ in self.__map[xid]: + if isinstance(hdr1, jrnl.EnqRec) and hdr1.rid == hdr.deq_rid: + found_rid = True + break + if not found_rid: # Not found in current transaction op list, re-throw error + raise err del self.__map[xid] def _commit(self, xid): -- cgit v1.2.1 From ade50f17b8ffea099f8fffaaf283b2412f393bce Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Sat, 1 Feb 2014 12:03:30 +0000 Subject: QPID-5532: [C++ broker] Add debug log when timeouting DTX transaction git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563403 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/DtxTimeout.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp index 58700846ef..f317df5713 100644 --- a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp +++ b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/DtxManager.h" #include "qpid/sys/Time.h" +#include "qpid/log/Statement.h" using namespace qpid::broker; @@ -31,5 +32,6 @@ DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _ void DtxTimeout::fire() { + QPID_LOG(debug, "DTX transaction timeouted, XID=" << xid << ", timeout=" << timeout); mgr.timedout(xid); } -- cgit v1.2.1 From 6823d23dbeca328f4e860538a52015bc9313a6db Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 1 Feb 2014 15:40:47 +0000 Subject: QPID-5504 : Moving routing to Exchange from session classes git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563431 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/AbstractExchange.java | 63 +++++++- .../qpid/server/exchange/DefaultExchange.java | 62 ++++++-- .../org/apache/qpid/server/exchange/Exchange.java | 17 ++- .../org/apache/qpid/server/queue/QueueEntry.java | 27 +--- .../apache/qpid/server/queue/QueueEntryImpl.java | 67 +++----- .../apache/qpid/server/queue/SimpleAMQQueue.java | 88 ++--------- .../qpid/server/exchange/TopicExchangeTest.java | 5 +- .../apache/qpid/server/queue/MockQueueEntry.java | 5 +- .../qpid/server/protocol/v0_10/ServerSession.java | 18 ++- .../protocol/v0_10/ServerSessionDelegate.java | 25 +-- .../server/protocol/v0_10/Subscription_0_10.java | 42 ++--- .../qpid/server/protocol/v0_8/AMQChannel.java | 170 +++++++++++---------- .../server/protocol/v1_0/ExchangeDestination.java | 48 +----- .../apache/qpid/server/store/MessageStoreTest.java | 24 +-- 14 files changed, 295 insertions(+), 366 deletions(-) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index b00d98637e..6a959df440 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -33,12 +33,14 @@ import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -374,9 +376,9 @@ public abstract class AbstractExchange implements Exchange return getBindings().size(); } - @Override - public final List route(final ServerMessage message, - final InstanceProperties instanceProperties) + + final List route(final ServerMessage message, + final InstanceProperties instanceProperties) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); @@ -416,6 +418,59 @@ public abstract class AbstractExchange implements Exchange return queues; } + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final BaseQueue.PostEnqueueAction postEnqueueAction) + { + List queues = route(message, instanceProperties); + + if(queues == null || queues.isEmpty()) + { + Exchange altExchange = getAlternateExchange(); + if(altExchange != null) + { + return altExchange.send(message, instanceProperties, txn, postEnqueueAction); + } + else + { + return 0; + } + } + else + { + final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); + + txn.enqueue(queues,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + for(int i = 0; i < baseQueues.length; i++) + { + try + { + baseQueues[i].enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + } + _reference.release(); + } + + public void onRollback() + { + _reference.release(); + } + }); + return queues.size(); + } + } + protected abstract List doRoute(final ServerMessage message, final InstanceProperties instanceProperties); @@ -679,4 +734,6 @@ public abstract class AbstractExchange implements Exchange public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException; } + + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index e2582019cd..71d0f8b4dd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -36,11 +36,14 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; public class DefaultExchange implements Exchange @@ -203,22 +206,6 @@ public class DefaultExchange implements Exchange } } - @Override - public List route(ServerMessage message, final InstanceProperties instanceProperties) - { - AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); - if(q == null) - { - List noQueues = Collections.emptyList(); - return noQueues; - } - else - { - return Collections.singletonList(q); - } - - } - @Override public boolean isBound(AMQQueue queue) { @@ -343,4 +330,47 @@ public class DefaultExchange implements Exchange { return _id; } + + public final int send(final ServerMessage message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final BaseQueue.PostEnqueueAction postEnqueueAction) + { + final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + if(q == null) + { + return 0; + } + else + { + txn.enqueue(q,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + q.enqueue(message, postEnqueueAction); + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + } + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 78455c9261..18e912e972 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; @@ -94,13 +95,17 @@ public interface Exchange extends ExchangeReferrer void close() throws AMQException; /** - * Returns a list of queues to which to route this message. If there are - * no queues the empty list must be returned. - * - * @return list of queues to which to route the message. + * Routes a message + * @param message the message to be routed + * @param instanceProperties the instance properties + * @param txn the transaction to enqueue within + * @param postEnqueueAction action to perform on the result of every enqueue (may be null) + * @return the number of queues in which the message was enqueued performed */ - List route(ServerMessage message, final InstanceProperties instanceProperties); - + int send(ServerMessage message, + InstanceProperties instanceProperties, + ServerTransaction txn, + BaseQueue.PostEnqueueAction postEnqueueAction); /** * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 80ccbe1649..2aa1d1f473 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -22,11 +22,11 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.ServerTransaction; -public interface QueueEntry extends Comparable +public interface QueueEntry extends MessageInstance, Comparable { @@ -177,26 +177,17 @@ public interface QueueEntry extends Comparable AMQQueue getQueue(); - ServerMessage getMessage(); - long getSize(); boolean getDeliveredToConsumer(); boolean expired() throws AMQException; - boolean isAvailable(); - - boolean isAcquired(); - - boolean acquire(); boolean acquire(Subscription sub); boolean acquiredBySubscription(); boolean isAcquiredBy(Subscription subscription); - void release(); - void setRedelivered(); boolean isRedelivered(); @@ -207,16 +198,7 @@ public interface QueueEntry extends Comparable boolean isRejectedBy(long subscriptionId); - void delete(); - - /** - * Returns true if entry is either DEQUED or DELETED state. - * - * @return true if entry is either DEQUED or DELETED state - */ - boolean isDeleted(); - - void routeToAlternate(); + int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn); boolean isQueueDeleted(); @@ -241,5 +223,4 @@ public interface QueueEntry extends Comparable Filterable asFilterable(); - InstanceProperties getInstanceProperties(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index ed61f1acf6..461d493437 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.txn.ServerTransaction; import java.util.EnumMap; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -250,7 +249,7 @@ public abstract class QueueEntryImpl implements QueueEntry } else if(acquire()) { - routeToAlternate(); + routeToAlternate(null, null); } } @@ -368,65 +367,43 @@ public abstract class QueueEntryImpl implements QueueEntry dispose(); } - public void routeToAlternate() + public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn) { final AMQQueue currentQueue = getQueue(); Exchange alternateExchange = currentQueue.getAlternateExchange(); - + boolean autocommit = txn == null; if (alternateExchange != null) { - List queues = alternateExchange.route(getMessage(), getInstanceProperties()); - final ServerMessage message = getMessage(); - if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null) + if(autocommit) { - queues = alternateExchange.getAlternateExchange().route(getMessage(), getInstanceProperties()); + txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); } + int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action); - - if (queues != null && queues.size() != 0) + txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action() { - final List rerouteQueues = queues; - ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore()); - - txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() + public void postCommit() { - public void postCommit() - { - try - { - for (BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - - } - }); - - txn.dequeue(currentQueue, message, new ServerTransaction.Action() - { - public void postCommit() - { - delete(); - } + delete(); + } - public void onRollback() - { + public void onRollback() + { - } - }); + } + }); + if(autocommit) + { txn.commit(); } + return enqueues; + + } + else + { + return 0; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index d63d1946d3..87d11a892e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1355,93 +1355,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - if(_alternateExchange != null) + + for(final QueueEntry entry : entries) { + // TODO log requeues with a post enqueue action + int requeues = entry.routeToAlternate(null, txn); - for(final QueueEntry entry : entries) + if(requeues == 0) { - - List queues = _alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); - if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null) - { - queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(), entry.getInstanceProperties()); - } - - final ServerMessage message = entry.getMessage(); - if(queues != null && queues.size() != 0) - { - final List rerouteQueues = queues; - txn.enqueue(rerouteQueues, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - try - { - for(BaseQueue queue : rerouteQueues) - { - queue.enqueue(message); - } - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - - } - }); - txn.dequeue(this, entry.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - entry.delete(); - } - - public void onRollback() - { - } - }); - } - + // TODO log discard } - - _alternateExchange.removeReference(this); } - else - { - // TODO log discard - - for(final QueueEntry entry : entries) - { - final ServerMessage message = entry.getMessage(); - if(message != null) - { - txn.dequeue(this, message, - new ServerTransaction.Action() - { - public void postCommit() - { - entry.delete(); - } + txn.commit(); - public void onRollback() - { - } - }); - } - } + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); } - txn.commit(); for (Task task : _deleteTaskList) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 7bd525c90f..764549626a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -312,10 +312,9 @@ public class TopicExchangeTest extends QpidTestCase private int routeMessage(String routingKey, long messageNumber) throws AMQException { - ServerMessage serverMessage = mock(ServerMessage.class); - when(serverMessage.getRoutingKey()).thenReturn(routingKey); - List queues = _exchange.route(serverMessage, InstanceProperties.EMPTY); ServerMessage message = mock(ServerMessage.class); + when(message.getRoutingKey()).thenReturn(routingKey); + List queues = _exchange.route(message, InstanceProperties.EMPTY); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 2e3231e208..d3c866f747 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.ServerTransaction; public class MockQueueEntry implements QueueEntry { @@ -62,9 +63,9 @@ public class MockQueueEntry implements QueueEntry } - public void routeToAlternate() + public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn) { - + return 0; } public boolean expired() throws AMQException diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index fe82f65115..bae5616042 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -53,6 +54,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -102,6 +104,14 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); + private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry entry) + { + entry.getQueue().checkCapacity(ServerSession.this); + } + }; public static interface MessageDispositionChangeListener { @@ -182,7 +192,9 @@ public class ServerSession extends Session return isCommandsFull(id); } - public void enqueue(final MessageTransferMessage message, final List queues) + public int enqueue(final MessageTransferMessage message, + final InstanceProperties instanceProperties, + final Exchange exchange) { if(_outstandingCredit.get() != UNLIMITED_CREDIT && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) @@ -190,10 +202,10 @@ public class ServerSession extends Session _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); } + int enqueues = exchange.send(message, instanceProperties, _transaction, _checkCapacityAction); getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; - _transaction.enqueue(queues,message, postTransactionAction); incrementOutstandingTxnsIfNecessary(); + return enqueues; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 973f706e0a..dcca696529 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -39,7 +39,6 @@ import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -337,28 +336,10 @@ public class ServerSessionDelegate extends SessionDelegate } }; - List queues = exchange.route(message, instanceProperties); - if(queues.isEmpty() && exchange.getAlternateExchange() != null) - { - final Exchange alternateExchange = exchange.getAlternateExchange(); - queues = alternateExchange.route(message, instanceProperties); - if (!queues.isEmpty()) - { - exchangeInUse = alternateExchange; - } - else - { - exchangeInUse = exchange; - } - } - else - { - exchangeInUse = exchange; - } + int enqueues = serverSession.enqueue(message, instanceProperties, exchange); - if(!queues.isEmpty()) + if(enqueues != 0) { - serverSession.enqueue(message, queues); storeMessage.flushToStore(); } else @@ -372,7 +353,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(), messageMetaData.getRoutingKey())); + serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchange.getName(), messageMetaData.getRoutingKey())); } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index 17d0e5cb64..357b565365 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -59,7 +59,6 @@ import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -544,7 +543,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr void reject(final QueueEntry entry) { entry.setRedelivered(); - entry.routeToAlternate(); + entry.routeToAlternate(null, null); if(entry.isAcquiredBy(this)) { entry.delete(); @@ -575,35 +574,36 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr protected void sendToDLQOrDiscard(QueueEntry entry) { - final Exchange alternateExchange = entry.getQueue().getAlternateExchange(); final LogActor logActor = CurrentActor.get(); final ServerMessage msg = entry.getMessage(); - if (alternateExchange != null) + + int requeues = entry.routeToAlternate(new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry requeueEntry) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getQueue().getName())); + } + }, null); + + if (requeues == 0) { - final List destinationQueues = alternateExchange.route(entry.getMessage(), entry.getInstanceProperties()); + final AMQQueue queue = entry.getQueue(); + final Exchange alternateExchange = queue.getAlternateExchange(); - if (destinationQueues == null || destinationQueues.isEmpty()) + if(alternateExchange != null) { - entry.delete(); - - logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), + alternateExchange.getName())); } else { - entry.routeToAlternate(); - - //output operational logging for each delivery post commit - for (final BaseQueue destinationQueue : destinationQueues) - { - logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName())); - } + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), + queue.getName(), + msg.getRoutingKey())); } } - else - { - entry.delete(); - logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); - } } private boolean isMaxDeliveryLimitReached(QueueEntry entry) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index b7dc105cb7..c6d4151628 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -165,6 +165,11 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F private final TransactionTimeoutHelper _transactionTimeoutHelper; private final UUID _id = UUID.randomUUID(); + + private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); + private final ImmediateAction _immediateAction = new ImmediateAction(); + + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { @@ -330,6 +335,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { + final boolean immediate = _currentMessage.getMessagePublishInfo().isImmediate(); + final InstanceProperties instanceProperties = new InstanceProperties() { @@ -341,7 +348,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F case EXPIRATION: return amqMessage.getExpiration(); case IMMEDIATE: - return _currentMessage.getMessagePublishInfo().isImmediate(); + return immediate; case PERSISTENT: return amqMessage.isPersistent(); case MANDATORY: @@ -353,21 +360,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } }; - final List destinationQueues = - _currentMessage.getExchange().route(amqMessage, instanceProperties); - - if(destinationQueues == null || destinationQueues.isEmpty()) + int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction, + immediate ? _immediateAction : _capacityCheckAction); + if(enqueues == 0) { handleUnroutableMessage(amqMessage); } else { - _transaction.enqueue(destinationQueues, - amqMessage, - new MessageDeliveryAction(amqMessage, destinationQueues)); incrementOutstandingTxnsIfNecessary(); handle.flushToStore(); - } } } @@ -1258,7 +1260,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F if(immediate) { - action = new ImmediateAction(queue); + action = new ImmediateAction(); } else { @@ -1291,58 +1293,72 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _reference.release(); } - private class ImmediateAction implements BaseQueue.PostEnqueueAction + + } + private class ImmediateAction implements BaseQueue.PostEnqueueAction + { + + public ImmediateAction() { - private final BaseQueue _queue; + } - public ImmediateAction(BaseQueue queue) - { - _queue = queue; - } + public void onEnqueue(QueueEntry entry) + { + AMQQueue queue = entry.getQueue(); - public void onEnqueue(QueueEntry entry) + if (!entry.getDeliveredToConsumer() && entry.acquire()) { - if (!entry.getDeliveredToConsumer() && entry.acquire()) - { - - ServerTransaction txn = new LocalTransaction(_messageStore); - Collection entries = new ArrayList(1); - entries.add(entry); - final AMQMessage message = (AMQMessage) entry.getMessage(); - txn.dequeue(_queue, entry.getMessage(), - new MessageAcknowledgeAction(entries) + ServerTransaction txn = new LocalTransaction(_messageStore); + Collection entries = new ArrayList(1); + entries.add(entry); + final AMQMessage message = (AMQMessage) entry.getMessage(); + txn.dequeue(queue, entry.getMessage(), + new MessageAcknowledgeAction(entries) + { + @Override + public void postCommit() { - @Override - public void postCommit() + try { - try - { - final - ProtocolOutputConverter outputConverter = - _session.getProtocolOutputConverter(); - - outputConverter.writeReturn(message.getMessagePublishInfo(), - message.getContentHeaderBody(), - message, - _channelId, - AMQConstant.NO_CONSUMERS.getCode(), - IMMEDIATE_DELIVERY_REPLY_TEXT); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - super.postCommit(); + final + ProtocolOutputConverter outputConverter = + _session.getProtocolOutputConverter(); + + outputConverter.writeReturn(message.getMessagePublishInfo(), + message.getContentHeaderBody(), + message, + _channelId, + AMQConstant.NO_CONSUMERS.getCode(), + IMMEDIATE_DELIVERY_REPLY_TEXT); + } + catch (AMQException e) + { + throw new RuntimeException(e); } + super.postCommit(); } - ); - txn.commit(); - + } + ); + txn.commit(); - } } + else + { + queue.checkCapacity(AMQChannel.this); + } + + } + } + + private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction + { + @Override + public void onEnqueue(final QueueEntry entry) + { + AMQQueue queue = entry.getQueue(); + queue.checkCapacity(AMQChannel.this); } } @@ -1550,48 +1566,46 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F public void deadLetter(long deliveryTag) throws AMQException { final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap(); - final QueueEntry rejectedQueueEntry = unackedMap.get(deliveryTag); + final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag); if (rejectedQueueEntry == null) { _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag); - return; } else { final ServerMessage msg = rejectedQueueEntry.getMessage(); - final AMQQueue queue = rejectedQueueEntry.getQueue(); - - final Exchange altExchange = queue.getAlternateExchange(); - unackedMap.remove(deliveryTag); + int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction() + { + @Override + public void onEnqueue(final QueueEntry requeueEntry) + { + _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), + requeueEntry.getQueue().getName())); + } + }, null); - if (altExchange == null) + if(requeues == 0) { - _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - rejectedQueueEntry.delete(); - return; - } + final AMQQueue queue = rejectedQueueEntry.getQueue(); + final Exchange altExchange = queue.getAlternateExchange(); - final List destinationQueues = - altExchange.route(rejectedQueueEntry.getMessage(), rejectedQueueEntry.getInstanceProperties()); - - if (destinationQueues == null || destinationQueues.isEmpty()) - { - _logger.debug("Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); - rejectedQueueEntry.delete(); - return; - } - - rejectedQueueEntry.routeToAlternate(); + if (altExchange == null) + { + _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); - //output operational logging for each delivery post commit - for (final BaseQueue destinationQueue : destinationQueues) - { - _actor.message(_logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getName())); + } + else + { + _logger.debug( + "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: " + + deliveryTag); + _actor.message(_logSubject, + ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName())); + } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 3b981b46b8..3d030890e0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.amqp_1_0.type.messaging.Rejected; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; @@ -35,7 +36,8 @@ import org.apache.qpid.server.txn.ServerTransaction; public class ExchangeDestination implements ReceivingDestination, SendingDestination { private static final Accepted ACCEPTED = new Accepted(); - private static final Outcome[] OUTCOMES = { ACCEPTED }; + public static final Rejected REJECTED = new Rejected(); + private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED}; private Exchange _exchange; private TerminusDurability _durability; @@ -78,50 +80,10 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - List queues = _exchange.route(message, instanceProperties); + int enqueues = _exchange.send(message, instanceProperties, txn, null); - if(queues == null || queues.isEmpty()) - { - Exchange altExchange = _exchange.getAlternateExchange(); - if(altExchange != null) - { - queues = altExchange.route(message, instanceProperties); - } - } - - if(queues != null && !queues.isEmpty()) - { - final BaseQueue[] baseQueues = queues.toArray(new BaseQueue[queues.size()]); - - txn.enqueue(queues,message, new ServerTransaction.Action() - { - MessageReference _reference = message.newReference(); - - public void postCommit() - { - for(int i = 0; i < baseQueues.length; i++) - { - try - { - baseQueues[i].enqueue(message); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - _reference.release(); - } - - public void onRollback() - { - _reference.release(); - } - }); - } - return ACCEPTED; + return enqueues == 0 ? REJECTED : ACCEPTED; } TerminusDurability getDurability() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index 861b225e6f..f92a133919 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -624,32 +624,10 @@ public class MessageStoreTest extends QpidTestCase storedMessage.flushToStore(); final AMQMessage currentMessage = new AMQMessage(storedMessage); - final List destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY); ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - - trans.enqueue(destinationQueues, currentMessage, new ServerTransaction.Action() { - public void postCommit() - { - try - { - for(BaseQueue queue : destinationQueues) - { - queue.enqueue(currentMessage); - } - } - catch (AMQException e) - { - _logger.error("Problem enqueing message", e); - } - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); + exchange.send(currentMessage, InstanceProperties.EMPTY, trans, null); } -- cgit v1.2.1 From 0ab80c9e7799a1935aad8777d92c166f661993ec Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Sun, 2 Feb 2014 14:01:16 +0000 Subject: QPID-5121: Store module does not raise exception when attempting to enqueue a message bigger than the journal size git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563613 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp b/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp index 43461b66a3..64ee65f1ac 100644 --- a/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp +++ b/qpid/cpp/src/qpid/legacystore/jrnl/wrfc.cpp @@ -125,7 +125,8 @@ wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const u_int32_t fwd_dblks = subm_dblks + enq_dsize_dblks + _enq_cap_offs_dblks; u_int16_t findex = _fc_index; fcntl* fcp = _curr_fc; - bool in_use = false; + bool in_use = false; // at least one file contains an enqueued record + bool overwrite = false; // reached the original journal file we started with while (fwd_dblks && !(findex != _fc_index && fcp->enqcnt())) { fwd_dblks -= fwd_dblks > _fsize_dblks ? _fsize_dblks : fwd_dblks; @@ -133,12 +134,13 @@ wrfc::enq_threshold(const u_int32_t enq_dsize_dblks) const { if (++findex == _lpmp->num_jfiles()) findex = 0; + overwrite |= findex == _fc_index; fcp = _lpmp->get_fcntlp(findex); } in_use |= fcp->enqcnt() > 0; } // Return true if threshold exceeded - return findex != _fc_index && in_use; + return (findex != _fc_index && in_use) || overwrite; } bool wrfc::wr_reset() -- cgit v1.2.1 From 18d6b9bf6eb65a8ffcfd1d3f0893f31de1ea3ced Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Sun, 2 Feb 2014 14:51:12 +0000 Subject: QPID-5519: ACL property/properties for paged queues git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563628 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/acl/AclData.cpp | 14 ++++++++++++++ qpid/cpp/src/qpid/acl/AclValidator.cpp | 16 ++++++++++++++++ qpid/cpp/src/qpid/broker/AclModule.h | 29 ++++++++++++++++++++++++++++- qpid/cpp/src/qpid/broker/Broker.cpp | 3 +++ 4 files changed, 61 insertions(+), 1 deletion(-) diff --git a/qpid/cpp/src/qpid/acl/AclData.cpp b/qpid/cpp/src/qpid/acl/AclData.cpp index a205ddabfc..48b5e462e5 100644 --- a/qpid/cpp/src/qpid/acl/AclData.cpp +++ b/qpid/cpp/src/qpid/acl/AclData.cpp @@ -160,6 +160,16 @@ bool AclData::lookupMatchRule( // as rule's index. propertyMapItr lookupParamItr; switch (rulePropMapItr->first) { + case acl::SPECPROP_MAXPAGESLOWERLIMIT: + case acl::SPECPROP_MAXPAGESUPPERLIMIT: + lookupParamItr = params->find(PROP_MAXPAGES); + break; + + case acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT: + case acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT: + lookupParamItr = params->find(PROP_MAXPAGEFACTOR); + break; + case acl::SPECPROP_MAXQUEUECOUNTUPPERLIMIT: case acl::SPECPROP_MAXQUEUECOUNTLOWERLIMIT: lookupParamItr = params->find(PROP_MAXQUEUECOUNT); @@ -201,6 +211,8 @@ bool AclData::lookupMatchRule( case acl::SPECPROP_MAXQUEUESIZEUPPERLIMIT: case acl::SPECPROP_MAXFILECOUNTUPPERLIMIT: case acl::SPECPROP_MAXFILESIZEUPPERLIMIT: + case acl::SPECPROP_MAXPAGESUPPERLIMIT: + case acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT: limitChecked &= compareInt( rulePropMapItr->first, @@ -213,6 +225,8 @@ bool AclData::lookupMatchRule( case acl::SPECPROP_MAXQUEUESIZELOWERLIMIT: case acl::SPECPROP_MAXFILECOUNTLOWERLIMIT: case acl::SPECPROP_MAXFILESIZELOWERLIMIT: + case acl::SPECPROP_MAXPAGESLOWERLIMIT: + case acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT: limitChecked &= compareInt( rulePropMapItr->first, diff --git a/qpid/cpp/src/qpid/acl/AclValidator.cpp b/qpid/cpp/src/qpid/acl/AclValidator.cpp index a077667a33..89e072000e 100644 --- a/qpid/cpp/src/qpid/acl/AclValidator.cpp +++ b/qpid/cpp/src/qpid/acl/AclValidator.cpp @@ -110,6 +110,22 @@ namespace acl { boost::shared_ptr( new IntPropertyType(0,std::numeric_limits::max())))); + validators.insert(Validator(acl::SPECPROP_MAXPAGESLOWERLIMIT, + boost::shared_ptr( + new IntPropertyType(0,std::numeric_limits::max())))); + + validators.insert(Validator(acl::SPECPROP_MAXPAGESUPPERLIMIT, + boost::shared_ptr( + new IntPropertyType(0,std::numeric_limits::max())))); + + validators.insert(Validator(acl::SPECPROP_MAXPAGEFACTORLOWERLIMIT, + boost::shared_ptr( + new IntPropertyType(0,std::numeric_limits::max())))); + + validators.insert(Validator(acl::SPECPROP_MAXPAGEFACTORUPPERLIMIT, + boost::shared_ptr( + new IntPropertyType(0,std::numeric_limits::max())))); + std::string policyTypes[] = {"ring", "self-destruct", "reject"}; std::vector v(policyTypes, policyTypes + sizeof(policyTypes) / sizeof(std::string)); validators.insert(Validator(acl::SPECPROP_POLICYTYPE, diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h index c01697ace9..934a11789f 100644 --- a/qpid/cpp/src/qpid/broker/AclModule.h +++ b/qpid/cpp/src/qpid/broker/AclModule.h @@ -77,6 +77,9 @@ namespace acl { PROP_SCHEMAPACKAGE, PROP_SCHEMACLASS, PROP_POLICYTYPE, + PROP_PAGING, + PROP_MAXPAGES, + PROP_MAXPAGEFACTOR, PROP_MAXQUEUESIZE, PROP_MAXQUEUECOUNT, PROP_MAXFILESIZE, @@ -100,6 +103,7 @@ namespace acl { SPECPROP_SCHEMAPACKAGE = PROP_SCHEMAPACKAGE, SPECPROP_SCHEMACLASS = PROP_SCHEMACLASS, SPECPROP_POLICYTYPE = PROP_POLICYTYPE, + SPECPROP_PAGING = PROP_PAGING, SPECPROP_MAXQUEUESIZELOWERLIMIT, SPECPROP_MAXQUEUESIZEUPPERLIMIT, @@ -108,7 +112,11 @@ namespace acl { SPECPROP_MAXFILESIZELOWERLIMIT, SPECPROP_MAXFILESIZEUPPERLIMIT, SPECPROP_MAXFILECOUNTLOWERLIMIT, - SPECPROP_MAXFILECOUNTUPPERLIMIT }; + SPECPROP_MAXFILECOUNTUPPERLIMIT, + SPECPROP_MAXPAGESLOWERLIMIT, + SPECPROP_MAXPAGESUPPERLIMIT, + SPECPROP_MAXPAGEFACTORLOWERLIMIT, + SPECPROP_MAXPAGEFACTORUPPERLIMIT }; // AclResult shared between ACL spec and ACL authorise interface enum AclResult { @@ -229,6 +237,9 @@ namespace acl { if (str.compare("schemapackage") == 0) return PROP_SCHEMAPACKAGE; if (str.compare("schemaclass") == 0) return PROP_SCHEMACLASS; if (str.compare("policytype") == 0) return PROP_POLICYTYPE; + if (str.compare("paging") == 0) return PROP_PAGING; + if (str.compare("maxpages") == 0) return PROP_MAXPAGES; + if (str.compare("maxpagefactor") == 0) return PROP_MAXPAGEFACTOR; if (str.compare("maxqueuesize") == 0) return PROP_MAXQUEUESIZE; if (str.compare("maxqueuecount") == 0) return PROP_MAXQUEUECOUNT; if (str.compare("maxfilesize") == 0) return PROP_MAXFILESIZE; @@ -249,6 +260,9 @@ namespace acl { case PROP_SCHEMAPACKAGE: return "schemapackage"; case PROP_SCHEMACLASS: return "schemaclass"; case PROP_POLICYTYPE: return "policytype"; + case PROP_PAGING: return "paging"; + case PROP_MAXPAGES: return "maxpages"; + case PROP_MAXPAGEFACTOR: return "maxpagefactor"; case PROP_MAXQUEUESIZE: return "maxqueuesize"; case PROP_MAXQUEUECOUNT: return "maxqueuecount"; case PROP_MAXFILESIZE: return "maxfilesize"; @@ -270,6 +284,7 @@ namespace acl { if (str.compare("schemapackage") == 0) return SPECPROP_SCHEMAPACKAGE; if (str.compare("schemaclass") == 0) return SPECPROP_SCHEMACLASS; if (str.compare("policytype") == 0) return SPECPROP_POLICYTYPE; + if (str.compare("paging") == 0) return SPECPROP_PAGING; if (str.compare("queuemaxsizelowerlimit") == 0) return SPECPROP_MAXQUEUESIZELOWERLIMIT; if (str.compare("queuemaxsizeupperlimit") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT; if (str.compare("queuemaxcountlowerlimit") == 0) return SPECPROP_MAXQUEUECOUNTLOWERLIMIT; @@ -278,6 +293,10 @@ namespace acl { if (str.compare("filemaxsizeupperlimit") == 0) return SPECPROP_MAXFILESIZEUPPERLIMIT; if (str.compare("filemaxcountlowerlimit") == 0) return SPECPROP_MAXFILECOUNTLOWERLIMIT; if (str.compare("filemaxcountupperlimit") == 0) return SPECPROP_MAXFILECOUNTUPPERLIMIT; + if (str.compare("pageslowerlimit") == 0) return SPECPROP_MAXPAGESLOWERLIMIT; + if (str.compare("pagesupperlimit") == 0) return SPECPROP_MAXPAGESUPPERLIMIT; + if (str.compare("pagefactorlowerlimit") == 0) return SPECPROP_MAXPAGEFACTORLOWERLIMIT; + if (str.compare("pagefactorupperlimit") == 0) return SPECPROP_MAXPAGEFACTORUPPERLIMIT; // Allow old names in ACL file as aliases for newly-named properties if (str.compare("maxqueuesize") == 0) return SPECPROP_MAXQUEUESIZEUPPERLIMIT; if (str.compare("maxqueuecount") == 0) return SPECPROP_MAXQUEUECOUNTUPPERLIMIT; @@ -297,6 +316,7 @@ namespace acl { case SPECPROP_SCHEMAPACKAGE: return "schemapackage"; case SPECPROP_SCHEMACLASS: return "schemaclass"; case SPECPROP_POLICYTYPE: return "policytype"; + case SPECPROP_PAGING: return "paging"; case SPECPROP_MAXQUEUESIZELOWERLIMIT: return "queuemaxsizelowerlimit"; case SPECPROP_MAXQUEUESIZEUPPERLIMIT: return "queuemaxsizeupperlimit"; case SPECPROP_MAXQUEUECOUNTLOWERLIMIT: return "queuemaxcountlowerlimit"; @@ -305,6 +325,10 @@ namespace acl { case SPECPROP_MAXFILESIZEUPPERLIMIT: return "filemaxsizeupperlimit"; case SPECPROP_MAXFILECOUNTLOWERLIMIT: return "filemaxcountlowerlimit"; case SPECPROP_MAXFILECOUNTUPPERLIMIT: return "filemaxcountupperlimit"; + case SPECPROP_MAXPAGESLOWERLIMIT: return "pageslowerlimit"; + case SPECPROP_MAXPAGESUPPERLIMIT: return "pageslowerlimit"; + case SPECPROP_MAXPAGEFACTORLOWERLIMIT: return "pagefactorlowerlimit"; + case SPECPROP_MAXPAGEFACTORUPPERLIMIT: return "pagefactorlowerlimit"; default: assert(false); // should never get here } return ""; @@ -381,6 +405,9 @@ namespace acl { p4->insert(PROP_EXCLUSIVE); p4->insert(PROP_AUTODELETE); p4->insert(PROP_POLICYTYPE); + p4->insert(PROP_PAGING); + p4->insert(PROP_MAXPAGES); + p4->insert(PROP_MAXPAGEFACTOR); p4->insert(PROP_MAXQUEUESIZE); p4->insert(PROP_MAXQUEUECOUNT); diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 73910ec6c4..4017fdbfe3 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1301,6 +1301,9 @@ std::pair, bool> Broker::createQueue( params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy())); + params.insert(make_pair(acl::PROP_PAGING, settings.paging ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_MAXPAGES, boost::lexical_cast(settings.maxPages))); + params.insert(make_pair(acl::PROP_MAXPAGEFACTOR, boost::lexical_cast(settings.pageFactor))); params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast(settings.maxDepth.getCount()))); params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast(settings.maxDepth.getSize()))); params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast(settings.maxFileCount))); -- cgit v1.2.1 From af3acd46675d3597f209f2a0c6c2988c6c170c51 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Sun, 2 Feb 2014 20:19:08 +0000 Subject: NO-JIRA: add missing broker-core entries for the Ivy upload task git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563704 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/ivy.nexus.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/qpid/java/ivy.nexus.xml b/qpid/java/ivy.nexus.xml index e301bcb0cf..55b2d2d729 100644 --- a/qpid/java/ivy.nexus.xml +++ b/qpid/java/ivy.nexus.xml @@ -39,6 +39,12 @@ + + + + + + -- cgit v1.2.1 From ca612300f94b01958740eca34e9deee099351a84 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 3 Feb 2014 01:58:01 +0000 Subject: QPID-5504 : Adding missing class git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563755 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/message/MessageInstance.java | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java new file mode 100644 index 0000000000..afd7ff0269 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + + +public interface MessageInstance +{ + + boolean isAvailable(); + + boolean acquire(); + + boolean isAcquired(); + + void release(); + + void delete(); + + boolean isDeleted(); + + ServerMessage getMessage(); + + InstanceProperties getInstanceProperties(); +} -- cgit v1.2.1 From c3b93ca895efa1175e5891ba73b0407e421ad31e Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Mon, 3 Feb 2014 12:49:01 +0000 Subject: QPID-5534: [C++ broker] Headers exchange can route a message to one queue multiple times git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563863 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 7 ++++++- qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py | 6 ++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 19c7f107f6..585c7ba764 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -302,7 +302,12 @@ void HeadersExchange::route(Deliverable& msg) if (p.get()) { for (std::vector::const_iterator i = p->begin(); i != p->end(); ++i) { if (match(i->args, msg.getMessage())) { - b->push_back(i->binding); + /* check if a binding tothe same queue has not been already added to b */ + std::vector >::iterator bi = b->begin(); + while ((bi != b->end()) && ((*bi)->queue != i->binding->queue)) + ++bi; + if (bi == b->end()) + b->push_back(i->binding); } } } diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py index 315991d585..53de37f12c 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/exchange.py @@ -456,6 +456,12 @@ class HeadersExchangeTests(TestHelper): self.myBasicPublish({"irrelevant":0}) self.assertEmpty(self.q) + def testMultipleBindings(self): + self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="SomeKey", arguments={ 'x-match':'any', "name":"fred"}) + self.session.exchange_bind(queue="q", exchange="amq.match", binding_key="AnotherKey", arguments={ 'x-match':'all', "age":3}) + self.myAssertPublishGet({"name":"fred", "age":3}) + self.assertEmpty(self.q) + class MiscellaneousErrorsTests(TestHelper): """ -- cgit v1.2.1 From 1f71694af9d86eec479e41404d8cf8affd448a70 Mon Sep 17 00:00:00 2001 From: Pavel Moravec Date: Mon, 3 Feb 2014 12:59:37 +0000 Subject: QPID-5519: ACL property/properties for paged queues - fixed typo, added tests and documentation git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563866 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/design_docs/broker-acl-work.txt | 24 ++-- qpid/cpp/src/qpid/broker/AclModule.h | 4 +- qpid/cpp/src/tests/acl.py | 223 ++++++++++++++++++++++++++++++ qpid/doc/book/src/cpp-broker/Security.xml | 38 ++++- 4 files changed, 275 insertions(+), 14 deletions(-) diff --git a/qpid/cpp/design_docs/broker-acl-work.txt b/qpid/cpp/design_docs/broker-acl-work.txt index e89e446a56..e587dc5198 100644 --- a/qpid/cpp/design_docs/broker-acl-work.txt +++ b/qpid/cpp/design_docs/broker-acl-work.txt @@ -28,16 +28,20 @@ in memory and on disk. * Add property limit settings to CREATE QUEUE Acl rules. -User Option Acl Limit Property Units ---------------- ---------------------- --------------- -qpid.max_size queuemaxsizelowerlimit bytes - queuemaxsizeupperlimit bytes -qpid.max_count queuemaxcountlowerlimit messages - queuemaxcountupperlimit messages -qpid.file_size filemaxsizelowerlimit pages (64Kb per page) - filemaxsizeupperlimit pages (64Kb per page) -qpid.file_count filemaxcountlowerlimit files - filemaxcountupperlimit files +User Option Acl Limit Property Units +--------------- ---------------------- --------------- +qpid.max_size queuemaxsizelowerlimit bytes + queuemaxsizeupperlimit bytes +qpid.max_count queuemaxcountlowerlimit messages + queuemaxcountupperlimit messages +qpid.file_size filemaxsizelowerlimit pages (64Kb per page) + filemaxsizeupperlimit pages (64Kb per page) +qpid.file_count filemaxcountlowerlimit files + filemaxcountupperlimit files +qpid.max_pages_loaded pageslowerlimit pages + pagesupperlimit pages +qpid.page_factor pagefactorlowerlimit integer (multiple of the platform-defined page size) + pagefactorlowerlimit integer (multiple of the platform-defined page size) * Change rule match behavior to accomodate limit settings diff --git a/qpid/cpp/src/qpid/broker/AclModule.h b/qpid/cpp/src/qpid/broker/AclModule.h index 934a11789f..aa0ea0c6b0 100644 --- a/qpid/cpp/src/qpid/broker/AclModule.h +++ b/qpid/cpp/src/qpid/broker/AclModule.h @@ -326,9 +326,9 @@ namespace acl { case SPECPROP_MAXFILECOUNTLOWERLIMIT: return "filemaxcountlowerlimit"; case SPECPROP_MAXFILECOUNTUPPERLIMIT: return "filemaxcountupperlimit"; case SPECPROP_MAXPAGESLOWERLIMIT: return "pageslowerlimit"; - case SPECPROP_MAXPAGESUPPERLIMIT: return "pageslowerlimit"; + case SPECPROP_MAXPAGESUPPERLIMIT: return "pagesupperlimit"; case SPECPROP_MAXPAGEFACTORLOWERLIMIT: return "pagefactorlowerlimit"; - case SPECPROP_MAXPAGEFACTORUPPERLIMIT: return "pagefactorlowerlimit"; + case SPECPROP_MAXPAGEFACTORUPPERLIMIT: return "pagefactorupperlimit"; default: assert(false); // should never get here } return ""; diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py index 66705e6d24..c9b2db64db 100755 --- a/qpid/cpp/src/tests/acl.py +++ b/qpid/cpp/src/tests/acl.py @@ -671,6 +671,118 @@ class ACLTests(TestBase010): self.fail(result) + def test_illegal_pages_lower_limit_spec(self): + """ + Test illegal paged queue policy + """ + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pageslowerlimit=-1\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "-1 is not a valid value for 'pageslowerlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pageslowerlimit=9223372036854775808\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "9223372036854775808 is not a valid value for 'pageslowerlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + + def test_illegal_pages_upper_limit_spec(self): + """ + Test illegal paged queue policy + """ + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pagesupperlimit=-1\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "-1 is not a valid value for 'pagesupperlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pagesupperlimit=9223372036854775808\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "9223372036854775808 is not a valid value for 'pagesupperlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + + def test_illegal_pagefactor_lower_limit_spec(self): + """ + Test illegal paged queue policy + """ + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pagefactorlowerlimit=-1\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "-1 is not a valid value for 'pagefactorlowerlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pagefactorlowerlimit=9223372036854775808\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "9223372036854775808 is not a valid value for 'pagefactorlowerlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + + def test_illegal_pagefactor_upper_limit_spec(self): + """ + Test illegal paged queue policy + """ + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pagefactorupperlimit=-1\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "-1 is not a valid value for 'pagefactorupperlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=q2 pagefactorupperlimit=9223372036854775808\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + expected = "9223372036854775808 is not a valid value for 'pagefactorupperlimit', " \ + "values should be between 0 and 9223372036854775807"; + if (result.find(expected) == -1): + self.fail(result) + + #===================================== # ACL queue tests #===================================== @@ -687,6 +799,7 @@ class ACLTests(TestBase010): aclf.write('acl deny bob@QPID purge queue name=q3\n') aclf.write('acl deny bob@QPID delete queue name=q4\n') aclf.write('acl deny bob@QPID create queue name=q5 maxqueuesize=1000 maxqueuecount=100\n') + aclf.write('acl deny bob@QPID create queue name=q6 paging=true\n') aclf.write('acl allow all all') aclf.close() @@ -737,6 +850,15 @@ class ACLTests(TestBase010): self.assertEqual(403,e.args[0].error_code) session = self.get_session('bob','bob') + try: + queue_options = {} + queue_options["qpid.paging"] = True + session.queue_declare(queue="q6", arguments=queue_options) + self.fail("ACL should deny queue create request with name=q6, qpid.paging=True"); + except qpid.session.SessionException, e: + self.assertEqual(403,e.args[0].error_code) + session = self.get_session('bob','bob') + try: queue_options = {} queue_options["qpid.max_count"] = 200 @@ -971,6 +1093,107 @@ class ACLTests(TestBase010): if (403 == e.args[0].error_code): self.fail("ACL should allow queue delete request for q4"); + #===================================== + # ACL paged tests + #===================================== + + def test_paged_allow_mode(self): + """ + Test cases for paged acl in allow mode + """ + aclf = self.get_acl_file() + aclf.write('acl deny bob@QPID create queue name=qf1 pageslowerlimit=1000\n') + aclf.write('acl deny bob@QPID create queue name=qf2 pagesupperlimit=100\n') + aclf.write('acl deny bob@QPID create queue name=qf3 pagefactorlowerlimit=10\n') + aclf.write('acl deny bob@QPID create queue name=qf4 pagefactorupperlimit=1\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + try: + queue_options = {} + queue_options["qpid.paging"] = True + queue_options["qpid.max_pages_loaded"] = 500 + session.queue_declare(queue="qf1", arguments=queue_options) + self.fail("ACL should deny queue create request with name=qf1, qpid.paging=True, qpid.max_pages_loaded=500"); + except qpid.session.SessionException, e: + self.assertEqual(403,e.args[0].error_code) + session = self.get_session('bob','bob') + + try: + queue_options = {} + queue_options["qpid.paging"] = True + queue_options["qpid.max_pages_loaded"] = 500 + session.queue_declare(queue="qf2", arguments=queue_options) + self.fail("ACL should deny queue create request with name=qf2, qpid.paging=True, qpid.max_pages_loaded=500"); + except qpid.session.SessionException, e: + self.assertEqual(403,e.args[0].error_code) + session = self.get_session('bob','bob') + + try: + queue_options = {} + queue_options["qpid.paging"] = True + queue_options["qpid.page_factor"] = 5 + session.queue_declare(queue="qf3", arguments=queue_options) + self.fail("ACL should deny queue create request with name=qf3, qpid.paging=True, qpid.page_factor=5"); + except qpid.session.SessionException, e: + self.assertEqual(403,e.args[0].error_code) + session = self.get_session('bob','bob') + + try: + queue_options = {} + queue_options["qpid.paging"] = True + queue_options["qpid.page_factor"] = 5 + session.queue_declare(queue="qf4", arguments=queue_options) + self.fail("ACL should deny queue create request with name=qf4, qpid.paging=True, qpid.page_factor=5"); + except qpid.session.SessionException, e: + self.assertEqual(403,e.args[0].error_code) + session = self.get_session('bob','bob') + + + def test_paged_deny_mode(self): + """ + Test cases for paged acl in deny mode + """ + aclf = self.get_acl_file() + aclf.write('acl allow bob@QPID create queue name=qf1 pageslowerlimit=100 pagesupperlimit=1000\n') + aclf.write('acl allow bob@QPID create queue name=qf2 pagefactorlowerlimit=1 pagefactorupperlimit=10\n') + aclf.write('acl allow anonymous all all\n') + aclf.write('acl deny all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + try: + queue_options = {} + queue_options["qpid.paging"] = True + queue_options["qpid.max_pages_loaded"] = 500 + session.queue_declare(queue="qf1", arguments=queue_options) + except qpid.session.SessionException, e: + if (403 == e.args[0].error_code): + self.fail("ACL should allow queue create request with name=qf1, qpid.paging=True, qpid.max_pages_loaded=500"); + session = self.get_session('bob','bob') + + try: + queue_options = {} + queue_options["qpid.paging"] = True + queue_options["qpid.page_factor"] = 5 + session.queue_declare(queue="qf2", arguments=queue_options) + except qpid.session.SessionException, e: + if (403 == e.args[0].error_code): + self.fail("ACL should allow queue create request with name=qf2, qpid.paging=True, qpid.page_factor=5"); + session = self.get_session('bob','bob') + + #===================================== # ACL file tests #===================================== diff --git a/qpid/doc/book/src/cpp-broker/Security.xml b/qpid/doc/book/src/cpp-broker/Security.xml index 7bf7034996..00795a05d8 100644 --- a/qpid/doc/book/src/cpp-broker/Security.xml +++ b/qpid/doc/book/src/cpp-broker/Security.xml @@ -421,7 +421,11 @@ com.sun.security.jgss.initiate { filemaxsizelowerlimit | filemaxsizeupperlimit | filemaxcountlowerlimit | - filemaxcountupperlimit ] + filemaxcountupperlimit | + pageslowerlimit | + pagesupperlimit | + pagefactorlowerlimit | + pagefactorupperlimit ] acl permission {||"all"} {action|"all"} [object|"all" [property= ...]] @@ -728,6 +732,12 @@ com.sun.security.jgss.initiate { Indicates the presence of an exclusive flag CREATE QUEUE, ACCESS QUEUE + + paging + Boolean + Indicates if the queue is paging queue + CREATE QUEUE, ACCESS QUEUE + type String @@ -806,6 +816,30 @@ com.sun.security.jgss.initiate { Maximum value for file.max_count (files) CREATE QUEUE, ACCESS QUEUE + + pageslowerlimit + Integer + Minimum value for number of pages in memory of paged queue + CREATE QUEUE + + + pagesupperlimit + Integer + Maximum value for number of pages in memory of paged queue + CREATE QUEUE + + + pagefactorlowerlimit + Integer + Minimum value for size of one page in paged queue + CREATE QUEUE + + + pagefactorupperlimit + Integer + Maximum value for size of one page in paged queue + CREATE QUEUE + @@ -910,7 +944,7 @@ com.sun.security.jgss.initiate { create queue - name alternate durable exclusive autodelete policy queuemaxsizelowerlimit queuemaxsizeupperlimit queuemaxcountlowerlimit queuemaxcountupperlimit filemaxsizelowerlimit filemaxsizeupperlimit filemaxcountlowerlimit filemaxcountupperlimit + name alternate durable exclusive autodelete policy queuemaxsizelowerlimit queuemaxsizeupperlimit queuemaxcountlowerlimit queuemaxcountupperlimit filemaxsizelowerlimit filemaxsizeupperlimit filemaxcountlowerlimit filemaxcountupperlimit paging pageslowerlimit pagesupperlimit pagefactorlowerlimit pagefactorupperlimit -- cgit v1.2.1 From fe915e0f5573921b28cbc2b7b6f7d15ce90a05a9 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 3 Feb 2014 15:24:14 +0000 Subject: NO-JIRA: minor additions to readme git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1563950 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/AMQP_1.0 | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/qpid/cpp/AMQP_1.0 b/qpid/cpp/AMQP_1.0 index ef478c7a15..f5db31b8f5 100644 --- a/qpid/cpp/AMQP_1.0 +++ b/qpid/cpp/AMQP_1.0 @@ -164,7 +164,8 @@ warning is logged. If the node is an exchange, then an outgoing link (i.e. messages to travel out from broker) will cause a temporary, link-scoped queue to be created on the broker and bound to the exchange. [See section on -'Topics' below] +'Topics' below]. The name of the queue will be of the form +_. Outgoing links may have a filter set on their source. The filters currently supported by the broker are 'legacy-amqp-direct-binding', @@ -272,7 +273,9 @@ trigger the establishment of the link. This gives the peer some means of ensuring their expectations will be met. The 'shared' capability allows subscriptions from an exchange to be -shared by multiple receivers. +shared by multiple receivers. Where this is specified the subscription +queue created takes the name of the link (and does not include the +container id). The 'durable' capability will be added if the queue or exchange refered to by the source or target is durable. The 'queue' capability -- cgit v1.2.1 From c8bdc3035a0bd1a8527b648f555b4f8b4f413e98 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 3 Feb 2014 19:17:02 +0000 Subject: QPID-5528: HA Clean up error messages around rolled-back transactions. A simple transaction test on a 3 node cluster generates a lot of errors and rollback messages in the broker logs even though the test code never rolls back a transaction. E.g. qpid-cluster-benchmark -b 20.0.20.200 -n1 -m 1000 -q3 -s2 -r2 --send-arg=--tx --send-arg=10 --receive-arg=--tx --receive-arg=10 The errors are caused by queues being deleted while backup brokers are using them. This happens a lot in the transaction test because a transactional session must create a new transaction when the previous one closes. When the session closes the open transaction is rolled back automatically. Thus there is almost always an empty transaction that is created then immediately rolled back at the end of the session. Backup brokers may still be in the process of subscribing to the transaction's replication queue at this point, causing (harmlesss) errors. This commit takes the following steps to clean up the unwanted error and rollback messages: HA TX messages cleaned up: - Remove log messages about rolling back/destroying empty transactions. - Remove misleading "backup disconnected" message for cancelled transactions. - Remove spurious warning about ignored unreplicated dequeues. - Include TxReplicator destroy in QueueReplicator mutex, idempotence check before destroy. Allow HA to suppress/modify broker exception logging: - Move broker exception logging into ErrorListener - Every SessionHandler has DefaultErrorListener that does the same logging as before. - Added SessionHandlerObserver to allow plugins to change the error listener. - HA plugin set ErrorListeners to log harmless exceptions as HA debug messages. Unrelated cleanup: - Broker now logs "incoming execution exceptions" as debug messages rather than ignoring. - Exception prefixes: don't add the prefix if already present. The exception test above should now pass without errors or rollback messages in the logs. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564010 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/Exception.cpp | 12 +++-- qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 17 +++---- qpid/cpp/src/qpid/broker/Broker.h | 3 ++ qpid/cpp/src/qpid/broker/SessionHandler.cpp | 28 ++++++++++- qpid/cpp/src/qpid/broker/SessionHandlerObserver.h | 51 +++++++++++++++++++ qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 30 +++++------- qpid/cpp/src/qpid/ha/ErrorListener.h | 60 +++++++++++++++++++++++ qpid/cpp/src/qpid/ha/Primary.cpp | 46 +++++++++++++++++ qpid/cpp/src/qpid/ha/Primary.h | 2 + qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp | 14 +++--- qpid/cpp/src/qpid/ha/PrimaryTxObserver.h | 6 ++- qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 45 +++++++++++------ qpid/cpp/src/qpid/ha/QueueReplicator.h | 17 ++++--- qpid/cpp/src/qpid/ha/TxReplicator.cpp | 21 ++++---- qpid/cpp/src/qpid/ha/TxReplicator.h | 3 +- 15 files changed, 279 insertions(+), 76 deletions(-) create mode 100644 qpid/cpp/src/qpid/broker/SessionHandlerObserver.h create mode 100644 qpid/cpp/src/qpid/ha/ErrorListener.h diff --git a/qpid/cpp/src/qpid/Exception.cpp b/qpid/cpp/src/qpid/Exception.cpp index a6696f06e1..999c2aeb52 100644 --- a/qpid/cpp/src/qpid/Exception.cpp +++ b/qpid/cpp/src/qpid/Exception.cpp @@ -45,16 +45,20 @@ Exception::Exception(const std::string& msg) throw() : message(msg) { Exception::~Exception() throw() {} -std::string Exception::getPrefix() const { return ""; } +std::string Exception::getPrefix() const { return std::string(); } std::string Exception::getMessage() const { return message; } +namespace { const std::string COLON(": "); } + const char* Exception::what() const throw() { // Construct the what string the first time it is needed. if (whatStr.empty()) { - whatStr = getPrefix(); - if (!whatStr.empty()) whatStr += ": "; - whatStr += message; + if (message.compare(0, getPrefix().size(), getPrefix()) == 0 || // Already has prefix + getPrefix().empty()) // No prefix + whatStr = message; + else + whatStr = getPrefix() + COLON + message; } return whatStr.c_str(); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 5eedafc77b..43f39c2919 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -94,8 +94,7 @@ void SessionHandler::handleIn(AMQFrame& f) { } } catch(const SessionException& e) { - QPID_LOG(error, "Execution exception: " << e.what()); - executionException(e.code, e.what()); // Let subclass handle this first. + executionException(e.code, e.what()); framing::AMQP_AllProxy::Execution execution(channel); AMQMethodBody* m = f.getMethod(); SequenceNumber commandId; @@ -105,16 +104,13 @@ void SessionHandler::handleIn(AMQFrame& f) { sendDetach(); } catch(const ChannelException& e){ - QPID_LOG(error, "Channel exception: " << e.what()); - channelException(e.code, e.what()); // Let subclass handle this first. + channelException(e.code, e.what()); peer.detached(name, e.code); } catch(const ConnectionException& e) { - QPID_LOG(error, "Connection exception: " << e.what()); connectionException(e.code, e.getMessage()); } catch(const std::exception& e) { - QPID_LOG(error, "Unexpected exception: " << e.what()); connectionException(connection::CLOSE_CODE_FRAMING_ERROR, e.what()); } } @@ -186,13 +182,14 @@ void SessionHandler::detach(const std::string& name) { } void SessionHandler::detached(const std::string& /*name*/, uint8_t code) { - // Special case for detached: Don't check if we are - // attached. Checking can lead to an endless game of "detached - // tennis" on federated brokers. awaitingDetached = false; + // Special case for detached: Don't throw if we are not attached. Doing so + // can lead to an endless game of "detached tennis" on federated brokers. + if (!getState()) return; // Already detached. if (code != session::DETACH_CODE_NORMAL) { sendReady = receiveReady = false; - channelException(convert(code), "session.detached from peer."); + channelException(convert(code), Msg() << "Channel " << channel.get() + << " received session.detached from peer"); } else { handleDetach(); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 13679bd623..4bad8f2960 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -38,6 +38,7 @@ #include "qpid/broker/System.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" +#include "qpid/broker/SessionHandlerObserver.h" #include "qpid/broker/BrokerObservers.h" #include "qpid/management/Manageable.h" #include "qpid/sys/ConnectionCodec.h" @@ -179,6 +180,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir dataDir; DataDir pagingDir; ConnectionObservers connectionObservers; + SessionHandlerObservers sessionHandlerObservers; BrokerObservers brokerObservers; QueueRegistry queues; @@ -361,6 +363,7 @@ class Broker : public sys::Runnable, public Plugin::Target, ConsumerFactories& getConsumerFactories() { return consumerFactories; } ConnectionObservers& getConnectionObservers() { return connectionObservers; } + SessionHandlerObservers& getSessionHandlerObservers() { return sessionHandlerObservers; } BrokerObservers& getBrokerObservers() { return brokerObservers; } /** Properties to be set on outgoing link connections */ diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index a8f5734af7..3d5faf2dab 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -33,11 +33,35 @@ using namespace framing; using namespace std; using namespace qpid::sys; +namespace { +class DefaultErrorListener : public SessionHandler::ErrorListener { + public: + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(error, "Connection exception: " << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(error, "Channel exception: " << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, "Execution exception: " << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, "Incoming execution exception: " << framing::createSessionException(code, msg).what()); + } + void detach() {} + + private: +}; +} + SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch) : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch), connection(c), - proxy(out) -{} + proxy(out), + errorListener(boost::shared_ptr(new DefaultErrorListener())) +{ + c.getBroker().getSessionHandlerObservers().newSessionHandler(*this); +} SessionHandler::~SessionHandler() { diff --git a/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h new file mode 100644 index 0000000000..6d0ea16254 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SessionHandlerObserver.h @@ -0,0 +1,51 @@ +#ifndef QPID_BROKER_SESSIONHANDLEROBSERVER_H +#define QPID_BROKER_SESSIONHANDLEROBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Observers.h" + +namespace qpid { +namespace broker { +class SessionHandler; + +/** + * Observer of session handler events. + */ +class SessionHandlerObserver +{ + public: + virtual ~SessionHandlerObserver() {} + virtual void newSessionHandler(SessionHandler&) {} +}; + + +class SessionHandlerObservers : public Observers { + public: + void newSessionHandler(SessionHandler& sh) { + each(boost::bind(&SessionHandlerObserver::newSessionHandler, _1, boost::ref(sh))); + } +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SESSIONHANDLEROBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index b1faf19e52..7928b6ab71 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -172,34 +172,29 @@ Variant::Map asMapVoid(const Variant& value) { } } // namespace -// Listens for errors on the bridge session. -class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { +// Report errors on the broker replication session. +class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener { public: - ErrorListener(const std::string& lp, BrokerReplicator& br) : - logPrefix(lp), brokerReplicator(br) {} + ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} - void connectionException(framing::connection::CloseCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Connection error: " << msg); + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what()); } - void channelException(framing::session::DetachCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Channel error: " << msg); + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what()); } - void executionException(framing::execution::ErrorCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Execution error: " << msg); + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what()); } - - void incomingExecutionException( - framing::execution::ErrorCode, const std::string& msg) { - QPID_LOG(error, logPrefix << "Incoming execution error: " << msg); + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } - void detach() { QPID_LOG(debug, logPrefix << "Session detached."); } private: std::string logPrefix; - BrokerReplicator& brokerReplicator; }; /** Keep track of queues or exchanges during the update process to solve 2 @@ -328,8 +323,7 @@ void BrokerReplicator::initialize() { boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2) ); assert(result.second); - result.first->setErrorListener( - boost::shared_ptr(new ErrorListener(logPrefix, *this))); + result.first->setErrorListener(boost::shared_ptr(new ErrorListener(logPrefix))); broker.getConnectionObservers().add(shared_from_this()); } diff --git a/qpid/cpp/src/qpid/ha/ErrorListener.h b/qpid/cpp/src/qpid/ha/ErrorListener.h new file mode 100644 index 0000000000..1ae2078a11 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ErrorListener.h @@ -0,0 +1,60 @@ +#ifndef QPID_HA_ERRORLISTENER_H +#define QPID_HA_ERRORLISTENER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/SessionHandler.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace ha { + +/** Default ErrorListener for HA module */ +class ErrorListener : public broker::SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} + + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); + } + void detach() { + QPID_LOG(error, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; +}; + + +}} // namespace qpid::ha + +#endif /*!QPID_HA_ERRORLISTENER_H*/ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 3bb51b1813..b4d50d1652 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/BrokerObserver.h" #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/SessionHandlerObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" @@ -87,12 +88,54 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary& primary; }; +class PrimaryErrorListener : public broker::SessionHandler::ErrorListener { + public: + PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} + + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; +}; + +class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver { + public: + PrimarySessionHandlerObserver(const std::string& logPrefix) + : errorListener(new PrimaryErrorListener(logPrefix)) {} + void newSessionHandler(broker::SessionHandler& sh) { + BrokerInfo info; + // Suppress error logging for backup connections + // TODO aconway 2014-01-31: Be more selective, suppress only expected errors? + if (ha::ConnectionObserver::getBrokerInfo(sh.getConnection(), info)) { + sh.setErrorListener(errorListener); + } + } + private: + boost::shared_ptr errorListener; +}; + + } // namespace Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : haBroker(hb), membership(hb.getMembership()), logPrefix("Primary: "), active(false), replicationTest(hb.getSettings().replicateDefault.get()), + sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)), queueLimits(logPrefix) { // Note that at this point, we are still rejecting client connections. @@ -124,6 +167,8 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : } brokerObserver.reset(new PrimaryBrokerObserver(*this)); haBroker.getBroker().getBrokerObservers().add(brokerObserver); + haBroker.getBroker().getSessionHandlerObservers().add(sessionHandlerObserver); + checkReady(); // Outside lock // Allow client connections @@ -134,6 +179,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : Primary::~Primary() { if (timerTask) timerTask->cancel(); haBroker.getBroker().getBrokerObservers().remove(brokerObserver); + haBroker.getBroker().getSessionHandlerObservers().remove(sessionHandlerObserver); haBroker.getObserver()->reset(); } diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 2e32515c9a..af368bca0f 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -42,6 +42,7 @@ class Queue; class Connection; class ConnectionObserver; class BrokerObserver; +class SessionHandlerObserver; class TxBuffer; class DtxBuffer; } @@ -152,6 +153,7 @@ class Primary : public Role BackupMap backups; boost::shared_ptr connectionObserver; boost::shared_ptr brokerObserver; + boost::shared_ptr sessionHandlerObserver; boost::intrusive_ptr timerTask; ReplicaMap replicas; TxMap txMap; diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp index dc5bf15911..c94ced7024 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp @@ -98,7 +98,8 @@ PrimaryTxObserver::PrimaryTxObserver( replicationTest(hb.getSettings().replicateDefault.get()), txBuffer(tx), id(true), - exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()) + exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()), + empty(true) { logPrefix = "Primary transaction "+shortStr(id)+": "; @@ -149,6 +150,7 @@ void PrimaryTxObserver::enqueue(const QueuePtr& q, const broker::Message& m) if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Enqueue: " << LogMessageId(*q, m)); checkState(SENDING, "Too late for enqueue"); + empty = false; enqueues[q] += m.getReplicationId(); txQueue->deliver(TxEnqueueEvent(q->getName(), m.getReplicationId()).message()); txQueue->deliver(m); @@ -162,12 +164,9 @@ void PrimaryTxObserver::dequeue( checkState(SENDING, "Too late for dequeue"); if (replicationTest.useLevel(*q) == ALL) { // Ignore unreplicated queues. QPID_LOG(trace, logPrefix << "Dequeue: " << LogMessageId(*q, pos, id)); + empty = false; txQueue->deliver(TxDequeueEvent(q->getName(), id).message()); } - else { - QPID_LOG(warning, logPrefix << "Dequeue skipped, queue not replicated: " - << LogMessageId(*q, pos, id)); - } } namespace { @@ -221,8 +220,10 @@ void PrimaryTxObserver::commit() { } void PrimaryTxObserver::rollback() { - QPID_LOG(debug, logPrefix << "Rollback"); Mutex::ScopedLock l(lock); + // Don't bleat about rolling back empty transactions, this happens all the time + // when a session closes and rolls back its outstanding transaction. + if (!empty) QPID_LOG(debug, logPrefix << "Rollback"); if (state != ENDED) { txQueue->deliver(TxRollbackEvent().message()); end(l); @@ -287,7 +288,6 @@ void PrimaryTxObserver::txPrepareFailEvent(const string& data) { void PrimaryTxObserver::cancel(const ReplicatingSubscription& rs) { Mutex::ScopedLock l(lock); types::Uuid backup = rs.getBrokerInfo().getSystemId(); - QPID_LOG(debug, logPrefix << "Backup disconnected: " << backup); // Normally the backup should be completed before it is cancelled. if (completed(backup, l)) error(backup, "Unexpected disconnect:", l); // Break the pointer cycle if backups have completed and we are done with txBuffer. diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h index 105fee4d40..5b7c2e3e93 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h +++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h @@ -55,8 +55,9 @@ class Primary; * A TxReplicator on the backup replicates the tx-queue and creates * a TxBuffer on the backup equivalent to the one on the primary. * - * Also observes the tx-queue for prepare-complete messages and - * subscription cancellations. + * Creates an exchange to receive prepare-ok/prepare-fail messages from backups. + * + * Monitors for tx-queue subscription cancellations. * * THREAD SAFE: called in user connection thread for TX events, * and in backup connection threads for prepare-completed events @@ -122,6 +123,7 @@ class PrimaryTxObserver : public broker::TransactionObserver, QueueIdsMap enqueues; UuidSet backups; // All backups of transaction. UuidSet incomplete; // Incomplete backups (not yet responded to prepare) + bool empty; // True if the transaction is empty - no enqueues/dequeues. }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 507df6ea5a..59b2013f59 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -73,17 +73,26 @@ void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) { registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1)); } +// Debug log expected exceptions on queue replicator, check incoming execution +// exceptions for "deleted on primary" conditions. class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { public: ErrorListener(const boost::shared_ptr& qr) : queueReplicator(qr), logPrefix(qr->logPrefix) {} - void connectionException(framing::connection::CloseCode, const std::string&) {} - void channelException(framing::session::DetachCode, const std::string&) {} - void executionException(framing::execution::ErrorCode, const std::string&) {} - - void incomingExecutionException(ErrorCode e, const std::string& msg) { - queueReplicator->incomingExecutionException(e, msg); + void connectionException(framing::connection::CloseCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what()); + } + void channelException(framing::session::DetachCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what()); + } + void executionException(framing::execution::ErrorCode code, const std::string& msg) { + QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); + } + void incomingExecutionException(ErrorCode code, const std::string& msg) { + if (!queueReplicator->deletedOnPrimary(code, msg)) + QPID_LOG(error, logPrefix << "Incoming " + << framing::createSessionException(code, msg).what()); } void detach() { QPID_LOG(debug, logPrefix << "Session detached"); @@ -197,20 +206,25 @@ void QueueReplicator::disconnect() { // Called from Queue::destroyed() void QueueReplicator::destroy() { + QPID_LOG(debug, logPrefix << "Destroyed"); boost::shared_ptr bridge2; // To call outside of lock { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed - QPID_LOG(debug, logPrefix << "Destroyed"); bridge2 = bridge; // call close outside the lock. - // Need to drop shared pointers to avoid pointer cycles keeping this in memory. - queue.reset(); - bridge.reset(); - getBroker()->getExchanges().destroy(getName()); + destroy(l); } if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. } +void QueueReplicator::destroy(Mutex::ScopedLock&) { + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); +} + + // Called in a broker connection thread when the bridge is created. // Note: called with the Link lock held. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler_) { @@ -306,18 +320,19 @@ void QueueReplicator::idEvent(const string& data, Mutex::ScopedLock&) { nextId = decodeStr(data).id; } -void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) { +bool QueueReplicator::deletedOnPrimary(ErrorCode e, const std::string& msg) { if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) { // If the queue is destroyed at the same time we are subscribing, we may // get a not-found or resource-deleted exception before the // BrokerReplicator gets the queue-delete event. Shut down the bridge by // calling destroy(), we can let the BrokerReplicator delete the queue // when the queue-delete arrives. - QPID_LOG(debug, logPrefix << "Deleted on primary: " << msg); + QPID_LOG(debug, logPrefix << "Deleted on primary: " + << framing::createSessionException(e, msg).what()); destroy(); + return true; } - else - QPID_LOG(error, logPrefix << "Incoming execution exception: " << msg); + return false; } // Unused Exchange methods. diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 22cd13a0a8..f94c6de116 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -46,11 +46,13 @@ class HaBroker; class Settings; /** - * Exchange created on a backup broker to replicate a queue on the primary. + * Exchange created on a backup broker to receive replicated messages and + * replication events from a queue on the primary. It subscribes to the primary + * queue via a ReplicatingSubscription on the primary by passing special + * arguments to the subscribe command. * - * Puts replicated messages on the local queue, handles dequeue events. - * Creates a ReplicatingSubscription on the primary by passing special - * arguments to the consume command. + * It puts replicated messages on the local replica queue and handles dequeue + * events by removing local messages. * * THREAD SAFE: Called in different connection threads. */ @@ -74,7 +76,7 @@ class QueueReplicator : public broker::Exchange, void disconnect(); // Called when we are disconnected from the primary. - std::string getType() const; + virtual std::string getType() const; void route(broker::Deliverable&); @@ -101,7 +103,9 @@ class QueueReplicator : public broker::Exchange, void initialize(); // Called as part of create() virtual void deliver(const broker::Message&); + virtual void destroy(); // Called when the queue is destroyed. + virtual void destroy(sys::Mutex::ScopedLock&); sys::Mutex lock; HaBroker& haBroker; @@ -124,8 +128,7 @@ class QueueReplicator : public broker::Exchange, void dequeueEvent(const std::string& data, sys::Mutex::ScopedLock&); void idEvent(const std::string& data, sys::Mutex::ScopedLock&); - void incomingExecutionException(framing::execution::ErrorCode e, - const std::string& msg); + bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg); std::string logPrefix; std::string bridgeName; diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.cpp b/qpid/cpp/src/qpid/ha/TxReplicator.cpp index 7ff03b5f92..d2a647ae8f 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/TxReplicator.cpp @@ -87,7 +87,7 @@ TxReplicator::TxReplicator( QueueReplicator(hb, txQueue, link), store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0), channel(link->nextChannel()), - ended(false), + empty(true), ended(false), dequeueState(hb.getBroker().getQueues()) { string id(getTxId(txQueue->getName())); @@ -151,6 +151,7 @@ void TxReplicator::enqueue(const string& data, sys::Mutex::ScopedLock&) { decodeStr(data, e); QPID_LOG(trace, logPrefix << "Enqueue: " << e); enq = e; + empty = false; } void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { @@ -163,6 +164,7 @@ void TxReplicator::dequeue(const string& data, sys::Mutex::ScopedLock&) { // prepared, then they are all receieved before the prepare event. // We collect the events here so we can do a single scan of the queue in prepare. dequeueState.add(e); + empty = false; } void TxReplicator::DequeueState::add(const TxDequeueEvent& event) { @@ -227,7 +229,9 @@ void TxReplicator::commit(const string&, sys::Mutex::ScopedLock& l) { void TxReplicator::rollback(const string&, sys::Mutex::ScopedLock& l) { if (!txBuffer) return; - QPID_LOG(debug, logPrefix << "Rollback"); + // Don't bleat about rolling back empty transactions, this happens all the time + // when a session closes and rolls back its outstanding transaction. + if (!empty) QPID_LOG(debug, logPrefix << "Rollback"); if (context.get()) store->abort(*context); txBuffer->rollback(); end(l); @@ -255,15 +259,12 @@ void TxReplicator::end(sys::Mutex::ScopedLock&) { } // Called when the tx queue is deleted. -void TxReplicator::destroy() { - { - sys::Mutex::ScopedLock l(lock); - if (!ended) { - QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback."); - rollback(string(), l); - } +void TxReplicator::destroy(sys::Mutex::ScopedLock& l) { + if (!ended) { + if (!empty) QPID_LOG(error, logPrefix << "Destroyed prematurely, rollback"); + rollback(string(), l); } - QueueReplicator::destroy(); + QueueReplicator::destroy(l); } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/TxReplicator.h b/qpid/cpp/src/qpid/ha/TxReplicator.h index 7f1256699a..5c509d14a7 100644 --- a/qpid/cpp/src/qpid/ha/TxReplicator.h +++ b/qpid/cpp/src/qpid/ha/TxReplicator.h @@ -67,7 +67,8 @@ class TxReplicator : public QueueReplicator { // QueueReplicator overrides void route(broker::Deliverable& deliverable); - void destroy(); + using QueueReplicator::destroy; + void destroy(sys::Mutex::ScopedLock&); protected: -- cgit v1.2.1 From 6a6d0ce4b10814f4c535acc071aec0f5cf991037 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 3 Feb 2014 22:37:29 +0000 Subject: QPID-5528: HA add missing QPID_BROKER_EXTERN declarations. Missing from previous commit r1564010 | QPID-5528: HA Clean up error messages around rolled-back transactions. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1564120 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/SessionHandler.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index cb81084014..ee6baed0b6 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -21,7 +21,7 @@ * under the License. * */ - +#include "qpid/broker/BrokerImportExport.h" #include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" @@ -76,8 +76,8 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler { SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - amqp_0_10::Connection& getConnection(); - const amqp_0_10::Connection& getConnection() const; + QPID_BROKER_EXTERN amqp_0_10::Connection& getConnection(); + QPID_BROKER_EXTERN const amqp_0_10::Connection& getConnection() const; framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } @@ -86,7 +86,7 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler { void attached(const std::string& name);//used by 'pushing' inter-broker bridges void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges - void setErrorListener(boost::shared_ptr e) { errorListener = e; } + QPID_BROKER_EXTERN void setErrorListener(boost::shared_ptr e) { errorListener = e; } // Called by SessionAdapter void incomingExecutionException(framing::execution::ErrorCode, const std::string& msg); -- cgit v1.2.1