summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp130
1 files changed, 49 insertions, 81 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 9a84db547c..5d96467bbf 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -29,7 +29,7 @@
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/SessionOutputException.h"
#include "qpid/broker/TxAccept.h"
-#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/SequenceSet.h"
@@ -65,9 +65,8 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
-SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
+SemanticState::SemanticState(SessionState& ss)
: session(ss),
- deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
@@ -89,7 +88,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
- recover(true);
+ requeue();
//now unsubscribe, which may trigger queue deletion and thus
//needs to occur after the requeueing of unacked messages
@@ -124,7 +123,7 @@ void SemanticState::consume(const string& tag,
resumeId, resumeTtl, arguments);
if (!c) // Create plain consumer
c = ConsumerImpl::shared_ptr(
- new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+ new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
@@ -281,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
bool ack,
- bool _acquire,
+ SubscriptionType type,
bool _exclusive,
const string& _tag,
const string& _resumeId,
@@ -289,11 +288,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const framing::FieldTable& _arguments
) :
- Consumer(_name, _acquire),
+Consumer(_name, type),
parent(_parent),
queue(_queue),
ackExpected(ack),
- acquire(_acquire),
+ acquire(type == CONSUMER),
blocked(true),
exclusive(_exclusive),
resumeId(_resumeId),
@@ -340,32 +339,42 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession()
return &(parent->session);
}
-bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+{
+ return deliver(cursor, msg, shared_from_this());
+}
+bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
assertClusterSafe();
- allocateCredit(msg.payload);
- DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(),
- shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0);
+ allocateCredit(msg);
+ DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(),
+ consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg));
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
if (sync) deliveryCount = 0;//reset
- parent->deliver(record, sync);
+ const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding());
+
+ record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(),
+ ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
+ acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED,
+ msg.getAnnotations(),
+ sync));
if (credit.isWindowMode() || ackExpected || !acquire) {
parent->record(record);
}
if (acquire && !ackExpected) { // auto acquire && auto accept
- msg.queue->dequeue(0, msg);
+ queue->dequeue(0 /*ctxt*/, cursor);
record.setEnded();
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
}
-bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>)
+bool SemanticState::ConsumerImpl::filter(const Message&)
{
return true;
}
-bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
+bool SemanticState::ConsumerImpl::accept(const Message& msg)
{
assertClusterSafe();
// TODO aconway 2009-06-08: if we have byte & message credit but
@@ -389,21 +398,21 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) {
}
}
-void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
+void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
{
assertClusterSafe();
Credit original = credit;
- credit.consume(1, msg->getRequiredCredit());
+ credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << original << " now " << credit);
}
-bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
+bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
{
- bool enoughCredit = credit.check(1, msg->getRequiredCredit());
+ bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
- << " credit for message of " << msg->getRequiredCredit() << " bytes: "
+ << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
<< credit);
return enoughCredit;
}
@@ -421,7 +430,6 @@ void SemanticState::disable(ConsumerImpl::shared_ptr c)
session.getConnection().outputTasks.removeOutputTask(c.get());
}
-
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
disable(c);
@@ -435,49 +443,20 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
c->cancel();
}
-void SemanticState::handle(intrusive_ptr<Message> msg) {
- if (txBuffer.get()) {
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- route(msg, *deliverable);
- txBuffer->enlist(op);
- } else {
- DeliverableMessage deliverable(msg);
- route(msg, deliverable);
- if (msg->isContentReleaseRequested()) {
- // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
- // presence of these messages). Do not change these without also checking these tests.
- if (msg->isContentReleaseBlocked()) {
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked");
- } else {
- msg->releaseContent();
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content released");
- }
- }
- }
-}
-
-namespace
+TxBuffer* SemanticState::getTxBuffer()
{
-const std::string nullstring;
+ return txBuffer.get();
}
-void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
- msg->computeExpiration(getSession().getBroker().getExpiryPolicy());
+void SemanticState::route(Message& msg, Deliverable& strategy) {
+ msg.computeExpiration(getSession().getBroker().getExpiryPolicy());
- std::string exchangeName = msg->getExchangeName();
- if (!cacheExchange || cacheExchange->getName() != exchangeName
- || cacheExchange->isDestroyed())
- {
+ std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName();
+ if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
- }
- cacheExchange->setProperties(msg);
/* verify the userid if specified: */
- std::string id =
- msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
+ std::string id = msg.getUserId();
if (authMsg && !id.empty() && !session.getConnection().isAuthenticatedUser(id))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
@@ -487,9 +466,9 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
AclModule* acl = getSession().getBroker().getAcl();
if (acl && acl->doTransferAcl())
{
- if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() ))
+ if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg.getRoutingKey() ))
throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " <<
- exchangeName << " with routing-key " << msg->getRoutingKey()));
+ exchangeName << " with routing-key " << msg.getRoutingKey()));
}
cacheExchange->route(strategy);
@@ -501,9 +480,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy);
}
- if (!strategy.delivered) {
- msg->destroy();
- }
}
}
@@ -543,28 +519,20 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
}
}
-void SemanticState::recover(bool requeue)
+void SemanticState::requeue()
{
- if(requeue){
- //take copy and clear unacked as requeue may result in redelivery to this session
- //which will in turn result in additions to unacked
- DeliveryRecords copy = unacked;
- unacked.clear();
- for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
- }else{
- for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
- //unconfirmed messages re redelivered and therefore have their
- //id adjusted, confirmed messages are not and so the ordering
- //w.r.t id is lost
- sort(unacked.begin(), unacked.end());
- }
+ //take copy and clear unacked as requeue may result in redelivery to this session
+ //which will in turn result in additions to unacked
+ DeliveryRecords copy = unacked;
+ unacked.clear();
+ for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
getSession().setUnackedCount(unacked.size());
}
-void SemanticState::deliver(DeliveryRecord& msg, bool sync)
-{
- return deliveryAdapter.deliver(msg, sync);
-}
+
+SessionContext& SemanticState::getSession() { return session; }
+const SessionContext& SemanticState::getSession() const { return session; }
+
const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{