summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp41
1 files changed, 27 insertions, 14 deletions
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index cc02d9ec94..88cdf7e03a 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -21,6 +21,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DeliveryRecord.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
@@ -28,6 +29,7 @@
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -55,9 +57,8 @@ SessionState::SessionState(
const SessionState::Configuration& config, bool delayManagement)
: qpid::SessionState(id, config),
broker(b), handler(&h),
- semanticState(*this, *this),
+ semanticState(*this),
adapter(semanticState),
- msgBuilder(&broker.getStore()),
mgmtObject(0),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
@@ -208,7 +209,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
if (frame.getBof() && frame.getBos()) //start of frameset
msgBuilder.start(id);
- intrusive_ptr<Message> msg(msgBuilder.getMessage());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
@@ -218,13 +219,16 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
header.setEof(false);
msg->getFrames().append(header);
}
+ DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
if (broker.isTimestamping())
- msg->setTimestamp();
- msg->setPublisher(&getConnection());
+ deliverable.getMessage().setTimestamp();
+ deliverable.getMessage().setPublisher(&getConnection());
+
+
+ IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().begin();
- semanticState.handle(msg);
+ semanticState.route(deliverable.getMessage(), deliverable);
msgBuilder.end();
- IncompleteIngressMsgXfer xfer(this, msg);
msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
}
@@ -294,18 +298,28 @@ void SessionState::handleOut(AMQFrame& frame) {
handler->out(frame);
}
-void SessionState::deliver(DeliveryRecord& msg, bool sync)
+DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+ const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+ qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
+ const qpid::types::Variant::Map& annotations, bool sync)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
assert(senderGetCommandPoint().offset == 0);
SequenceNumber commandId = senderGetCommandPoint().command;
- msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
+
+ framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
+ method.setEof(false);
+ getProxy().getHandler().handle(method);
+ message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations);
+ message.sendContent(getProxy().getHandler(), maxFrameSize);
+
assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
if (sync) {
AMQP_ClientProxy::Execution& p(getProxy().getExecution());
Proxy::ScopedSync s(p);
p.sync();
}
+ return commandId;
}
void SessionState::sendCompletion() {
@@ -349,7 +363,6 @@ void SessionState::addPendingExecutionSync()
}
}
-
/** factory for creating a reference-counted IncompleteIngressMsgXfer object
* which will be attached to a message that will be completed asynchronously.
*/
@@ -408,10 +421,10 @@ void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCom
/** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg)
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
- std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+ std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg);
bool unique = pendingMsgs.insert(item).second;
if (!unique) {
assert(false);
@@ -430,13 +443,13 @@ void SessionState::AsyncCommandCompleter::deletePendingMessage(SequenceNumber id
/** done when an execution.sync arrives */
void SessionState::AsyncCommandCompleter::flushPendingMessages()
{
- std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+ std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
pendingMsgs.swap(copy); // we've only tracked these in case a flush is needed, so nuke 'em now.
}
// drop lock, so it is safe to call "flush()"
- for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+ for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin();
i != copy.end(); ++i) {
i->second->flush();
}