summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h1
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp25
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h19
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp12
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp45
-rw-r--r--cpp/src/qpid/broker/SessionState.h6
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp5
-rwxr-xr-xcpp/src/tests/python_tests2
9 files changed, 88 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 6e92f89706..3314ec6be3 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -83,6 +83,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Execution010Handler* getExecution010Handler() { throw framing::NotImplementedException("Class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 15c29ed482..5ab0dd84a9 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -37,7 +37,8 @@ SessionAdapter::SessionAdapter(SemanticState& s) :
HandlerImpl(s),
exchangeImpl(s),
queueImpl(s),
- messageImpl(s)
+ messageImpl(s),
+ executionImpl(s)
{}
@@ -377,6 +378,28 @@ void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers)
}
*/
+
+void SessionAdapter::ExecutionHandlerImpl::sync()
+{
+ //TODO
+}
+
+void SessionAdapter::ExecutionHandlerImpl::result(uint32_t /*commandId*/, const string& /*value*/)
+{
+ //TODO
+}
+
+void SessionAdapter::ExecutionHandlerImpl::exception(uint16_t /*errorCode*/,
+ uint32_t /*commandId*/,
+ uint8_t /*classCode*/,
+ uint8_t /*commandCode*/,
+ uint8_t /*fieldIndex*/,
+ const std::string& /*description*/,
+ const framing::FieldTable& /*errorInfo*/)
+{
+ //TODO
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index 0dd3529359..c2d61392d7 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -55,6 +55,7 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
Message010Handler* getMessage010Handler(){ return &messageImpl; }
Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
Queue010Handler* getQueue010Handler(){ return &queueImpl; }
+ Execution010Handler* getExecution010Handler(){ return &executionImpl; }
BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
@@ -172,9 +173,27 @@ class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
};
+ class ExecutionHandlerImpl : public Execution010Handler, public HandlerImpl
+ {
+ public:
+ ExecutionHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+
+ void sync();
+ void result(uint32_t commandId, const string& value);
+ void exception(uint16_t errorCode,
+ uint32_t commandId,
+ uint8_t classCode,
+ uint8_t commandCode,
+ uint8_t fieldIndex,
+ const std::string& description,
+ const framing::FieldTable& errorInfo);
+
+ };
+
ExchangeHandlerImpl exchangeImpl;
QueueHandlerImpl queueImpl;
MessageHandlerImpl messageImpl;
+ ExecutionHandlerImpl executionImpl;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 919a3e6ee8..3baa3a89a7 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -69,12 +69,6 @@ void SessionHandler::handleIn(AMQFrame& f) {
QPID_MSG("Channel " << channel.get() << " is not open"));
}
}
- } catch(const ChannelException& e) {
- ignoring=true; // Ignore trailing frames sent by client.
- session->detach();
- session.reset();
- //TODO: implement new exception handling mechanism
- //peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -83,6 +77,12 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
+void SessionHandler::destroy() {
+ ignoring=true; // Ignore trailing frames sent by client.
+ session->detach();
+ session.reset();
+}
+
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 4b031f2951..fa013a1c15 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -70,6 +70,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
void localSuspend();
void detach() { localSuspend(); }
void sendCompletion();
+ void destroy();
protected:
void handleIn(framing::AMQFrame&);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 571b8848ae..ceaa70db18 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -168,16 +168,16 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
return status;
}
-void SessionState::handleCommand(framing::AMQMethodBody* method)
+void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& id)
{
- SequenceNumber id = nextIn++;
+ id = nextIn++;
Invoker::Result invocation = invoke(adapter, *method);
completed.add(id);
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
} else if (invocation.hasResult()) {
- getProxy().getExecution().result(id.getValue(), invocation.getResult());
+ getProxy().getExecution010().result(id, invocation.getResult());
}
if (method->isSync()) {
sendCompletion();
@@ -185,16 +185,18 @@ void SessionState::handleCommand(framing::AMQMethodBody* method)
//TODO: if window gets too large send unsolicited completion
}
-void SessionState::handleContent(AMQFrame& frame)
+void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
- if (!msg) {//start of frameset will be indicated by frame flags
- SequenceNumber id = nextIn++;
+ if (frame.getBof() && frame.getBos()) {//start of frameset
+ id = nextIn++;
msgBuilder.start(id);
msg = msgBuilder.getMessage();
+ } else {
+ id = msg->getCommandId();
}
msgBuilder.handle(frame);
- if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
+ if (frame.getEof() && frame.getEos()) {//end of frameset
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
@@ -210,16 +212,27 @@ void SessionState::handle(AMQFrame& frame)
{
received(frame);
- //TODO: make command handling more uniform, regardless of whether
- //commands carry content. (For now, assume all single frame
- //assmblies are non-content bearing and all content-bearing
- //assmeblies will have more than one frame):
- if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod());
- } else {
- handleContent(frame);
+ SequenceNumber commandId;
+ try {
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content. (For now, assume all single frame
+ //assemblies are non-content bearing and all content-bearing
+ //assemblies will have more than one frame):
+ if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ handleContent(frame, commandId);
+ }
+ } catch(const ChannelException& e) {
+ //TODO: better implementation of new exception handling mechanism
+ AMQMethodBody* m = frame.getMethod();
+ if (m) {
+ getProxy().getExecution010().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
+ } else {
+ getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
+ }
+ handler->destroy();
}
-
}
DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr token)
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index c936edee21..ecf0b41a7a 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -93,8 +93,6 @@ class SessionState : public framing::SessionState,
void activateOutput();
void handle(framing::AMQFrame& frame);
- void handleCommand(framing::AMQMethodBody* method);
- void handleContent(framing::AMQFrame& frame);
void complete(const framing::SequenceSet& ranges);
void sendCompletion();
@@ -138,8 +136,10 @@ class SessionState : public framing::SessionState,
RangedOperation ackOp;
management::Session::shared_ptr mgmtObject;
+ void handleCommand(framing::AMQMethodBody* method, framing::SequenceNumber& id);
+ void handleContent(framing::AMQFrame& frame, framing::SequenceNumber& id);
- friend class SessionManager;
+ friend class SessionManager;
};
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 3750f4a9a8..eeb658600d 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -49,12 +49,15 @@ uint32_t AMQFrame::frameOverhead() {
void AMQFrame::encode(Buffer& buffer) const
{
+ //set track first (controls on track 0, everything else on 1):
+ uint8_t track = getBody()->type() ? 1 : 0;
+
uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0);
buffer.putOctet(flags);
buffer.putOctet(getBody()->type());
buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself)
buffer.putOctet(0);
- buffer.putOctet(0x0f & subchannel);
+ buffer.putOctet(0x0f & track);
buffer.putShort(channel);
buffer.putLong(0);
body->encode(buffer);
diff --git a/cpp/src/tests/python_tests b/cpp/src/tests/python_tests
index d9754ed0fb..73b3b970c1 100755
--- a/cpp/src/tests/python_tests
+++ b/cpp/src/tests/python_tests
@@ -1,7 +1,7 @@
#!/bin/sh
# Run the python tests.
if test -d ../../../python ; then
- cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS
+ cd ../../../python && ./run-tests -v -s ../specs/amqp.0-10-preview.xml -I cpp_failing_0-10_preview.txt -b localhost:$QPID_PORT $PYTHON_TESTS && ./run-tests --skip-self-test -v -s "0-10" -I cpp_failing_0-10.txt -b localhost:$QPID_PORT $PYTHON_TESTS
else
echo Warning: python tests not found.
fi