diff options
author | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-03-03 14:49:06 +0000 |
commit | 0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e (patch) | |
tree | 8fe7333962fbea735455340424657a540c6ef9a9 /qpid/cpp/src | |
parent | c8ad468141a96e5fdf4534552fe72e84399d5d5d (diff) | |
download | qpid-python-0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e.tar.gz |
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema.
The preview codepath (99-0) remains unaltered.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@633108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
24 files changed, 624 insertions, 194 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 48851085c6..080260be02 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -14,7 +14,7 @@ force: if GENERATE # AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac -amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml +amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/extra.xml $(top_srcdir)/xml/cluster.xml amqp_0_10_xml=@AMQP_FINAL_XML@ specs=$(amqp_99_0_xml) $(amqp_0_10_xml) @@ -130,6 +130,7 @@ libqpidcommon_la_SOURCES = \ qpid/framing/SendContent.cpp \ qpid/framing/SequenceNumber.cpp \ qpid/framing/SequenceNumberSet.cpp \ + qpid/framing/SequenceSet.cpp \ qpid/framing/Proxy.cpp \ qpid/framing/Uuid.cpp \ qpid/framing/AMQP_HighestVersion.h \ @@ -412,6 +413,7 @@ nobase_include_HEADERS = \ qpid/framing/SessionState.h \ qpid/framing/SendContent.h \ qpid/framing/SequenceNumber.h \ + qpid/framing/SequenceSet.h \ qpid/framing/SequenceNumberSet.h \ qpid/framing/SerializeHandler.h \ qpid/framing/StructHelper.h \ diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h index ef2c51bb8d..5237087dc8 100644 --- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h +++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h @@ -85,6 +85,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations ExecutionHandler* getExecutionHandler() { BADHANDLER(); } ConnectionHandler* getConnectionHandler() { BADHANDLER(); } SessionHandler* getSessionHandler() { BADHANDLER(); } + Connection010Handler* getConnection010Handler() { BADHANDLER(); } + Session010Handler* getSession010Handler() { BADHANDLER(); } #undef BADHANDLER private: diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index e296d52214..126e1b2723 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -23,6 +23,7 @@ #include "ConnectionHandler.h" #include "Connection.h" #include "qpid/framing/ConnectionStartBody.h" +#include "qpid/framing/Connection010StartBody.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" @@ -38,11 +39,14 @@ const std::string en_US = "en_US"; } void ConnectionHandler::init(const framing::ProtocolInitiation& header) { + //need to send out a protocol header back to the client + handler->connection.getOutput().initiated(header); + FieldTable properties; string mechanisms(PLAIN); string locales(en_US); - handler->serverMode = true; - handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); + handler->serverMode = true; + handler->client.start(properties, mechanisms, locales); } void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) @@ -55,7 +59,7 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) AMQMethodBody* method=frame.getBody()->getMethod(); try{ if (handler->serverMode) { - if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method)) + if (!invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method)) throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); } else { if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method)) diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 2a581d5675..44e2ce05fa 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -41,10 +41,10 @@ class Connection; // TODO aconway 2007-09-18: Rename to ConnectionHandler class ConnectionHandler : public framing::FrameHandler { - struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, + struct Handler : public framing::AMQP_ServerOperations::Connection010Handler, public framing::AMQP_ClientOperations::ConnectionHandler { - framing::AMQP_ClientProxy::Connection client; + framing::AMQP_ClientProxy::Connection010 client; framing::AMQP_ServerProxy::Connection server; Connection& connection; bool serverMode; @@ -55,6 +55,7 @@ class ConnectionHandler : public framing::FrameHandler const std::string& locale); void secureOk(const std::string& response); void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat); + void heartbeat() {} void open(const std::string& virtualHost, const std::string& capabilities, bool insist); void close(uint16_t replyCode, const std::string& replyText, diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 9b44f31e14..e012d693fb 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) ++end; } - for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1)); + for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1)); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c) } } -void SemanticState::acknowledged(const DeliveryRecord& delivery) +void SemanticState::adjustFlow(const DeliveryRecord& delivery) { delivery.subtractFrom(outstanding); ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { - get_pointer(i)->acknowledged(delivery); + get_pointer(i)->adjustFlow(delivery); } } -void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery) { if (windowing) { if (msgCredit != 0xFFFFFFFF) msgCredit++; @@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify() parent->outputTasks.activateOutput(); } + +void SemanticState::accepted(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + if (txBuffer.get()) { + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet + + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { + for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(range.start, range.end); + } +} + +void SemanticState::completed(DeliveryId first, DeliveryId last) +{ + AckRange range = findRange(first, last); + for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1)); + requestDispatch(); +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index cc9c0e1e9b..88a2fcab5c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains, void addMessageCredit(uint32_t value); void flush(); void stop(); - void acknowledged(const DeliveryRecord&); + void adjustFlow(const DeliveryRecord&); Queue::shared_ptr getQueue() { return queue; } bool isBlocked() const { return blocked; } @@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains, void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); - void acknowledged(const DeliveryRecord&); + void adjustFlow(const DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); void requestDispatch(ConsumerImpl&); @@ -171,8 +171,6 @@ class SemanticState : public framing::FrameHandler::Chains, void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ackCumulative(DeliveryId deliveryTag); - void ackRange(DeliveryId deliveryTag, DeliveryId endTag); void recover(bool requeue); void flow(bool active); DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); @@ -180,8 +178,15 @@ class SemanticState : public framing::FrameHandler::Chains, void release(DeliveryId first, DeliveryId last); void reject(DeliveryId first, DeliveryId last); void handle(intrusive_ptr<Message> msg); - bool doOutput() { return outputTasks.doOutput(); } + + //preview only (completed == ack): + void ackCumulative(DeliveryId deliveryTag); + void ackRange(DeliveryId deliveryTag, DeliveryId endTag); + + //final 0-10 spec (completed and accepted are distinct): + void completed(DeliveryId deliveryTag, DeliveryId endTag); + void accepted(DeliveryId deliveryTag, DeliveryId endTag); }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 0e3c9928d1..de96ae3f12 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -60,17 +60,10 @@ void SessionHandler::handleIn(AMQFrame& f) { AMQMethodBody* m = f.getBody()->getMethod(); try { if (!ignoring) { - if (m && - (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) || - invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) { + if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) { return; } else if (session.get()) { - boost::optional<SequenceNumber> ack=session->received(f); session->handle(f); - if (ack) - peerSession.ack(*ack, SequenceNumberSet()); - } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) { - return; } else { throw ChannelErrorException( QPID_MSG("Channel " << channel.get() << " is not open")); @@ -80,7 +73,8 @@ void SessionHandler::handleIn(AMQFrame& f) { ignoring=true; // Ignore trailing frames sent by client. session->detach(); session.reset(); - peerSession.closed(e.code, e.what()); + //TODO: implement new exception handling mechanism + //peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -92,7 +86,7 @@ void SessionHandler::handleIn(AMQFrame& f) { void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) - peerSession.solicitAck(); + peerSession.flush(false, false, true); } void SessionHandler::assertAttached(const char* method) const { @@ -111,136 +105,123 @@ void SessionHandler::assertClosed(const char* method) const { << " is already open.")); } -void SessionHandler::open(uint32_t detachedLifetime) { - assertClosed("open"); - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); - peerSession.attached(session->getId(), session->getTimeout()); +void SessionHandler::localSuspend() { + if (session.get() && session->isAttached()) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + session.reset(); + } } -void SessionHandler::resume(const Uuid& id) { - assertClosed("resume"); - session = connection.broker.getSessionManager().resume(id); - session->attach(*this); - SequenceNumber seq = session->resuming(); - peerSession.attached(session->getId(), session->getTimeout()); - proxy.getSession().ack(seq, SequenceNumberSet()); -} -void SessionHandler::flow(bool /*active*/) { - assertAttached("flow"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flow"); +ConnectionState& SessionHandler::getConnection() { return connection; } +const ConnectionState& SessionHandler::getConnection() const { return connection; } + +//new methods: +void SessionHandler::attach(const std::string& name, bool /*force*/) +{ + //TODO: need to revise session manager to support resume as well + assertClosed("attach"); + std::auto_ptr<SessionState> state( + connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); + peerSession.attached(name); } -void SessionHandler::flowOk(bool /*active*/) { - assertAttached("flowOk"); - // TODO aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException("session.flowOk"); +void SessionHandler::attached(const std::string& /*name*/) +{ + std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0)); + session.reset(state.release()); } -void SessionHandler::close() { - assertAttached("close"); - QPID_LOG(info, "Received session.close"); - ignoring=false; - session->detach(); - session.reset(); - peerSession.closed(REPLY_SUCCESS, "ok"); +void SessionHandler::detach(const std::string& name) +{ + assertAttached("detach"); + localSuspend(); + peerSession.detached(name, 0); assert(&connection.getChannel(channel.get()) == this); connection.closeChannel(channel.get()); } -void SessionHandler::closed(uint16_t replyCode, const string& replyText) { - QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText); +void SessionHandler::detached(const std::string& name, uint8_t code) +{ ignoring=false; session->detach(); session.reset(); -} - -void SessionHandler::localSuspend() { - if (session.get() && session->isAttached()) { - session->detach(); - connection.broker.getSessionManager().suspend(session); - session.reset(); + if (code) { + //no error + } else { + //error occured + QPID_LOG(warning, "Received session.closed: "<< name << " " << code); } } -void SessionHandler::suspend() { - assertAttached("suspend"); - localSuspend(); - peerSession.detached(); - assert(&connection.getChannel(channel.get()) == this); - connection.closeChannel(channel.get()); -} - -void SessionHandler::ack(uint32_t cumulativeSeenMark, - const SequenceNumberSet& /*seenFrameSet*/) +void SessionHandler::requestTimeout(uint32_t t) { - assertAttached("ack"); - if (session->getState() == SessionState::RESUMING) { - session->receivedAck(cumulativeSeenMark); - framing::SessionState::Replay replay=session->replay(); - std::for_each(replay.begin(), replay.end(), - boost::bind(&SessionHandler::handleOut, this, _1)); - } - else - session->receivedAck(cumulativeSeenMark); + session->setTimeout(t); + //proxy.timeout(t); } -void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - // TODO aconway 2007-10-02: may be removed from spec. - assert(0); throw NotImplementedException("session.high-water-mark"); +void SessionHandler::timeout(uint32_t) +{ + //not sure what we need to do on the server for this... } -void SessionHandler::solicitAck() { - assertAttached("solicit-ack"); - peerSession.ack(session->sendingAck(), SequenceNumberSet()); +void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) +{ + if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point"); + + session->next = id; } -void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime) +void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments) { - std::auto_ptr<SessionState> state( - connection.broker.getSessionManager().open(*this, detachedLifetime)); - session.reset(state.release()); + if (!commands.empty() || fragments.size()) { + throw NotImplementedException("Session resumption not yet supported"); + } } -void SessionHandler::detached() +void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/) { - connection.broker.getSessionManager().suspend(session); - session.reset(); + //don't really care too much about this yet } - -ConnectionState& SessionHandler::getConnection() { return connection; } -const ConnectionState& SessionHandler::getConnection() const { return connection; } - -void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) +void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply) { - assertAttached("complete"); - session->complete(cumulative, range); + session->complete(commands); + if (timelyReply) { + peerSession.knownCompleted(session->knownCompleted); + session->knownCompleted.clear(); + } } -void SessionHandler::flush() +void SessionHandler::knownCompleted(const framing::SequenceSet& commands) { - assertAttached("flush"); - session->flush(); + session->completed.remove(commands); } -void SessionHandler::sync() + +void SessionHandler::flush(bool expected, bool confirmed, bool completed) { - assertAttached("sync"); - session->sync(); + if (expected) { + peerSession.expected(SequenceSet(session->next), Array()); + } + if (confirmed) { + peerSession.confirmed(session->completed, Array()); + } + if (completed) { + peerSession.completed(session->completed, true); + } } -void SessionHandler::noop() + +void SessionHandler::sendCompletion() { - assertAttached("noop"); - session->noop(); + peerSession.completed(session->completed, true); } -void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/) +void SessionHandler::gap(const framing::SequenceSet& /*commands*/) { - //never actually sent by client at present + throw NotImplementedException("gap not yet supported"); } - + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index e6bc463a82..4b031f2951 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -27,8 +27,10 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/Array.h" #include "qpid/framing/ChannelHandler.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" #include <boost/noncopyable.hpp> @@ -44,9 +46,7 @@ class SessionState; * receives incoming frames, handles session controls and manages the * association between the channel and a session. */ -class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, - public framing::AMQP_ClientOperations::SessionHandler, - public framing::AMQP_ServerOperations::ExecutionHandler, +class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, public framing::FrameHandler::InOutHandler, private boost::noncopyable { @@ -69,35 +69,32 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, // Called by closing connection. void localSuspend(); void detach() { localSuspend(); } + void sendCompletion(); protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); private: - /// Session methods - void open(uint32_t detachedLifetime); - void flow(bool active); - void flowOk(bool active); - void close(); - void closed(uint16_t replyCode, const std::string& replyText); - void resume(const framing::Uuid& sessionId); - void suspend(); - void ack(uint32_t cumulativeSeenMark, - const framing::SequenceNumberSet& seenFrameSet); - void highWaterMark(uint32_t lastSentMark); - void solicitAck(); - - //extra methods required for assuming client role - void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); - void detached(); - - //Execution methods: - void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void result(uint32_t command, const std::string& data); - void sync(); + //new methods: + void attach(const std::string& name, bool force); + void attached(const std::string& name); + void detach(const std::string& name); + void detached(const std::string& name, uint8_t code); + + void requestTimeout(uint32_t t); + void timeout(uint32_t t); + + void commandPoint(const framing::SequenceNumber& id, uint64_t offset); + void expected(const framing::SequenceSet& commands, const framing::Array& fragments); + void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments); + void completed(const framing::SequenceSet& commands, bool timelyReply); + void knownCompleted(const framing::SequenceSet& commands); + void flush(bool expected, bool confirmed, bool completed); + void gap(const framing::SequenceSet& commands); + + //hacks for old generator: + void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); } void assertAttached(const char* method) const; void assertActive(const char* method) const; @@ -106,7 +103,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler, Connection& connection; framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; - framing::AMQP_ClientProxy::Session peerSession; + framing::AMQP_ClientProxy::Session010 peerSession; bool ignoring; std::auto_ptr<SessionState> session; }; diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 573a567da6..5f04136444 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -49,7 +49,7 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), - ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2)) + ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2)) { getConnection().outputTasks.addOutputTask(&semanticState); @@ -170,9 +170,9 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, void SessionState::handleCommand(framing::AMQMethodBody* method) { - SequenceNumber id = incoming.next(); + SequenceNumber id = next++; Invoker::Result invocation = invoke(adapter, *method); - incoming.complete(id); + completed.add(id); if (!invocation.wasHandled()) { throw NotImplementedException("Not implemented"); @@ -180,7 +180,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method) getProxy().getExecution().result(id.getValue(), invocation.getResult()); } if (method->isSync()) { - incoming.sync(id); sendCompletion(); } //TODO: if window gets too large send unsolicited completion @@ -190,7 +189,8 @@ void SessionState::handleContent(AMQFrame& frame) { intrusive_ptr<Message> msg(msgBuilder.getMessage()); if (!msg) {//start of frameset will be indicated by frame flags - msgBuilder.start(incoming.next()); + SequenceNumber id = next++; + msgBuilder.start(id); msg = msgBuilder.getMessage(); } msgBuilder.handle(frame); @@ -198,9 +198,9 @@ void SessionState::handleContent(AMQFrame& frame) msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); - incoming.track(msg); + //TODO: may want to hold up execution until async enqueue is complete + completed.add(msg->getCommandId()); if (msg->getFrames().getMethod()->isSync()) { - incoming.sync(msg->getCommandId()); sendCompletion(); } } @@ -208,6 +208,8 @@ void SessionState::handleContent(AMQFrame& frame) void SessionState::handle(AMQFrame& frame) { + received(frame); + //TODO: make command handling more uniform, regardless of whether //commands carry content. (For now, assume all single frame //assmblies are non-content bearing and all content-bearing @@ -229,38 +231,13 @@ DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr t void SessionState::sendCompletion() { - SequenceNumber mark = incoming.getMark(); - SequenceNumberSet range = incoming.getRange(); - getProxy().getExecution().complete(mark.getValue(), range); -} - -void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range) -{ - //record: - SequenceNumber mark(cumulative); - if (outgoing.lwm < mark) { - outgoing.lwm = mark; - //ack messages: - semanticState.ackCumulative(mark.getValue()); - } - range.processRanges(ackOp); -} - -void SessionState::flush() -{ - incoming.flush(); - sendCompletion(); -} - -void SessionState::sync() -{ - incoming.sync(); - sendCompletion(); + handler->sendCompletion(); } -void SessionState::noop() +void SessionState::complete(const SequenceSet& commands) { - incoming.noop(); + knownCompleted.add(commands); + commands.for_each(ackOp); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 98c21a8ab5..fa6bd14ef3 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -25,6 +25,7 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceSet.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Time.h" @@ -83,6 +84,8 @@ class SessionState : public framing::SessionState, ConnectionState& getConnection(); uint32_t getTimeout() const { return timeout; } + void setTimeout(uint32_t t) { timeout = t; } + Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } @@ -93,10 +96,7 @@ class SessionState : public framing::SessionState, void handleCommand(framing::AMQMethodBody* method); void handleContent(framing::AMQFrame& frame); - void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range); - void flush(); - void noop(); - void sync(); + void complete(const framing::SequenceSet& ranges); void sendCompletion(); //delivery adapter methods: @@ -114,6 +114,10 @@ class SessionState : public framing::SessionState, uint32_t ackInterval); + framing::SequenceSet completed; + framing::SequenceSet knownCompleted; + framing::SequenceNumber next; + private: typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; @@ -130,8 +134,6 @@ class SessionState : public framing::SessionState, BrokerAdapter adapter; MessageBuilder msgBuilder; - //execution state - IncomingExecutionContext incoming; framing::Window outgoing; RangedOperation ackOp; diff --git a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp index bf53bf0cd6..2d3ecf3f6a 100644 --- a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp +++ b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp @@ -83,7 +83,7 @@ void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){ void AccumulatedAck::consolidate(){} void AccumulatedAck::clear(){ - mark = 0;//not sure that this is valid when wraparound is a possibility + mark = SequenceNumber(0);//not sure that this is valid when wraparound is a possibility ranges.clear(); } diff --git a/qpid/cpp/src/qpid/framing/BodyHolder.cpp b/qpid/cpp/src/qpid/framing/BodyHolder.cpp index f66f29d36a..de971b5b28 100644 --- a/qpid/cpp/src/qpid/framing/BodyHolder.cpp +++ b/qpid/cpp/src/qpid/framing/BodyHolder.cpp @@ -48,6 +48,7 @@ void BodyHolder::encode(Buffer& b) const { void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) { switch(type) { + case 0://CONTROL case METHOD_BODY: { ClassId c = buffer.getOctet(); MethodId m = buffer.getOctet(); diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp index c0cd210042..60d67f1b07 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.cpp +++ b/qpid/cpp/src/qpid/framing/Buffer.cpp @@ -194,6 +194,13 @@ void Buffer::putShortString(const string& s){ position += len; } +void Buffer::putMediumString(const string& s){ + uint16_t len = s.length(); + putShort(len); + s.copy(data + position, len); + position += len; +} + void Buffer::putLongString(const string& s){ uint32_t len = s.length(); putLong(len); @@ -208,6 +215,13 @@ void Buffer::getShortString(string& s){ position += len; } +void Buffer::getMediumString(string& s){ + uint16_t len = getShort(); + checkAvailable(len); + s.assign(data + position, len); + position += len; +} + void Buffer::getLongString(string& s){ uint32_t len = getLong(); checkAvailable(len); diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h index 9c0d403462..585379b09a 100644 --- a/qpid/cpp/src/qpid/framing/Buffer.h +++ b/qpid/cpp/src/qpid/framing/Buffer.h @@ -97,8 +97,10 @@ class Buffer void putUInt(uint64_t); void putShortString(const string& s); + void putMediumString(const string& s); void putLongString(const string& s); void getShortString(string& s); + void getMediumString(string& s); void getLongString(string& s); void getBin128(uint8_t* b); diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp index 3172246cc2..1b62d296c6 100644 --- a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp @@ -51,9 +51,10 @@ const SequenceNumber SequenceNumber::operator++(int) return old; } -SequenceNumber SequenceNumber::operator+(uint32_t i) const +SequenceNumber& SequenceNumber::operator--() { - return SequenceNumber(value + i); + value = value - 1; + return *this; } bool SequenceNumber::operator<(const SequenceNumber& other) const diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.h b/qpid/cpp/src/qpid/framing/SequenceNumber.h index b2594452d0..0ed591b804 100644 --- a/qpid/cpp/src/qpid/framing/SequenceNumber.h +++ b/qpid/cpp/src/qpid/framing/SequenceNumber.h @@ -39,7 +39,7 @@ class SequenceNumber SequenceNumber& operator++();//prefix ++ const SequenceNumber operator++(int);//postfix ++ - SequenceNumber operator+(uint32_t) const; + SequenceNumber& operator--();//prefix ++ bool operator==(const SequenceNumber& other) const; bool operator!=(const SequenceNumber& other) const; bool operator<(const SequenceNumber& other) const; diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp new file mode 100644 index 0000000000..e3461e233b --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -0,0 +1,222 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SequenceSet.h" + +using namespace qpid::framing; +using std::max; +using std::min; + +namespace { +//each range contains 2 numbers, 4 bytes each +uint16_t RANGE_SIZE = 2 * 4; +} + +void SequenceSet::encode(Buffer& buffer) const +{ + buffer.putShort(ranges.size() * RANGE_SIZE); + for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + i->encode(buffer); + } +} + +void SequenceSet::decode(Buffer& buffer) +{ + uint16_t size = buffer.getShort(); + uint16_t count = size / RANGE_SIZE;//number of ranges + if (size % RANGE_SIZE) throw FrameErrorException(QPID_MSG("Invalid size for sequence set: " << size)); + + for (uint16_t i = 0; i < count; i++) { + add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong())); + } +} + +uint32_t SequenceSet::size() const +{ + return 2 /*size field*/ + (ranges.size() * RANGE_SIZE); +} + +bool SequenceSet::contains(const SequenceNumber& point) const +{ + for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + if (i->contains(point)) return true; + } + return false; +} + +void SequenceSet::add(const SequenceNumber& s) +{ + add(s, s); +} + +void SequenceSet::add(const SequenceNumber& start, const SequenceNumber& end) +{ + if (start > end) { + add(end, start); + } else { + Range r(start, end); + bool merged = false; + Ranges::iterator i = ranges.begin(); + while (i != ranges.end() && !merged && i->start < start) { + if (i->merge(r)) merged = true; + i++; + } + if (!merged) { + ranges.insert(i, r); + } + } +} + +void SequenceSet::add(const SequenceSet& set) +{ + for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) { + add(i->start, i->end); + } +} + +void SequenceSet::remove(const SequenceSet& set) +{ + for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) { + remove(i->start, i->end); + } +} + +void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& end) +{ + if (start > end) { + remove(end, start); + } else { + Ranges::iterator i = ranges.begin(); + while (i != ranges.end() && i->start < start) { + if (start <= i->end) { + if (end > i->end) { + //i.e. start is within the range pointed to by i, but end is not + i->end = (uint32_t)start - 1; + } else { + //whole of range to be deleted is contained within that pointed to be i + if (end == i->end) { + //just shrink range pointed to by i + i->end = (uint32_t)start - 1; + } else { + //need to split the range pointed to by i + Range r(i->start, (uint32_t)start - 1); + i->start = end + 1; + ranges.insert(i, r); + } + return;//no need to go any further + } + } + i++; + } + Ranges::iterator j = i; + while (j != ranges.end() && j->end < end) { + j++; + } + if (j->start <= end){ + j->start = end + 1; + } + ranges.erase(i, j); + } +} + +void SequenceSet::remove(const SequenceNumber& s) +{ + for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) { + if (i->start == s) { + if (i->start == i->end) { + ranges.erase(i); + } else { + ++(i->start); + } + } else if (i->end == s) { + --(i->end); + } else if (i->contains(s)) { + //need to split range pointed to by i + Range r(i->start, (uint32_t)s - 1); + i->start = s + 1; + ranges.insert(i, r); + } + } +} + +bool SequenceSet::empty() const +{ + return ranges.empty(); +} + +void SequenceSet::clear() +{ + return ranges.clear(); +} + +bool SequenceSet::Range::contains(SequenceNumber i) const +{ + return i >= start && i <= end; +} + +bool SequenceSet::Range::intersects(const Range& r) const +{ + return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); +} + +bool SequenceSet::Range::merge(const Range& r) +{ + if (intersects(r) || mergeable(r.end) || r.mergeable(end)) { + start = min(start, r.start); + end = max(end, r.end); + return true; + } else { + return false; + } +} + +bool SequenceSet::Range::mergeable(const SequenceNumber& s) const +{ + if (contains(s) || start - s == 1) { + return true; + } else { + return false; + } +} + +void SequenceSet::Range::encode(Buffer& buffer) const +{ + buffer.putLong(start); + buffer.putLong(end); +} + +SequenceSet::Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {} + +namespace qpid{ +namespace framing{ + +std::ostream& operator<<(std::ostream& out, const SequenceSet& set) { + out << "{"; + for (SequenceSet::Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) { + if (i != set.ranges.begin()) out << ", "; + out << i->start.getValue() << "-" << i->end.getValue(); + } + out << "}"; + return out; +} + +} +} diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.h b/qpid/cpp/src/qpid/framing/SequenceSet.h new file mode 100644 index 0000000000..2f34cb5cba --- /dev/null +++ b/qpid/cpp/src/qpid/framing/SequenceSet.h @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _framing_SequenceSet_h +#define _framing_SequenceSet_h + +#include <ostream> +#include <list> +#include "amqp_types.h" +#include "Buffer.h" +#include "SequenceNumber.h" +#include "qpid/framing/reply_exceptions.h" + +namespace qpid { +namespace framing { + +class SequenceSet +{ + struct Range + { + SequenceNumber start; + SequenceNumber end; + + Range(SequenceNumber s, SequenceNumber e); + bool contains(SequenceNumber i) const; + bool intersects(const Range& r) const; + bool merge(const Range& r); + bool mergeable(const SequenceNumber& r) const; + void encode(Buffer& buffer) const; + }; + + typedef std::list<Range> Ranges; + Ranges ranges; + +public: + SequenceSet() {} + SequenceSet(const SequenceNumber& s) { add(s); } + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + uint32_t size() const; + + bool contains(const SequenceNumber& s) const; + void add(const SequenceNumber& s); + void add(const SequenceNumber& start, const SequenceNumber& end); + void add(const SequenceSet& set); + void remove(const SequenceNumber& s); + void remove(const SequenceNumber& start, const SequenceNumber& end); + void remove(const SequenceSet& set); + + void clear(); + bool empty() const; + + template <class T> + void for_each(T& t) const + { + for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + t(i->start, i->end); + } + } + + friend std::ostream& operator<<(std::ostream&, const SequenceSet&); +}; + + +}} // namespace qpid::framing + + +#endif diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h index 94442aa357..943970cc56 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types.h +++ b/qpid/cpp/src/qpid/framing/amqp_types.h @@ -65,6 +65,7 @@ const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX); class FramingContent; class FieldTable; class SequenceNumberSet; +class SequenceSet; class Uuid; }} // namespace qpid::framing diff --git a/qpid/cpp/src/qpid/framing/amqp_types_full.h b/qpid/cpp/src/qpid/framing/amqp_types_full.h index f1ed44ec05..da7bdc876d 100644 --- a/qpid/cpp/src/qpid/framing/amqp_types_full.h +++ b/qpid/cpp/src/qpid/framing/amqp_types_full.h @@ -34,6 +34,7 @@ #include "FramingContent.h" #include "FieldTable.h" #include "SequenceNumberSet.h" +#include "SequenceSet.h" #include "Uuid.h" #endif /*!_framing_amqp_types_decl_h*/ diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 0586eb9d36..c24205f53e 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -94,7 +94,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { std::queue<framing::AMQFrame> frameQueue; Mutex frameQueueLock; bool frameQueueClosed; - bool initiated; + bool isInitiated; bool readError; std::string identifier; bool isClient; @@ -105,7 +105,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { AsynchIOHandler() : inputHandler(0), frameQueueClosed(false), - initiated(false), + isInitiated(false), readError(false), isClient(false) {} @@ -128,6 +128,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler { void send(framing::AMQFrame&); void close(); void activateOutput(); + void initiated(const framing::ProtocolInitiation&); + // Input side void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff); @@ -259,13 +261,18 @@ void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } +void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi) +{ + write(pi); +} + // Input side void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (readError) { return; } framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount); - if(initiated){ + if(isInitiated){ framing::AMQFrame frame; try{ while(frame.decode(in)) { @@ -282,7 +289,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if(protocolInit.decode(in)){ QPID_LOG(debug, "INIT [" << identifier << "]"); inputHandler->initiated(protocolInit); - initiated = true; + isInitiated = true; } } // TODO: unreading needs to go away, and when we can cope @@ -324,10 +331,10 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { } void AsynchIOHandler::idle(AsynchIO&){ - if (isClient && !initiated) { + if (isClient && !isInitiated) { //get & write protocol header from upper layers write(inputHandler->getInitiation()); - initiated = true; + isInitiated = true; return; } ScopedLock<Mutex> l(frameQueueLock); diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h index 5a60ae4998..13407d9b9d 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h +++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h @@ -22,6 +22,7 @@ #define _ConnectionOutputHandler_ #include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" #include "OutputControl.h" namespace qpid { @@ -30,7 +31,7 @@ namespace sys { /** * Provides the output handler associated with a connection. */ -class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl +class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler { public: virtual void close() = 0; diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index d25378a519..0baf1a2763 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -38,6 +38,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ InlineVector.cpp \ ISList.cpp IList.cpp \ ClientSessionTest.cpp \ + SequenceSet.cpp \ serialize.cpp \ ProxyTemplate.cpp apply.cpp # FIXME aconway 2008-02-20: removed RefCountedMap.cpp due to valgrind error. diff --git a/qpid/cpp/src/tests/SequenceSet.cpp b/qpid/cpp/src/tests/SequenceSet.cpp new file mode 100644 index 0000000000..bffeed648e --- /dev/null +++ b/qpid/cpp/src/tests/SequenceSet.cpp @@ -0,0 +1,93 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/framing/SequenceSet.h" +#include "unit_test.h" + +QPID_AUTO_TEST_SUITE(SequenceSetTestSuite) + +using namespace qpid::framing; + +BOOST_AUTO_TEST_CASE(testAdd) { + SequenceSet s; + s.add(2); + s.add(8,8); + s.add(3,5); + + for (uint32_t i = 0; i <= 1; i++) //0, 1 + BOOST_CHECK(!s.contains(i)); + + for (uint32_t i = 2; i <= 5; i++) //2, 3, 4 & 5 + BOOST_CHECK(s.contains(i)); + + for (uint32_t i = 0; i <= 1; i++) //6, 7 + BOOST_CHECK(!s.contains(i)); + + BOOST_CHECK(s.contains(8));//8 + + SequenceSet t; + t.add(6, 10); + t.add(s); + + for (uint32_t i = 0; i <= 1; i++) + BOOST_CHECK(!t.contains(i)); + + for (uint32_t i = 2; i <= 10; i++) + BOOST_CHECK(t.contains(i)); +} + +BOOST_AUTO_TEST_CASE(testRemove) { + SequenceSet s; + SequenceSet t; + s.add(0, 10); + t.add(0, 10); + + s.remove(7); + s.remove(3, 5); + s.remove(9, 10); + + t.remove(s); + + for (uint32_t i = 0; i <= 2; i++) { + BOOST_CHECK(s.contains(i)); + BOOST_CHECK(!t.contains(i)); + } + + for (uint32_t i = 3; i <= 5; i++) { + BOOST_CHECK(!s.contains(i)); + BOOST_CHECK(t.contains(i)); + } + + BOOST_CHECK(s.contains(6)); + BOOST_CHECK(!t.contains(6)); + + BOOST_CHECK(!s.contains(7)); + BOOST_CHECK(t.contains(7)); + + BOOST_CHECK(s.contains(8)); + BOOST_CHECK(!t.contains(8)); + + for (uint32_t i = 9; i <= 10; i++) { + BOOST_CHECK(!s.contains(i)); + BOOST_CHECK(t.contains(i)); + } +} + +QPID_AUTO_TEST_SUITE_END() + + |