summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-23 12:29:17 +0000
committerGordon Sim <gsim@apache.org>2007-07-23 12:29:17 +0000
commit0db1af31320aa010c8e97da80000f7548d889068 (patch)
treece2cd8dba8cf46b685dcb626b31e25c17702c1a0 /cpp/src/qpid/client/ClientChannel.cpp
parent747ac26509e78ac9aa9120be02cd446ac99d21cd (diff)
downloadqpid-python-0db1af31320aa010c8e97da80000f7548d889068.tar.gz
Added initial 'execution-layer' to try out methods form the 0-10 execution class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558700 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp53
1 files changed, 47 insertions, 6 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index ab6b9a41c3..816ff05e85 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -92,10 +92,10 @@ void Channel::protocolInit(
connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response)));
**/
- send(new ConnectionTuneOkBody(
+ sendCommand(make_shared_ptr(new ConnectionTuneOkBody(
version, proposal->getRequestId(),
proposal->getChannelMax(), connection->getMaxFrameSize(),
- proposal->getHeartbeat()));
+ proposal->getHeartbeat())));
uint16_t heartbeat = proposal->getHeartbeat();
connection->connector->setReadTimeout(heartbeat * 2);
@@ -104,7 +104,7 @@ void Channel::protocolInit(
// Send connection open.
std::string capabilities;
responses.expect();
- send(new ConnectionOpenBody(version, vhost, capabilities, true));
+ sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true)));
//receive connection.open-ok (or redirect, but ignore that for now
//esp. as using force=true).
AMQMethodBody::shared_ptr openResponse = responses.receive();
@@ -210,6 +210,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
+ case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break;
default: throw UnknownMethod();
}
}
@@ -223,7 +224,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
- send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
+ sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId())));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
@@ -241,6 +242,18 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) {
throw UnknownMethod();
}
+void Channel::handleExecution(AMQMethodBody::shared_ptr method) {
+ if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) {
+ Monitor::ScopedLock l(outgoingMonitor);
+ //record the completion mark:
+ outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark();
+ //TODO: notify anyone waiting for completion notification:
+ outgoingMonitor.notifyAll();
+ } else{
+ throw UnknownMethod();
+ }
+}
+
void Channel::handleHeader(AMQHeaderBody::shared_ptr body){
messaging->handle(body);
}
@@ -315,7 +328,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive(
AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m)
{
responses.expect();
- send(toSend);
+ sendCommand(toSend);
return responses.receive(c, m);
}
@@ -325,7 +338,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync(
if(sync)
return sendAndReceive(body, c, m);
else {
- send(body);
+ sendCommand(body);
return AMQMethodBody::shared_ptr();
}
}
@@ -362,3 +375,31 @@ void Channel::run() {
messaging->run();
}
+void Channel::sendCommand(AMQBody::shared_ptr body)
+{
+ ++(outgoing.hwm);
+ send(body);
+}
+
+bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout)
+{
+ AbsTime end;
+ if (timeout == 0) {
+ end = AbsTime::FarFuture();
+ } else {
+ end = AbsTime(AbsTime::now(), timeout);
+ }
+
+ Monitor::ScopedLock l(outgoingMonitor);
+ while (end > AbsTime::now() && outgoing.lwm < poi) {
+ outgoingMonitor.wait(end);
+ }
+ return !(outgoing.lwm < poi);
+}
+
+bool Channel::synchWithServer(Duration timeout)
+{
+ send(make_shared_ptr(new ExecutionFlushBody(version)));
+ return waitForCompletion(outgoing.hwm, timeout);
+}
+