diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 41 |
1 files changed, 27 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index cc02d9ec94..88cdf7e03a 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -21,6 +21,7 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/Broker.h" #include "qpid/broker/ConnectionState.h" +#include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/DeliveryRecord.h" #include "qpid/broker/SessionManager.h" #include "qpid/broker/SessionHandler.h" @@ -28,6 +29,7 @@ #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" @@ -55,9 +57,8 @@ SessionState::SessionState( const SessionState::Configuration& config, bool delayManagement) : qpid::SessionState(id, config), broker(b), handler(&h), - semanticState(*this, *this), + semanticState(*this), adapter(semanticState), - msgBuilder(&broker.getStore()), mgmtObject(0), asyncCommandCompleter(new AsyncCommandCompleter(this)) { @@ -208,7 +209,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { if (frame.getBof() && frame.getBos()) //start of frameset msgBuilder.start(id); - intrusive_ptr<Message> msg(msgBuilder.getMessage()); + intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage()); msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset if (frame.getBof()) { @@ -218,13 +219,16 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) header.setEof(false); msg->getFrames().append(header); } + DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer()); if (broker.isTimestamping()) - msg->setTimestamp(); - msg->setPublisher(&getConnection()); + deliverable.getMessage().setTimestamp(); + deliverable.getMessage().setPublisher(&getConnection()); + + + IncompleteIngressMsgXfer xfer(this, msg); msg->getIngressCompletion().begin(); - semanticState.handle(msg); + semanticState.route(deliverable.getMessage(), deliverable); msgBuilder.end(); - IncompleteIngressMsgXfer xfer(this, msg); msg->getIngressCompletion().end(xfer); // allows msg to complete xfer } } @@ -294,18 +298,28 @@ void SessionState::handleOut(AMQFrame& frame) { handler->out(frame); } -void SessionState::deliver(DeliveryRecord& msg, bool sync) +DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message, + const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp, + qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode, + const qpid::types::Variant::Map& annotations, bool sync) { uint32_t maxFrameSize = getConnection().getFrameMax(); assert(senderGetCommandPoint().offset == 0); SequenceNumber commandId = senderGetCommandPoint().command; - msg.deliver(getProxy().getHandler(), commandId, maxFrameSize); + + framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode))); + method.setEof(false); + getProxy().getHandler().handle(method); + message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations); + message.sendContent(getProxy().getHandler(), maxFrameSize); + assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint. if (sync) { AMQP_ClientProxy::Execution& p(getProxy().getExecution()); Proxy::ScopedSync s(p); p.sync(); } + return commandId; } void SessionState::sendCompletion() { @@ -349,7 +363,6 @@ void SessionState::addPendingExecutionSync() } } - /** factory for creating a reference-counted IncompleteIngressMsgXfer object * which will be attached to a message that will be completed asynchronously. */ @@ -408,10 +421,10 @@ void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCom /** Track an ingress message that is pending completion */ -void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg) +void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); - std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg); + std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg); bool unique = pendingMsgs.insert(item).second; if (!unique) { assert(false); @@ -430,13 +443,13 @@ void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id /** done when an execution.sync arrives */ void SessionState::AsyncCommandCompleter::flushPendingMessages() { - std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy; + std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy; { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now. } // drop lock, so it is safe to call "flush()" - for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin(); + for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin(); i != copy.end(); ++i) { i->second->flush(); } |