diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker')
33 files changed, 438 insertions, 238 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 7fbbf4e2c4..c709606c17 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -164,6 +164,12 @@ void Bridge::destroy() listener(this); } +bool Bridge::isSessionReady() const +{ + SessionHandler& sessionHandler = conn->getChannel(id); + return sessionHandler.ready(); +} + void Bridge::setPersistenceId(uint64_t pId) const { persistenceId = pId; diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index a846254c57..8b4559a871 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -59,6 +59,8 @@ public: void destroy(); bool isDurable() { return args.i_durable; } + bool isSessionReady() const; + management::ManagementObject* GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args, diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 14861ec3df..b02b9fa818 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -435,8 +435,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); - QPID_LOG (debug, "Broker::connect()"); string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; + QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport << + "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); return Manageable::STATUS_NOT_IMPLEMENTED; @@ -455,7 +456,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, QPID_LOG (debug, "Broker::queueMoveMessages()"); if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter)) status = Manageable::STATUS_OK; - else + else return Manageable::STATUS_PARAMETER_INVALID; break; } @@ -804,6 +805,7 @@ bool Broker::deferDeliveryImpl(const std::string& , void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { clusterTimer = t; queueCleaner.setTimer(clusterTimer.get()); + dtxManager.setTimer(*clusterTimer.get()); } const std::string Broker::TCP_TRANSPORT("tcp"); @@ -941,6 +943,9 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); } + if (name.empty()) { + throw framing::InvalidArgumentException(QPID_MSG("Delete not allowed for default exchange")); + } Exchange::shared_ptr exchange(exchanges.get(name)); if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); @@ -968,6 +973,9 @@ void Broker::bind(const std::string& queueName, if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId)); } + if (exchangeName.empty()) { + throw framing::InvalidArgumentException(QPID_MSG("Bind not allowed for default exchange")); + } Queue::shared_ptr queue = queues.find(queueName); Exchange::shared_ptr exchange = exchanges.get(exchangeName); @@ -998,13 +1006,15 @@ void Broker::unbind(const std::string& queueName, if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId)); } - + if (exchangeName.empty()) { + throw framing::InvalidArgumentException(QPID_MSG("Unbind not allowed for default exchange")); + } Queue::shared_ptr queue = queues.find(queueName); Exchange::shared_ptr exchange = exchanges.get(exchangeName); if (!queue) { - throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); + throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName)); } else if (!exchange) { - throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); + throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName)); } else { if (exchange->unbind(queue, key, 0)) { if (exchange->isDurable() && queue->isDurable()) { diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 8362a9782c..0b3059d26c 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -156,16 +156,7 @@ Connection::~Connection() void Connection::received(framing::AMQFrame& frame) { // Received frame on connection so delay timeout restartTimeout(); - - if (frame.getChannel() == 0 && frame.getMethod()) { - adapter.handle(frame); - } else { - if (adapter.isOpen()) - getChannel(frame.getChannel()).in(frame); - else - close(connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received."); - } - + adapter.handle(frame); if (isLink) //i.e. we are acting as the client to another broker recordFromServer(frame); else diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 270711705e..015002a70c 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -68,8 +68,15 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) AMQMethodBody* method=frame.getBody()->getMethod(); Connection::ErrorListener* errorListener = handler->connection.getErrorListener(); try{ - if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) { + if (method && invoke( + static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), *method)) { + // This is a connection control frame, nothing more to do. + } else if (isOpen()) { handler->connection.getChannel(frame.getChannel()).in(frame); + } else { + handler->proxy.close( + connection::CLOSE_CODE_FRAMING_ERROR, + "Connection not yet open, invalid frame received."); } }catch(ConnectionException& e){ if (errorListener) errorListener->connectionError(e.what()); @@ -185,7 +192,7 @@ void ConnectionHandler::Handler::secureOk(const string& response) void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, uint16_t framemax, uint16_t heartbeat) { - connection.setFrameMax(framemax); + if (framemax) connection.setFrameMax(framemax); connection.setHeartbeatInterval(heartbeat); } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 17ecef60d9..0b8fe95d5e 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, ui { id = deliveryId; if (msg.payload->getRedelivered()){ - msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true); + msg.payload->setRedelivered(); } msg.payload->adjustTtl(); diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index 4465825d7f..5a331357be 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -90,7 +90,7 @@ class DeliveryRecord bool isAcquired() const { return acquired; } bool isComplete() const { return completed; } - bool isRedundant() const { return ended && (!windowing || completed); } // msg no longer needed - can discard + bool isRedundant() const { return ended && (!windowing || completed || cancelled); } bool isCancelled() const { return cancelled; } bool isAccepted() const { return !acceptExpected; } bool isEnded() const { return ended; } diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 060f80f60d..5591539853 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -139,6 +139,9 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c if (mgmtExchange != 0) { mgmtExchange->dec_bindingCount(); } + if (bk.queues.empty()) { + bindings.erase(routingKey); + } } else { return false; } diff --git a/qpid/cpp/src/qpid/broker/DtxAck.cpp b/qpid/cpp/src/qpid/broker/DtxAck.cpp index bca3f90bbe..c558681d62 100644 --- a/qpid/cpp/src/qpid/broker/DtxAck.cpp +++ b/qpid/cpp/src/qpid/broker/DtxAck.cpp @@ -32,6 +32,10 @@ DtxAck::DtxAck(const qpid::framing::SequenceSet& acked, DeliveryRecords& unacked not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); } +DtxAck::DtxAck(DeliveryRecords& unacked) { + pending = unacked; +} + bool DtxAck::prepare(TransactionContext* ctxt) throw() { try{ diff --git a/qpid/cpp/src/qpid/broker/DtxAck.h b/qpid/cpp/src/qpid/broker/DtxAck.h index 166147e58d..16c3ff8ba0 100644 --- a/qpid/cpp/src/qpid/broker/DtxAck.h +++ b/qpid/cpp/src/qpid/broker/DtxAck.h @@ -1,3 +1,6 @@ +#ifndef QPID_BROKER_DTXACK_H +#define QPID_BROKER_DTXACK_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,9 +21,6 @@ * under the License. * */ -#ifndef _DtxAck_ -#define _DtxAck_ - #include <algorithm> #include <functional> #include <list> @@ -29,20 +29,21 @@ #include "qpid/broker/TxOp.h" namespace qpid { - namespace broker { - class DtxAck : public TxOp{ - DeliveryRecords pending; +namespace broker { +class DtxAck : public TxOp{ + DeliveryRecords pending; - public: - DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~DtxAck(){} - virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } - }; - } -} + public: + DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked); + DtxAck(DeliveryRecords& unacked); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~DtxAck(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + const DeliveryRecords& getPending() const { return pending; } +}; +}} // qpid::broker -#endif +#endif /*!QPID_BROKER_DTXACK_H*/ diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp index f1b8169cf7..13177d3b72 100644 --- a/qpid/cpp/src/qpid/broker/DtxBuffer.cpp +++ b/qpid/cpp/src/qpid/broker/DtxBuffer.cpp @@ -23,8 +23,11 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer(const std::string& _xid) - : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {} +DtxBuffer::DtxBuffer( + const std::string& _xid, + bool ended_, bool suspended_, bool failed_, bool expired_) + : xid(_xid), ended(ended_), suspended(suspended_), failed(failed_), expired(expired_) +{} DtxBuffer::~DtxBuffer() {} @@ -34,7 +37,7 @@ void DtxBuffer::markEnded() ended = true; } -bool DtxBuffer::isEnded() +bool DtxBuffer::isEnded() const { Mutex::ScopedLock locker(lock); return ended; @@ -45,7 +48,7 @@ void DtxBuffer::setSuspended(bool isSuspended) suspended = isSuspended; } -bool DtxBuffer::isSuspended() +bool DtxBuffer::isSuspended() const { return suspended; } @@ -58,13 +61,13 @@ void DtxBuffer::fail() ended = true; } -bool DtxBuffer::isRollbackOnly() +bool DtxBuffer::isRollbackOnly() const { Mutex::ScopedLock locker(lock); return failed; } -const std::string& DtxBuffer::getXid() +std::string DtxBuffer::getXid() const { return xid; } @@ -76,8 +79,13 @@ void DtxBuffer::timedout() fail(); } -bool DtxBuffer::isExpired() +bool DtxBuffer::isExpired() const { Mutex::ScopedLock locker(lock); return expired; } + +bool DtxBuffer::isFailed() const +{ + return failed; +} diff --git a/qpid/cpp/src/qpid/broker/DtxBuffer.h b/qpid/cpp/src/qpid/broker/DtxBuffer.h index 1511cb032f..cabd37647a 100644 --- a/qpid/cpp/src/qpid/broker/DtxBuffer.h +++ b/qpid/cpp/src/qpid/broker/DtxBuffer.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,31 +26,34 @@ #include "qpid/sys/Mutex.h" namespace qpid { - namespace broker { - class DtxBuffer : public TxBuffer{ - sys::Mutex lock; - const std::string xid; - bool ended; - bool suspended; - bool failed; - bool expired; +namespace broker { +class DtxBuffer : public TxBuffer{ + mutable sys::Mutex lock; + const std::string xid; + bool ended; + bool suspended; + bool failed; + bool expired; - public: - typedef boost::shared_ptr<DtxBuffer> shared_ptr; + public: + typedef boost::shared_ptr<DtxBuffer> shared_ptr; - QPID_BROKER_EXTERN DtxBuffer(const std::string& xid = ""); - QPID_BROKER_EXTERN ~DtxBuffer(); - QPID_BROKER_EXTERN void markEnded(); - bool isEnded(); - void setSuspended(bool suspended); - bool isSuspended(); - void fail(); - bool isRollbackOnly(); - void timedout(); - bool isExpired(); - const std::string& getXid(); - }; - } + QPID_BROKER_EXTERN DtxBuffer( + const std::string& xid = "", + bool ended=false, bool suspended=false, bool failed=false, bool expired=false); + QPID_BROKER_EXTERN ~DtxBuffer(); + QPID_BROKER_EXTERN void markEnded(); + bool isEnded() const; + void setSuspended(bool suspended); + bool isSuspended() const; + void fail(); + bool isRollbackOnly() const; + void timedout(); + bool isExpired() const; + bool isFailed() const; + std::string getXid() const; +}; +} } diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index 3caa41c3f4..febd547478 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,7 +34,7 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {} +DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} DtxManager::~DtxManager() {} @@ -53,8 +53,8 @@ void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionCon createWork(xid)->recover(txn, ops); } -bool DtxManager::prepare(const std::string& xid) -{ +bool DtxManager::prepare(const std::string& xid) +{ QPID_LOG(debug, "preparing: " << xid); try { return getWork(xid)->prepare(); @@ -64,8 +64,8 @@ bool DtxManager::prepare(const std::string& xid) } } -bool DtxManager::commit(const std::string& xid, bool onePhase) -{ +bool DtxManager::commit(const std::string& xid, bool onePhase) +{ QPID_LOG(debug, "committing: " << xid); try { bool result = getWork(xid)->commit(onePhase); @@ -77,8 +77,8 @@ bool DtxManager::commit(const std::string& xid, bool onePhase) } } -void DtxManager::rollback(const std::string& xid) -{ +void DtxManager::rollback(const std::string& xid) +{ QPID_LOG(debug, "rolling back: " << xid); try { getWork(xid)->rollback(); @@ -91,7 +91,7 @@ void DtxManager::rollback(const std::string& xid) DtxWorkRecord* DtxManager::getWork(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); @@ -99,9 +99,14 @@ DtxWorkRecord* DtxManager::getWork(const std::string& xid) return ptr_map_ptr(i); } +bool DtxManager::exists(const std::string& xid) { + Mutex::ScopedLock locker(lock); + return work.find(xid) != work.end(); +} + void DtxManager::remove(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { throw NotFoundException(QPID_MSG("Unrecognised xid " << xid)); @@ -110,14 +115,15 @@ void DtxManager::remove(const std::string& xid) } } -DtxWorkRecord* DtxManager::createWork(std::string xid) +DtxWorkRecord* DtxManager::createWork(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i != work.end()) { throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); } else { - return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first); + std::string ncxid = xid; // Work around const correctness problems in ptr_map. + return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first); } } @@ -131,7 +137,7 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs) } timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid)); record->setTimeout(timeout); - timer.add(timeout); + timer->add(timeout); } uint32_t DtxManager::getTimeout(const std::string& xid) @@ -142,7 +148,7 @@ uint32_t DtxManager::getTimeout(const std::string& xid) void DtxManager::timedout(const std::string& xid) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { QPID_LOG(warning, "Transaction timeout failed: no record for xid"); @@ -153,7 +159,7 @@ void DtxManager::timedout(const std::string& xid) } } -DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) +DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {} void DtxManager::DtxCleanup::fire() diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index 680b62eeb2..11895695a3 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,8 +26,8 @@ #include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/framing/amqp_types.h" -#include "qpid/sys/Timer.h" #include "qpid/sys/Mutex.h" +#include "qpid/ptr_map.h" namespace qpid { namespace broker { @@ -39,22 +39,21 @@ class DtxManager{ { DtxManager& mgr; const std::string& xid; - - DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid); + + DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid); void fire(); }; WorkMap work; TransactionalStore* store; qpid::sys::Mutex lock; - qpid::sys::Timer& timer; + qpid::sys::Timer* timer; void remove(const std::string& xid); - DtxWorkRecord* getWork(const std::string& xid); - DtxWorkRecord* createWork(std::string xid); + DtxWorkRecord* createWork(const std::string& xid); public: - DtxManager(qpid::sys::Timer&); + DtxManager(sys::Timer&); ~DtxManager(); void start(const std::string& xid, DtxBuffer::shared_ptr work); void join(const std::string& xid, DtxBuffer::shared_ptr work); @@ -66,6 +65,15 @@ public: uint32_t getTimeout(const std::string& xid); void timedout(const std::string& xid); void setStore(TransactionalStore* store); + void setTimer(sys::Timer& t) { timer = &t; } + + // Used by cluster for replication. + template<class F> void each(F f) const { + for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i) + f(*ptr_map_ptr(i)); + } + DtxWorkRecord* getWork(const std::string& xid); + bool exists(const std::string& xid); }; } diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp index c4c52ec40a..58700846ef 100644 --- a/qpid/cpp/src/qpid/broker/DtxTimeout.cpp +++ b/qpid/cpp/src/qpid/broker/DtxTimeout.cpp @@ -25,7 +25,7 @@ using namespace qpid::broker; DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) - : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout"), timeout(_timeout), mgr(_mgr), xid(_xid) + : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout-"+_xid), timeout(_timeout), mgr(_mgr), xid(_xid) { } diff --git a/qpid/cpp/src/qpid/broker/DtxTimeout.h b/qpid/cpp/src/qpid/broker/DtxTimeout.h index 680a210e4f..1fcb4cee2a 100644 --- a/qpid/cpp/src/qpid/broker/DtxTimeout.h +++ b/qpid/cpp/src/qpid/broker/DtxTimeout.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,7 +29,9 @@ namespace broker { class DtxManager; -struct DtxTimeoutException : public Exception {}; +struct DtxTimeoutException : public Exception { + DtxTimeoutException(const std::string& msg=std::string()) : Exception(msg) {} +}; struct DtxTimeout : public sys::TimerTask { @@ -37,7 +39,7 @@ struct DtxTimeout : public sys::TimerTask DtxManager& mgr; const std::string xid; - DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); + DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); void fire(); }; diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp index 9f33e698db..a413fe418d 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,19 +28,19 @@ using qpid::sys::Mutex; using namespace qpid::broker; using namespace qpid::framing; -DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} -DtxWorkRecord::~DtxWorkRecord() +DtxWorkRecord::~DtxWorkRecord() { - if (timeout.get()) { + if (timeout.get()) { timeout->cancel(); } } bool DtxWorkRecord::prepare() { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); if (check()) { txn = store->begin(xid); if (prepare(txn.get())) { @@ -68,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionContext* _txn) bool DtxWorkRecord::commit(bool onePhase) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); if (check()) { if (prepared) { //already prepared i.e. 2pc @@ -78,13 +78,13 @@ bool DtxWorkRecord::commit(bool onePhase) store->commit(*txn); txn.reset(); - + std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); return true; } else { //1pc commit optimisation, don't need a 2pc transaction context: if (!onePhase) { - throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); + throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); } std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { @@ -107,16 +107,16 @@ bool DtxWorkRecord::commit(bool onePhase) void DtxWorkRecord::rollback() { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); check(); abort(); } void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); if (expired) { - throw DtxTimeoutException(); + throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out.")); } if (completed) { throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!")); @@ -163,7 +163,7 @@ void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer void DtxWorkRecord::timedout() { - Mutex::ScopedLock locker(lock); + Mutex::ScopedLock locker(lock); expired = true; rolledback = true; if (!completed) { @@ -175,3 +175,17 @@ void DtxWorkRecord::timedout() } abort(); } + +size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) { + Work::iterator i = std::find(work.begin(), work.end(), buf); + if (i == work.end()) throw NotFoundException( + QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid())); + return i - work.begin(); +} + +DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const { + if (i > work.size()) + throw NotFoundException( + QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid)); + return work[i]; +} diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h index aec2d2aed4..331e42fefd 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h @@ -73,9 +73,19 @@ public: void timedout(); void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; } boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; } + std::string getXid() const { return xid; } + bool isCompleted() const { return completed; } + bool isRolledback() const { return rolledback; } + bool isPrepared() const { return prepared; } + bool isExpired() const { return expired; } + + // Used by cluster update; + size_t size() const { return work.size(); } + DtxBuffer::shared_ptr operator[](size_t i) const; + uint32_t getTimeout() const { return timeout? timeout->timeout : 0; } + size_t indexOf(const DtxBuffer::shared_ptr&); }; -} -} +}} // qpid::broker #endif diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index 622cc81002..d68845062d 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -58,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { if (parent->sequence){ parent->sequenceNo++; - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo); } if (parent->ive) { parent->lastMsg = &( msg.getMessage()); @@ -390,7 +390,7 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) } void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { - msg->getProperties<DeliveryProperties>()->setExchange(getName()); + msg->setExchange(getName()); } bool Exchange::routeWithAlternate(Deliverable& msg) diff --git a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp index a811a86492..3262e343a3 100644 --- a/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp +++ b/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp @@ -93,11 +93,7 @@ void LegacyLVQ::removeIf(Predicate p) //purging of an LVQ is not enabled if the broker is clustered //(expired messages will be removed on delivery and consolidated //by key as part of normal LVQ operation). - - //TODO: Is there a neater way to check whether broker is - //clustered? Here we assume that if the clustered timer is the - //same as the regular timer, we are not clustered: - if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer())) + if (!broker || !broker->isInCluster()) MessageMap::removeIf(p); } diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 9ab4379a69..8010bf43e7 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -248,6 +248,19 @@ void Link::ioThreadProcessing() return; QPID_LOG(debug, "Link::ioThreadProcessing()"); + // check for bridge session errors and recover + if (!active.empty()) { + Bridges::iterator removed = std::remove_if( + active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1)); + for (Bridges::iterator i = removed; i != active.end(); ++i) { + Bridge::shared_ptr bridge = *i; + bridge->closed(); + bridge->cancel(*connection); + created.push_back(bridge); + } + active.erase(removed, active.end()); + } + //process any pending creates and/or cancellations if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { @@ -296,7 +309,7 @@ void Link::maintenanceVisit () } } } - else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0) + else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index d694d1eafd..992a94f92e 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -30,6 +30,7 @@ #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include <time.h> @@ -51,18 +52,9 @@ Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), expiration(FAR_FUTURE), dequeueCallback(0), - inCallback(false), requiredCredit(0), isManagementMessage(false) + inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false) {} -Message::Message(const Message& original) : - PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), dequeueCallback(0), - inCallback(false), requiredCredit(0) -{ - setExpiryPolicy(original.expiryPolicy); -} - Message::~Message() {} void Message::forcePersistent() @@ -288,6 +280,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) sys::Mutex::ScopedLock l(lock); Relay f(out); frames.map_if(f, TypeFilter<HEADER_BODY>()); + //as frame (and pointer to body) has now been passed to handler, + //subsequent modifications should use a copy + copyHeaderOnWrite = true; } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -342,11 +337,30 @@ bool Message::isExcluded(const std::vector<std::string>& excludes) const return false; } +class CloneHeaderBody +{ +public: + void operator()(AMQFrame& f) + { + f.cloneBody(); + } +}; + +AMQHeaderBody* Message::getHeaderBody() +{ + if (copyHeaderOnWrite) { + CloneHeaderBody f; + frames.map_if(f, TypeFilter<HEADER_BODY>()); + copyHeaderOnWrite = false; + } + return frames.getHeaders(); +} + void Message::addTraceId(const std::string& id) { sys::Mutex::ScopedLock l(lock); if (isA<MessageTransferBody>()) { - FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders(); + FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); std::string trace = headers.getAsString(X_QPID_TRACE); if (trace.empty()) { headers.setString(X_QPID_TRACE, id); @@ -360,7 +374,8 @@ void Message::addTraceId(const std::string& id) void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -382,9 +397,9 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) void Message::adjustTtl() { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { - sys::Mutex::ScopedLock l(lock); if (expiration < FAR_FUTURE) { sys::AbsTime current( expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); @@ -395,6 +410,42 @@ void Message::adjustTtl() } } +void Message::setRedelivered() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true); +} + +void Message::insertCustomProperty(const std::string& key, int64_t value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value); +} + +void Message::insertCustomProperty(const std::string& key, const std::string& value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value); +} + +void Message::removeCustomProperty(const std::string& key) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key); +} + +void Message::setExchange(const std::string& exchange) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<DeliveryProperties>()->setExchange(exchange); +} + +void Message::clearApplicationHeadersFlag() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag(); +} + void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } @@ -442,11 +493,6 @@ uint8_t Message::getPriority() const { return getAdapter().getPriority(frames); } -framing::FieldTable& Message::getOrInsertHeaders() -{ - return getProperties<MessageProperties>()->getApplicationHeaders(); -} - bool Message::getIsManagementMessage() const { return isManagementMessage; } void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index e1d6c60a80..2a23a25d06 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -29,13 +29,17 @@ #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" #include <boost/function.hpp> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <memory> #include <string> #include <vector> namespace qpid { namespace framing { +class AMQBody; +class AMQHeaderBody; class FieldTable; class SequenceNumber; } @@ -53,7 +57,6 @@ public: typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback; QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber()); - QPID_BROKER_EXTERN Message(const Message&); QPID_BROKER_EXTERN ~Message(); uint64_t getPersistenceId() const { return persistenceId; } @@ -75,7 +78,6 @@ public: bool isImmediate() const; QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const; QPID_BROKER_EXTERN std::string getAppId() const; - framing::FieldTable& getOrInsertHeaders(); QPID_BROKER_EXTERN bool isPersistent() const; bool requiresAccept(); @@ -85,18 +87,19 @@ public: sys::AbsTime getExpiration() const { return expiration; } void setExpiration(sys::AbsTime exp) { expiration = exp; } void adjustTtl(); + void setRedelivered(); + QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value); + QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value); + QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key); + void setExchange(const std::string&); + void clearApplicationHeadersFlag(); framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } - template <class T> T* getProperties() { - qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); - } - template <class T> const T* getProperties() const { const qpid::framing::AMQHeaderBody* p = frames.getHeaders(); - return p->get<T>(true); + return p->get<T>(); } template <class T> const T* hasProperties() const { @@ -156,9 +159,8 @@ public: bool isExcluded(const std::vector<std::string>& excludes) const; void addTraceId(const std::string& id); - void forcePersistent(); - bool isForcedPersistent(); - + void forcePersistent(); + bool isForcedPersistent(); /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); @@ -178,7 +180,7 @@ public: bool redelivered; bool loaded; bool staged; - bool forcePersistentPolicy; // used to force message as durable, via a broker policy + bool forcePersistentPolicy; // used to force message as durable, via a broker policy ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; @@ -194,6 +196,15 @@ public: uint32_t requiredCredit; bool isManagementMessage; + mutable bool copyHeaderOnWrite; + + /** + * Expects lock to be held + */ + template <class T> T* getModifiableProperties() { + return getHeaderBody()->get<T>(true); + } + qpid::framing::AMQHeaderBody* getHeaderBody(); }; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 224446adb2..d4140f3505 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -702,7 +702,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); + if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); listeners.populate(copy); @@ -805,11 +805,6 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg } if (traceId.size()) { - //copy on write: take deep copy of message before modifying it - //as the frames may already be available for delivery on other - //threads - boost::intrusive_ptr<Message> copy(new Message(*msg)); - msg = copy; msg->addTraceId(traceId); } diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index acdb4934d4..12a13ccfe6 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -381,13 +381,17 @@ void CyrusAuthenticator::start(const string& mechanism, const string& response) const char *challenge; unsigned int challenge_len; - QPID_LOG(debug, "SASL: Starting authentication with mechanism: " << mechanism); + // This should be at same debug level as mech list in getMechanisms(). + QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism); int code = sasl_server_start(sasl_conn, mechanism.c_str(), - response.c_str(), response.length(), + response.size() ? response.c_str() : 0, response.length(), &challenge, &challenge_len); processAuthenticationStep(code, challenge, challenge_len); + qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); + if ( cnxMgmt ) + cnxMgmt->set_saslMechanism(mechanism); } void CyrusAuthenticator::step(const string& response) @@ -424,10 +428,12 @@ void CyrusAuthenticator::processAuthenticationStep(int code, const char *challen client.secure(challenge_str); } else { std::string uid; + //save error detail before trying to retrieve username as error in doing so will overwrite it + std::string errordetail = sasl_errdetail(sasl_conn); if (!getUsername(uid)) { - QPID_LOG(info, "SASL: Authentication failed (no username available):" << sasl_errdetail(sasl_conn)); + QPID_LOG(info, "SASL: Authentication failed (no username available yet):" << errordetail); } else { - QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << sasl_errdetail(sasl_conn)); + QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << errordetail); } // TODO: Change to more specific exceptions, when they are @@ -459,6 +465,9 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr if (ssf) { securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); } + qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); + if ( cnxMgmt ) + cnxMgmt->set_saslSsf(ssf); return securityLayer; } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 52aad75748..94d0cc87f7 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -132,6 +132,10 @@ bool SemanticState::cancel(const string& tag) //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); + //can also remove any records that are now redundant + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1)); + unacked.erase(removed, unacked.end()); return true; } else { return false; @@ -177,8 +181,8 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) if (!dtxSelected) { throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); } - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + dtxBuffer.reset(new DtxBuffer(xid)); + txBuffer = dtxBuffer; if (join) { mgr.join(xid, dtxBuffer); } else { @@ -246,7 +250,7 @@ void SemanticState::resumeDtx(const std::string& xid) checkDtxTimeout(); dtxBuffer->setSuspended(false); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + txBuffer = dtxBuffer; } void SemanticState::checkDtxTimeout() @@ -284,6 +288,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, acquire(_acquire), blocked(true), windowing(true), + windowActive(false), exclusive(_exclusive), resumeId(_resumeId), tag(_tag), @@ -534,7 +539,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); - if (windowing) { + if (windowing && windowActive) { if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); } @@ -641,6 +646,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -650,6 +656,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -670,7 +677,8 @@ void SemanticState::ConsumerImpl::flush() { while(haveCredit() && queue->dispatch(shared_from_this())) ; - stop(); + msgCredit = 0; + byteCredit = 0; } void SemanticState::ConsumerImpl::stop() @@ -678,6 +686,7 @@ void SemanticState::ConsumerImpl::stop() assertClusterSafe(); msgCredit = 0; byteCredit = 0; + windowActive = false; } Queue::shared_ptr SemanticState::getQueue(const string& name) const { @@ -711,6 +720,10 @@ void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedeliver DeliveryRecords::reverse_iterator start(range.end); DeliveryRecords::reverse_iterator end(range.start); for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered)); + + DeliveryRecords::iterator removed = + remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1)); + unacked.erase(removed, range.end); } void SemanticState::reject(DeliveryId first, DeliveryId last) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 1cff46f369..12ccc75f11 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -65,7 +65,7 @@ class SessionContext; * * Message delivery is driven by ConsumerImpl::doOutput(), which is * called when a client's socket is ready to write data. - * + * */ class SemanticState : private boost::noncopyable { public: @@ -80,6 +80,7 @@ class SemanticState : private boost::noncopyable { const bool acquire; bool blocked; bool windowing; + bool windowActive; bool exclusive; std::string resumeId; const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command @@ -106,9 +107,9 @@ class SemanticState : private boost::noncopyable { uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); OwnershipToken* getSession(); - bool deliver(QueuedMessage& msg); - bool filter(boost::intrusive_ptr<Message> msg); - bool accept(boost::intrusive_ptr<Message> msg); + bool deliver(QueuedMessage& msg); + bool filter(boost::intrusive_ptr<Message> msg); + bool accept(boost::intrusive_ptr<Message> msg); void disableNotify(); void enableNotify(); @@ -123,7 +124,7 @@ class SemanticState : private boost::noncopyable { void addMessageCredit(uint32_t value); void flush(); void stop(); - void complete(DeliveryRecord&); + void complete(DeliveryRecord&); boost::shared_ptr<Queue> getQueue() const { return queue; } bool isBlocked() const { return blocked; } bool setBlocked(bool set) { std::swap(set, blocked); return set; } @@ -148,9 +149,10 @@ class SemanticState : private boost::noncopyable { management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; + typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; + private: typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; - typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; SessionContext& session; DeliveryAdapter& deliveryAdapter; @@ -181,6 +183,7 @@ class SemanticState : private boost::noncopyable { void disable(ConsumerImpl::shared_ptr); public: + SemanticState(DeliveryAdapter&, SessionContext&); ~SemanticState(); @@ -189,7 +192,7 @@ class SemanticState : private boost::noncopyable { const ConsumerImpl::shared_ptr find(const std::string& destination) const; bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const; - + /** * Get named queue, never returns 0. * @return: named queue @@ -197,11 +200,11 @@ class SemanticState : private boost::noncopyable { * @exception: ConnectionException if name="" and session has no default. */ boost::shared_ptr<Queue> getQueue(const std::string& name) const; - + bool exists(const std::string& consumerTag); - void consume(const std::string& destination, - boost::shared_ptr<Queue> queue, + void consume(const std::string& destination, + boost::shared_ptr<Queue> queue, bool ackRequired, bool acquire, bool exclusive, const std::string& resumeId=std::string(), uint64_t resumeTtl=0, const framing::FieldTable& = framing::FieldTable()); @@ -219,12 +222,13 @@ class SemanticState : private boost::noncopyable { void commit(MessageStore* const store); void rollback(); void selectDtx(); + bool getDtxSelected() const { return dtxSelected; } void startDtx(const std::string& xid, DtxManager& mgr, bool join); void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); void recover(bool requeue); - void deliver(DeliveryRecord& message, bool sync); + void deliver(DeliveryRecord& message, bool sync); void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired); void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); @@ -245,9 +249,12 @@ class SemanticState : private boost::noncopyable { DeliveryRecords& getUnacked() { return unacked; } framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } + DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; } void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; } + void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; } void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); + DtxBufferMap& getSuspendedXids() { return suspendedXids; } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index ee9646931e..742dbe9be8 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -445,8 +445,6 @@ void SessionState::addPendingExecutionSync() boost::intrusive_ptr<AsyncCompletion::Callback> SessionState::IncompleteIngressMsgXfer::clone() { - boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); - // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed. // If the client is pending the message.transfer completion, flush now to force immediate write to journal. if (requiresSync) @@ -457,7 +455,8 @@ SessionState::IncompleteIngressMsgXfer::clone() pending = true; completerContext->addPendingMessage(msg); } - return cb; + + return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this)); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index b43df0c0aa..506af85c47 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -256,7 +256,15 @@ class SessionState : public qpid::SessionState, requiresAccept(m->requiresAccept()), requiresSync(m->getFrames().getMethod()->isSync()), pending(false) {} - virtual ~IncompleteIngressMsgXfer() {}; + IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x ) + : AsyncCommandContext(x.session, x.msg->getCommandId()), + session(x.session), + msg(x.msg), + requiresAccept(x.requiresAccept), + requiresSync(x.requiresSync), + pending(x.pending) {} + + virtual ~IncompleteIngressMsgXfer() {}; virtual void completed(bool); virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone(); diff --git a/qpid/cpp/src/qpid/broker/TxBuffer.cpp b/qpid/cpp/src/qpid/broker/TxBuffer.cpp index b509778e89..d92e6ace48 100644 --- a/qpid/cpp/src/qpid/broker/TxBuffer.cpp +++ b/qpid/cpp/src/qpid/broker/TxBuffer.cpp @@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(TransactionalStore* const store) } void TxBuffer::accept(TxOpConstVisitor& v) const { - std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); + std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); } diff --git a/qpid/cpp/src/qpid/broker/TxPublish.h b/qpid/cpp/src/qpid/broker/TxPublish.h index f0b9c0a302..dba7878af2 100644 --- a/qpid/cpp/src/qpid/broker/TxPublish.h +++ b/qpid/cpp/src/qpid/broker/TxPublish.h @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,57 +34,58 @@ #include <boost/intrusive_ptr.hpp> namespace qpid { - namespace broker { - /** - * Defines the behaviour for publish operations on a - * transactional channel. Messages are routed through - * exchanges when received but are not at that stage delivered - * to the matching queues, rather the queues are held in an - * instance of this class. On prepare() the message is marked - * enqueued to the relevant queues in the MessagesStore. On - * commit() the messages will be passed to the queue for - * dispatch or to be added to the in-memory queue. - */ - class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{ +namespace broker { +/** + * Defines the behaviour for publish operations on a + * transactional channel. Messages are routed through + * exchanges when received but are not at that stage delivered + * to the matching queues, rather the queues are held in an + * instance of this class. On prepare() the message is marked + * enqueued to the relevant queues in the MessagesStore. On + * commit() the messages will be passed to the queue for + * dispatch or to be added to the in-memory queue. + */ +class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{ - class Commit{ - boost::intrusive_ptr<Message>& msg; - public: - Commit(boost::intrusive_ptr<Message>& msg); - void operator()(const boost::shared_ptr<Queue>& queue); - }; - class Rollback{ - boost::intrusive_ptr<Message>& msg; - public: - Rollback(boost::intrusive_ptr<Message>& msg); - void operator()(const boost::shared_ptr<Queue>& queue); - }; + class Commit{ + boost::intrusive_ptr<Message>& msg; + public: + Commit(boost::intrusive_ptr<Message>& msg); + void operator()(const boost::shared_ptr<Queue>& queue); + }; + class Rollback{ + boost::intrusive_ptr<Message>& msg; + public: + Rollback(boost::intrusive_ptr<Message>& msg); + void operator()(const boost::shared_ptr<Queue>& queue); + }; - boost::intrusive_ptr<Message> msg; - std::list<boost::shared_ptr<Queue> > queues; - std::list<boost::shared_ptr<Queue> > prepared; + boost::intrusive_ptr<Message> msg; + std::list<boost::shared_ptr<Queue> > queues; + std::list<boost::shared_ptr<Queue> > prepared; - void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); + void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); - public: - QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg); - QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw(); - QPID_BROKER_EXTERN virtual void commit() throw(); - QPID_BROKER_EXTERN virtual void rollback() throw(); + public: + QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg); + QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw(); + QPID_BROKER_EXTERN virtual void commit() throw(); + QPID_BROKER_EXTERN virtual void rollback() throw(); - virtual Message& getMessage() { return *msg; }; - - QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue); + virtual Message& getMessage() { return *msg; }; - virtual ~TxPublish(){} - virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue); - QPID_BROKER_EXTERN uint64_t contentSize(); + virtual ~TxPublish(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } - boost::intrusive_ptr<Message> getMessage() const { return msg; } - const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; } - }; - } + QPID_BROKER_EXTERN uint64_t contentSize(); + + boost::intrusive_ptr<Message> getMessage() const { return msg; } + const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; } + const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; } +}; +} } diff --git a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp index 962877a471..d26b370632 100644 --- a/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp @@ -93,6 +93,7 @@ NullAuthenticator::~NullAuthenticator() {} void NullAuthenticator::getMechanisms(Array& mechanisms) { mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS"))); + mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN"))); } void NullAuthenticator::start(const string& mechanism, const string& response) diff --git a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index 676074a590..1dff1ddc8f 100644 --- a/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -27,10 +27,14 @@ #include "qpid/sys/AsynchIOHandler.h" #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/Socket.h" +#include "qpid/sys/SocketAddress.h" #include "qpid/sys/SystemInfo.h" #include "qpid/sys/windows/SslAsynchIO.h" + #include <boost/bind.hpp> +#include <boost/ptr_container/ptr_vector.hpp> #include <memory> + // security.h needs to see this to distinguish from kernel use. #define SECURITY_WIN32 #include <security.h> @@ -68,9 +72,10 @@ struct SslServerOptions : qpid::Options }; class SslProtocolFactory : public qpid::sys::ProtocolFactory { - qpid::sys::Socket listener; const bool tcpNoDelay; - const uint16_t listeningPort; + boost::ptr_vector<Socket> listeners; + boost::ptr_vector<AsynchAcceptor> acceptors; + uint16_t listeningPort; std::string brokerHost; const bool clientAuthSelected; std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor; @@ -78,7 +83,7 @@ class SslProtocolFactory : public qpid::sys::ProtocolFactory { CredHandle credHandle; public: - SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay); + SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, int backlog, bool nodelay); ~SslProtocolFactory(); void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*); void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port, @@ -114,6 +119,7 @@ static struct SslPlugin : public Plugin { try { const broker::Broker::Options& opts = broker->getOptions(); ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options, + "", boost::lexical_cast<std::string>(options.port), opts.connectionBacklog, opts.tcpNoDelay)); QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort()); @@ -126,12 +132,13 @@ static struct SslPlugin : public Plugin { } sslPlugin; SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, - int backlog, + const std::string& host, const std::string& port, int backlog, bool nodelay) : tcpNoDelay(nodelay), - listeningPort(listener.listen("", boost::lexical_cast<std::string>(options.port), backlog)), clientAuthSelected(options.clientAuth) { + // Make sure that certificate store is good before listening to sockets + // to avoid having open and listening sockets when there is no cert store SecInvalidateHandle(&credHandle); // Get the certificate for this server. @@ -176,6 +183,23 @@ SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options, throw QPID_WINDOWS_ERROR(status); ::CertFreeCertificateContext(certContext); ::CertCloseStore(certStoreHandle, 0); + + // Listen to socket(s) + SocketAddress sa(host, port); + + // We must have at least one resolved address + QPID_LOG(info, "SSL Listening to: " << sa.asString()) + Socket* s = new Socket; + listeningPort = s->listen(sa, backlog); + listeners.push_back(s); + + // Try any other resolved addresses + while (sa.nextAddress()) { + QPID_LOG(info, "SSL Listening to: " << sa.asString()) + Socket* s = new Socket; + s->listen(sa, backlog); + listeners.push_back(s); + } } SslProtocolFactory::~SslProtocolFactory() { @@ -238,10 +262,12 @@ uint16_t SslProtocolFactory::getPort() const { void SslProtocolFactory::accept(sys::Poller::shared_ptr poller, sys::ConnectionCodec::Factory* fact) { - acceptor.reset( - AsynchAcceptor::create(listener, - boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); - acceptor->start(poller); + for (unsigned i = 0; i<listeners.size(); ++i) { + acceptors.push_back( + AsynchAcceptor::create(listeners[i], + boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false))); + acceptors[i].start(poller); + } } void SslProtocolFactory::connect(sys::Poller::shared_ptr poller, |