diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-23 12:29:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-23 12:29:17 +0000 |
commit | 0db1af31320aa010c8e97da80000f7548d889068 (patch) | |
tree | ce2cd8dba8cf46b685dcb626b31e25c17702c1a0 /cpp/src/qpid/client/ClientChannel.cpp | |
parent | 747ac26509e78ac9aa9120be02cd446ac99d21cd (diff) | |
download | qpid-python-0db1af31320aa010c8e97da80000f7548d889068.tar.gz |
Added initial 'execution-layer' to try out methods form the 0-10 execution class.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@558700 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 53 |
1 files changed, 47 insertions, 6 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index ab6b9a41c3..816ff05e85 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -92,10 +92,10 @@ void Channel::protocolInit( connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); **/ - send(new ConnectionTuneOkBody( + sendCommand(make_shared_ptr(new ConnectionTuneOkBody( version, proposal->getRequestId(), proposal->getChannelMax(), connection->getMaxFrameSize(), - proposal->getHeartbeat())); + proposal->getHeartbeat()))); uint16_t heartbeat = proposal->getHeartbeat(); connection->connector->setReadTimeout(heartbeat * 2); @@ -104,7 +104,7 @@ void Channel::protocolInit( // Send connection open. std::string capabilities; responses.expect(); - send(new ConnectionOpenBody(version, vhost, capabilities, true)); + sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true))); //receive connection.open-ok (or redirect, but ignore that for now //esp. as using force=true). AMQMethodBody::shared_ptr openResponse = responses.receive(); @@ -210,6 +210,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt) case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break; case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; + case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break; default: throw UnknownMethod(); } } @@ -223,7 +224,7 @@ AMQMethodBody::shared_ptr method, const MethodContext& ctxt) void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) { switch (method->amqpMethodId()) { case ChannelCloseBody::METHOD_ID: - send(new ChannelCloseOkBody(version, ctxt.getRequestId())); + sendCommand(make_shared_ptr(new ChannelCloseOkBody(version, ctxt.getRequestId()))); peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); return; case ChannelFlowBody::METHOD_ID: @@ -241,6 +242,18 @@ void Channel::handleConnection(AMQMethodBody::shared_ptr method) { throw UnknownMethod(); } +void Channel::handleExecution(AMQMethodBody::shared_ptr method) { + if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) { + Monitor::ScopedLock l(outgoingMonitor); + //record the completion mark: + outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark(); + //TODO: notify anyone waiting for completion notification: + outgoingMonitor.notifyAll(); + } else{ + throw UnknownMethod(); + } +} + void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ messaging->handle(body); } @@ -315,7 +328,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceive( AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m) { responses.expect(); - send(toSend); + sendCommand(toSend); return responses.receive(c, m); } @@ -325,7 +338,7 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( if(sync) return sendAndReceive(body, c, m); else { - send(body); + sendCommand(body); return AMQMethodBody::shared_ptr(); } } @@ -362,3 +375,31 @@ void Channel::run() { messaging->run(); } +void Channel::sendCommand(AMQBody::shared_ptr body) +{ + ++(outgoing.hwm); + send(body); +} + +bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout) +{ + AbsTime end; + if (timeout == 0) { + end = AbsTime::FarFuture(); + } else { + end = AbsTime(AbsTime::now(), timeout); + } + + Monitor::ScopedLock l(outgoingMonitor); + while (end > AbsTime::now() && outgoing.lwm < poi) { + outgoingMonitor.wait(end); + } + return !(outgoing.lwm < poi); +} + +bool Channel::synchWithServer(Duration timeout) +{ + send(make_shared_ptr(new ExecutionFlushBody(version))); + return waitForCompletion(outgoing.hwm, timeout); +} + |