From 6998892f4c84794d1fb5847925d03770df9621d7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 7 Feb 2014 15:24:50 +0000 Subject: Merged from trunk git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-amqp-1-0-management@1565688 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/bindings/qmf2/ruby/ruby.i | 2 +- cpp/bindings/qpid/perl/perl.i | 2 +- cpp/bindings/qpid/ruby/ruby.i | 42 +- cpp/include/qpid/swig_perl_typemaps.i | 2 +- cpp/include/qpid/swig_ruby_typemaps.i | 2 +- cpp/src/CMakeLists.txt | 2 + cpp/src/qpid/broker/Broker.cpp | 6 +- cpp/src/qpid/broker/Broker.h | 2 + cpp/src/qpid/broker/DtxManager.cpp | 15 +- cpp/src/qpid/broker/DtxManager.h | 3 +- cpp/src/qpid/broker/SessionAdapter.cpp | 2 + cpp/src/qpid/client/windows/SslConnector.cpp | 264 +---------- cpp/src/qpid/ha/Backup.cpp | 8 +- cpp/src/qpid/ha/BrokerReplicator.cpp | 56 ++- cpp/src/qpid/ha/BrokerReplicator.h | 4 +- cpp/src/qpid/ha/ErrorListener.h | 60 --- cpp/src/qpid/ha/QueueReplicator.cpp | 24 +- cpp/src/qpid/ha/QueueReplicator.h | 2 +- cpp/src/qpid/linearstore/ISSUES | 37 +- cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 13 +- cpp/src/qpid/linearstore/MessageStoreImpl.h | 2 - cpp/src/qpid/linearstore/TxnCtxt.cpp | 4 +- cpp/src/qpid/linearstore/journal/JournalFile.cpp | 68 +-- cpp/src/qpid/linearstore/journal/JournalFile.h | 13 +- .../linearstore/journal/LinearFileController.cpp | 82 ++-- .../linearstore/journal/LinearFileController.h | 13 +- .../qpid/linearstore/journal/RecoveryManager.cpp | 195 +++++--- cpp/src/qpid/linearstore/journal/RecoveryManager.h | 24 +- cpp/src/qpid/linearstore/journal/deq_rec.cpp | 8 +- cpp/src/qpid/linearstore/journal/deq_rec.h | 4 +- cpp/src/qpid/linearstore/journal/enq_rec.cpp | 8 +- cpp/src/qpid/linearstore/journal/enq_rec.h | 4 +- cpp/src/qpid/linearstore/journal/jcntl.cpp | 20 +- cpp/src/qpid/linearstore/journal/jcntl.h | 2 + cpp/src/qpid/linearstore/journal/jerrno.cpp | 4 +- cpp/src/qpid/linearstore/journal/jrec.h | 2 +- cpp/src/qpid/linearstore/journal/txn_rec.cpp | 8 +- cpp/src/qpid/linearstore/journal/txn_rec.h | 4 +- cpp/src/qpid/linearstore/journal/wmgr.cpp | 100 +++-- cpp/src/qpid/linearstore/journal/wmgr.h | 12 +- cpp/src/qpid/log/Logger.cpp | 2 +- cpp/src/qpid/log/Statement.cpp | 53 +-- cpp/src/qpid/log/Statement.h | 1 + cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 5 + cpp/src/qpid/messaging/amqp/ConnectionContext.h | 1 + cpp/src/qpid/messaging/amqp/SslTransport.cpp | 10 +- cpp/src/qpid/messaging/amqp/TransportContext.h | 3 + cpp/src/qpid/sys/windows/SslAsynchIO.cpp | 16 +- cpp/src/qpid/sys/windows/SslAsynchIO.h | 2 + cpp/src/qpid/sys/windows/SslCredential.cpp | 273 ++++++++++++ cpp/src/qpid/sys/windows/SslCredential.h | 81 ++++ cpp/src/tests/brokertest.py | 2 +- cpp/src/tests/ha_test.py | 2 +- cpp/src/tests/linearstore/tx-test-soak.sh | 254 +++++++++++ java/amqp-1-0-client-jms/pom.xml | 17 +- java/amqp-1-0-client-websocket/pom.xml | 138 +----- java/amqp-1-0-client/pom.xml | 13 +- java/amqp-1-0-common/pom.xml | 10 +- java/bdbstore/jmx/pom.xml | 25 +- java/bdbstore/pom.xml | 27 +- java/bdbstore/systests/pom.xml | 27 +- java/broker-core/pom.xml | 50 +-- java/broker-plugins/access-control/pom.xml | 18 +- java/broker-plugins/amqp-0-10-protocol/pom.xml | 20 +- java/broker-plugins/amqp-0-8-protocol/pom.xml | 18 +- java/broker-plugins/amqp-1-0-protocol/pom.xml | 17 +- .../amqp-msg-conv-0-10-to-1-0/pom.xml | 24 +- .../amqp-msg-conv-0-8-to-0-10/pom.xml | 21 +- .../amqp-msg-conv-0-8-to-1-0/pom.xml | 24 +- java/broker-plugins/derby-store/pom.xml | 21 +- java/broker-plugins/jdbc-provider-bone/pom.xml | 15 +- java/broker-plugins/jdbc-store/pom.xml | 19 +- java/broker-plugins/management-http/pom.xml | 118 +---- java/broker-plugins/management-jmx/pom.xml | 19 +- java/broker-plugins/memory-store/pom.xml | 12 +- java/broker-plugins/websocket/pom.xml | 119 +---- java/broker/pom.xml | 52 ++- java/client/pom.xml | 18 +- java/common/pom.xml | 20 +- java/jca/pom.xml | 22 +- java/jca/rar/pom.xml | 40 +- java/management/common/pom.xml | 10 +- java/management/example/pom.xml | 13 +- java/perftests/pom.xml | 31 +- java/perftests/visualisation-jfc/pom.xml | 35 +- java/pom.xml | 489 ++++++++++++++++----- java/qpid-test-utils/pom.xml | 18 +- java/systests/pom.xml | 91 ++-- java/tools/pom.xml | 19 +- tests/src/py/qpid_tests/broker_0_10/dtx.py | 20 +- tools/src/py/qls/efp.py | 3 +- tools/src/py/qls/err.py | 10 + tools/src/py/qls/jrnl.py | 194 +++++--- tools/src/py/qpid-ha | 3 - tools/src/py/qpid_qls_analyze.py | 8 +- 95 files changed, 2074 insertions(+), 1624 deletions(-) delete mode 100644 cpp/src/qpid/ha/ErrorListener.h create mode 100644 cpp/src/qpid/sys/windows/SslCredential.cpp create mode 100644 cpp/src/qpid/sys/windows/SslCredential.h create mode 100755 cpp/src/tests/linearstore/tx-test-soak.sh diff --git a/cpp/bindings/qmf2/ruby/ruby.i b/cpp/bindings/qmf2/ruby/ruby.i index 0254017555..65d0770224 100644 --- a/cpp/bindings/qmf2/ruby/ruby.i +++ b/cpp/bindings/qmf2/ruby/ruby.i @@ -30,7 +30,7 @@ } catch (qpid::types::Exception& mex) { static VALUE qmferror = rb_define_class("QmfError", rb_eStandardError); - rb_raise(qmferror, mex.what()); + rb_raise(qmferror, "%s", mex.what()); } } diff --git a/cpp/bindings/qpid/perl/perl.i b/cpp/bindings/qpid/perl/perl.i index 0d118ae0fb..4dc2665c2b 100644 --- a/cpp/bindings/qpid/perl/perl.i +++ b/cpp/bindings/qpid/perl/perl.i @@ -27,7 +27,7 @@ $action } catch (qpid::messaging::MessagingException& mex) { - Perl_croak(aTHX_ mex.what()); + Perl_croak(aTHX_ "%s", mex.what()); } } diff --git a/cpp/bindings/qpid/ruby/ruby.i b/cpp/bindings/qpid/ruby/ruby.i index 34388a2b16..a2f2ffab4c 100644 --- a/cpp/bindings/qpid/ruby/ruby.i +++ b/cpp/bindings/qpid/ruby/ruby.i @@ -34,86 +34,86 @@ } catch(qpid::messaging::ConnectionError& error) { static VALUE merror = rb_define_class("ConnectionError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::TransportFailure& error) { static VALUE merror = rb_define_class("TransportFailure", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::TransactionAborted& error) { static VALUE merror = rb_define_class("TransactionAborted", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::TransactionError& error) { static VALUE merror = rb_define_class("TransactionError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::UnauthorizedAccess& error) { static VALUE merror = rb_define_class("UnauthorizedAccess", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::SessionError& error) { static VALUE merror = rb_define_class("SessionError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::TargetCapacityExceeded& error) { static VALUE merror = rb_define_class("TargetCapacityExceeded", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::SendError& error) { static VALUE merror = rb_define_class("SendError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::SenderError& error) { static VALUE merror = rb_define_class("SenderError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::NoMessageAvailable& error) { static VALUE merror = rb_define_class("NoMessageAvailable", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::FetchError& error) { static VALUE merror = rb_define_class("FetchError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::ReceiverError& error) { static VALUE merror = rb_define_class("ReceiverError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::InvalidOptionString& error) { static VALUE merror = rb_define_class("InvalidOptionString", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::KeyError& error) { static VALUE merror = rb_define_class("KeyError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::AssertionFailed& error) { static VALUE merror = rb_define_class("AssertionFailed", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::NotFound& error) { static VALUE merror = rb_define_class("NotFound", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::ResolutionError& error) { static VALUE merror = rb_define_class("ResolutionError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::MalformedAddress& error) { static VALUE merror = rb_define_class("MalformedAddress", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::AddressError& error) { static VALUE merror = rb_define_class("AddressError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::LinkError& error) { static VALUE merror = rb_define_class("LinkError", eMessagingError); - rb_raise(merror, error.what()); + rb_raise(merror, "%s", error.what()); } catch(qpid::messaging::MessagingException& error) { - rb_raise(eMessagingError, error.what()); + rb_raise(eMessagingError, "%s", error.what()); } } diff --git a/cpp/include/qpid/swig_perl_typemaps.i b/cpp/include/qpid/swig_perl_typemaps.i index f1425ebd67..6c0e6d8bac 100644 --- a/cpp/include/qpid/swig_perl_typemaps.i +++ b/cpp/include/qpid/swig_perl_typemaps.i @@ -120,7 +120,7 @@ } } } catch (qpid::types::Exception& ex) { - Perl_croak(aTHX_ ex.what()); + Perl_croak(aTHX_ "%s", ex.what()); } if (!result) diff --git a/cpp/include/qpid/swig_ruby_typemaps.i b/cpp/include/qpid/swig_ruby_typemaps.i index 1a07cc86b0..4e07088bce 100644 --- a/cpp/include/qpid/swig_ruby_typemaps.i +++ b/cpp/include/qpid/swig_ruby_typemaps.i @@ -106,7 +106,7 @@ } } catch (qpid::types::Exception& ex) { static VALUE error = rb_define_class("Error", rb_eStandardError); - rb_raise(error, ex.what()); + rb_raise(error, "%s", ex.what()); } return result; diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 46b526579f..d8b823fc8e 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -460,6 +460,8 @@ if (BUILD_SSL) if (CMAKE_SYSTEM_NAME STREQUAL Windows) set (sslcommon_SOURCES qpid/sys/windows/SslAsynchIO.cpp + qpid/sys/windows/SslCredential.cpp + qpid/sys/windows/SslCredential.h qpid/sys/windows/util.cpp qpid/sys/windows/util.h ) diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 4017fdbfe3..7f076e92cd 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -148,6 +148,8 @@ Broker::Options::Options(const std::string& name) : timestampRcvMsgs(false), // set the 0.10 timestamp delivery property linkMaintenanceInterval(2*sys::TIME_SEC), linkHeartbeatInterval(120*sys::TIME_SEC), + dtxDefaultTimeout(60), // 60s + dtxMaxTimeout(3600), // 3600s maxNegotiateTime(10000) // 10s { int c = sys::SystemInfo::concurrency(); @@ -192,6 +194,8 @@ Broker::Options::Options(const std::string& name) : "Interval to check link health and re-connect if need be") ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"), "Heartbeat interval for a federation link") + ("dtx-default-timeout", optValue(dtxDefaultTimeout, "SECONDS"), "Default timeout for DTX transaction before aborting it") + ("dtx-max-timeout", optValue(dtxMaxTimeout, "SECONDS"), "Maximum allowed timeout for DTX transaction. A value of zero disables maximum timeout limit checks and allows arbitrarily large timeout settings.") ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation") ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag") ; @@ -224,7 +228,7 @@ Broker::Broker(const Broker::Options& conf) : exchanges(this), links(this), factory(new SecureConnectionFactory(*this)), - dtxManager(*timer.get()), + dtxManager(*timer.get(), getOptions().dtxDefaultTimeout), sessionManager( qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 4bad8f2960..5d1e241be9 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -121,6 +121,8 @@ class Broker : public sys::Runnable, public Plugin::Target, bool timestampRcvMsgs; sys::Duration linkMaintenanceInterval; sys::Duration linkHeartbeatInterval; + uint32_t dtxDefaultTimeout; // Default timeout of a DTX transaction + uint32_t dtxMaxTimeout; // Maximal timeout of a DTX transaction uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation std::string fedTag; diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 5ba1ce4dac..4fb82bb41b 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -62,7 +62,12 @@ namespace { } -DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} +DtxManager::DtxManager(qpid::sys::Timer& t, uint32_t _dtxDefaultTimeout) : + store(0), + timer(&t), + dtxDefaultTimeout(_dtxDefaultTimeout) +{ +} DtxManager::~DtxManager() {} @@ -150,8 +155,12 @@ DtxWorkRecord* DtxManager::createWork(const std::string& xid) if (i != work.end()) { throw NotAllowedException(QPID_MSG("Xid " << convert(xid) << " is already known (use 'join' to add work to an existing xid)")); } else { - std::string ncxid = xid; // Work around const correctness problems in ptr_map. - return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first); + std::string ncxid = xid; // Work around const correctness problems with work.insert + DtxWorkRecord* dtxWorkRecord = new DtxWorkRecord(xid, store); + work.insert(ncxid, dtxWorkRecord); + if (dtxDefaultTimeout>0) + setTimeout(xid, dtxDefaultTimeout); + return dtxWorkRecord; } } diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index 000fc7b4b8..ad30ed61a0 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -44,12 +44,13 @@ class DtxManager{ TransactionalStore* store; qpid::sys::Mutex lock; qpid::sys::Timer* timer; + uint32_t dtxDefaultTimeout; void remove(const std::string& xid); DtxWorkRecord* createWork(const std::string& xid); public: - DtxManager(sys::Timer&); + DtxManager(sys::Timer&, uint32_t _dtxDefaultTimeout=0); ~DtxManager(); void start(const std::string& xid, boost::intrusive_ptr work); void join(const std::string& xid, boost::intrusive_ptr work); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 5b1a6aa267..7c2d1cf9f5 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -678,6 +678,8 @@ DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, uint32_t timeout) { + if ((timeout > getBroker().getOptions().dtxMaxTimeout) && (getBroker().getOptions().dtxMaxTimeout > 0)) + throw InvalidArgumentException(QPID_MSG("xid " << xid << " has timeout " << timeout << " bigger than maximum allowed " << getBroker().getOptions().dtxMaxTimeout)); getBroker().getDtxManager().setTimeout(DtxManager::convert(xid), timeout); } diff --git a/cpp/src/qpid/client/windows/SslConnector.cpp b/cpp/src/qpid/client/windows/SslConnector.cpp index 4f4ef4f559..d0be818df0 100644 --- a/cpp/src/qpid/client/windows/SslConnector.cpp +++ b/cpp/src/qpid/client/windows/SslConnector.cpp @@ -32,26 +32,14 @@ #include "qpid/sys/windows/check.h" #include "qpid/sys/windows/util.h" #include "qpid/sys/windows/SslAsynchIO.h" +#include "qpid/sys/windows/SslCredential.h" #include #include -// security.h needs to see this to distinguish from kernel use. -#define SECURITY_WIN32 -#include -#include -#undef SECURITY_WIN32 #include -/* - * Note on client certificates: The Posix/NSS implementation performs a lazy - * client certificate search part way through the ssl handshake if the server - * requests one. Here, it is not known in advance if the server will - * request the certificate so the certificate is pre-loaded (even if never - * used). To match the Linux behavior, client certificate load problems are - * remembered and reported later if appropriate, but do not prevent the - * connection attempt. - */ + namespace qpid { namespace client { @@ -61,34 +49,16 @@ using qpid::sys::Socket; class SslConnector : public qpid::client::TCPConnector { - struct SavedError { - std::string logMessage; - std::string error; - void set(const std::string &lm, const std::string es); - void set(const std::string &lm, int status); - void clear(); - bool pending(); - }; - qpid::sys::windows::ClientSslAsynchIO *shim; boost::shared_ptr poller; std::string brokerHost; - HCERTSTORE certStore; - PCCERT_CONTEXT cert; - SCHANNEL_CRED cred; - CredHandle credHandle; - TimeStamp credExpiry; - SavedError clientCertError; + qpid::sys::windows::SslCredential sslCredential; + bool certLoaded; - virtual ~SslConnector(); void negotiationDone(SECURITY_STATUS status); void connect(const std::string& host, const std::string& port); void connected(const Socket&); - PCCERT_CONTEXT findCertificate(const std::string& name); - void loadPrivCertStore(); - std::string getPasswd(const std::string& filename); - void importHostCert(const ConnectionSettings&); public: SslConnector(boost::shared_ptr, @@ -127,15 +97,12 @@ namespace { void SslConnector::negotiationDone(SECURITY_STATUS status) { if (status == SEC_E_OK) { - clientCertError.clear(); initAmqp(); } else { - if (status == SEC_E_INCOMPLETE_CREDENTIALS && clientCertError.pending()) { + if (status == SEC_E_INCOMPLETE_CREDENTIALS && !certLoaded) { // Server requested a client cert but we supplied none for the following reason: - if (!clientCertError.logMessage.empty()) - QPID_LOG(warning, clientCertError.logMessage); - connectFailed(QPID_MSG(clientCertError.error)); + connectFailed(QPID_MSG(sslCredential.error())); } else connectFailed(QPID_MSG(qpid::sys::strError(status))); @@ -146,46 +113,15 @@ SslConnector::SslConnector(boost::shared_ptr p, framing::ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) - : TCPConnector(p, ver, settings, cimpl), shim(0), poller(p), certStore(0), cert(0) + : TCPConnector(p, ver, settings, cimpl), shim(0), poller(p) { - SecInvalidateHandle(&credHandle); - memset(&cred, 0, sizeof(cred)); - cred.dwVersion = SCHANNEL_CRED_VERSION; - cred.dwFlags = SCH_CRED_NO_DEFAULT_CREDS; const std::string& name = (settings.sslCertName != "") ? settings.sslCertName : qpid::sys::ssl::SslOptions::global.certName; - cert = findCertificate(name); - if (cert != NULL) { - // assign the certificate into the credentials - cred.paCred = &cert; - cred.cCreds = 1; - } - - SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL, - UNISP_NAME, - SECPKG_CRED_OUTBOUND, - NULL, - &cred, - NULL, - NULL, - &credHandle, - &credExpiry); - if (status != SEC_E_OK) - throw QPID_WINDOWS_ERROR(status); + certLoaded = sslCredential.load(name); QPID_LOG(debug, "SslConnector created for " << ver.toString()); } -SslConnector::~SslConnector() -{ - if (SecIsValidHandle(&credHandle)) - ::FreeCredentialsHandle(&credHandle); - if (cert) - ::CertFreeCertificateContext(cert); - if (certStore) - ::CertCloseStore(certStore, CERT_CLOSE_STORE_FORCE_FLAG); -} - void SslConnector::connect(const std::string& host, const std::string& port) { brokerHost = host; TCPConnector::connect(host, port); @@ -194,7 +130,7 @@ void SslConnector::connect(const std::string& host, const std::string& port) { void SslConnector::connected(const Socket& s) { shim = new qpid::sys::windows::ClientSslAsynchIO(brokerHost, s, - credHandle, + sslCredential.handle(), boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), boost::bind(&SslConnector::disconnected, this, _1), @@ -206,186 +142,4 @@ void SslConnector::connected(const Socket& s) { shim->start(poller); } - -void SslConnector::loadPrivCertStore() -{ - // Get a handle to the system store or pkcs#12 file - qpid::sys::ssl::SslOptions& opts = qpid::sys::ssl::SslOptions::global; - if (opts.certFilename.empty()) { - // opening a system store, names are not case sensitive - std::string store = opts.certStore.empty() ? "my" : opts.certStore; - std::transform(store.begin(), store.end(), store.begin(), ::tolower); - // map confusing GUI name to actual registry store name - if (store == "personal") - store = "my"; - certStore = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A, 0, NULL, - CERT_STORE_OPEN_EXISTING_FLAG | CERT_STORE_READONLY_FLAG | - CERT_SYSTEM_STORE_CURRENT_USER, store.c_str()); - if (!certStore) { - HRESULT status = GetLastError(); - clientCertError.set(Msg() << "Could not open system certificate store: " << store, status); - return; - } - QPID_LOG(debug, "SslConnector using certifcates from system store: " << store); - } else { - // opening the store from file and populating it with a private key - HANDLE certFileHandle = NULL; - certFileHandle = CreateFile(opts.certFilename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - if (INVALID_HANDLE_VALUE == certFileHandle) { - HRESULT status = GetLastError(); - clientCertError.set(Msg() << "Failed to open the file holding the private key: " << opts.certFilename, status); - return; - } - std::vector certEncoded; - DWORD certEncodedSize = 0L; - const DWORD fileSize = GetFileSize(certFileHandle, NULL); - if (INVALID_FILE_SIZE != fileSize) { - certEncoded.resize(fileSize); - bool result = false; - result = ReadFile(certFileHandle, &certEncoded[0], - fileSize, - &certEncodedSize, - NULL); - if (!result) { - // the read failed, return the error as an HRESULT - HRESULT status = GetLastError(); - CloseHandle(certFileHandle); - clientCertError.set(Msg() << "Reading the private key from file failed " << opts.certFilename, status); - return; - } - } - else { - HRESULT status = GetLastError(); - clientCertError.set(Msg() << "Unable to read the certificate file " << opts.certFilename, status); - return; - } - CloseHandle(certFileHandle); - - CRYPT_DATA_BLOB blobData; - blobData.cbData = certEncodedSize; - blobData.pbData = &certEncoded[0]; - - // get passwd from file and convert to null terminated wchar_t (Windows UCS2) - std::string passwd = getPasswd(opts.certPasswordFile); - if (clientCertError.pending()) - return; - int pwlen = passwd.length(); - std::vector pwUCS2(pwlen + 1, L'\0'); - int nwc = MultiByteToWideChar(CP_UTF8, MB_ERR_INVALID_CHARS, passwd.data(), pwlen, &pwUCS2[0], pwlen); - if (!nwc) { - HRESULT status = GetLastError(); - clientCertError.set("Error converting password from UTF8", status); - return; - } - - certStore = PFXImportCertStore(&blobData, &pwUCS2[0], 0); - if (certStore == NULL) { - HRESULT status = GetLastError(); - clientCertError.set("Failed to open the certificate store", status); - return; - } - QPID_LOG(debug, "SslConnector using certificate from pkcs#12 file: " << opts.certFilename); - } -} - - -PCCERT_CONTEXT SslConnector::findCertificate(const std::string& name) -{ - loadPrivCertStore(); - if (clientCertError.pending()) - return NULL; - - // search for the certificate by Friendly Name - PCCERT_CONTEXT tmpctx = NULL; - while (tmpctx = CertEnumCertificatesInStore(certStore, tmpctx)) { - DWORD len = CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE, - 0, NULL, NULL, 0); - if (len == 1) - continue; - std::vector ctxname(len); - CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE, - 0, NULL, &ctxname[0], len); - bool found = !name.compare(&ctxname[0]); - if (found) - break; - } - - // verify whether some certificate has been found - if (tmpctx == NULL) { - clientCertError.set(Msg() << "Client SSL/TLS certificate not found in the certificate store for name " << name, - "client certificate not found"); - } - return tmpctx; -} - - -std::string SslConnector::getPasswd(const std::string& filename) -{ - std::string passwd; - if (filename == "") - return passwd; - - HANDLE pwfHandle = CreateFile(filename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - - if (INVALID_HANDLE_VALUE == pwfHandle) { - HRESULT status = GetLastError(); - clientCertError.set(Msg() << "Failed to open the password file: " << filename, status); - return passwd; - } - - const DWORD fileSize = GetFileSize(pwfHandle, NULL); - if (fileSize == INVALID_FILE_SIZE) { - CloseHandle(pwfHandle); - clientCertError.set("", "Cannot read password file"); - return passwd; - } - - std::vector pwbuf; - pwbuf.resize(fileSize); - DWORD nbytes = 0; - if (!ReadFile(pwfHandle, &pwbuf[0], fileSize, &nbytes, NULL)) { - HRESULT status = GetLastError(); - CloseHandle(pwfHandle); - clientCertError.set("Error reading password file", status); - return passwd; - } - CloseHandle(pwfHandle); - - if (nbytes == 0) - return passwd; - - while (nbytes) { - if ((pwbuf[nbytes-1] == 012) || (pwbuf[nbytes-1] == 015)) - nbytes--; - else - break; - } - - if (nbytes) - passwd.assign(&pwbuf[0], nbytes); - - return passwd; -} - -void SslConnector::SavedError::set(const std::string &lm, const std::string es) { - logMessage = lm; - error = es; -} - -void SslConnector::SavedError::set(const std::string &lm, int status) { - logMessage = lm; - error = qpid::sys::strError(status); -} - -void SslConnector::SavedError::clear() { - logMessage.clear(); - error.clear(); -} - -bool SslConnector::SavedError::pending() { - return !logMessage.empty() || !error.empty(); -} - }}} // namespace qpid::client::windows diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index beae53d85f..9d50b1c665 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -63,18 +63,16 @@ void Backup::setBrokerUrl(const Url& brokers) { QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; types::Uuid uuid(true); - std::pair result; - result = broker.getLinks().declare( + link = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), brokers[0].host, brokers[0].port, protocol, false, // durable settings.mechanism, settings.username, settings.password, - false); // no amq.failover - don't want to use client URL. - link = result.first; + false).first; // no amq.failover - don't want to use client URL. replicator = BrokerReplicator::create(haBroker, link); broker.getExchanges().registerExchange(replicator); } - link->setUrl(brokers); // Outside the lock, once set link doesn't change. + link->setUrl(brokers); } void Backup::stop(Mutex::ScopedLock&) { diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 7928b6ab71..3957ef5a0c 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -270,7 +270,8 @@ template std::string key() { } boost::shared_ptr BrokerReplicator::create( - HaBroker& hb, const boost::shared_ptr& l) { + HaBroker& hb, const boost::shared_ptr& l) +{ boost::shared_ptr br(new BrokerReplicator(hb, l)); br->initialize(); return br; @@ -330,13 +331,21 @@ void BrokerReplicator::initialize() { BrokerReplicator::~BrokerReplicator() {} namespace { -void collectQueueReplicators( - const boost::shared_ptr& ex, - set >& collect) -{ - boost::shared_ptr qr(boost::dynamic_pointer_cast(ex)); - if (qr) collect.insert(qr); -} +struct QueueReplicators : public std::deque > { + QueueReplicators(const ExchangeRegistry& er) { addAll(er); } + + /** Add the exchange if it is a QueueReplicator. */ + void add(const boost::shared_ptr& ex) { + boost::shared_ptr qr = + boost::dynamic_pointer_cast(ex); + if (qr) push_back(qr); + } + /** Add all QueueReplicator in the ExchangeRegistry. */ + void addAll(const ExchangeRegistry& er) { + // Make copy of exchanges so we can work outside the registry lock. + er.eachExchange(boost::bind(&QueueReplicators::add, this, _1)); + } +}; } // namespace void BrokerReplicator::shutdown() { @@ -877,35 +886,22 @@ void BrokerReplicator::forced(broker::Connection& c, const std::string& message) string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } -void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr ex) { - boost::shared_ptr qr(boost::dynamic_pointer_cast(ex)); - if (qr) { - qr->disconnect(); - if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { - // Transactions are aborted on failover so clean up tx-queues - deleteQueue(qr->getQueue()->getName()); - } +void BrokerReplicator::disconnectedQueueReplicator( + const boost::shared_ptr& qr) +{ + qr->disconnect(); + if (TxReplicator::isTxQueue(qr->getQueue()->getName())) { + // Transactions are aborted on failover so clean up tx-queues + deleteQueue(qr->getQueue()->getName()); } } -typedef vector > ExchangeVector; - -// Callback function for accumulating exchange candidates -namespace { -void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) { - ev.push_back(i); -} -} - // Called by ConnectionObserver::disconnected, disconnected from the network side. void BrokerReplicator::disconnected() { QPID_LOG(info, logPrefix << "Disconnected from primary " << primary); connect = 0; - - // Make copy of exchanges so we can work outside the registry lock. - ExchangeVector exs; - exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1)); - for_each(exs.begin(), exs.end(), + QueueReplicators qrs(broker.getExchanges()); + for_each(qrs.begin(), qrs.end(), boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1)); } diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index 445406ad19..1e051878ae 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -108,8 +108,6 @@ class BrokerReplicator : public broker::Exchange, typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&); typedef qpid::sys::unordered_map EventDispatchMap; - typedef qpid::sys::unordered_map QueueReplicatorMap; - class UpdateTracker; class ErrorListener; @@ -152,7 +150,7 @@ class BrokerReplicator : public broker::Exchange, void deleteQueue(const std::string& name, bool purge=true); void deleteExchange(const std::string& name); - void disconnectedQueueReplicator(boost::shared_ptr); + void disconnectedQueueReplicator(const boost::shared_ptr&); void disconnected(); void setMembership(const types::Variant::List&); // Set membership from list. diff --git a/cpp/src/qpid/ha/ErrorListener.h b/cpp/src/qpid/ha/ErrorListener.h deleted file mode 100644 index 1ae2078a11..0000000000 --- a/cpp/src/qpid/ha/ErrorListener.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef QPID_HA_ERRORLISTENER_H -#define QPID_HA_ERRORLISTENER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/SessionHandler.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/reply_exceptions.h" - -namespace qpid { -namespace ha { - -/** Default ErrorListener for HA module */ -class ErrorListener : public broker::SessionHandler::ErrorListener { - public: - ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {} - - void connectionException(framing::connection::CloseCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what()); - } - void channelException(framing::session::DetachCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << framing::createChannelException(code, msg).what()); - } - void executionException(framing::execution::ErrorCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << framing::createSessionException(code, msg).what()); - } - void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) { - QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); - } - void detach() { - QPID_LOG(error, logPrefix << "Session detached."); - } - - private: - std::string logPrefix; -}; - - -}} // namespace qpid::ha - -#endif /*!QPID_HA_ERRORLISTENER_H*/ diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index 59b2013f59..6881896f5e 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -39,6 +39,7 @@ #include "qpid/Msg.h" #include "qpid/assert.h" #include +#include #include @@ -90,7 +91,8 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what()); } void incomingExecutionException(ErrorCode code, const std::string& msg) { - if (!queueReplicator->deletedOnPrimary(code, msg)) + boost::shared_ptr qr = queueReplicator.lock(); + if (qr && !qr->deletedOnPrimary(code, msg)) QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what()); } @@ -98,7 +100,7 @@ class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { QPID_LOG(debug, logPrefix << "Session detached"); } private: - boost::shared_ptr queueReplicator; + boost::weak_ptr queueReplicator; std::string logPrefix; }; @@ -112,9 +114,12 @@ class QueueReplicator::QueueObserver : public broker::QueueObserver { void consumerAdded( const Consumer& ) {} void consumerRemoved( const Consumer& ) {} // Queue observer is destroyed when the queue is. - void destroy() { queueReplicator->destroy(); } + void destroy() { + boost::shared_ptr qr = queueReplicator.lock(); + if (qr) qr->destroy(); + } private: - boost::shared_ptr queueReplicator; + boost::weak_ptr queueReplicator; }; boost::shared_ptr QueueReplicator::create( @@ -171,8 +176,7 @@ void QueueReplicator::initialize() { throw Exception(QPID_MSG("Duplicate queue replicator " << getName())); // Enable callback to initializeBridge - std::pair result = - queue->getBroker()->getLinks().declare( + boost::shared_ptr b = queue->getBroker()->getLinks().declare( bridgeName, *link, false, // durable @@ -189,10 +193,10 @@ void QueueReplicator::initialize() { // Include shared_ptr to self to ensure we are not deleted // before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) - ); - bridge = result.first; - bridge->setErrorListener( + ).first; + b->setErrorListener( boost::shared_ptr(new ErrorListener(shared_from_this()))); + bridge = b; // bridge is a weak_ptr to avoid a cycle. // Enable callback to destroy() queue->getObservers().add( @@ -211,7 +215,7 @@ void QueueReplicator::destroy() { { Mutex::ScopedLock l(lock); if (!queue) return; // Already destroyed - bridge2 = bridge; // call close outside the lock. + bridge2 = bridge.lock(); // !call close outside the lock. destroy(l); } if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock. diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index f94c6de116..757f12c7a9 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -112,7 +112,7 @@ class QueueReplicator : public broker::Exchange, const BrokerInfo brokerInfo; DispatchMap dispatch; boost::shared_ptr link; - boost::shared_ptr bridge; + boost::weak_ptr bridge; boost::shared_ptr queue; broker::SessionHandler* sessionHandler; diff --git a/cpp/src/qpid/linearstore/ISSUES b/cpp/src/qpid/linearstore/ISSUES index 8c5b08bb61..7f4d7750d0 100644 --- a/cpp/src/qpid/linearstore/ISSUES +++ b/cpp/src/qpid/linearstore/ISSUES @@ -26,6 +26,7 @@ Current/pending: 5359 - Linearstore: Implement new management schema and wire into store 5360 - Linearstore: Evaluate and rework logging to produce a consistent log output 5361 - Linearstore: No tests for linearstore functionality currently exist + svn r.1564893 2014-02-05: Added tx-test-soak.sh * No existing tests for linearstore: ** Basic broker-level tests for txn and non-txn recovery ** Store-level tests which check write boundary conditions @@ -34,29 +35,29 @@ Current/pending: ** Basic performance tests 5362 - Linearstore: No store tools exist for examining the journals svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. + svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze + svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze + svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze * Store analysis and status * Recovery/reading of message content * Empty file pool status and management 5464 - [linearstore] Incompletely created journal files accumulate in EFP - 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message - * Probablilty: 2 of 600 (0.3%) using tx-test-soak.sh - 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message - * Probability: 6 of 600 (1.0%) using tx-test-soak.sh - * If broker is started a second time after failure, it starts correctly and test completes ok. 5484 1035843 Slow performance for producers - svn r.1558592 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers. - - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue : create() failed: jexception 0x0000 + svn r.1558592 2014-01-15 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers. + svn r.1558913 2014-01-16 replaces use of /dev/urandom with several calls to rand() to construct a 64-bit random number. + * Recommend rebuilding and testing for performance again with these two fixes. Marked POST. +# - 1036026 [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue : create() failed: jexception 0x0000 UNABLE TO REPRODUCE - but Frantizek has additional info - 1039522 Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL * Possible dup of 1039525 - * May be fixed by QPID-5483 - waiting for needinfo + * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing - 1039525 Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL * Possible dup of 1039522 - * May be fixed by QPID-5483 - waiting for needinfo - 5487 - [linearstore] Replace use of /dev/urandom with c random generator calls + * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing +# - 1049870 [LinearStore] auto-delete property does not survive restart -Fixed/closed: -============= +Fixed/closed (in commit order): +=============================== Q-JIRA RHBZ Description / Comments ------ ------- ---------------------- 5357 1052518 Linearstore: Empty file recycling not functional @@ -85,6 +86,17 @@ NO-JIRA - Added missing Apache copyright/license text svn r.1558589 2014-01-15: Proposed fix * May be linked to RHBZ 1039522 - waiting for needinfo * May be linked to RHBZ 1039525 - waiting for needinfo + 5487 1054448 [linearstore] Replace use of /dev/urandom with c random generator calls + svn r.1558913 2014-01-16: Proposed fix + 5480 1053749 [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message + svn r.1564877 2014-02-05: Proposed fix + * Probability: 6 of 600 (1.0%) using tx-test-soak.sh + * If broker is started a second time after failure, it starts correctly and test completes ok. + * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally). + * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary + 5479 1053701 [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message + * Probability: 2 of 600 (0.3%) using tx-test-soak.sh + * Fixed by checkin for QPID-5480, no longer able to reproduce. Marked POST. Future: ======= @@ -101,3 +113,4 @@ Code tidy-up * Member names: xxx_ * Rename classes, functions and variables to camel-case * Add Doxygen docs to classes +* Make fid's consistent in name (fid, file_id, pfid) and format (hex vs decimal) diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 483b494c2c..ff5b41b962 100644 --- a/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -593,7 +593,7 @@ void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry_) std::ostringstream oss; oss << "Recovered transaction prepared list:"; for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) { - oss << std::endl << " " << str2hexnum(i->xid); + oss << std::endl << " " << qpid::linearstore::journal::jcntl::str2hexnum(i->xid); } QLS_LOG(debug, oss.str()); @@ -1292,7 +1292,7 @@ void MessageStoreImpl::completed(TxnCtxt& txn_, mgmtObject->inc_tplTxnAborts(); } } catch (const std::exception& e) { - QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what()); + QLS_LOG(error, "Error completing xid " << qpid::linearstore::journal::jcntl::str2hexnum(txn_.getXid()) << ": " << e.what()); throw; } } @@ -1516,15 +1516,6 @@ void MessageStoreImpl::journalDeleted(JournalImpl& j_) { journalList.erase(j_.id()); } -std::string MessageStoreImpl::str2hexnum(const std::string& str) { - std::ostringstream oss; - oss << "(" << str.size() << ")0x" << std::hex; - for (unsigned i=str.size(); i>0; --i) { - oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1]; - } - return oss.str(); -} - MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) : qpid::Options(name_), truncateFlag(defTruncateFlag), diff --git a/cpp/src/qpid/linearstore/MessageStoreImpl.h b/cpp/src/qpid/linearstore/MessageStoreImpl.h index 3157b9be9d..c2eb0deab0 100644 --- a/cpp/src/qpid/linearstore/MessageStoreImpl.h +++ b/cpp/src/qpid/linearstore/MessageStoreImpl.h @@ -235,8 +235,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem } void chkTplStoreInit(); - static std::string str2hexnum(const std::string& str); - public: typedef boost::shared_ptr shared_ptr; diff --git a/cpp/src/qpid/linearstore/TxnCtxt.cpp b/cpp/src/qpid/linearstore/TxnCtxt.cpp index 743d12989a..df2e7a442d 100644 --- a/cpp/src/qpid/linearstore/TxnCtxt.cpp +++ b/cpp/src/qpid/linearstore/TxnCtxt.cpp @@ -52,7 +52,9 @@ void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) { jc->txn_abort(dtokp.get(), getXid()); } } catch (const qpid::linearstore::journal::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Error commit") + e.what()); + std::ostringstream oss; + oss << "Error during " << (commit ? "commit" : "abort") << ": " << e.what(); + THROW_STORE_EXCEPTION(oss.str()); } } } diff --git a/cpp/src/qpid/linearstore/journal/JournalFile.cpp b/cpp/src/qpid/linearstore/journal/JournalFile.cpp index 1b2025bd5a..fc6ced4fd2 100644 --- a/cpp/src/qpid/linearstore/journal/JournalFile.cpp +++ b/cpp/src/qpid/linearstore/journal/JournalFile.cpp @@ -27,18 +27,18 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include -//#include // DEBUG - namespace qpid { namespace linearstore { namespace journal { JournalFile::JournalFile(const std::string& fqFileName, const efpIdentity_t& efpIdentity, - const uint64_t fileSeqNum) : + const uint64_t fileSeqNum, + const std::string queueName) : efpIdentity_(efpIdentity), fqFileName_(fqFileName), fileSeqNum_(fileSeqNum), + queueName_(queueName), serial_(getRandom64()), firstRecordOffset_(0ULL), fileHandle_(-1), @@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::string& fqFileName, {} JournalFile::JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader) : + const ::file_hdr_t& fileHeader, + const std::string queueName) : efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib), fqFileName_(fqFileName), fileSeqNum_(fileHeader._file_number), + queueName_(queueName), serial_(fileHeader._rhdr._serial), firstRecordOffset_(fileHeader._fro), fileHandle_(-1), @@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::string& fqFileName, fileHeaderPtr_(0), aioControlBlockPtr_(0), fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES), + initializedFlag_(false), enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0), submittedDblkCount_("JournalFile::submittedDblkCount", 0), completedDblkCount_("JournalFile::completedDblkCount", 0), @@ -78,18 +82,21 @@ JournalFile::~JournalFile() { void JournalFile::initialize(const uint32_t completedDblkCount) { - if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) - { - std::ostringstream oss; - oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); - oss << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + if (!initializedFlag_) { + if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024)) + { + std::ostringstream oss; + oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024); + oss << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize"); + } + fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; + aioControlBlockPtr_ = new aio_cb; + initializedFlag_ = true; } - fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_; - aioControlBlockPtr_ = new aio_cb; if (completedDblkCount > 0UL) { - submittedDblkCount_.add(completedDblkCount); - completedDblkCount_.add(completedDblkCount); + submittedDblkCount_.set(completedDblkCount); + completedDblkCount_.set(completedDblkCount); } } @@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, const efpDataSize_kib_t efpDataSize_kib, const uint16_t userFlags, const uint64_t recordId, - const uint64_t firstRecordOffset, - const std::string queueName) { + const uint64_t firstRecordOffset) { firstRecordOffset_ = firstRecordOffset; ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib); ::file_hdr_init(fileHeaderBasePtr_, @@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(io_context_t ioContextPtr, recordId, firstRecordOffset, fileSeqNum_, - queueName.size(), - queueName.data()); - aio::prep_pwrite(aioControlBlockPtr_, - fileHandle_, - (void*)fileHeaderBasePtr_, - QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024, - 0UL); - if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); + queueName_.size(), + queueName_.data()); + const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024; + aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL); + if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) { + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0"; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite"); + } addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS); incrOutstandingAioOperationCount(); } @@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_context_t ioContextPtr, aio_cb* aioControlBlockPtr, void* data, uint32_t dataSize_dblks) { - aio::prep_pwrite_2(aioControlBlockPtr, - fileHandle_, - data, - dataSize_dblks * QLS_DBLK_SIZE_BYTES, - submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES); + const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES; + const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES; + aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs); pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb) pcbp->_wdblks = dataSize_dblks; pcbp->_jfp = this; if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) { - throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details + std::ostringstream oss; + oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex << fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs; + throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite"); } addSubmittedDblkCount(dataSize_dblks); incrOutstandingAioOperationCount(); diff --git a/cpp/src/qpid/linearstore/journal/JournalFile.h b/cpp/src/qpid/linearstore/journal/JournalFile.h index f0ad432fd8..e33830ef7f 100644 --- a/cpp/src/qpid/linearstore/journal/JournalFile.h +++ b/cpp/src/qpid/linearstore/journal/JournalFile.h @@ -38,6 +38,7 @@ protected: const efpIdentity_t efpIdentity_; const std::string fqFileName_; const uint64_t fileSeqNum_; + const std::string queueName_; const uint64_t serial_; uint64_t firstRecordOffset_; int fileHandle_; @@ -46,6 +47,7 @@ protected: ::file_hdr_t* fileHeaderPtr_; aio_cb* aioControlBlockPtr_; uint32_t fileSize_dblks_; ///< File size in data blocks, including file header + bool initializedFlag_; AtomicCounter enqueuedRecordCount_; ///< Count of enqueued records AtomicCounter submittedDblkCount_; ///< Write file count (data blocks) for submitted AIO @@ -56,10 +58,12 @@ public: // Constructor for creating new file with known fileSeqNum and random serial JournalFile(const std::string& fqFileName, const efpIdentity_t& efpIdentity, - const uint64_t fileSeqNum); + const uint64_t fileSeqNum, + const std::string queueName); // Constructor for recovery in which fileSeqNum and serial are recovered from fileHeader param JournalFile(const std::string& fqFileName, - const ::file_hdr_t& fileHeader); + const ::file_hdr_t& fileHeader, + const std::string queueName); virtual ~JournalFile(); void initialize(const uint32_t completedDblkCount); @@ -76,13 +80,13 @@ public: const efpDataSize_kib_t efpDataSize_kib, const uint16_t userFlags, const uint64_t recordId, - const uint64_t firstRecordOffset, - const std::string queueName); + const uint64_t firstRecordOffset); void asyncPageWrite(io_context_t ioContextPtr, aio_cb* aioControlBlockPtr, void* data, uint32_t dataSize_dblks); + uint32_t getSubmittedDblkCount() const; uint32_t getEnqueuedRecordCount() const; uint32_t incrEnqueuedRecordCount(); uint32_t decrEnqueuedRecordCount(); @@ -109,7 +113,6 @@ protected: static uint64_t getRandom64(); bool isOpen() const; - uint32_t getSubmittedDblkCount() const; uint32_t addSubmittedDblkCount(const uint32_t a); uint32_t getCompletedDblkCount() const; diff --git a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp index 5483f3bb94..86d1b0e93c 100644 --- a/cpp/src/qpid/linearstore/journal/LinearFileController.cpp +++ b/cpp/src/qpid/linearstore/journal/LinearFileController.cpp @@ -25,8 +25,6 @@ #include "qpid/linearstore/journal/jcntl.h" #include "qpid/linearstore/journal/JournalFile.h" -//#include // DEBUG - namespace qpid { namespace linearstore { namespace journal { @@ -34,10 +32,10 @@ namespace journal { LinearFileController::LinearFileController(jcntl& jcntlRef) : jcntlRef_(jcntlRef), emptyFilePoolPtr_(0), - currentJournalFilePtr_(0), fileSeqCounter_("LinearFileController::fileSeqCounter", 0), recordIdCounter_("LinearFileController::recordIdCounter", 0), - decrCounter_("LinearFileController::decrCounter", 0) + decrCounter_("LinearFileController::decrCounter", 0), + currentJournalFilePtr_(0) {} LinearFileController::~LinearFileController() {} @@ -53,7 +51,7 @@ void LinearFileController::initialize(const std::string& journalDirectory, void LinearFileController::finalize() { if (currentJournalFilePtr_) { currentJournalFilePtr_->close(); - currentJournalFilePtr_ = NULL; + currentJournalFilePtr_ = 0; } while (!journalFileList_.empty()) { delete journalFileList_.front(); @@ -62,17 +60,21 @@ void LinearFileController::finalize() { } void LinearFileController::addJournalFile(JournalFile* journalFilePtr, - const uint32_t completedDblkCount) { - if (currentJournalFilePtr_) { + const uint32_t completedDblkCount, + const bool makeCurrentFlag) { + if (makeCurrentFlag && currentJournalFilePtr_) { currentJournalFilePtr_->close(); + currentJournalFilePtr_ = 0; } journalFilePtr->initialize(completedDblkCount); - currentJournalFilePtr_ = journalFilePtr; { slock l(journalFileListMutex_); - journalFileList_.push_back(currentJournalFilePtr_); + journalFileList_.push_back(journalFilePtr); + } + if (makeCurrentFlag) { + currentJournalFilePtr_ = journalFilePtr; + currentJournalFilePtr_->open(); } - currentJournalFilePtr_->open(); } efpDataSize_sblks_t LinearFileController::dataSize_sblks() const { @@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController::fileSize_sblks() const { return emptyFilePoolPtr_->fileSize_sblks(); } +void LinearFileController::getNextJournalFile() { + if (currentJournalFilePtr_) + currentJournalFilePtr_->close(); + pullEmptyFileFromEfp(); +} + uint64_t LinearFileController::getNextRecordId() { return recordIdCounter_.increment(); } -void LinearFileController::pullEmptyFileFromEfp() { - if (currentJournalFilePtr_) - currentJournalFilePtr_->close(); - std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name -//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG - addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); +void LinearFileController::removeFileToEfp(const std::string& fileName) { + if (emptyFilePoolPtr_) { + emptyFilePoolPtr_->returnEmptyFile(fileName); + } } void LinearFileController::restoreEmptyFile(const std::string& fileName) { @@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyFile(const std::string& fileName) { void LinearFileController::purgeEmptyFilesToEfp() { slock l(journalFileListMutex_); - purgeEmptyFilesToEfpNoLock(); + while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records + emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); + delete journalFileList_.front(); + journalFileList_.pop_front(); + } } uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { @@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t file } uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount(); // TODO: Re-evaluate after testing and profiling @@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t file // records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data // records) without impacting performance or performing badly (leaving excessive empty files in the journals). if (decrCounter_.increment() % 100ULL == 0ULL) { - purgeEmptyFilesToEfpNoLock(); + purgeEmptyFilesToEfp(); } return r; } uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->addCompletedDblkCount(a); } uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) { - slock l(journalFileListMutex_); return find(fileSeqNumber)->decrOutstandingAioOperationCount(); } @@ -142,12 +149,11 @@ void LinearFileController::asyncFileHeaderWrite(io_context_t ioContextPtr, const uint64_t recordId, const uint64_t firstRecordOffset) { currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr, - emptyFilePoolPtr_->getPartitionNumber(), - emptyFilePoolPtr_->dataSize_kib(), - userFlags, - recordId, - firstRecordOffset, - jcntlRef_.id()); + emptyFilePoolPtr_->getPartitionNumber(), + emptyFilePoolPtr_->dataSize_kib(), + userFlags, + recordId, + firstRecordOffset); } void LinearFileController::asyncPageWrite(io_context_t ioContextPtr, @@ -195,8 +201,8 @@ void LinearFileController::addJournalFile(const std::string& fileName, const efpIdentity_t& efpIdentity, const uint64_t fileNumber, const uint32_t completedDblkCount) { - JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber); - addJournalFile(jfp, completedDblkCount); + JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id()); + addJournalFile(jfp, completedDblkCount, true); } void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const { @@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJournalFileValid() const { return currentJournalFilePtr_ != 0; } -// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) { - if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) + if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber) return currentJournalFilePtr_; + + slock l(journalFileListMutex_); for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) { if ((*i)->getFileSeqNum() == fileSeqNumber) { return *i; } } + std::ostringstream oss; oss << "fileSeqNumber=" << fileSeqNumber; throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find"); @@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFileSeqNum() { return fileSeqCounter_.increment(); } -void LinearFileController::purgeEmptyFilesToEfpNoLock() { -//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG - while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && - journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records -//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG - emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName()); - delete journalFileList_.front(); - journalFileList_.pop_front(); - } +void LinearFileController::pullEmptyFileFromEfp() { + std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name + addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0); } }}} diff --git a/cpp/src/qpid/linearstore/journal/LinearFileController.h b/cpp/src/qpid/linearstore/journal/LinearFileController.h index 933b9792a4..05f08144b9 100644 --- a/cpp/src/qpid/linearstore/journal/LinearFileController.h +++ b/cpp/src/qpid/linearstore/journal/LinearFileController.h @@ -44,12 +44,12 @@ protected: jcntl& jcntlRef_; std::string journalDirectory_; EmptyFilePool* emptyFilePoolPtr_; - JournalFile* currentJournalFilePtr_; AtomicCounter fileSeqCounter_; AtomicCounter recordIdCounter_; AtomicCounter decrCounter_; JournalFileList_t journalFileList_; + JournalFile* currentJournalFilePtr_; smutex journalFileListMutex_; public: @@ -62,12 +62,14 @@ public: void finalize(); void addJournalFile(JournalFile* journalFilePtr, - const uint32_t completedDblkCount); + const uint32_t completedDblkCount, + const bool makeCurrentFlag); efpDataSize_sblks_t dataSize_sblks() const; efpFileSize_sblks_t fileSize_sblks() const; + void getNextJournalFile(); uint64_t getNextRecordId(); - void pullEmptyFileFromEfp(); + void removeFileToEfp(const std::string& fileName); void restoreEmptyFile(const std::string& fileName); void purgeEmptyFilesToEfp(); @@ -105,11 +107,12 @@ protected: bool checkCurrentJournalFileValid() const; JournalFile* find(const efpFileCount_t fileSeqNumber); uint64_t getNextFileSeqNum(); - void purgeEmptyFilesToEfpNoLock(); + void pullEmptyFileFromEfp(); }; typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr, - const uint32_t completedDblkCount); + const uint32_t completedDblkCount, + const bool makeCurrentFlag); }}} diff --git a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 72308cc929..a1cec53ca1 100644 --- a/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -56,11 +56,15 @@ RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t pendingTransaction_(ptxn) {} - bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) { return a.recordId_ < b.recordId_; } +RecoveredFileData_t::RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount) : + journalFilePtr_(journalFilePtr), + completedDblkCount_(completedDblkCount) +{} + RecoveryManager::RecoveryManager(const std::string& journalDirectory, const std::string& queuename, enq_map& enqueueMapRef, @@ -77,11 +81,17 @@ RecoveryManager::RecoveryManager(const std::string& journalDirectory, highestRecordId_(0ULL), highestFileNumber_(0ULL), lastFileFullFlag_(false), + initial_fid_(0), currentSerial_(0), efpFileSize_kib_(0) {} -RecoveryManager::~RecoveryManager() {} +RecoveryManager::~RecoveryManager() { + for (fileNumberMapItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { + delete i->second; + } + fileNumberMap_.clear(); +} void RecoveryManager::analyzeJournals(const std::vector* preparedTransactionListPtr, EmptyFilePoolManager* emptyFilePoolManager, @@ -92,9 +102,6 @@ void RecoveryManager::analyzeJournals(const std::vector* preparedTr *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib(); - // Check for file full condition - lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; - if (!journalEmptyFlag_) { // Read all records, establish remaining enqueued records @@ -106,6 +113,9 @@ void RecoveryManager::analyzeJournals(const std::vector* preparedTr inFileStream_.close(); } + // Check for file full condition + lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024; + // Remove leading files which have no enqueued records removeEmptyFiles(*emptyFilePoolPtrPtr); @@ -121,7 +131,7 @@ void RecoveryManager::analyzeJournals(const std::vector* preparedTr // Unlock any affected enqueues in emap for (tdl_itr_t i=tdl.begin(); ienq_flag_) { // enq op - decrement enqueue count - fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount(); + fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND @@ -174,7 +184,7 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, } } while (!foundRecord); - if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) { + if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) { if (!getFile(recordIdListConstItr_->fileId_, false)) { std::ostringstream oss; oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_; @@ -231,7 +241,6 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, ::rec_tail_t enqueueTail; inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t)); uint32_t cs = checksum.getChecksum(); -//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs); if (res != 0) { std::stringstream oss; @@ -266,17 +275,30 @@ bool RecoveryManager::readNextRemainingRecord(void** const dataPtrPtr, void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { if (journalEmptyFlag_) { - if (uninitializedJournal_.size() > 0) { - lfcPtr->restoreEmptyFile(uninitializedJournal_); + if (uninitFileList_.size() > 0) { + std::string uninitFile = uninitFileList_.back(); + uninitFileList_.pop_back(); + lfcPtr->restoreEmptyFile(uninitFile); } } else { for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) { - uint32_t fileDblkCount = i->first == highestFileNumber_ ? // Is this this last file? - endOffset_ / QLS_DBLK_SIZE_BYTES : // Last file uses _endOffset - efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full - (lfcPtr->*fnPtr)(i->second, fileDblkCount); + (lfcPtr->*fnPtr)(i->second->journalFilePtr_, i->second->completedDblkCount_, i->first == initial_fid_); } } + + std::ostringstream oss; + bool logFlag = !notNeededFilesList_.empty(); + if (logFlag) { + oss << "Files removed from head of journal: prior truncation during recovery:"; + } + while (!notNeededFilesList_.empty()) { + lfcPtr->removeFileToEfp(notNeededFilesList_.back()); + oss << std::endl << " * " << notNeededFilesList_.back(); + notNeededFilesList_.pop_back(); + } + if (logFlag) { + journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str()); + } } std::string RecoveryManager::toString(const std::string& jid) { @@ -285,7 +307,7 @@ std::string RecoveryManager::toString(const std::string& jid) { oss << " Number of journal files = " << fileNumberMap_.size() << std::endl; oss << " Journal File List:" << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { - std::string fqFileName = k->second->getFqFileName(); + std::string fqFileName = k->second->journalFilePtr_->getFqFileName(); oss << " " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl; } oss << " Enqueue Counts: [ "; @@ -293,7 +315,7 @@ std::string RecoveryManager::toString(const std::string& jid) { if (l != fileNumberMap_.begin()) { oss << ", "; } - oss << l->second->getEnqueuedRecordCount(); + oss << l->second->journalFilePtr_->getEnqueuedRecordCount(); } oss << " ]" << std::endl; oss << " Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl; @@ -330,15 +352,17 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { << std::setw(10) << "--------" << std::endl; for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) { - std::string fqFileName = k->second->getFqFileName(); + std::string fqFileName = k->second->journalFilePtr_->getFqFileName(); + std::ostringstream fid; + fid << std::hex << "0x" << k->first; std::ostringstream fro; - fro << std::hex << "0x" << k->second->getFirstRecordOffset(); - oss << indentStr << std::setw(7) << k->first + fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset(); + oss << indentStr << std::setw(7) << fid.str() << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1) << std::setw(16) << fro.str() - << std::setw(12) << k->second->getEnqueuedRecordCount() - << std::setw(5) << k->second->getEfpIdentity().pn_ - << std::setw(9) << k->second->getEfpIdentity().ds_ << "k" + << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount() + << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_ + << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k" << std::endl; } oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ << @@ -347,7 +371,7 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl; oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl; oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl; - oss << indentStr << "Enqueued records (txn & non-txn):"; + //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report } return oss.str(); } @@ -357,27 +381,28 @@ std::string RecoveryManager::toLog(const std::string& jid, const int indent) { void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { std::string headerQueueName; ::file_hdr_t fileHeader; - directoryList_t directoryList; + stringList_t directoryList; jdir::read_dir(journalDirectory_, directoryList, false, true, false, true); - for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { + for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { readJournalFileHeader(*i, fileHeader, headerQueueName); if (headerQueueName.empty()) { std::ostringstream oss; - if (uninitializedJournal_.empty()) { - oss << "Journal file " << (*i) << " is first uninitialized (not yet written) journal file."; - journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str()); - uninitializedJournal_ = *i; - } else { - oss << "Journal file " << (*i) << " is second or greater uninitialized journal file - ignoring"; - journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); - } + oss << "Journal file " << (*i) << " is uninitialized"; + journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); + uninitFileList_.push_back(*i); } else if (headerQueueName.compare(queueName_) != 0) { std::ostringstream oss; oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); } else { - JournalFile* jfp = new JournalFile(*i, fileHeader); - fileNumberMap_[fileHeader._file_number] = jfp; + JournalFile* jfp = new JournalFile(*i, fileHeader, queueName_); + std::pair res = fileNumberMap_.insert( + std::pair(fileHeader._file_number, new RecoveredFileData_t(jfp, 0))); + if (!res.second) { + std::ostringstream oss; + oss << "Journal file " << (*i) << " has fid=0x" << std::hex << jfp->getFileSeqNum() << " which already exists for this journal."; + throw jexception(oss.str()); // TODO: complete this exception + } if (fileHeader._file_number > highestFileNumber_) { highestFileNumber_ = fileHeader._file_number; } @@ -393,7 +418,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { - currentJournalFileConstItr_ = fileNumberMap_.begin(); + currentJournalFileItr_ = fileNumberMap_.begin(); } } @@ -408,7 +433,7 @@ void RecoveryManager::checkFileStreamOk(bool checkEof) { } } -void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) { +void RecoveryManager::checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition) { if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) { std::ostringstream oss; oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition; @@ -420,12 +445,13 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) if (sblkOffset) { std::ostringstream oss1; - oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber(); + oss1 << std::hex << "Bad record alignment found at fid=0x" << start_fid; oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec; oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required."; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str()); - std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary); + fileNumberMapConstItr_t fnmItr = fileNumberMap_.find(start_fid); + std::ofstream outFileStream(fnmItr->second->journalFilePtr_->getFqFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary); if (!outFileStream.good()) { throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment"); } @@ -447,7 +473,7 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment"); } std::ostringstream oss2; - oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber(); + oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << start_fid; oss2 << " offs=0x" << currentPosn; journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str()); currentPosn = outFileStream.tellp(); @@ -456,16 +482,15 @@ void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) std::free(writeBuffer); journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed."); } - endOffset_ = currentPosn; + lastRecord(start_fid, currentPosn); } bool RecoveryManager::decodeRecord(jrec& record, std::size_t& cumulativeSizeRead, ::rec_hdr_t& headerRecord, - std::streampos& fileOffset) + const uint64_t start_fid, + const std::streampos recordOffset) { - std::streampos start_file_offs = fileOffset; - if (highestRecordId_ == 0) { highestRecordId_ = headerRecord._rid; } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit @@ -475,7 +500,7 @@ bool RecoveryManager::decodeRecord(jrec& record, bool done = false; while (!done) { try { - done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead); + done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead, recordOffset); } catch (const jexception& e) { if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) { @@ -485,11 +510,12 @@ bool RecoveryManager::decodeRecord(jrec& record, } else { journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what()); } - checkJournalAlignment(start_file_offs); + checkJournalAlignment(start_fid, recordOffset); return false; } if (!done && needNextFile()) { if (!getNextFile(false)) { + checkJournalAlignment(start_fid, recordOffset); return false; } } @@ -498,11 +524,11 @@ bool RecoveryManager::decodeRecord(jrec& record, } std::string RecoveryManager::getCurrentFileName() const { - return currentJournalFileConstItr_->second->getFqFileName(); + return currentJournalFileItr_->second->journalFilePtr_->getFqFileName(); } uint64_t RecoveryManager::getCurrentFileNumber() const { - return currentJournalFileConstItr_->first; + return currentJournalFileItr_->first; } bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) { @@ -511,8 +537,8 @@ bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordO //std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ } - currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber); - if (currentJournalFileConstItr_ == fileNumberMap_.end()) { + currentJournalFileItr_ = fileNumberMap_.find(fileNumber); + if (currentJournalFileItr_ == fileNumberMap_.end()) { return false; } inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary); @@ -536,7 +562,8 @@ bool RecoveryManager::getNextFile(bool jumpToFirstRecordOffsetFlag) { if (inFileStream_.is_open()) { inFileStream_.close(); //std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG - if (++currentJournalFileConstItr_ == fileNumberMap_.end()) { + currentJournalFileItr_->second->completedDblkCount_ = efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; + if (++currentJournalFileItr_ == fileNumberMap_.end()) { return false; } inFileStream_.clear(); // clear eof flag, req'd for older versions of c++ @@ -562,13 +589,15 @@ bool RecoveryManager::getNextRecordHeader() rec_hdr_t h; bool hdr_ok = false; - std::streampos file_pos; + uint64_t file_id = 0; + std::streampos file_pos = 0; while (!hdr_ok) { if (needNextFile()) { if (!getNextFile(true)) { return false; } } + file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum(); file_pos = inFileStream_.tellg(); if (file_pos == std::streampos(-1)) { std::ostringstream oss; @@ -587,21 +616,21 @@ bool RecoveryManager::getNextRecordHeader() } } + uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary switch(h._magic) { case QLS_ENQ_MAGIC: { //std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } enq_rec er; - uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary - if (!decodeRecord(er, cum_size_read, h, file_pos)) { + if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) { return false; } if (!er.is_transient()) { // Ignore transient msgs - fileNumberMap_[start_fid]->incrEnqueuedRecordCount(); + fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount(); if (er.xid_size()) { er.get_xid(&xidp); if (xidp == 0) { @@ -629,12 +658,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } deq_rec dr; - uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary - if (!decodeRecord(dr, cum_size_read, h, file_pos)) { + if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) { return false; } if (dr.xid_size()) { @@ -655,7 +683,7 @@ bool RecoveryManager::getNextRecordHeader() } else { uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error - fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); + fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount(); } } } @@ -664,11 +692,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } txn_rec ar; - if (!decodeRecord(ar, cum_size_read, h, file_pos)) { + if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) { return false; } // Delete this txn from tmap, unlock any locked records in emap @@ -680,7 +708,7 @@ bool RecoveryManager::getNextRecordHeader() txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) { - fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount(); + fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount(); } else { enqueueMapRef_.unlock(itr->drid_); // ignore not found error } @@ -691,11 +719,11 @@ bool RecoveryManager::getNextRecordHeader() { //std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) { - endOffset_ = file_pos; + lastRecord(file_id, file_pos); return false; } txn_rec cr; - if (!decodeRecord(cr, cum_size_read, h, file_pos)) { + if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) { return false; } // Delete this txn from tmap, process records into emap @@ -717,7 +745,7 @@ bool RecoveryManager::getNextRecordHeader() } else { // txn dequeue uint64_t enq_fid; if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error - fileNumberMap_[enq_fid]->decrEnqueuedRecordCount(); + fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount(); } } } @@ -729,7 +757,9 @@ bool RecoveryManager::getNextRecordHeader() inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t)); checkFileStreamOk(false); if (needNextFile()) { + file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES; if (!getNextFile(false)) { + lastRecord(start_fid, file_pos); return false; } } @@ -737,17 +767,36 @@ bool RecoveryManager::getNextRecordHeader() break; case 0: //std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG - checkJournalAlignment(file_pos); + checkJournalAlignment(getCurrentFileNumber(), file_pos); return false; default: //std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG // Stop as this is the overwrite boundary. - checkJournalAlignment(file_pos); + checkJournalAlignment(getCurrentFileNumber(), file_pos); return false; } return true; } +void RecoveryManager::lastRecord(const uint64_t file_id, const std::streamoff endOffset) { + endOffset_ = endOffset; + initial_fid_ = file_id; + fileNumberMap_[file_id]->completedDblkCount_ = endOffset_ / QLS_DBLK_SIZE_BYTES; + + // Remove any files in fileNumberMap_ beyond initial_fid_ + fileNumberMapItr_t unwantedFirstItr = fileNumberMap_.find(file_id); + if (++unwantedFirstItr != fileNumberMap_.end()) { + fileNumberMapItr_t itr = unwantedFirstItr; + notNeededFilesList_.push_back(unwantedFirstItr->second->journalFilePtr_->getFqFileName()); + while (++itr != fileNumberMap_.end()) { + notNeededFilesList_.push_back(itr->second->journalFilePtr_->getFqFileName()); + delete itr->second->journalFilePtr_; + delete itr->second; + } + fileNumberMap_.erase(unwantedFirstItr, fileNumberMap_.end()); + } +} + bool RecoveryManager::needNextFile() { if (inFileStream_.is_open()) { return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024); @@ -820,7 +869,7 @@ bool RecoveryManager::readFileHeader() { currentSerial_ = fhdr._rhdr._serial; } else { inFileStream_.close(); - if (currentJournalFileConstItr_ == fileNumberMap_.begin()) { + if (currentJournalFileItr_ == fileNumberMap_.begin()) { journalEmptyFlag_ = true; } return false; @@ -855,9 +904,11 @@ void RecoveryManager::readJournalFileHeader(const std::string& journalFileName, } void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) { - while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { -//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG - emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName()); + while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) { + RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second; + emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName()); + delete rfdp->journalFilePtr_; + delete rfdp; fileNumberMap_.erase(fileNumberMap_.begin()->first); } } diff --git a/cpp/src/qpid/linearstore/journal/RecoveryManager.h b/cpp/src/qpid/linearstore/journal/RecoveryManager.h index 997596938b..e19f92e305 100644 --- a/cpp/src/qpid/linearstore/journal/RecoveryManager.h +++ b/cpp/src/qpid/linearstore/journal/RecoveryManager.h @@ -51,15 +51,21 @@ struct RecoveredRecordData_t { RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn); }; +struct RecoveredFileData_t { + JournalFile* journalFilePtr_; + uint32_t completedDblkCount_; + RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount); +}; + bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b); class RecoveryManager { protected: // Types - typedef std::vector directoryList_t; - typedef directoryList_t::const_iterator directoryListConstItr_t; - typedef std::map fileNumberMap_t; + typedef std::vector stringList_t; + typedef stringList_t::const_iterator stringListConstItr_t; + typedef std::map fileNumberMap_t; typedef fileNumberMap_t::iterator fileNumberMapItr_t; typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t; typedef std::vector recordIdList_t; @@ -74,18 +80,20 @@ protected: // Initial journal analysis data fileNumberMap_t fileNumberMap_; ///< File number - JournalFilePtr map + stringList_t notNeededFilesList_; ///< Files not needed and to be returned to EFP + stringList_t uninitFileList_; ///< File name of uninitialized journal files found during header analysis bool journalEmptyFlag_; ///< Journal data files empty std::streamoff firstRecordOffset_; ///< First record offset in ffid std::streamoff endOffset_; ///< End offset (first byte past last record) uint64_t highestRecordId_; ///< Highest rid found uint64_t highestFileNumber_; ///< Highest file number found bool lastFileFullFlag_; ///< Last file is full - std::string uninitializedJournal_; ///< File name of uninitialized journal found during header analysis + uint64_t initial_fid_; ///< File id where initial write after recovery will occur // State for recovery of individual enqueued records uint64_t currentSerial_; uint32_t efpFileSize_kib_; - fileNumberMapConstItr_t currentJournalFileConstItr_; + fileNumberMapConstItr_t currentJournalFileItr_; std::string currentFileName_; std::ifstream inFileStream_; recordIdList_t recordIdList_; @@ -121,16 +129,18 @@ public: protected: void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity); void checkFileStreamOk(bool checkEof); - void checkJournalAlignment(const std::streampos recordPosition); + void checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition); bool decodeRecord(jrec& record, std::size_t& cumulativeSizeRead, ::rec_hdr_t& recordHeader, - std::streampos& fileOffset); + const uint64_t start_fid, + const std::streampos recordOffset); std::string getCurrentFileName() const; uint64_t getCurrentFileNumber() const; bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag); bool getNextFile(bool jumpToFirstRecordOffsetFlag); bool getNextRecordHeader(); + void lastRecord(const uint64_t file_id, const std::streamoff endOffset); bool needNextFile(); void prepareRecordList(); bool readFileHeader(); diff --git a/cpp/src/qpid/linearstore/journal/deq_rec.cpp b/cpp/src/qpid/linearstore/journal/deq_rec.cpp index a4882aaa9c..90ca27d082 100644 --- a/cpp/src/qpid/linearstore/journal/deq_rec.cpp +++ b/cpp/src/qpid/linearstore/journal/deq_rec.cpp @@ -181,7 +181,7 @@ deq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch } bool -deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) +deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) { if (rec_offs == 0) { @@ -228,7 +228,7 @@ deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } - check_rec_tail(); + check_rec_tail(rec_start); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); @@ -274,7 +274,7 @@ deq_rec::rec_size() const } void -deq_rec::check_rec_tail() const { +deq_rec::check_rec_tail(const std::streampos rec_start) const { Checksum checksum; checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t)); if (_deq_hdr._xidsize > 0) { @@ -284,7 +284,7 @@ deq_rec::check_rec_tail() const { uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; - oss << std::hex; + oss << std::endl << " Record offset: 0x" << std::hex << rec_start; if (res & ::REC_TAIL_MAGIC_ERR_MASK) { oss << std::endl << " Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; } diff --git a/cpp/src/qpid/linearstore/journal/deq_rec.h b/cpp/src/qpid/linearstore/journal/deq_rec.h index ead0eed72a..9f55032e76 100644 --- a/cpp/src/qpid/linearstore/journal/deq_rec.h +++ b/cpp/src/qpid/linearstore/journal/deq_rec.h @@ -49,7 +49,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const uint64_t drid, const void* const xidp, const std::size_t xidlen, const bool txn_coml_commit); uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); - bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start); inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); } inline uint64_t rid() const { return _deq_hdr._rhdr._rid; } @@ -59,7 +59,7 @@ public: inline std::size_t data_size() const { return 0; } // This record never carries data std::size_t xid_size() const; std::size_t rec_size() const; - void check_rec_tail() const; + void check_rec_tail(const std::streampos rec_start) const; private: virtual void clean(); diff --git a/cpp/src/qpid/linearstore/journal/enq_rec.cpp b/cpp/src/qpid/linearstore/journal/enq_rec.cpp index f95a722308..0fecd90cbf 100644 --- a/cpp/src/qpid/linearstore/journal/enq_rec.cpp +++ b/cpp/src/qpid/linearstore/journal/enq_rec.cpp @@ -218,7 +218,7 @@ enq_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch } bool -enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) +enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) { if (rec_offs == 0) { @@ -291,7 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } - check_rec_tail(); + check_rec_tail(rec_start); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); @@ -352,7 +352,7 @@ enq_rec::rec_size(const std::size_t xidsize, const std::size_t dsize, const bool } void -enq_rec::check_rec_tail() const { +enq_rec::check_rec_tail(const std::streampos rec_start) const { Checksum checksum; checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t)); if (_enq_hdr._xidsize > 0) { @@ -365,7 +365,7 @@ enq_rec::check_rec_tail() const { uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; - oss << std::hex; + oss << std::endl << " Record offset: 0x" << std::hex << rec_start; if (res & ::REC_TAIL_MAGIC_ERR_MASK) { oss << std::endl << " Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic; } diff --git a/cpp/src/qpid/linearstore/journal/enq_rec.h b/cpp/src/qpid/linearstore/journal/enq_rec.h index 1655e2cc4d..d85cde42f5 100644 --- a/cpp/src/qpid/linearstore/journal/enq_rec.h +++ b/cpp/src/qpid/linearstore/journal/enq_rec.h @@ -51,7 +51,7 @@ public: void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen, const void* const xidp, const std::size_t xidlen, const bool transient, const bool external); uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); - bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start); std::size_t get_xid(void** const xidpp); std::size_t get_data(void** const datapp); @@ -63,7 +63,7 @@ public: std::size_t rec_size() const; static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external); inline uint64_t rid() const { return _enq_hdr._rhdr._rid; } - void check_rec_tail() const; + void check_rec_tail(const std::streampos rec_start) const; private: virtual void clean(); diff --git a/cpp/src/qpid/linearstore/journal/jcntl.cpp b/cpp/src/qpid/linearstore/journal/jcntl.cpp index 55bac4a2e5..ab367754d5 100644 --- a/cpp/src/qpid/linearstore/journal/jcntl.cpp +++ b/cpp/src/qpid/linearstore/journal/jcntl.cpp @@ -21,6 +21,7 @@ #include "qpid/linearstore/journal/jcntl.h" +#include #include "qpid/linearstore/journal/data_tok.h" #include "qpid/linearstore/journal/JournalLog.h" @@ -90,7 +91,7 @@ jcntl::initialize(EmptyFilePool* efpp, _linearFileController.finalize(); _jdir.clear_dir(); // Clear any existing journal files _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL); - _linearFileController.pullEmptyFileFromEfp(); + _linearFileController.getNextJournalFile(); _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS); _init_flag = true; } @@ -120,6 +121,9 @@ jcntl::recover(EmptyFilePoolManager* efpmp, _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5)); _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber()); _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController); + if (_recoveryManager.isLastFileFull()) { + _linearFileController.getNextJournalFile(); + } _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset())); @@ -316,6 +320,20 @@ jcntl::getLinearFileControllerRef() { return _linearFileController; } +// static +std::string +jcntl::str2hexnum(const std::string& str) { + if (str.empty()) { + return ""; + } + std::ostringstream oss; + oss << "(" << str.size() << ")0x" << std::hex; + for (unsigned i=str.size(); i>0; --i) { + oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1]; + } + return oss.str(); +} + iores jcntl::flush(const bool block_till_aio_cmpl) { diff --git a/cpp/src/qpid/linearstore/journal/jcntl.h b/cpp/src/qpid/linearstore/journal/jcntl.h index 2db0e707a7..94c00d2fab 100644 --- a/cpp/src/qpid/linearstore/journal/jcntl.h +++ b/cpp/src/qpid/linearstore/journal/jcntl.h @@ -537,6 +537,8 @@ public: inline virtual void instr_incr_outstanding_aio_cnt() {} inline virtual void instr_decr_outstanding_aio_cnt() {} + static std::string str2hexnum(const std::string& str); + protected: static bool _init; static bool init_statics(); diff --git a/cpp/src/qpid/linearstore/journal/jerrno.cpp b/cpp/src/qpid/linearstore/journal/jerrno.cpp index 01c432d37b..8765396b31 100644 --- a/cpp/src/qpid/linearstore/journal/jerrno.cpp +++ b/cpp/src/qpid/linearstore/journal/jerrno.cpp @@ -167,8 +167,8 @@ jerrno::__init() _err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found"; // class jrec, enq_rec, deq_rec, txn_rec - _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header."; - _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid data record tail."; + _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid record header."; + _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid record tail."; // class wmgr _err_map[JERR_WMGR_BADPGSTATE] = "JERR_WMGR_BADPGSTATE: Page buffer in illegal state for operation."; diff --git a/cpp/src/qpid/linearstore/journal/jrec.h b/cpp/src/qpid/linearstore/journal/jrec.h index 7645e646f6..cad0e5d7a2 100644 --- a/cpp/src/qpid/linearstore/journal/jrec.h +++ b/cpp/src/qpid/linearstore/journal/jrec.h @@ -98,7 +98,7 @@ public: * \returns Number of data-blocks encoded. */ virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0; - virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0; + virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) = 0; virtual std::string& str(std::string& str) const = 0; virtual std::size_t data_size() const = 0; diff --git a/cpp/src/qpid/linearstore/journal/txn_rec.cpp b/cpp/src/qpid/linearstore/journal/txn_rec.cpp index 1368fd4be2..298ab608b1 100644 --- a/cpp/src/qpid/linearstore/journal/txn_rec.cpp +++ b/cpp/src/qpid/linearstore/journal/txn_rec.cpp @@ -176,7 +176,7 @@ txn_rec::encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Ch } bool -txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) +txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) { if (rec_offs == 0) { @@ -218,7 +218,7 @@ txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) assert(!ifsp->fail() && !ifsp->bad()); return false; } - check_rec_tail(); + check_rec_tail(rec_start); } ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size()); assert(!ifsp->fail() && !ifsp->bad()); @@ -266,7 +266,7 @@ txn_rec::rec_size() const } void -txn_rec::check_rec_tail() const { +txn_rec::check_rec_tail(const std::streampos rec_start) const { Checksum checksum; checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t)); if (_txn_hdr._xidsize > 0) { @@ -276,7 +276,7 @@ txn_rec::check_rec_tail() const { uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs); if (res != 0) { std::stringstream oss; - oss << std::hex; + oss << std::endl << " Record offset: 0x" << std::hex << rec_start; if (res & ::REC_TAIL_MAGIC_ERR_MASK) { oss << std::endl << " Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; } diff --git a/cpp/src/qpid/linearstore/journal/txn_rec.h b/cpp/src/qpid/linearstore/journal/txn_rec.h index daca16d9d4..4552071595 100644 --- a/cpp/src/qpid/linearstore/journal/txn_rec.h +++ b/cpp/src/qpid/linearstore/journal/txn_rec.h @@ -49,7 +49,7 @@ public: void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp, const std::size_t xidlen); uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum); - bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs); + bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start); std::size_t get_xid(void** const xidpp); std::string& str(std::string& str) const; @@ -57,7 +57,7 @@ public: std::size_t xid_size() const; std::size_t rec_size() const; inline uint64_t rid() const { return _txn_hdr._rhdr._rid; } - void check_rec_tail() const; + void check_rec_tail(const std::streampos rec_start) const; private: virtual void clean(); diff --git a/cpp/src/qpid/linearstore/journal/wmgr.cpp b/cpp/src/qpid/linearstore/journal/wmgr.cpp index c246837a8d..7c590713f5 100644 --- a/cpp/src/qpid/linearstore/journal/wmgr.cpp +++ b/cpp/src/qpid/linearstore/journal/wmgr.cpp @@ -30,8 +30,6 @@ #include "qpid/linearstore/journal/LinearFileController.h" #include "qpid/linearstore/journal/utils/file_hdr.h" -//#include // DEBUG - namespace qpid { namespace linearstore { namespace journal { @@ -49,7 +47,7 @@ wmgr::wmgr(jcntl* jc, _deq_busy(false), _abort_busy(false), _commit_busy(false), - _txn_pending_set() + _txn_pending_map() {} wmgr::wmgr(jcntl* jc, @@ -67,7 +65,7 @@ wmgr::wmgr(jcntl* jc, _deq_busy(false), _abort_busy(false), _commit_busy(false), - _txn_pending_set() + _txn_pending_map() {} wmgr::~wmgr() @@ -281,6 +279,7 @@ wmgr::dequeue(data_tok* dtokp, _deq_busy = true; } //std::cout << "---+++ wmgr::dequeue() DEQ rid=0x" << std::hex << rid << " drid=0x" << dequeue_rid << " " << std::dec << std::flush; // DEBUG + std::string xid((const char*)xid_ptr, xid_len); bool done = false; Checksum checksum; while (!done) @@ -292,9 +291,27 @@ wmgr::dequeue(data_tok* dtokp, uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, (_cache_pgsize_sblks * QLS_SBLK_SIZE_DBLKS) - _pg_offset_dblks, checksum); - // Remember fid which contains the record header in case record is split over several files if (data_offs_dblks == 0) { - dtokp->set_fid(_lfc.getCurrentFileSeqNum()); + uint64_t fid; + short eres = _emap.get_pfid(dtokp->dequeue_rid(), fid); + if (eres == enq_map::EMAP_OK) { + dtokp->set_fid(fid); + } else if (xid_len > 0) { + txn_data_list_t tdl = _tmap.get_tdata_list(xid); + bool found = false; + for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) { + if (i->rid_ == dtokp->dequeue_rid()) { + found = true; + dtokp->set_fid(i->pfid_); + break; + } + } + if (!found) { + throw jexception("rid found in neither emap nor tmap, transactional"); + } + } else { + throw jexception("rid not found in emap, non-transactional"); + } } _pg_offset_dblks += ret; _cached_offset_dblks += ret; @@ -325,7 +342,7 @@ wmgr::dequeue(data_tok* dtokp, if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; + oss << std::hex << "emap: rid=0x" << rid; throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); } if (eres == enq_map::EMAP_LOCKED) @@ -335,10 +352,6 @@ wmgr::dequeue(data_tok* dtokp, throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); } } -//std::cout << "[0x" << std::hex << _lfc.getEnqueuedRecordCount(fid) << std::dec << std::flush; // DEBUG -//try { - _lfc.decrEnqueuedRecordCount(fid); -//} catch (std::exception& e) { std::cout << "***OOPS*** " << e.what() << " cfid=" << _lfc.getCurrentFileSeqNum() << " fid=" << fid << std::flush; throw; } } done = true; @@ -427,14 +440,16 @@ wmgr::abort(data_tok* dtokp, // Delete this txn from tmap, unlock any locked records in emap std::string xid((const char*)xid_ptr, xid_len); txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + fidl_t fidl; for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (!itr->enq_flag_) _emap.unlock(itr->drid_); // ignore rid not found error - if (itr->enq_flag_) - _lfc.decrEnqueuedRecordCount(itr->pfid_); + if (itr->enq_flag_) { + fidl.push_back(itr->pfid_); + } } - std::pair::iterator, bool> res = _txn_pending_set.insert(xid); + std::pair res = _txn_pending_map.insert(std::pair(xid, fidl)); if (!res.second) { std::ostringstream oss; @@ -526,6 +541,7 @@ wmgr::commit(data_tok* dtokp, // Delete this txn from tmap, process records into emap std::string xid((const char*)xid_ptr, xid_len); txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found + fidl_t fidl; for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) { if (itr->enq_flag_) // txn enqueue @@ -547,20 +563,20 @@ wmgr::commit(data_tok* dtokp, if (eres == enq_map::EMAP_RID_NOT_FOUND) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue"); + oss << std::hex << "emap: rid=0x" << itr->drid_; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "commit"); } if (eres == enq_map::EMAP_LOCKED) { std::ostringstream oss; - oss << std::hex << "rid=0x" << rid; - throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue"); + oss << std::hex << "rid=0x" << itr->drid_; + throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "commit"); } } - _lfc.decrEnqueuedRecordCount(fid); + fidl.push_back(fid); } } - std::pair::iterator, bool> res = _txn_pending_set.insert(xid); + std::pair res = _txn_pending_map.insert(std::pair(xid, fidl)); if (!res.second) { std::ostringstream oss; @@ -695,7 +711,7 @@ wmgr::get_next_file() { _pg_cntr = 0; //std::cout << "&&&&& wmgr::get_next_file(): " << status_str() << std::flush << std::endl; // DEBUG - _lfc.pullEmptyFileFromEfp(); + _lfc.getNextJournalFile(); } int32_t @@ -757,7 +773,7 @@ wmgr::get_events(timespec* const timeout, data_tok* dtokp = pcbp->_pdtokl->at(k); if (dtokp->decr_pg_cnt() == 0) { - std::set::iterator it; + pending_txn_map_itr_t it; switch (dtokp->wstate()) { case data_tok::ENQ_SUBM: @@ -770,6 +786,9 @@ wmgr::get_events(timespec* const timeout, _tmap.set_aio_compl(dtokp->xid(), dtokp->rid()); break; case data_tok::DEQ_SUBM: + if (!dtokp->has_xid()) { + _lfc.decrEnqueuedRecordCount(dtokp->fid()); + } dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::DEQ); @@ -781,31 +800,35 @@ wmgr::get_events(timespec* const timeout, dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::ABORTED); - it = _txn_pending_set.find(dtokp->xid()); - if (it == _txn_pending_set.end()) + it = _txn_pending_map.find(dtokp->xid()); + if (it == _txn_pending_map.end()) { std::ostringstream oss; - oss << std::hex << "_txn_pending_set: abort xid=\""; - oss << dtokp->xid() << "\""; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", - "get_events"); + oss << std::hex << "_txn_pending_set: abort xid=\"" + << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events"); + } + for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) { + _lfc.decrEnqueuedRecordCount(*i); } - _txn_pending_set.erase(it); + _txn_pending_map.erase(it); break; case data_tok::COMMIT_SUBM: dtokl.push_back(dtokp); tot_data_toks++; dtokp->set_wstate(data_tok::COMMITTED); - it = _txn_pending_set.find(dtokp->xid()); - if (it == _txn_pending_set.end()) + it = _txn_pending_map.find(dtokp->xid()); + if (it == _txn_pending_map.end()) { std::ostringstream oss; - oss << std::hex << "_txn_pending_set: commit xid=\""; - oss << dtokp->xid() << "\""; - throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", - "get_events"); + oss << std::hex << "_txn_pending_set: commit xid=\"" + << qpid::linearstore::journal::jcntl::str2hexnum(dtokp->xid()) << "\""; + throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "get_events"); + } + for (fidl_itr_t i=it->second.begin(); i!=it->second.end(); ++i) { + _lfc.decrEnqueuedRecordCount(*i); } - _txn_pending_set.erase(it); + _txn_pending_map.erase(it); break; case data_tok::ENQ_PART: case data_tok::DEQ_PART: @@ -858,8 +881,8 @@ wmgr::is_txn_synced(const std::string& xid) if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED) return false; // Check for outstanding commit/aborts - std::set::iterator it = _txn_pending_set.find(xid); - return it == _txn_pending_set.end(); + pending_txn_map_itr_t it = _txn_pending_map.find(xid); + return it == _txn_pending_map.end(); } void @@ -871,7 +894,6 @@ wmgr::initialize(aio_callback* const cbp, pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); wmgr::clean(); _page_cb_arr[0]._state = IN_USE; - _ddtokl.clear(); _cached_offset_dblks = 0; _enq_busy = false; } diff --git a/cpp/src/qpid/linearstore/journal/wmgr.h b/cpp/src/qpid/linearstore/journal/wmgr.h index 8837e51e97..8eaa2364ad 100644 --- a/cpp/src/qpid/linearstore/journal/wmgr.h +++ b/cpp/src/qpid/linearstore/journal/wmgr.h @@ -22,9 +22,11 @@ #ifndef QPID_LINEARSTORE_JOURNAL_WMGR_H #define QPID_LINEARSTORE_JOURNAL_WMGR_H +#include +#include #include "qpid/linearstore/journal/enums.h" #include "qpid/linearstore/journal/pmgr.h" -#include +#include namespace qpid { namespace linearstore { @@ -51,11 +53,15 @@ class LinearFileController; class wmgr : public pmgr { private: + typedef std::vector fidl_t; + typedef fidl_t::iterator fidl_itr_t; + typedef std::map pending_txn_map_t; + typedef pending_txn_map_t::iterator pending_txn_map_itr_t; + LinearFileController& _lfc; ///< Linear File Controller ref uint32_t _max_dtokpp; ///< Max data writes per page uint32_t _max_io_wait_us; ///< Max wait in microseconds till submit uint32_t _cached_offset_dblks; ///< Amount of unwritten data in page (dblocks) - std::deque _ddtokl; ///< Deferred dequeue data_tok list // TODO: Convert _enq_busy etc into a proper threadsafe lock // TODO: Convert to enum? Are these encodes mutually exclusive? @@ -70,7 +76,7 @@ private: enq_rec _enq_rec; ///< Enqueue record used for encoding/decoding deq_rec _deq_rec; ///< Dequeue record used for encoding/decoding txn_rec _txn_rec; ///< Transaction record used for encoding/decoding - std::set _txn_pending_set; ///< Set containing xids of pending commits/aborts + pending_txn_map_t _txn_pending_map; ///< Set containing xids of pending commits/aborts public: wmgr(jcntl* jc, diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index 8dd970c3b9..7b1e22f80c 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -106,7 +106,7 @@ void Logger::log(const Statement& s, const std::string& msg) { os << s.file << ":"; if (flags&LINE) os << dec << s.line << ":"; - if (flags&FUNCTION) + if ((flags&FUNCTION) && s.function) os << s.function << ":"; if (flags & (FILE|LINE|FUNCTION)) os << " "; diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp index e91756cb9a..86b069fd91 100644 --- a/cpp/src/qpid/log/Statement.cpp +++ b/cpp/src/qpid/log/Statement.cpp @@ -144,54 +144,33 @@ void Statement::log(const std::string& message) { Statement::Initializer::Initializer(Statement& s) : statement(s) { - // QPID-3891 + // QPID-3891: // From the given BOOST_CURRENT_FUNCTION name extract only the // namespace-qualified-functionName, skipping return and calling args. // Given: // qpid::name::space::Function(args) // Return: // "qpid::name::space::Function". - if (s.function != NULL) { - bool foundOParen(false); - const char * opPtr; - for (opPtr = s.function; *opPtr != '\0'; opPtr++) { - if (*opPtr == '(') { - foundOParen = true; - break; - } - } - - if (foundOParen) { - const char * bPtr = opPtr; - for (bPtr = opPtr; bPtr > s.function; bPtr--) { - if (bPtr[-1] == ' ') { - break; - } - } - - size_t nStoreSize = opPtr - bPtr; - if (nStoreSize > 0) { - // Note: the struct into which this name is stored - // is static and is never deleted. - char * nStore = new char[nStoreSize + 1]; - std::copy (bPtr, opPtr, nStore); - nStore[nStoreSize] = '\0'; - - s.function = nStore; - } else { - // Ignore zero length name - } - } else { - // No name found - do nothing - } - } else { - // no function-name pointer to process + if (s.function) { + const char* end = s.function + strlen(s.function); + const char* fEnd = std::find(s.function, end, '('); + typedef std::reverse_iterator Reverse; + const char* fBegin = find(Reverse(fEnd), Reverse(s.function), ' ').base(); + size_t n = fEnd - fBegin; + char* name = new char[n+1]; + std::copy(fBegin, fEnd, name); + name[n] = '\0'; + s.function = name; } - Statement::categorize(s); Logger::instance().add(s); } +Statement::Initializer::~Initializer() { + delete[] const_cast(statement.function); + statement.function = 0; +} + namespace { const char* names[LevelTraits::COUNT] = { diff --git a/cpp/src/qpid/log/Statement.h b/cpp/src/qpid/log/Statement.h index 20fd3745b6..410ffaaa3d 100644 --- a/cpp/src/qpid/log/Statement.h +++ b/cpp/src/qpid/log/Statement.h @@ -114,6 +114,7 @@ struct Statement { struct Initializer { QPID_COMMON_EXTERN Initializer(Statement& s); + QPID_COMMON_EXTERN ~Initializer(); Statement& statement; }; }; diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 07367b8aa8..4230a0d644 100644 --- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -819,6 +819,11 @@ qpid::sys::Codec& ConnectionContext::getCodec() return *this; } +const qpid::messaging::ConnectionOptions* ConnectionContext::getOptions() +{ + return this; +} + std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) { qpid::sys::ScopedLock l(lock); diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 2b5e85a6f5..75afeba46a 100644 --- a/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -102,6 +102,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void activateOutput(); qpid::sys::Codec& getCodec(); + const qpid::messaging::ConnectionOptions* getOptions(); //ConnectionCodec interface: std::size_t decode(const char* buffer, std::size_t size); std::size_t encode(char* buffer, std::size_t size); diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.cpp b/cpp/src/qpid/messaging/amqp/SslTransport.cpp index a62a553d90..953e989f5f 100644 --- a/cpp/src/qpid/messaging/amqp/SslTransport.cpp +++ b/cpp/src/qpid/messaging/amqp/SslTransport.cpp @@ -20,6 +20,7 @@ */ #include "SslTransport.h" #include "TransportContext.h" +#include "qpid/messaging/ConnectionOptions.h" #include "qpid/sys/ssl/SslSocket.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/ConnectionCodec.h" @@ -52,7 +53,14 @@ struct StaticInit } -SslTransport::SslTransport(TransportContext& c, boost::shared_ptr p) : context(c), connector(0), aio(0), poller(p) {} +SslTransport::SslTransport(TransportContext& c, boost::shared_ptr p) : context(c), connector(0), aio(0), poller(p) +{ + const ConnectionOptions* options = context.getOptions(); + if (options->sslCertName != "") { + QPID_LOG(debug, "ssl-cert-name = " << options->sslCertName); + socket.setCertName(options->sslCertName); + } +} void SslTransport::connect(const std::string& host, const std::string& port) { diff --git a/cpp/src/qpid/messaging/amqp/TransportContext.h b/cpp/src/qpid/messaging/amqp/TransportContext.h index 57192b5976..3ff353c19c 100644 --- a/cpp/src/qpid/messaging/amqp/TransportContext.h +++ b/cpp/src/qpid/messaging/amqp/TransportContext.h @@ -26,6 +26,8 @@ namespace sys { class Codec; } namespace messaging { +class ConnectionOptions; + namespace amqp { /** @@ -38,6 +40,7 @@ class TransportContext public: virtual ~TransportContext() {} virtual qpid::sys::Codec& getCodec() = 0; + virtual const qpid::messaging::ConnectionOptions* getOptions() = 0; virtual void closed() = 0; virtual void opened() = 0; private: diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp index a733ece74c..00fd96b6ef 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.cpp +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -182,15 +182,19 @@ void SslAsynchIO::notifyPendingWrite() { } void SslAsynchIO::queueWriteClose() { - if (state == Negotiating) { - // Never got going, so don't bother trying to close SSL down orderly. + { + qpid::sys::Mutex::ScopedLock l(lock); + if (state == Negotiating) { + // Never got going, so don't bother trying to close SSL down orderly. + state = ShuttingDown; + aio->queueWriteClose(); + return; + } + if (state == ShuttingDown) + return; state = ShuttingDown; - aio->queueWriteClose(); - return; } - state = ShuttingDown; - DWORD shutdown = SCHANNEL_SHUTDOWN; SecBuffer shutBuff; shutBuff.cbBuffer = sizeof(DWORD); diff --git a/cpp/src/qpid/sys/windows/SslAsynchIO.h b/cpp/src/qpid/sys/windows/SslAsynchIO.h index f80285c305..aab5c0021e 100644 --- a/cpp/src/qpid/sys/windows/SslAsynchIO.h +++ b/cpp/src/qpid/sys/windows/SslAsynchIO.h @@ -26,6 +26,7 @@ #include "qpid/sys/IntegerTypes.h" #include "qpid/sys/Poller.h" #include "qpid/CommonImportExport.h" +#include "qpid/sys/Mutex.h" #include #include #include @@ -86,6 +87,7 @@ protected: // AsynchIO layer below that's actually doing the I/O qpid::sys::AsynchIO *aio; + Mutex lock; // Track what the state of the SSL session is. Have to know when it's // time to notify the upper layer that the session is up, and also to diff --git a/cpp/src/qpid/sys/windows/SslCredential.cpp b/cpp/src/qpid/sys/windows/SslCredential.cpp new file mode 100644 index 0000000000..667f0f1ef0 --- /dev/null +++ b/cpp/src/qpid/sys/windows/SslCredential.cpp @@ -0,0 +1,273 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include +#include +#include "qpid/Msg.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/windows/check.h" +#include "qpid/sys/windows/util.h" +#include "qpid/sys/windows/SslCredential.h" + + +namespace qpid { +namespace sys { +namespace windows { + + +SslCredential::SslCredential() : certStore(0), cert(0) +{ + SecInvalidateHandle(&credHandle); + memset(&cred, 0, sizeof(cred)); + cred.dwVersion = SCHANNEL_CRED_VERSION; + cred.dwFlags = SCH_CRED_NO_DEFAULT_CREDS; +} + +SslCredential::~SslCredential() +{ + if (SecIsValidHandle(&credHandle)) + ::FreeCredentialsHandle(&credHandle); + if (cert) + ::CertFreeCertificateContext(cert); + if (certStore) + ::CertCloseStore(certStore, CERT_CLOSE_STORE_FORCE_FLAG); +} + +bool SslCredential::load(const std::string& certName) +{ + cert = findCertificate(certName); + if (cert != NULL) { + // assign the certificate into the credentials + cred.paCred = &cert; + cred.cCreds = 1; + } + + SECURITY_STATUS status = ::AcquireCredentialsHandle(NULL, + UNISP_NAME, + SECPKG_CRED_OUTBOUND, + NULL, + &cred, + NULL, + NULL, + &credHandle, + &credExpiry); + if (status != SEC_E_OK) + throw QPID_WINDOWS_ERROR(status); + + return (cert != NULL); +} + +CredHandle SslCredential::handle() +{ + return credHandle; +} + +std::string SslCredential::error() +{ + // Certificate needed after all. Return main error and log additional context + if (!loadError.logMessage.empty()) + QPID_LOG(warning, loadError.logMessage); + return loadError.error; +} + +void SslCredential::loadPrivCertStore() +{ + // Get a handle to the system store or pkcs#12 file + qpid::sys::ssl::SslOptions& opts = qpid::sys::ssl::SslOptions::global; + if (opts.certFilename.empty()) { + // opening a system store, names are not case sensitive + std::string store = opts.certStore.empty() ? "my" : opts.certStore; + std::transform(store.begin(), store.end(), store.begin(), ::tolower); + // map confusing GUI name to actual registry store name + if (store == "personal") + store = "my"; + certStore = ::CertOpenStore(CERT_STORE_PROV_SYSTEM_A, 0, NULL, + CERT_STORE_OPEN_EXISTING_FLAG | CERT_STORE_READONLY_FLAG | + CERT_SYSTEM_STORE_CURRENT_USER, store.c_str()); + if (!certStore) { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Could not open system certificate store: " << store, status); + return; + } + QPID_LOG(debug, "SslConnector using certifcates from system store: " << store); + } else { + // opening the store from file and populating it with a private key + HANDLE certFileHandle = NULL; + certFileHandle = CreateFile(opts.certFilename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + if (INVALID_HANDLE_VALUE == certFileHandle) { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Failed to open the file holding the private key: " << opts.certFilename, status); + return; + } + std::vector certEncoded; + DWORD certEncodedSize = 0L; + const DWORD fileSize = GetFileSize(certFileHandle, NULL); + if (INVALID_FILE_SIZE != fileSize) { + certEncoded.resize(fileSize); + bool result = false; + result = ReadFile(certFileHandle, &certEncoded[0], + fileSize, + &certEncodedSize, + NULL); + if (!result) { + // the read failed, return the error as an HRESULT + HRESULT status = GetLastError(); + CloseHandle(certFileHandle); + loadError.set(Msg() << "Reading the private key from file failed " << opts.certFilename, status); + return; + } + } + else { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Unable to read the certificate file " << opts.certFilename, status); + return; + } + CloseHandle(certFileHandle); + + CRYPT_DATA_BLOB blobData; + blobData.cbData = certEncodedSize; + blobData.pbData = &certEncoded[0]; + + // get passwd from file and convert to null terminated wchar_t (Windows UCS2) + std::string passwd = getPasswd(opts.certPasswordFile); + if (loadError.pending()) + return; + int pwlen = passwd.length(); + std::vector pwUCS2(pwlen + 1, L'\0'); + int nwc = MultiByteToWideChar(CP_UTF8, MB_ERR_INVALID_CHARS, passwd.data(), pwlen, &pwUCS2[0], pwlen); + if (!nwc) { + HRESULT status = GetLastError(); + loadError.set("Error converting password from UTF8", status); + return; + } + + certStore = PFXImportCertStore(&blobData, &pwUCS2[0], 0); + if (certStore == NULL) { + HRESULT status = GetLastError(); + loadError.set("Failed to open the certificate store", status); + return; + } + QPID_LOG(debug, "SslConnector using certificate from pkcs#12 file: " << opts.certFilename); + } +} + + +PCCERT_CONTEXT SslCredential::findCertificate(const std::string& name) +{ + loadPrivCertStore(); + if (loadError.pending()) + return NULL; + + // search for the certificate by Friendly Name + PCCERT_CONTEXT tmpctx = NULL; + while (tmpctx = CertEnumCertificatesInStore(certStore, tmpctx)) { + DWORD len = CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE, + 0, NULL, NULL, 0); + if (len == 1) + continue; + std::vector ctxname(len); + CertGetNameString(tmpctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE, + 0, NULL, &ctxname[0], len); + bool found = !name.compare(&ctxname[0]); + if (found) + break; + } + + // verify whether some certificate has been found + if (tmpctx == NULL) { + loadError.set(Msg() << "Client SSL/TLS certificate not found in the certificate store for name " << name, + "client certificate not found"); + } + return tmpctx; +} + + +std::string SslCredential::getPasswd(const std::string& filename) +{ + std::string passwd; + if (filename == "") + return passwd; + + HANDLE pwfHandle = CreateFile(filename.c_str(), GENERIC_READ, 0, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + + if (INVALID_HANDLE_VALUE == pwfHandle) { + HRESULT status = GetLastError(); + loadError.set(Msg() << "Failed to open the password file: " << filename, status); + return passwd; + } + + const DWORD fileSize = GetFileSize(pwfHandle, NULL); + if (fileSize == INVALID_FILE_SIZE) { + CloseHandle(pwfHandle); + loadError.set("", "Cannot read password file"); + return passwd; + } + + std::vector pwbuf; + pwbuf.resize(fileSize); + DWORD nbytes = 0; + if (!ReadFile(pwfHandle, &pwbuf[0], fileSize, &nbytes, NULL)) { + HRESULT status = GetLastError(); + CloseHandle(pwfHandle); + loadError.set("Error reading password file", status); + return passwd; + } + CloseHandle(pwfHandle); + + if (nbytes == 0) + return passwd; + + while (nbytes) { + if ((pwbuf[nbytes-1] == 012) || (pwbuf[nbytes-1] == 015)) + nbytes--; + else + break; + } + + if (nbytes) + passwd.assign(&pwbuf[0], nbytes); + + return passwd; +} + +void SslCredential::SavedError::set(const std::string &lm, const std::string es) { + logMessage = lm; + error = es; +} + +void SslCredential::SavedError::set(const std::string &lm, int status) { + logMessage = lm; + error = qpid::sys::strError(status); +} + +void SslCredential::SavedError::clear() { + logMessage.clear(); + error.clear(); +} + +bool SslCredential::SavedError::pending() { + return !logMessage.empty() || !error.empty(); +} + +}}} diff --git a/cpp/src/qpid/sys/windows/SslCredential.h b/cpp/src/qpid/sys/windows/SslCredential.h new file mode 100644 index 0000000000..ba16dcdab5 --- /dev/null +++ b/cpp/src/qpid/sys/windows/SslCredential.h @@ -0,0 +1,81 @@ +#ifndef _sys_SslCredential +#define _sys_SslCredential +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/CommonImportExport.h" + +#include +// security.h needs to see this to distinguish from kernel use. +#define SECURITY_WIN32 +#include +#include +#undef SECURITY_WIN32 + +namespace qpid { +namespace sys { +namespace windows { + +/* + * Manage certificate data structures for SChannel. + * + * Note on client certificates: The Posix/NSS implementation performs a lazy + * client certificate search part way through the ssl handshake if the server + * requests one. Here, it is not known in advance if the server will + * request the certificate so the certificate is pre-loaded (even if never + * used). To match the Linux behavior, client certificate load problems are + * remembered and reported later if appropriate, but do not prevent the + * connection attempt. + */ + +class SslCredential { +public: + QPID_COMMON_EXTERN SslCredential(); + QPID_COMMON_EXTERN ~SslCredential(); + QPID_COMMON_EXTERN bool load(const std::string& certName); + QPID_COMMON_EXTERN CredHandle handle(); + QPID_COMMON_EXTERN std::string error(); + +private: + struct SavedError { + std::string logMessage; + std::string error; + void set(const std::string &lm, const std::string es); + void set(const std::string &lm, int status); + void clear(); + bool pending(); + }; + + HCERTSTORE certStore; + PCCERT_CONTEXT cert; + SCHANNEL_CRED cred; + CredHandle credHandle; + TimeStamp credExpiry; + SavedError loadError; + + PCCERT_CONTEXT findCertificate(const std::string& name); + void loadPrivCertStore(); + std::string getPasswd(const std::string& filename); +}; + +}}} + +#endif // _sys_SslCredential diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index c7fcf2c3af..4b6820e4fd 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -278,7 +278,7 @@ class Broker(Popen): cmd += ["--log-to-stderr=no"] # Add default --log-enable arguments unless args already has --log arguments. - if not next((l for l in args if l.startswith("--log")), None): + if not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+"] if test_store: cmd += ["--load-module", BrokerTest.test_store_lib, diff --git a/cpp/src/tests/ha_test.py b/cpp/src/tests/ha_test.py index 4606fab746..748c8ef0c1 100755 --- a/cpp/src/tests/ha_test.py +++ b/cpp/src/tests/ha_test.py @@ -137,7 +137,7 @@ class HaBroker(Broker): "--max-negotiate-time=1000", "--ha-cluster=%s"%ha_cluster] # Add default --log-enable arguments unless args already has --log arguments. - if not next((l for l in args if l.startswith("--log")), None): + if not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+", "--log-enable=debug+:ha::"] if ha_replicate is not None: args += [ "--ha-replicate=%s"%ha_replicate ] diff --git a/cpp/src/tests/linearstore/tx-test-soak.sh b/cpp/src/tests/linearstore/tx-test-soak.sh new file mode 100755 index 0000000000..fa05e0a4a8 --- /dev/null +++ b/cpp/src/tests/linearstore/tx-test-soak.sh @@ -0,0 +1,254 @@ +#! /bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# tx-test-soak +# +# Basic test methodology: +# 1. Start broker +# 2. Run qpid-txtest against broker using randomly generated parameters +# 3. After some time, kill the broker using SIGKILL +# 4. Restart broker, recover messages +# 5. Run qpid-txtest against broker in check mode, which checks that all expected messages are present. +# 6. Wash, rinse, repeat... The number of runs is determined by ${NUM_RUNS} + +NUM_RUNS=1000 +BASE_DIR=${HOME}/RedHat +CMAKE_BUILD_DIR=${BASE_DIR}/q.cm + +# Infrequently adjusted +RESULT_BASE_DIR_PREFIX=${BASE_DIR}/results.tx-test-soak +RECOVER_TIME_PER_QUEUE=1 +STORE_MODULE="linearstore.so" +BROKER_LOG_LEVEL="info+" +BROKER_MANAGEMENT="no" # "no" or "yes" +TRUNCATE_INTERVAL=10 +MAX_DISK_PERC_USED=90 + +# Consts (don't adjust these...) +export BASE_DIR +RELATIVE_BASE_DIR=`python -c "import os,os.path; print os.path.relpath(os.environ['BASE_DIR'], os.environ['PWD'])"` +LOG_FILE_NAME=log.txt +QPIDD_FN=qpidd +QPIDD=${CMAKE_BUILD_DIR}/src/${QPIDD_FN} +TXTEST=${CMAKE_BUILD_DIR}/src/tests/qpid-txtest +QPIDD_BASE_ARGS="--load-module ${STORE_MODULE} -m ${BROKER_MANAGEMENT} --auth no --default-flow-stop-threshold 0 --default-flow-resume-threshold 0 --default-queue-limit 0 --store-dir ${BASE_DIR} --log-enable ${BROKER_LOG_LEVEL} --log-to-stderr no --log-to-stdout no" +TXTEST_INIT_STR="--init yes --transfer no --check no" +TXTEST_RUN_STR="--init no --transfer yes --check no" +TXTEST_CHK_STR="--init no --transfer no --check yes" +SUCCESS_MSG="All expected messages were retrieved." +TIMESTAMP_FORMAT="+%Y-%m-%d_%H:%M:%S" +ANSI_RED="\e[1;31m" +ANSI_NONE="\e[0m" +DEFAULT_EFP_DIR=2048k +DEFAULT_EFP_SIZE=2101248 +SIG_KILL=-9 +SIG_TERM=-15 + +# Creates a random number into the variable named in string $1 in the range [$2..$3] (both inclusive). +# $1: variable name as string to which random value is assigned +# $2: minimum inclusive range of random number +# $3: maximum inclusive range of random number +get_random() { + eval $1=`python -S -c "import random; print random.randint($2,$3)"` +} + +# Uses anon-uniform distribution to set a random message size. +# Most messages must be small (0 - 1k), but we need a few medium (10k) and large (100k) ones also. +# Sets message size into var ${MSG_SIZE} +set_message_size() { + local key=0 + get_random "key" 1 10 + if (( "${key}" == "10" )); then # 1 out of 10 - very large + get_random "MSG_SIZE" 100000 1000000 + FILE_SIZE_MULTIPLIER=3 + elif (( "${key}" >= "8" )); then # 2 out of 10 - large + get_random "MSG_SIZE" 10000 100000 + FILE_SIZE_MULTIPLIER=2 + elif (( "${key}" >= "6" )); then # 2 out of 10 - medium + get_random "MSG_SIZE" 1000 10000 + FILE_SIZE_MULTIPLIER=1 + else # 5 out of 10 - small + get_random "MSG_SIZE" 10 1000 + FILE_SIZE_MULTIPLIER=1 + fi +} + +# Start or restart broker +# $1: Log suffix: either "A" or "B". If "A", broker is started with truncation, otherwise broker is restarted with recovery. +# $2: Truncate flag - only used if Log suffix is "A": if true, then truncate store +# The PID of the broker is returned in ${QPIDD_PID} +start_broker() { + local truncate_val + local truncate_str + if [[ "$1" == "A" ]]; then + if [[ $2 == true ]]; then + truncate_val="yes" + truncate_str="(Store truncated)" + if [[ -e ${BASE_DIR}/qls/p001/efp/${DEFAULT_EFP_DIR} ]]; then + for f in ${BASE_DIR}/qls/p001/efp/${DEFAULT_EFP_DIR}/*; do + local filesize=`stat -c%s "${f}"` + if (( ${filesize} != ${DEFAULT_EFP_SIZE} )); then + rm ${f} + fi + done + fi + else + truncate_val="no" + fi + else + truncate_val="no" + fi + echo "${QPIDD} ${QPIDD_BASE_ARGS} --truncate ${truncate_val} --log-to-file ${RESULT_DIR}/qpidd.$1.log &" > ${RESULT_DIR}/qpidd.$1.cmd + ${QPIDD} ${QPIDD_BASE_ARGS} --truncate ${truncate_val} --log-to-file ${RESULT_DIR}/qpidd.$1.log & + QPIDD_PID=$! + echo "Broker PID=${QPIDD_PID} ${truncate_str}" | tee -a ${LOG_FILE} +} + +# Start or evaluate results of transaction test client +# $1: Log suffix flag: either "A" or "B". If "A", client is started in test mode, otherwise client evaluates recovery. +start_tx_test() { + local tx_test_params="--messages-per-tx ${MSGS_PER_TX} --tx-count 1000000 --total-messages ${TOT_MSGS} --size ${MSG_SIZE} --queues ${NUM_QUEUES}" + if [[ "$1" == "A" ]]; then + # Run in background + echo "${TXTEST##*/} parameters: ${tx_test_params}" | tee -a ${LOG_FILE} + echo "${TXTEST} ${tx_test_params} ${TXTEST_INIT_STR} &> ${RESULT_DIR}/txtest.$1.log" > ${RESULT_DIR}/txtest.$1.cmd + ${TXTEST} ${tx_test_params} ${TXTEST_INIT_STR} &> ${RESULT_DIR}/txtest.$1.log + echo "${TXTEST} ${tx_test_params} ${TXTEST_RUN_STR} &> ${RESULT_DIR}/txtest.$1.log &" >> ${RESULT_DIR}/txtest.$1.cmd + ${TXTEST} ${tx_test_params} ${TXTEST_RUN_STR} &> ${RESULT_DIR}/txtest.$1.log & + else + # Run in foreground + #echo "${TXTEST##*/} ${tx_test_params} ${TXTEST_CHK_STR}" | tee -a ${LOG_FILE} + echo "${TXTEST} ${tx_test_params} ${TXTEST_CHK_STR} &> ${RESULT_DIR}/txtest.$1.log" > ${RESULT_DIR}/txtest.$1.cmd + ${TXTEST} ${tx_test_params} ${TXTEST_CHK_STR} &> ${RESULT_DIR}/txtest.$1.log + fi +} + +# Search for the presence of core.* files, move them into the current result directory and run gdb against them. +# No params +process_core_files() { + ls core.* &> /dev/null + if (( "$?" == "0" )); then + for cf in core.*; do + gdb --batch --quiet -ex "thread apply all bt" -ex "quit" ${QPIDD} ${cf} &> ${RESULT_DIR}/${cf##*/}.gdb.txt + gdb --batch --quiet -ex "thread apply all bt full" -ex "quit" ${QPIDD} ${cf} &> ${RESULT_DIR}/${cf##*/}.gdb-full.txt + cat ${RESULT_DIR}/${cf##*/}.gdb.txt + mv ${cf} ${RESULT_DIR}/ + echo "Core file ${cf##*/} found and recovered" + done + fi +} + +# Kill a process quietly +# $1: Signal +# $2: PID +kill_process() { + kill ${1} ${2} &>> ${LOG_FILE} + wait ${2} &>> ${LOG_FILE} +} + +# Check that test can run: No other copy of qpidd running, enough disk space +check_ready_to_run() { + # Check no copy of qpidd is running + PID=`pgrep ${QPIDD_FN}` + if [[ "$?" == "0" ]]; then + echo "ERROR: qpidd running as pid ${PID}" + exit 1 + fi + # Check disk is < 90% full + local perc_full=`df -h ${HOME} | tail -1 | awk '{print substr($5,0, length($5)-1)}'` + if (( ${perc_full} >= ${MAX_DISK_PERC_USED} )); then + echo "ERROR: Disk is too close to full (${perc_full}%)" + exit 2 + fi +} + +ulimit -c unlimited # Allow core files to be created + +RESULT_BASE_DIR_SUFFIX=`date "${TIMESTAMP_FORMAT}"` +RESULT_BASE_DIR="${RESULT_BASE_DIR_PREFIX}.${RESULT_BASE_DIR_SUFFIX}" +LOG_FILE=${RESULT_BASE_DIR}/${LOG_FILE_NAME} +if [[ -n "${RESULT_BASE_DIR}" ]]; then + rm -rf ${RESULT_BASE_DIR} +fi + +mkdir -p ${RESULT_BASE_DIR} +for rn in `seq ${NUM_RUNS}`; do + # === Prepare result dir, check ready to run test, set run vars === + RESULT_DIR=${RESULT_BASE_DIR}/run_${rn} + mkdir -p ${RESULT_DIR} + check_ready_to_run + if (( (${rn} - 1) % ${TRUNCATE_INTERVAL} == 0 )) || [[ -n ${ERROR_FLAG} ]]; then + TRUNCATE_FLAG=true + else + TRUNCATE_FLAG=false + fi + set_message_size + get_random "MSGS_PER_TX" 1 20 + get_random "TOT_MSGS" 100 1000 + get_random "NUM_QUEUES" 2 15 + MIN_RUNTIME=$(( 20 * ${FILE_SIZE_MULTIPLIER} )) + MAX_RUNTIME=$(( 120 * ${FILE_SIZE_MULTIPLIER} )) + get_random "RUN_TIME" ${MIN_RUNTIME} ${MAX_RUNTIME} + RECOVER_TIME=$(( ${NUM_QUEUES} * ${RECOVER_TIME_PER_QUEUE} * ${FILE_SIZE_MULTIPLIER} )) + echo "Run ${rn} of ${NUM_RUNS} ==============" | tee -a ${LOG_FILE} + + # === PART A: Initial run of qpid-txtest === + start_broker "A" ${TRUNCATE_FLAG} + sleep ${RECOVER_TIME} # Need a way to test if broker has started here + start_tx_test "A" + echo "Running for ${RUN_TIME} secs..." | tee -a ${LOG_FILE} + sleep ${RUN_TIME} + kill_process ${SIG_KILL} ${QPIDD_PID} + sleep 2 + tar -czf ${RESULT_DIR}/qls_B.tar.gz ${RELATIVE_BASE_DIR}/qls + + # === PART B: Recovery and check === + start_broker "B" + echo "Recover time=${RECOVER_TIME} secs..." | tee -a ${LOG_FILE} + sleep ${RECOVER_TIME} # Need a way to test if broker has started here + start_tx_test "B" + sleep 1 + kill_process ${SIG_TERM} ${QPIDD_PID} + sleep 2 + PID=`pgrep ${QPIDD_FN}` + if [[ "$?" == "0" ]]; then + kill_process ${SIG_KILL} ${PID} + sleep 2 + fi + tar -czf ${RESULT_DIR}/qls_C.tar.gz ${RELATIVE_BASE_DIR}/qls + + # === Check for errors, cores and exceptions in logs === + grep -Hn "jexception" ${RESULT_DIR}/qpidd.A.log | tee -a ${LOG_FILE} + grep -Hn "jexception" ${RESULT_DIR}/qpidd.B.log | tee -a ${LOG_FILE} + grep "${SUCCESS_MSG}" ${RESULT_DIR}/txtest.B.log &> /dev/null + if [[ "$?" != "0" ]]; then + echo "ERROR in run ${rn}" >> ${LOG_FILE} + echo -e "${ANSI_RED}ERROR${ANSI_NONE} in run ${rn}" + ERROR_FLAG=true + else + unset ERROR_FLAG + fi + sleep 2 + process_core_files + echo | tee -a ${LOG_FILE} +done + diff --git a/java/amqp-1-0-client-jms/pom.xml b/java/amqp-1-0-client-jms/pom.xml index ffcfd223ae..2b73700eed 100644 --- a/java/amqp-1-0-client-jms/pom.xml +++ b/java/amqp-1-0-client-jms/pom.xml @@ -16,34 +16,35 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-amqp-1-0-client-jms + 0.28-SNAPSHOT + Qpid AMQP 1.0 Client JMS + AMQP 1.0 compliant JMS module org.apache.qpid qpid-amqp-1-0-client - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-amqp-1-0-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 provided diff --git a/java/amqp-1-0-client-websocket/pom.xml b/java/amqp-1-0-client-websocket/pom.xml index 3862fb0fc5..78da6d3b9e 100644 --- a/java/amqp-1-0-client-websocket/pom.xml +++ b/java/amqp-1-0-client-websocket/pom.xml @@ -16,145 +16,41 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-amqp-1-0-client-websocket + 0.28-SNAPSHOT + Qpid AMQP 1.0 Client WebSocket + AMQP 1.0 compliant WebSocket module - org.apache.qpid qpid-amqp-1-0-client - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-amqp-1-0-common - 0.26-SNAPSHOT - compile + ${project.version} + + org.apache.geronimo.specs + geronimo-servlet_3.0_spec + - - org.apache.geronimo.specs - geronimo-servlet_3.0_spec - 1.0 - compile - - - - org.eclipse.jetty - jetty-server - 8.1.14.v20131031 - compile - - - org.eclipse.jetty.orbit - javax.servlet - - - org.eclipse.jetty - jetty-continuation - - - org.eclipse.jetty - jetty-http - - - - - - org.eclipse.jetty - jetty-continuation - 8.1.14.v20131031 - compile - - - - org.eclipse.jetty - jetty-security - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-server - - - - - - org.eclipse.jetty - jetty-http - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-io - - - - - org.eclipse.jetty - jetty-io - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-util - - - - - - org.eclipse.jetty - jetty-servlet - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-security - - - - - - org.eclipse.jetty - jetty-util - 8.1.14.v20131031 - compile - - - - org.eclipse.jetty - jetty-websocket - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-util - - - org.eclipse.jetty - jetty-io - - - org.eclipse.jetty - jetty-http - - - + + org.eclipse.jetty + jetty-websocket + diff --git a/java/amqp-1-0-client/pom.xml b/java/amqp-1-0-client/pom.xml index 1d6a444d6d..2bee6b43d1 100644 --- a/java/amqp-1-0-client/pom.xml +++ b/java/amqp-1-0-client/pom.xml @@ -16,21 +16,24 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-amqp-1-0-client + 0.28-SNAPSHOT + Qpid AMQP 1.0 Client + AMQP 1.0 compliant client module org.apache.qpid qpid-amqp-1-0-common - 0.26-SNAPSHOT - compile + ${project.version} diff --git a/java/amqp-1-0-common/pom.xml b/java/amqp-1-0-common/pom.xml index d771d58ab1..7b01caa5b0 100644 --- a/java/amqp-1-0-common/pom.xml +++ b/java/amqp-1-0-common/pom.xml @@ -16,14 +16,18 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-amqp-1-0-common + 0.28-SNAPSHOT + Qpid AMQP 1.0 Common + AMQP 1.0 compliant common module diff --git a/java/bdbstore/jmx/pom.xml b/java/bdbstore/jmx/pom.xml index 82ece2c66f..626f80b9ab 100644 --- a/java/bdbstore/jmx/pom.xml +++ b/java/bdbstore/jmx/pom.xml @@ -16,49 +16,48 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-bdbstore-jmx + 0.28-SNAPSHOT + Qpid BDB Message Store JMX + BDB message store JMX implementation org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.qpid qpid-broker-plugins-management-jmx - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-bdbstore - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile com.sleepycat je - 5.0.84 provided @@ -66,14 +65,14 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar test diff --git a/java/bdbstore/pom.xml b/java/bdbstore/pom.xml index ec71547038..60abfcb5cd 100644 --- a/java/bdbstore/pom.xml +++ b/java/bdbstore/pom.xml @@ -16,41 +16,41 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-bdbstore + 0.28-SNAPSHOT + Qpid BDB Message Store + BDB message store implementation using Oracle Berkeley DB Java Edition org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.qpid qpid-broker-plugins-amqp-0-8-protocol - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile com.sleepycat je - 5.0.97 provided @@ -58,14 +58,14 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar test @@ -73,21 +73,20 @@ org.apache.qpid qpid-broker-plugins-amqp-0-10-protocol - 0.26-SNAPSHOT + ${project.version} test org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 test org.apache.qpid qpid-client - 0.26-SNAPSHOT + ${project.version} test diff --git a/java/bdbstore/systests/pom.xml b/java/bdbstore/systests/pom.xml index e095765ea1..01b87448e2 100644 --- a/java/bdbstore/systests/pom.xml +++ b/java/bdbstore/systests/pom.xml @@ -16,15 +16,19 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-bdbstore-systests + 0.28-SNAPSHOT + Qpid BDB Store System Tests + BDB message store system tests target${file.separator}qpid-broker${file.separator}${project.version} @@ -37,57 +41,46 @@ junit junit - ${junit-version} compile org.apache.qpid qpid-systests - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 provided org.apache.qpid qpid-bdbstore - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-bdbstore-jmx - 0.26-SNAPSHOT - compile + ${project.version} com.sleepycat je - 5.0.84 - compile diff --git a/java/broker-core/pom.xml b/java/broker-core/pom.xml index 7254de0251..4621a8df75 100644 --- a/java/broker-core/pom.xml +++ b/java/broker-core/pom.xml @@ -16,14 +16,18 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-broker-core + 0.28-SNAPSHOT + Qpid Java Broker Core + Broker core functionality and initial configuration ${basedir}/src/main/java @@ -33,50 +37,38 @@ org.apache.qpid qpid-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-management-common - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.slf4j slf4j-log4j12 - ${slf4j-version} - compile commons-logging commons-logging - 1.1.1 - compile xalan xalan - 2.7.0 - compile xml-apis @@ -88,15 +80,11 @@ commons-beanutils commons-beanutils-core - 1.8.3 - compile commons-digester commons-digester - 1.8.1 - compile commons-beanutils @@ -108,50 +96,36 @@ commons-codec commons-codec - 1.6 - compile commons-lang commons-lang - 2.6 - compile commons-collections commons-collections - 3.2.1 - compile commons-configuration commons-configuration - 1.8 - compile org.codehaus.jackson jackson-core-asl - 1.9.0 - compile org.codehaus.jackson jackson-mapper-asl - 1.9.0 - compile org.apache.bcel bcel - 5.2 - compile @@ -165,7 +139,7 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test @@ -261,12 +235,12 @@ velocity velocity - 1.4 + ${velocity-version} velocity velocity-dep - 1.4 + ${velocity-version} diff --git a/java/broker-plugins/access-control/pom.xml b/java/broker-plugins/access-control/pom.xml index c63618380c..0c92129325 100644 --- a/java/broker-plugins/access-control/pom.xml +++ b/java/broker-plugins/access-control/pom.xml @@ -16,15 +16,19 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-access-control + 0.28-SNAPSHOT + Qpid Access Control Broker Plug-in + Access Control broker plug-in ${project.build.directory}/generated-sources/generated-logmessages @@ -34,29 +38,27 @@ org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided log4j log4j - ${log4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar test diff --git a/java/broker-plugins/amqp-0-10-protocol/pom.xml b/java/broker-plugins/amqp-0-10-protocol/pom.xml index 6bfce64240..c2ef3bff33 100644 --- a/java/broker-plugins/amqp-0-10-protocol/pom.xml +++ b/java/broker-plugins/amqp-0-10-protocol/pom.xml @@ -16,50 +16,50 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-amqp-0-10-protocol + 0.28-SNAPSHOT + Qpid AMQP 0-10 Protocol Plug-in + AMQP 0-10 protocol broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar test diff --git a/java/broker-plugins/amqp-0-8-protocol/pom.xml b/java/broker-plugins/amqp-0-8-protocol/pom.xml index 0d8c8be3dd..3bbd624e65 100644 --- a/java/broker-plugins/amqp-0-8-protocol/pom.xml +++ b/java/broker-plugins/amqp-0-8-protocol/pom.xml @@ -16,43 +16,45 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-amqp-0-8-protocol + 0.28-SNAPSHOT + Qpid AMQP 0-8 Protocol Broker Plug-in + AMQP 0-8, 0-9 and 0-9-1 protocol broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided log4j log4j - ${log4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar test diff --git a/java/broker-plugins/amqp-1-0-protocol/pom.xml b/java/broker-plugins/amqp-1-0-protocol/pom.xml index 97db18011d..c3b28c5ee4 100644 --- a/java/broker-plugins/amqp-1-0-protocol/pom.xml +++ b/java/broker-plugins/amqp-1-0-protocol/pom.xml @@ -16,36 +16,37 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-amqp-1-0-protocol + 0.28-SNAPSHOT + Qpid AMQP 1-0 Protocol Broker Plug-in + AMQP 1-0 protocol broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.qpid qpid-amqp-1-0-common - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile diff --git a/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml b/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml index 1403caee0a..38ff043377 100644 --- a/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml +++ b/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/pom.xml @@ -16,50 +16,50 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-amqp-msg-conv-0-10-to-1-0 + 0.28-SNAPSHOT + Qpid AMQP 0-10 to 1-0 Message Conversion Broker Plug-in + AMQP message conversion (0-10 to 1-0) broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.qpid qpid-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-0-10-protocol - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-amqp-1-0-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-1-0-protocol - 0.26-SNAPSHOT - compile + ${project.version} diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml index deccb6cf22..016f777212 100644 --- a/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml +++ b/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/pom.xml @@ -16,43 +16,44 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-amqp-msg-conv-0-8-to-0-10 + 0.28-SNAPSHOT + Qpid AMQP 0-8 to 0-10 Message Conversion Broker Plug-in + AMQP message conversion (0-8, 0-9 and 0-9-1 to 0-10) broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.qpid qpid-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-0-8-protocol - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-0-10-protocol - 0.26-SNAPSHOT - compile + ${project.version} diff --git a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml index ec725e7e57..a01760f139 100644 --- a/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml +++ b/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/pom.xml @@ -16,50 +16,50 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-amqp-msg-conv-0-8-to-1-0 + 0.28-SNAPSHOT + Qpid AMQP 0-8 to 1-0 Message Conversion Broker Plug-in + AMQP message conversion (0-8, 0-9 and 0-9-1 to 1-0) broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.qpid qpid-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-0-8-protocol - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-amqp-1-0-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-1-0-protocol - 0.26-SNAPSHOT - compile + ${project.version} diff --git a/java/broker-plugins/derby-store/pom.xml b/java/broker-plugins/derby-store/pom.xml index 9b55875aa8..58088d7a75 100644 --- a/java/broker-plugins/derby-store/pom.xml +++ b/java/broker-plugins/derby-store/pom.xml @@ -16,50 +16,49 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-derby-store + 0.28-SNAPSHOT + Qpid Derby Message Store + Apache Derby DB message store broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT - compile + ${project.version} org.apache.derby derby - 10.8.2.2 - compile log4j log4j - ${log4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar test diff --git a/java/broker-plugins/jdbc-provider-bone/pom.xml b/java/broker-plugins/jdbc-provider-bone/pom.xml index 52631a44e3..af468af3af 100644 --- a/java/broker-plugins/jdbc-provider-bone/pom.xml +++ b/java/broker-plugins/jdbc-provider-bone/pom.xml @@ -16,29 +16,31 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-jdbc-provider-bone + 0.28-SNAPSHOT + Qpid JDBC Message Store Connection Pooling Plug-in + JDBC Message Store Connection Pooling broker plug-in using BoneCP org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided com.jolbox bonecp - 0.7.1.RELEASE - compile @@ -51,7 +53,6 @@ com.google.guava guava - 14.0.1 runtime diff --git a/java/broker-plugins/jdbc-store/pom.xml b/java/broker-plugins/jdbc-store/pom.xml index ca861a42f3..a2541ec913 100644 --- a/java/broker-plugins/jdbc-store/pom.xml +++ b/java/broker-plugins/jdbc-store/pom.xml @@ -16,43 +16,45 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-jdbc-store + 0.28-SNAPSHOT + Qpid JDBC Message Store Broker Plug-in + JDBC message store broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided log4j log4j - ${log4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar test @@ -60,7 +62,6 @@ org.apache.derby derby - 10.8.2.2 test diff --git a/java/broker-plugins/management-http/pom.xml b/java/broker-plugins/management-http/pom.xml index 57b2dd863b..d614bbf0b1 100644 --- a/java/broker-plugins/management-http/pom.xml +++ b/java/broker-plugins/management-http/pom.xml @@ -16,151 +16,51 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-management-http + 0.28-SNAPSHOT + Qpid HTTP Management Broker Plug-in + HTTP Management broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.geronimo.specs geronimo-servlet_3.0_spec - 1.0 - compile org.eclipse.jetty jetty-server - 8.1.14.v20131031 - compile - - - org.eclipse.jetty.orbit - javax.servlet - - - org.eclipse.jetty - jetty-continuation - - - org.eclipse.jetty - jetty-http - - - - - - org.eclipse.jetty - jetty-continuation - 8.1.14.v20131031 - compile - - - - org.eclipse.jetty - jetty-security - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-server - - - - - - org.eclipse.jetty - jetty-http - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-io - - - - - - org.eclipse.jetty - jetty-io - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-util - - org.eclipse.jetty jetty-servlet - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-security - - - - - - org.eclipse.jetty - jetty-util - 8.1.14.v20131031 - compile - - - - org.eclipse.jetty - jetty-websocket - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-util - - - org.eclipse.jetty - jetty-io - - - org.eclipse.jetty - jetty-http - - @@ -174,7 +74,7 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test diff --git a/java/broker-plugins/management-jmx/pom.xml b/java/broker-plugins/management-jmx/pom.xml index 8a06fcf185..78b7f327ec 100644 --- a/java/broker-plugins/management-jmx/pom.xml +++ b/java/broker-plugins/management-jmx/pom.xml @@ -16,43 +16,44 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-management-jmx + 0.28-SNAPSHOT + Qpid JMX Management Broker Plug-in + JMX management broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.qpid qpid-management-common - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test diff --git a/java/broker-plugins/memory-store/pom.xml b/java/broker-plugins/memory-store/pom.xml index 6bfd2a8d49..bdf503cd89 100644 --- a/java/broker-plugins/memory-store/pom.xml +++ b/java/broker-plugins/memory-store/pom.xml @@ -16,21 +16,25 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-memory-store + 0.28-SNAPSHOT + Qpid Memory Message Store Broker Plug-in + Memory message store broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided diff --git a/java/broker-plugins/websocket/pom.xml b/java/broker-plugins/websocket/pom.xml index fb55be05c8..f7d9c45638 100644 --- a/java/broker-plugins/websocket/pom.xml +++ b/java/broker-plugins/websocket/pom.xml @@ -15,142 +15,45 @@ See the License for the specific language governing permissions and limitations under the License. --> - + + 4.0.0 + - qpid-project org.apache.qpid - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-broker-plugins-websocket + 0.28-SNAPSHOT + Qpid WebSocket Broker Plug-in + WebSocket broker plug-in org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} provided org.apache.geronimo.specs geronimo-servlet_3.0_spec - 1.0 - compile org.eclipse.jetty jetty-server - 8.1.14.v20131031 - compile - - - org.eclipse.jetty.orbit - javax.servlet - - - org.eclipse.jetty - jetty-continuation - - - org.eclipse.jetty - jetty-http - - - - - - org.eclipse.jetty - jetty-continuation - 8.1.14.v20131031 - compile - - - - org.eclipse.jetty - jetty-security - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-server - - - - - - org.eclipse.jetty - jetty-http - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-io - - - - - - org.eclipse.jetty - jetty-io - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-util - - - - - - org.eclipse.jetty - jetty-servlet - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-security - - - - - - org.eclipse.jetty - jetty-util - 8.1.14.v20131031 - compile org.eclipse.jetty jetty-websocket - 8.1.14.v20131031 - compile - - - org.eclipse.jetty - jetty-util - - - org.eclipse.jetty - jetty-io - - - org.eclipse.jetty - jetty-http - - - + + diff --git a/java/broker/pom.xml b/java/broker/pom.xml index 6eb51a4b2e..9761819542 100644 --- a/java/broker/pom.xml +++ b/java/broker/pom.xml @@ -16,132 +16,130 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-broker + 0.28-SNAPSHOT + Qpid Java Broker + Broker configuration and executable org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-common - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile commons-cli commons-cli - 1.2 - compile org.apache.qpid qpid-broker-plugins-access-control - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-amqp-0-8-protocol - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-amqp-0-10-protocol - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-amqp-1-0-protocol - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-amqp-msg-conv-0-8-to-0-10 - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-amqp-msg-conv-0-8-to-1-0 - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-amqp-msg-conv-0-10-to-1-0 - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-derby-store - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-jdbc-provider-bone - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-jdbc-store - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-management-http - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-management-jmx - 0.26-SNAPSHOT + ${project.version} runtime org.apache.qpid qpid-broker-plugins-memory-store - 0.26-SNAPSHOT + ${project.version} runtime @@ -149,7 +147,7 @@ org.apache.qpid qpid-bdbstore - 0.26-SNAPSHOT + ${project.version} runtime true @@ -157,7 +155,7 @@ org.apache.qpid qpid-bdbstore-jmx - 0.26-SNAPSHOT + ${project.version} runtime true @@ -166,7 +164,7 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test diff --git a/java/client/pom.xml b/java/client/pom.xml index 3d9508eb16..f5623ff53c 100644 --- a/java/client/pom.xml +++ b/java/client/pom.xml @@ -16,42 +16,42 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-client + 0.28-SNAPSHOT + Qpid AMQP 0.x JMS Client + JMS client supporting AMQP 0-8, 0-9, 0-9-1 and 0-10. org.apache.qpid qpid-common - 0.26-SNAPSHOT - compile + ${project.version} org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 provided org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test diff --git a/java/common/pom.xml b/java/common/pom.xml index d931c799a3..7ea7f5907a 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -16,14 +16,18 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-common + 0.28-SNAPSHOT + Qpid Common + Common functionality @@ -37,15 +41,13 @@ org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test @@ -91,17 +93,17 @@ velocity velocity - 1.4 + ${velocity-version} velocity velocity-dep - 1.4 + ${velocity-version} org.python jython-standalone - 2.5.3 + ${jython-version} diff --git a/java/jca/pom.xml b/java/jca/pom.xml index c7a8de61fe..ecff05f95a 100644 --- a/java/jca/pom.xml +++ b/java/jca/pom.xml @@ -16,69 +16,65 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-jca + 0.28-SNAPSHOT + Qpid JCA + J2EE Connector Architecture code org.apache.qpid qpid-client - 0.26-SNAPSHOT + ${project.version} provided org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.geronimo.specs geronimo-j2ee-connector_1.5_spec - 2.0.0 provided org.apache.geronimo.specs geronimo-jta_1.1_spec - 1.1.1 provided org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 provided org.apache.geronimo.specs geronimo-ejb_3.0_spec - 1.0.1 provided org.apache.geronimo.specs geronimo-servlet_3.0_spec - 1.0 provided org.apache.geronimo.framework geronimo-kernel - 2.2.1 provided @@ -116,7 +112,7 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test diff --git a/java/jca/rar/pom.xml b/java/jca/rar/pom.xml index fca35dba14..1f1aa5e6e0 100644 --- a/java/jca/rar/pom.xml +++ b/java/jca/rar/pom.xml @@ -16,22 +16,26 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-ra + 0.28-SNAPSHOT rar + Qpid Resource Adaptor + Resource Adaptor org.apache.qpid qpid-client - 0.26-SNAPSHOT + ${project.version} runtime @@ -44,7 +48,7 @@ org.apache.qpid qpid-jca - 0.26-SNAPSHOT + ${project.version} runtime @@ -63,30 +67,4 @@ - diff --git a/java/management/common/pom.xml b/java/management/common/pom.xml index ad5d103165..e08338ff8e 100644 --- a/java/management/common/pom.xml +++ b/java/management/common/pom.xml @@ -16,15 +16,19 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-management-common + 0.28-SNAPSHOT + Qpid Management Common + Common management code diff --git a/java/management/example/pom.xml b/java/management/example/pom.xml index bdddc3d2dc..d70d16c647 100644 --- a/java/management/example/pom.xml +++ b/java/management/example/pom.xml @@ -16,22 +16,25 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-management-examples + 0.28-SNAPSHOT + Qpid Management Examples + Example management code org.apache.qpid qpid-management-common - 0.26-SNAPSHOT - compile + ${project.version} diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 2a1dbd4350..ce60b13703 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -16,14 +16,18 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-perftests + 0.28-SNAPSHOT + Qpid Performance Tests + Performance testing configuration, code and tests target${file.separator}qpid-broker${file.separator}${project.version} @@ -37,28 +41,22 @@ org.apache.qpid qpid-client - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 provided @@ -66,35 +64,26 @@ commons-lang commons-lang - 2.6 - compile commons-collections commons-collections - 3.2.1 - compile commons-beanutils commons-beanutils-core - 1.8.3 - compile com.google.code.gson gson - 2.0 - compile org.apache.derby derby - 10.8.2.2 runtime @@ -102,14 +91,14 @@ org.apache.qpid qpid-systests - 0.26-SNAPSHOT + ${project.version} test org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test diff --git a/java/perftests/visualisation-jfc/pom.xml b/java/perftests/visualisation-jfc/pom.xml index d677491a03..ea0979de39 100644 --- a/java/perftests/visualisation-jfc/pom.xml +++ b/java/perftests/visualisation-jfc/pom.xml @@ -16,35 +16,35 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT ../../pom.xml - 4.0.0 qpid-perftests-visualisation-jfc + 0.28-SNAPSHOT + Qpid Performance Tests Visualisation JFC + Performance testing visualisation using JFreeChart org.apache.qpid qpid-perftests - 0.26-SNAPSHOT - compile + ${project.version} org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.derby derby - 10.8.2.2 runtime @@ -52,14 +52,14 @@ net.sourceforge.csvjdbc csvjdbc - 1.0.8 + ${csvjdbc-version} provided jfree jfreechart - 1.0.13 + ${jfreechart-version} provided @@ -67,7 +67,7 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT + ${project.version} test @@ -109,4 +109,17 @@ + + + csvjdbc.releases + http://csvjdbc.sourceforge.net/maven2 + default + + true + + + false + + + diff --git a/java/pom.xml b/java/pom.xml index 54395ed4fe..3eb4cfb8c0 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -16,47 +16,56 @@ limitations under the License. --> + 4.0.0 + org.apache apache 12 - 4.0.0 org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT pom - + Qpid + http://qpid.apache.org + Apache Qpid™ makes messaging tools that speak AMQP and support many languages and platforms. AMQP is an open internet protocol for reliably sending and receiving messages. It makes it possible for everyone to build a diverse, coherent messaging ecosystem. + + + JIRA + https://issues.apache.org/jira/browse/QPID + + + + Jenkins + https://builds.apache.org/view/M-R/view/Qpid/ + + + + + Apache Qpid Users + users-subscribe@qpid.apache.org + users-unsubscribe@qpid.apache.org + users@qpid.apache.org + http://mail-archives.apache.org/mod_mbox/qpid-users/ + + + Apache Qpid Developers + dev-subscribe@qpid.apache.org + dev-unsubscribe@qpid.apache.org + dev@qpid.apache.org + http://mail-archives.apache.org/mod_mbox/qpid-dev/ + + + + + scm:svn:http://svn.apache.org/repos/asf/qpid/trunk/qpid/java + scm:svn:https://svn.apache.org/repos/asf/qpid/trunk/qpid/java + http://svn.apache.org/viewvc/qpid/trunk/qpid/java + + 3.0.0 @@ -64,12 +73,39 @@ 1.9.1 + 1.2.16 + 1.6.4 + 8.1.14.v20131031 + 1.9.1 + 5.0.97 + 2.0 + 14.0.1 + 0.7.1.RELEASE + 1.8.3 + 1.2 + 1.6 + 3.2.1 + 1.8 + 1.8.1 + 2.6 + 1.1.1 + 1.0 + 1.1.1 + 1.0 + 2.0.0 + 1.0.1 + 2.2.1 + 2.7.0 + 5.2 + 1.4 + 10.8.2.2 + 2.5.3 + 1.0.8 + 1.0.13 3.8.1 1.9.0 - 1.2.16 - 1.6.4 1.7 @@ -105,7 +141,7 @@ 1.6 - (java-mms.0-9-1|java-mms.0-10) + (java-mms.0-9-1|java-mms.0-10|java-bdb.0-10) ${basedir} @@ -120,30 +156,13 @@ ${profile.excludes.java-mms.0-10} - - - junit - junit - ${junit-version} - test - - - - org.mockito - mockito-all - ${mockito-version} - test - - - - + amqp-1-0-common amqp-1-0-client amqp-1-0-client-jms amqp-1-0-client-websocket - amqp-1-0-common - broker broker-core + broker broker-plugins/access-control broker-plugins/amqp-0-8-protocol broker-plugins/amqp-0-10-protocol @@ -177,32 +196,263 @@ bdbstore/systests - - - - oracle.releases - http://download.oracle.com/maven - default - - true - - - false - - - - - csvjdbc.releases - http://csvjdbc.sourceforge.net/maven2 - default - - true - - - false - - - + + + junit + junit + test + + + + org.mockito + mockito-all + test + + + + + + + + junit + junit + ${junit-version} + test + + + + org.mockito + mockito-all + ${mockito-version} + test + + + + log4j + log4j + ${log4j-version} + compile + + + + org.slf4j + slf4j-api + ${slf4j-version} + compile + + + + org.slf4j + slf4j-log4j12 + ${slf4j-version} + compile + + + + org.eclipse.jetty + jetty-server + ${jetty-version} + compile + + + + org.eclipse.jetty + jetty-websocket + ${jetty-version} + compile + + + + org.eclipse.jetty + jetty-servlet + ${jetty-version} + compile + + + + org.codehaus.jackson + jackson-core-asl + ${jackson-version} + compile + + + + org.codehaus.jackson + jackson-mapper-asl + ${jackson-version} + compile + + + + org.dojotoolkit + dojo + ${dojo-version} + compile + + + + com.sleepycat + je + ${bdb-version} + compile + + + + commons-beanutils + commons-beanutils-core + ${commons-beanutils-version} + compile + + + + commons-digester + commons-digester + ${commons-digester-version} + compile + + + + commons-lang + commons-lang + ${commons-lang-version} + compile + + + + commons-cli + commons-cli + ${commons-cli-version} + compile + + + + commons-codec + commons-codec + ${commons-codec-version} + compile + + + + commons-collections + commons-collections + ${commons-collections-version} + compile + + + + commons-configuration + commons-configuration + ${commons-configuration-version} + compile + + + + commons-logging + commons-logging + ${commons-logging-version} + compile + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + ${geronimo-jms-version} + provided + + + + org.apache.geronimo.specs + geronimo-jta_1.1_spec + ${geronimo-gta-version} + provided + + + + org.apache.geronimo.specs + geronimo-servlet_3.0_spec + ${geronimo-servlet-version} + compile + + + + org.apache.geronimo.specs + geronimo-j2ee-connector_1.5_spec + ${geronimo-j2ee-connector-version} + compile + + + + org.apache.geronimo.specs + geronimo-ejb_3.0_spec + ${geronimo-ejb-version} + provided + + + + org.apache.geronimo.framework + geronimo-kernel + ${geronimo-kernel-version} + provided + + + + xalan + xalan + ${xalan-version} + compile + + + + org.apache.bcel + bcel + ${bcel-version} + compile + + + + velocity + velocity + ${velocity-version} + compile + + + + velocity + velocity-dep + ${velocity-version} + compile + + + + org.apache.derby + derby + ${derby-version} + compile + + + + com.google.guava + guava + ${guava-version} + compile + + + + com.jolbox + bonecp + ${bonecp-version} + compile + + + + com.google.code.gson + gson + ${gson-version} + compile + + + + @@ -497,41 +747,6 @@ - - Apache Qpid™ makes messaging tools that speak AMQP and support many languages and platforms. - AMQP is an open internet protocol for reliably sending and receiving messages. It makes it possible for everyone to build a diverse, coherent messaging ecosystem. - - - http://qpid.apache.org - - - JIRA - https://issues.apache.org/jira/browse/QPID - - - - - Apache Qpid Users - users-subscribe@qpid.apache.org - users-unsubscribe@qpid.apache.org - users@qpid.apache.org - http://mail-archives.apache.org/mod_mbox/qpid-users/ - - - Apache Qpid Developers - dev-subscribe@qpid.apache.org - dev-unsubscribe@qpid.apache.org - dev@qpid.apache.org - http://mail-archives.apache.org/mod_mbox/qpid-dev/ - - - - - scm:svn:http://svn.apache.org/repos/asf/qpid/trunk/qpid - scm:svn:https://svn.apache.org/repos/asf/qpid/trunk/qpid - http://svn.apache.org/viewvc/qpid/ - - @@ -548,6 +763,21 @@ + + + + oracle.releases + http://download.oracle.com/maven + default + + true + + + false + + + + + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-test-utils + 0.28-SNAPSHOT + Qpid Test Utilities + Testing utilities junit junit - ${junit-version} compile log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.slf4j slf4j-log4j12 - ${slf4j-version} - compile org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 provided diff --git a/java/systests/pom.xml b/java/systests/pom.xml index 98086bbe9e..a556841240 100644 --- a/java/systests/pom.xml +++ b/java/systests/pom.xml @@ -16,14 +16,18 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-systests + 0.28-SNAPSHOT + Qpid System Tests + System testing configuration, code and tests target${file.separator}qpid-broker${file.separator}${project.version} @@ -51,36 +55,29 @@ org.apache.qpid qpid-test-utils - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.qpid qpid-client - 0.26-SNAPSHOT - compile + ${project.version} org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 provided @@ -88,21 +85,17 @@ org.apache.qpid qpid-jca - 0.26-SNAPSHOT - compile + ${project.version} org.apache.geronimo.specs geronimo-j2ee-connector_1.5_spec - 2.0.0 - compile org.apache.geronimo.specs geronimo-jta_1.1_spec - 1.1.1 runtime @@ -110,125 +103,117 @@ org.apache.qpid qpid-broker - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-core - 0.26-SNAPSHOT + ${project.version} test-jar - compile org.apache.qpid qpid-broker-plugins-access-control - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-0-8-protocol - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-0-10-protocol - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-1-0-protocol - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-msg-conv-0-8-to-0-10 - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-msg-conv-0-8-to-1-0 - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-amqp-msg-conv-0-10-to-1-0 - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-derby-store - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-jdbc-provider-bone - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-jdbc-store - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-management-http - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-management-jmx - 0.26-SNAPSHOT - compile + ${project.version} org.apache.qpid qpid-broker-plugins-memory-store - 0.26-SNAPSHOT - compile + ${project.version} - + + diff --git a/java/tools/pom.xml b/java/tools/pom.xml index 0afaa1d274..9da9ca0443 100644 --- a/java/tools/pom.xml +++ b/java/tools/pom.xml @@ -16,42 +16,39 @@ limitations under the License. --> + 4.0.0 + org.apache.qpid - qpid-project - 0.26-SNAPSHOT + qpid-parent + 1.0-SNAPSHOT - 4.0.0 qpid-tools + 0.28-SNAPSHOT + Qpid Tools + Tools org.apache.qpid qpid-client - 0.26-SNAPSHOT - compile + ${project.version} log4j log4j - ${log4j-version} - compile org.slf4j slf4j-api - ${slf4j-version} - compile org.apache.geronimo.specs geronimo-jms_1.1_spec - 1.0 - compile diff --git a/tests/src/py/qpid_tests/broker_0_10/dtx.py b/tests/src/py/qpid_tests/broker_0_10/dtx.py index f8d6533a78..a9619bcdb8 100644 --- a/tests/src/py/qpid_tests/broker_0_10/dtx.py +++ b/tests/src/py/qpid_tests/broker_0_10/dtx.py @@ -594,9 +594,10 @@ class DtxTests(TestBase010): session.dtx_select() session.dtx_start(xid=tx) - self.assertEqual(0, session.dtx_get_timeout(xid=tx).timeout) - session.dtx_set_timeout(xid=tx, timeout=60) + # below test checks for default value of dtx-default-timeout broker option self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout) + session.dtx_set_timeout(xid=tx, timeout=200) + self.assertEqual(200, session.dtx_get_timeout(xid=tx).timeout) self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status) self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status) @@ -628,6 +629,21 @@ class DtxTests(TestBase010): self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status) self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status) + def test_set_timeout_too_high(self): + """ + Test the timeout can't be more than --dtx-max-timeout + broker option + """ + session = self.session + tx = self.xid("dummy") + + session.dtx_select() + session.dtx_start(xid=tx) + try: + session.dtx_set_timeout(xid=tx, timeout=3601) + except SessionException, e: + self.assertEquals(542, e.args[0].error_code) + def test_recover(self): diff --git a/tools/src/py/qls/efp.py b/tools/src/py/qls/efp.py index 3ad8104faa..abf289dc12 100644 --- a/tools/src/py/qls/efp.py +++ b/tools/src/py/qls/efp.py @@ -26,10 +26,11 @@ class EfpManager(object): Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the Empty File Pool (EFP). """ - def __init__(self, directory): + def __init__(self, directory, args): if not os.path.exists(directory): raise qls.err.InvalidQlsDirectoryNameError(directory) self.directory = directory + self.args = args self.partitions = [] def report(self): print 'Found', len(self.partitions), 'partition(s).' diff --git a/tools/src/py/qls/err.py b/tools/src/py/qls/err.py index d56d739915..702fbb9520 100644 --- a/tools/src/py/qls/err.py +++ b/tools/src/py/qls/err.py @@ -30,6 +30,16 @@ class QlsRecordError(QlsError): QlsError.__init__(self) self.file_header = file_header self.record = record + def get_expected_fro(self): + return self.file_header.first_record_offset + def get_file_number(self): + return self.file_header.file_num + def get_queue_name(self): + return self.file_header.queue_name + def get_record_id(self): + return self.record.record_id + def get_record_offset(self): + return self.record.file_offset def __str__(self): return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \ (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id) diff --git a/tools/src/py/qls/jrnl.py b/tools/src/py/qls/jrnl.py index ffd200ddba..5bce78bfad 100644 --- a/tools/src/py/qls/jrnl.py +++ b/tools/src/py/qls/jrnl.py @@ -23,6 +23,7 @@ import qls.err import string import struct from time import gmtime, strftime +import zlib class HighCounter(object): def __init__(self): @@ -39,10 +40,11 @@ class HighCounter(object): class JournalRecoveryManager(object): TPL_DIR_NAME = 'tpl' JRNL_DIR_NAME = 'jrnl' - def __init__(self, directory): + def __init__(self, directory, args): if not os.path.exists(directory): raise qls.err.InvalidQlsDirectoryNameError(directory) self.directory = directory + self.args = args self.tpl = None self.journals = {} self.high_rid_counter = HighCounter() @@ -54,20 +56,21 @@ class JournalRecoveryManager(object): def run(self, args): tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME) if os.path.exists(tpl_dir): - self.tpl = Journal(tpl_dir, None) + self.tpl = Journal(tpl_dir, None, self.args) self.tpl.recover(self.high_rid_counter) print jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME) - prepared_list = self.tpl.txn_map.get_prepared_list() + prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {} if os.path.exists(jrnl_dir): for dir_entry in sorted(os.listdir(jrnl_dir)): - jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list) + jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list, self.args) jrnl.recover(self.high_rid_counter) self.journals[jrnl.get_queue_name()] = jrnl print self._reconcile_transactions(prepared_list, args.txn) def _reconcile_transactions(self, prepared_list, txn_flag): print 'Transaction reconciliation report:' + print '==================================' print len(prepared_list), 'open transaction(s) found in prepared transaction list:' for xid in prepared_list.keys(): commit_flag = prepared_list[xid] @@ -79,7 +82,7 @@ class JournalRecoveryManager(object): status = '[Prepared, but interrupted during abort phase]' print ' ', Utils.format_xid(xid), status if prepared_list[xid] is None: # Prepared, but not committed or aborted - enqueue_record = self.tpl.get_txn_map_record(xid) + enqueue_record = self.tpl.get_txn_map_record(xid)[0][1] dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \ self.tpl.current_journal_file, self.high_rid_counter.get_next(), \ enqueue_record.record_id, xid, None) @@ -141,7 +144,7 @@ class EnqueueMap(object): lock_str = '[LOCKED]' else: lock_str = '' - rstr += '\n %d:%s %s' % (journal_file.file_header.file_num, record, lock_str) + rstr += '\n 0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str) else: rstr += '.' return rstr @@ -245,7 +248,7 @@ class TransactionMap(object): for xid, op_list in self.txn_map.iteritems(): rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(op_list)) for journal_file, record, _ in op_list: - rstr += '\n %d:%s' % (journal_file.file_header.file_num, record) + rstr += '\n 0x%x:%s' % (journal_file.file_header.file_num, record) else: rstr += '.' return rstr @@ -303,7 +306,7 @@ class Journal(object): """ Instance of a Qpid Linear Store (QLS) journal. """ - def __init__(self, directory, xid_prepared_list): + def __init__(self, directory, xid_prepared_list, args): self.directory = directory self.queue_name = os.path.basename(directory) self.files = {} @@ -315,6 +318,10 @@ class Journal(object): self.first_rec_flag = None self.statistics = JournalStatistics() self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only + self.args = args + self.last_record_offset = None # TODO: Move into JournalFile + self.num_filler_records_required = None # TODO: Move into JournalFile + self.fill_to_offset = None def add_record(self, record): if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord): if record.xid_size > 0: @@ -334,15 +341,18 @@ class Journal(object): def get_queue_name(self): return self.queue_name def recover(self, high_rid_counter): - print 'Recovering', self.queue_name #DEBUG + print 'Recovering %s' % self.queue_name self._analyze_files() try: while self._get_next_record(high_rid_counter): pass + self._check_alignment() except qls.err.NoMoreFilesInJournalError: - #print '[No more files in journal]' # DEBUG - #print #DEBUG - pass + print 'No more files in journal' + except qls.err.FirstRecordOffsetMismatchError as err: + print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \ + (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(), + err.get_expected_fro()) def reconcile_transactions(self, prepared_list, txn_flag): xid_list = self.txn_map.get_xid_list() if len(xid_list) > 0: @@ -363,11 +373,12 @@ class Journal(object): if txn_flag: self.txn_map.abort(xid) else: - print ' ', Utils.format_xid(xid), '- Aborting, not in prepared transaction list' + print ' ', Utils.format_xid(xid), '- Ignoring, not in prepared transaction list' if txn_flag: self.txn_map.abort(xid) def report(self, print_stats_flag): print 'Journal "%s":' % self.queue_name + print '=' * (11 + len(self.queue_name)) if print_stats_flag: print str(self.statistics) print self.enq_map.report_str(True, True) @@ -375,6 +386,11 @@ class Journal(object): JournalFile.report_header() for file_num in sorted(self.files.keys()): self.files[file_num].report() + #TODO: move this to JournalFile, append to file info + if self.num_filler_records_required is not None and self.fill_to_offset is not None: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) print #--- protected functions --- def _analyze_files(self): @@ -386,21 +402,35 @@ class Journal(object): args = Utils.load_args(file_handle, RecordHeader) file_hdr = FileHeader(*args) file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader)) - if not file_hdr.is_header_valid(file_hdr): - break - file_hdr.load(file_handle) - if not file_hdr.is_valid(): - break - Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE) - self.files[file_hdr.file_num] = JournalFile(file_hdr) + if file_hdr.is_header_valid(file_hdr): + file_hdr.load(file_handle) + if file_hdr.is_valid(): + Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE) + self.files[file_hdr.file_num] = JournalFile(file_hdr) self.file_num_list = sorted(self.files.keys()) self.file_num_itr = iter(self.file_num_list) + def _check_alignment(self): # TODO: Move into JournalFile + remaining_sblks = self.last_record_offset % Utils.SBLK_SIZE + if remaining_sblks == 0: + self.num_filler_records_required = 0 + else: + self.num_filler_records_required = (Utils.SBLK_SIZE - remaining_sblks) / Utils.DBLK_SIZE + self.fill_to_offset = self.last_record_offset + (self.num_filler_records_required * Utils.DBLK_SIZE) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \ + (self.current_journal_file.file_header.file_num, self.last_record_offset, + self.num_filler_records_required, self.fill_to_offset) def _check_file(self): - if self.current_journal_file is not None and not self.current_journal_file.file_header.is_end_of_file(): - return - self._get_next_file() + if self.current_journal_file is not None: + if not self.current_journal_file.file_header.is_end_of_file(): + return True + if self.current_journal_file.file_header.is_end_of_file(): + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() + if not self._get_next_file(): + return False fhdr = self.current_journal_file.file_header fhdr.file_handle.seek(fhdr.first_record_offset) + return True def _get_next_file(self): if self.current_journal_file is not None: file_handle = self.current_journal_file.file_header.file_handle @@ -413,13 +443,16 @@ class Journal(object): except StopIteration: pass if file_num == 0: - raise qls.err.NoMoreFilesInJournalError(self.queue_name) + return False self.current_journal_file = self.files[file_num] self.first_rec_flag = True - print self.current_journal_file.file_header - #print '[file_num=0x%x]' % self.current_journal_file.file_num #DEBUG + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (self.current_journal_file.file_header.file_num, self.current_journal_file.file_header) + return True def _get_next_record(self, high_rid_counter): - self._check_file() + if not self._check_file(): + return False + self.last_record_offset = self.current_journal_file.file_header.file_handle.tell() this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader) if not this_record.is_header_valid(self.current_journal_file.file_header): return False @@ -427,32 +460,42 @@ class Journal(object): if this_record.file_offset != self.current_journal_file.file_header.first_record_offset: raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record) self.first_rec_flag = False - high_rid_counter.check(this_record.record_id) self.statistics.total_record_count += 1 + start_journal_file = self.current_journal_file if isinstance(this_record, EnqueueRecord): - self._handle_enqueue_record(this_record) - print this_record + ok_flag = self._handle_enqueue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) elif isinstance(this_record, DequeueRecord): - self._handle_dequeue_record(this_record) - print this_record + ok_flag = self._handle_dequeue_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) elif isinstance(this_record, TransactionRecord): - self._handle_transaction_record(this_record) - print this_record + ok_flag = self._handle_transaction_record(this_record, start_journal_file) + high_rid_counter.check(this_record.record_id) + if self.args.show_recs or self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) else: self.statistics.filler_record_count += 1 + ok_flag = True + if self.args.show_all_recs: + print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record) Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE) - return True - def _handle_enqueue_record(self, enqueue_record): - start_journal_file = self.current_journal_file + return ok_flag + def _handle_enqueue_record(self, enqueue_record, start_journal_file): while enqueue_record.load(self.current_journal_file.file_header.file_handle): - self._get_next_file() - if not enqueue_record.is_valid(self.current_journal_file): - return + if not self._get_next_file(): + enqueue_record.truncated_flag = True + return False + if not enqueue_record.is_valid(start_journal_file): + return False if enqueue_record.is_external() and enqueue_record.data != None: raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record) if enqueue_record.is_transient(): self.statistics.transient_record_count += 1 - return + return True if enqueue_record.xid_size > 0: self.txn_map.add(start_journal_file, enqueue_record) self.statistics.transaction_operation_count += 1 @@ -462,13 +505,14 @@ class Journal(object): self.enq_map.add(start_journal_file, enqueue_record, False) start_journal_file.incr_enq_cnt() self.statistics.enqueue_count += 1 - #print enqueue_record, # DEBUG - def _handle_dequeue_record(self, dequeue_record): - start_journal_file = self.current_journal_file + return True + def _handle_dequeue_record(self, dequeue_record, start_journal_file): while dequeue_record.load(self.current_journal_file.file_header.file_handle): - self._get_next_file() - if not dequeue_record.is_valid(self.current_journal_file): - return + if not self._get_next_file(): + dequeue_record.truncated_flag = True + return False + if not dequeue_record.is_valid(start_journal_file): + return False if dequeue_record.xid_size > 0: if self.xid_prepared_list is None: # ie this is the TPL dequeue_record.transaction_prepared_list_flag = True @@ -484,12 +528,14 @@ class Journal(object): except qls.err.RecordIdNotFoundError: dequeue_record.warnings.append('NOT IN EMAP') self.statistics.dequeue_count += 1 - #print dequeue_record, # DEBUG - def _handle_transaction_record(self, transaction_record): + return True + def _handle_transaction_record(self, transaction_record, start_journal_file): while transaction_record.load(self.current_journal_file.file_header.file_handle): - self._get_next_file() - if not transaction_record.is_valid(self.current_journal_file): - return + if not self._get_next_file(): + transaction_record.truncated_flag = True + return False + if not transaction_record.is_valid(start_journal_file): + return False if transaction_record.magic[-1] == 'a': self.statistics.transaction_abort_count += 1 else: @@ -501,7 +547,7 @@ class Journal(object): # if transaction_record.magic[-1] == 'c': # commits only # self._txn_obj_list[hdr.xid] = hdr self.statistics.transaction_record_count += 1 - #print transaction_record, # DEBUG + return True def _load_data(self, record): while not record.is_complete: record.load(self.current_journal_file.file_handle) @@ -511,6 +557,7 @@ class JournalFile(object): self.file_header = file_header self.enq_cnt = 0 self.deq_cnt = 0 + self.num_filler_records_required = None def incr_enq_cnt(self): self.enq_cnt += 1 def decr_enq_cnt(self, record): @@ -527,7 +574,8 @@ class JournalFile(object): print '-------- ------- ---- ----- ------------' def report(self): comment = '' if self.file_header.file_num == 0 else '' - print '%8d %7d %4d %4dk %s %s' % (self.file_header.file_num, self.get_enq_cnt(), self.file_header.partition_num, + file_num_str = '0x%x' % self.file_header.file_num + print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num, self.file_header.efp_data_size_kb, os.path.basename(self.file_header.file_handle.name), comment) @@ -541,6 +589,9 @@ class RecordHeader(object): self.serial = serial self.record_id = record_id self.warnings = [] + self.truncated_flag = False + def checksum_encode(self): + return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id) def load(self, file_handle): pass @staticmethod @@ -560,8 +611,6 @@ class RecordHeader(object): if self.version != Utils.RECORD_VERSION: raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION) if self.serial != file_header.serial: - #print '[serial mismatch at 0x%x]' % self.file_offset #DEBUG - #print #DEBUG return False return True def _get_warnings(self): @@ -606,8 +655,8 @@ class RecordTail(object): return False self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \ self.serial == record.serial and \ - self.record_id == record.record_id - # TODO: When we can verify the checksum, add this here + self.record_id == record.record_id and \ + Utils.adler32(record.checksum_encode()) == self.checksum return self.valid_flag def __str__(self): """Return a string representation of the this RecordTail instance""" @@ -660,7 +709,7 @@ class FileHeader(RecordHeader): return strftime(fstr, time) def __str__(self): """Return a string representation of the this FileHeader instance""" - return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num, + return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num, self.first_record_offset, self.partition_num, self.efp_data_size_kb, self.timestamp_str(), self._get_warnings()) @@ -677,6 +726,9 @@ class EnqueueRecord(RecordHeader): self.data = None self.data_complete = False self.record_tail = None + def checksum_encode(self): + return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + \ + self.xid + self.data def is_external(self): return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0 def is_transient(self): @@ -732,6 +784,9 @@ class EnqueueRecord(RecordHeader): return fstr def __str__(self): """Return a string representation of the this EnqueueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, self.data_size) if self.record_tail is None: record_tail_str = '' else: @@ -750,6 +805,9 @@ class DequeueRecord(RecordHeader): self.xid = None self.xid_complete = False self.record_tail = None + def checksum_encode(self): + return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \ + self.xid def is_transaction_complete_commit(self): return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0 def is_valid(self, journal_file): @@ -790,6 +848,10 @@ class DequeueRecord(RecordHeader): return '' def __str__(self): """Return a string representation of the this DequeueRecord instance""" + if self.truncated_flag: + return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self), + self.xid_size, + self.dequeue_record_id) if self.record_tail is None: record_tail_str = '' else: @@ -805,6 +867,8 @@ class TransactionRecord(RecordHeader): self.xid = None self.xid_complete = False self.record_tail = None + def checksum_encode(self): + return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid def is_valid(self, journal_file): if not RecordHeader.is_header_valid(self, journal_file.file_header): return False @@ -832,6 +896,8 @@ class TransactionRecord(RecordHeader): return False def __str__(self): """Return a string representation of the this TransactionRecord instance""" + if self.truncated_flag: + return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size) if self.record_tail is None: record_tail_str = '' else: @@ -845,6 +911,9 @@ class Utils(object): RECORD_VERSION = 2 SBLK_SIZE = 4096 @staticmethod + def adler32(data): + return zlib.adler32(data) & 0xffffffff + @staticmethod def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data): record_class = _CLASSES.get(magic[-1]) record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id) @@ -866,7 +935,7 @@ class Utils(object): record.data_complete = True record.record_tail = RecordTail(None) record.record_tail.xmagic = Utils.inv_str(magic) - record.record_tail.checksum = 0 # TODO: when we can calculate checksums, add this here + record.record_tail.checksum = Utils.adler32(record.checksum_encode()) record.record_tail.serial = record.serial record.record_tail.record_id = record.record_id return record @@ -878,7 +947,7 @@ class Utils(object): # << DEBUG >> begin = data.find('msg') end = data.find('\0', begin) - return 'data="%s"' % data[begin:end] + return 'data(%d)="%s"' % (dsize, data[begin:end]) # << END DEBUG if Utils._is_printable(data): datastr = Utils._split_str(data) @@ -941,7 +1010,8 @@ class Utils(object): @staticmethod def skip(file_handle, boundary): """Read and discard disk bytes until the next multiple of boundary""" - file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary)) + if not file_handle.closed: + file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary)) #--- protected functions --- @staticmethod def _hex_str(in_str, begin, end): diff --git a/tools/src/py/qpid-ha b/tools/src/py/qpid-ha index cf054409d8..daa73d3312 100755 --- a/tools/src/py/qpid-ha +++ b/tools/src/py/qpid-ha @@ -133,12 +133,10 @@ class SetCmd(Command): Command.__init__(self, "set", "Set HA configuration settings") self.add("--brokers-url", "", "string", "URL with address of each broker in the cluster. Used by brokers to connect to each other.") self.add("--public-url", "", "string", "URL advertised to clients to connect to the cluster. May be a list or a VIP.") - self.add("--backups", "", "int", "Expect backups to be running"), def do_execute(self, qmf_broker, ha_broker, opts, args): if (opts.brokers_url): qmf_broker._method("setBrokersUrl", {"url":opts.brokers_url}, HA_BROKER) if (opts.public_url): qmf_broker._method("setPublicUrl", {"url":opts.public_url}, HA_BROKER) - if (opts.backups): qmf_broker._method("setExpectedBackups", {"expectedBackups":opts.backups}, HA_BROKER) SetCmd() @@ -151,7 +149,6 @@ class QueryCmd(Command): for x in [("Status:", hb.status), ("Brokers URL:", hb.brokersUrl), ("Public URL:", hb.publicUrl), - ("Expected Backups:", hb.expectedBackups), ("Replicate: ", hb.replicateDefault) ]: print "%-20s %s"%(x[0], x[1]) diff --git a/tools/src/py/qpid_qls_analyze.py b/tools/src/py/qpid_qls_analyze.py index 165d41fe95..a540587547 100755 --- a/tools/src/py/qpid_qls_analyze.py +++ b/tools/src/py/qpid_qls_analyze.py @@ -37,8 +37,8 @@ class QqpdLinearStoreAnalyzer(object): self.args = None self._process_args() self.qls_dir = os.path.abspath(self.args.qls_dir) - self.efp_manager = efp.EfpManager(self.qls_dir) - self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir) + self.efp_manager = efp.EfpManager(self.qls_dir, self.args) + self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir, self.args) def _analyze_efp(self): self.efp_manager.run(self.args) def _analyze_journals(self): @@ -49,6 +49,10 @@ class QqpdLinearStoreAnalyzer(object): help='Qpid Linear Store (QLS) directory to be analyzed') parser.add_argument('--efp', action='store_true', help='Analyze the Emtpy File Pool (EFP) and show stats') + parser.add_argument('--show-recs', action='store_true', + help='Show material records found during recovery') + parser.add_argument('--show-all-recs', action='store_true', + help='Show all records (including fillers) found during recovery') parser.add_argument('--stats', action='store_true', help='Print journal record stats') parser.add_argument('--txn', action='store_true', -- cgit v1.2.1