summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp167
1 files changed, 113 insertions, 54 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 5fc9a1a932..751b3ff709 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -20,6 +20,8 @@
*/
#include "qpid/broker/SessionState.h"
+
+#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DtxAck.h"
@@ -35,12 +37,14 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
-#include "qpid/sys/ClusterSafe.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/FedOps.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <iostream>
#include <sstream>
@@ -49,6 +53,11 @@
#include <assert.h>
+namespace {
+const std::string X_SCOPE("x-scope");
+const std::string SESSION("session");
+}
+
namespace qpid {
namespace broker {
@@ -88,6 +97,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
+ unbindSessionBindings();
requeue();
//now unsubscribe, which may trigger queue deletion and thus
@@ -277,7 +287,7 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
bool ack,
@@ -303,7 +313,7 @@ Consumer(_name, type),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
deliveryCount(0),
- mgmtObject(0)
+ protocols(parent->getSession().getBroker().getProtocolRegistry())
{
if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
{
@@ -312,20 +322,20 @@ Consumer(_name, type),
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
- !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
+ mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
+ !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
}
}
}
-ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const
+ManagementObject::shared_ptr SemanticStateConsumerImpl::GetManagementObject (void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
-Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
+Manageable::status_t SemanticStateConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -335,24 +345,23 @@ Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t met
}
-OwnershipToken* SemanticState::ConsumerImpl::getSession()
+OwnershipToken* SemanticStateConsumerImpl::getSession()
{
return &(parent->session);
}
-bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
{
return deliver(cursor, msg, shared_from_this());
}
-bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
+bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
- assertClusterSafe();
allocateCredit(msg);
+ boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
- consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+ consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit());
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
- const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
@@ -370,27 +379,26 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa
return true;
}
-bool SemanticState::ConsumerImpl::filter(const Message&)
+bool SemanticStateConsumerImpl::filter(const Message&)
{
return true;
}
-bool SemanticState::ConsumerImpl::accept(const Message& msg)
+bool SemanticStateConsumerImpl::accept(const Message& msg)
{
- assertClusterSafe();
// TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
//
- blocked = !(filter(msg) && checkCredit(msg));
+ blocked = !checkCredit(msg);
return !blocked;
}
namespace {
struct ConsumerName {
- const SemanticState::ConsumerImpl& consumer;
- ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+ const SemanticStateConsumerImpl& consumer;
+ ConsumerName(const SemanticStateConsumerImpl& ci) : consumer(ci) {}
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
@@ -399,26 +407,27 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) {
}
}
-void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
+void SemanticStateConsumerImpl::allocateCredit(const Message& msg)
{
- assertClusterSafe();
Credit original = credit;
- credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+ boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
+ credit.consume(1, transfer->getRequiredCredit());
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << original << " now " << credit);
}
-bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
+bool SemanticStateConsumerImpl::checkCredit(const Message& msg)
{
- bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+ boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
+ bool enoughCredit = credit.check(1, transfer->getRequiredCredit());
QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
- << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
+ << " credit for message of " << transfer->getRequiredCredit() << " bytes: "
<< credit);
return enoughCredit;
}
-SemanticState::ConsumerImpl::~ConsumerImpl()
+SemanticStateConsumerImpl::~SemanticStateConsumerImpl()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -437,9 +446,9 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
- if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+ // Only run auto-delete for counted consumers.
+ if (c->isCounted() && queue->canAutoDelete() && !queue->hasExclusiveOwner())
Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID);
- }
}
c->cancel();
}
@@ -491,9 +500,8 @@ void SemanticState::requestDispatch()
i->second->requestDispatch();
}
-void SemanticState::ConsumerImpl::requestDispatch()
+void SemanticStateConsumerImpl::requestDispatch()
{
- assertClusterSafe();
if (blocked) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -510,7 +518,7 @@ bool SemanticState::complete(DeliveryRecord& delivery)
return delivery.isRedundant();
}
-void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
+void SemanticStateConsumerImpl::complete(DeliveryRecord& delivery)
{
if (!delivery.isComplete()) {
delivery.complete();
@@ -535,7 +543,7 @@ SessionContext& SemanticState::getSession() { return session; }
const SessionContext& SemanticState::getSession() const { return session; }
-const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
+const SemanticStateConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
ConsumerImpl::shared_ptr consumer;
if (!find(destination, consumer)) {
@@ -592,37 +600,33 @@ void SemanticState::stop(const std::string& destination)
find(destination)->stop();
}
-void SemanticState::ConsumerImpl::setWindowMode()
+void SemanticStateConsumerImpl::setWindowMode()
{
- assertClusterSafe();
credit.setWindowMode(true);
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
}
}
-void SemanticState::ConsumerImpl::setCreditMode()
+void SemanticStateConsumerImpl::setCreditMode()
{
- assertClusterSafe();
credit.setWindowMode(false);
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
}
}
-void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
+void SemanticStateConsumerImpl::addByteCredit(uint32_t value)
{
- assertClusterSafe();
credit.addByteCredit(value);
}
-void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
+void SemanticStateConsumerImpl::addMessageCredit(uint32_t value)
{
- assertClusterSafe();
credit.addMessageCredit(value);
}
-bool SemanticState::ConsumerImpl::haveCredit()
+bool SemanticStateConsumerImpl::haveCredit()
{
if (credit) {
return true;
@@ -632,21 +636,20 @@ bool SemanticState::ConsumerImpl::haveCredit()
}
}
-bool SemanticState::ConsumerImpl::doDispatch()
+bool SemanticStateConsumerImpl::doDispatch()
{
return queue->dispatch(shared_from_this());
}
-void SemanticState::ConsumerImpl::flush()
+void SemanticStateConsumerImpl::flush()
{
while(haveCredit() && doDispatch())
;
credit.cancel();
}
-void SemanticState::ConsumerImpl::stop()
+void SemanticStateConsumerImpl::stop()
{
- assertClusterSafe();
credit.cancel();
}
@@ -700,7 +703,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
getSession().setUnackedCount(unacked.size());
}
-bool SemanticState::ConsumerImpl::doOutput()
+bool SemanticStateConsumerImpl::doOutput()
{
try {
return haveCredit() && doDispatch();
@@ -709,28 +712,26 @@ bool SemanticState::ConsumerImpl::doOutput()
}
}
-void SemanticState::ConsumerImpl::enableNotify()
+void SemanticStateConsumerImpl::enableNotify()
{
Mutex::ScopedLock l(lock);
- assertClusterSafe();
notifyEnabled = true;
}
-void SemanticState::ConsumerImpl::disableNotify()
+void SemanticStateConsumerImpl::disableNotify()
{
Mutex::ScopedLock l(lock);
notifyEnabled = false;
}
-bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
+bool SemanticStateConsumerImpl::isNotifyEnabled() const {
Mutex::ScopedLock l(lock);
return notifyEnabled;
}
-void SemanticState::ConsumerImpl::notify()
+void SemanticStateConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
- assertClusterSafe();
if (notifyEnabled) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -755,7 +756,6 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) {
}
void SemanticState::accepted(const SequenceSet& commands) {
- assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
@@ -815,4 +815,63 @@ void SemanticState::detached()
}
}
+void SemanticState::addBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey, const framing::FieldTable& arguments)
+{
+ QPID_LOG (debug, "SemanticState::addBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey << ", "
+ << "args=" << arguments << "]");
+ std::string fedOp = arguments.getAsString(qpidFedOp);
+ if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) {
+ fedOp = fedOpBind;
+ }
+ std::string fedOrigin = arguments.getAsString(qpidFedOrigin);
+ if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) {
+ bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+ else if (fedOp == fedOpUnbind) {
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+}
+
+void SemanticState::removeBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey)
+{
+ QPID_LOG (debug, "SemanticState::removeBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey)
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, ""));
+}
+
+void SemanticState::unbindSessionBindings()
+{
+ //unbind session-scoped bindings
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ QPID_LOG (debug, "SemanticState::unbindSessionBindings ["
+ << "queue=" << i->get<0>() << ", "
+ << "exchange=" << i->get<1>()<< ", "
+ << "key=" << i->get<2>() << ", "
+ << "fedOrigin=" << i->get<3>() << "]");
+ try {
+ std::string fedOrigin = i->get<3>();
+ if (!fedOrigin.empty()) {
+ framing::FieldTable fedArguments;
+ fedArguments.setString(qpidFedOp, fedOpUnbind);
+ fedArguments.setString(qpidFedOrigin, fedOrigin);
+ session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments,
+ userID, connectionId);
+ } else {
+ session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(),
+ userID, connectionId);
+ }
+ }
+ catch (...) {
+ }
+ }
+ bindings.clear();
+}
+
}} // namespace qpid::broker