summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-03 14:49:06 +0000
committerGordon Sim <gsim@apache.org>2008-03-03 14:49:06 +0000
commit0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e (patch)
tree8fe7333962fbea735455340424657a540c6ef9a9 /qpid/cpp/src
parentc8ad468141a96e5fdf4534552fe72e84399d5d5d (diff)
downloadqpid-python-0fb2f5356f1ea96ea0f3ccbc3de54cbd556fc57e.tar.gz
A further step to final 0-10 spec.
The extra.xml fragment adds class defs for connection in session that are in line with latest spec but use old schema. The preview codepath (99-0) remains unaltered. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@633108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/Makefile.am4
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerAdapter.h2
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h5
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp37
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h15
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp179
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h51
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp49
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h14
-rw-r--r--qpid/cpp/src/qpid/framing/AccumulatedAck.cpp2
-rw-r--r--qpid/cpp/src/qpid/framing/BodyHolder.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.cpp14
-rw-r--r--qpid/cpp/src/qpid/framing/Buffer.h2
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumber.cpp5
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceNumber.h2
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceSet.cpp222
-rw-r--r--qpid/cpp/src/qpid/framing/SequenceSet.h86
-rw-r--r--qpid/cpp/src/qpid/framing/amqp_types.h1
-rw-r--r--qpid/cpp/src/qpid/framing/amqp_types_full.h1
-rw-r--r--qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp19
-rw-r--r--qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h3
-rw-r--r--qpid/cpp/src/tests/Makefile.am1
-rw-r--r--qpid/cpp/src/tests/SequenceSet.cpp93
24 files changed, 624 insertions, 194 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 48851085c6..080260be02 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -14,7 +14,7 @@ force:
if GENERATE
# AMQP_PREVIEW_XML and AMQP_FINAL_XML are defined in ../configure.ac
-amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/cluster.xml
+amqp_99_0_xml=@AMQP_PREVIEW_XML@ $(top_srcdir)/xml/extra.xml $(top_srcdir)/xml/cluster.xml
amqp_0_10_xml=@AMQP_FINAL_XML@
specs=$(amqp_99_0_xml) $(amqp_0_10_xml)
@@ -130,6 +130,7 @@ libqpidcommon_la_SOURCES = \
qpid/framing/SendContent.cpp \
qpid/framing/SequenceNumber.cpp \
qpid/framing/SequenceNumberSet.cpp \
+ qpid/framing/SequenceSet.cpp \
qpid/framing/Proxy.cpp \
qpid/framing/Uuid.cpp \
qpid/framing/AMQP_HighestVersion.h \
@@ -412,6 +413,7 @@ nobase_include_HEADERS = \
qpid/framing/SessionState.h \
qpid/framing/SendContent.h \
qpid/framing/SequenceNumber.h \
+ qpid/framing/SequenceSet.h \
qpid/framing/SequenceNumberSet.h \
qpid/framing/SerializeHandler.h \
qpid/framing/StructHelper.h \
diff --git a/qpid/cpp/src/qpid/broker/BrokerAdapter.h b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
index ef2c51bb8d..5237087dc8 100644
--- a/qpid/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/qpid/cpp/src/qpid/broker/BrokerAdapter.h
@@ -85,6 +85,8 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
SessionHandler* getSessionHandler() { BADHANDLER(); }
+ Connection010Handler* getConnection010Handler() { BADHANDLER(); }
+ Session010Handler* getSession010Handler() { BADHANDLER(); }
#undef BADHANDLER
private:
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index e296d52214..126e1b2723 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -23,6 +23,7 @@
#include "ConnectionHandler.h"
#include "Connection.h"
#include "qpid/framing/ConnectionStartBody.h"
+#include "qpid/framing/Connection010StartBody.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
@@ -38,11 +39,14 @@ const std::string en_US = "en_US";
}
void ConnectionHandler::init(const framing::ProtocolInitiation& header) {
+ //need to send out a protocol header back to the client
+ handler->connection.getOutput().initiated(header);
+
FieldTable properties;
string mechanisms(PLAIN);
string locales(en_US);
- handler->serverMode = true;
- handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
+ handler->serverMode = true;
+ handler->client.start(properties, mechanisms, locales);
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
@@ -55,7 +59,7 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
AMQMethodBody* method=frame.getBody()->getMethod();
try{
if (handler->serverMode) {
- if (!invoke(static_cast<AMQP_ServerOperations::ConnectionHandler&>(*handler.get()), *method))
+ if (!invoke(static_cast<AMQP_ServerOperations::Connection010Handler&>(*handler.get()), *method))
throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
} else {
if (!invoke(static_cast<AMQP_ClientOperations::ConnectionHandler&>(*handler.get()), *method))
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index 2a581d5675..44e2ce05fa 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -41,10 +41,10 @@ class Connection;
// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
- struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler,
+ struct Handler : public framing::AMQP_ServerOperations::Connection010Handler,
public framing::AMQP_ClientOperations::ConnectionHandler
{
- framing::AMQP_ClientProxy::Connection client;
+ framing::AMQP_ClientProxy::Connection010 client;
framing::AMQP_ServerProxy::Connection server;
Connection& connection;
bool serverMode;
@@ -55,6 +55,7 @@ class ConnectionHandler : public framing::FrameHandler
const std::string& locale);
void secureOk(const std::string& response);
void tuneOk(uint16_t channelMax, uint32_t frameMax, uint16_t heartbeat);
+ void heartbeat() {}
void open(const std::string& virtualHost,
const std::string& capabilities, bool insist);
void close(uint16_t replyCode, const std::string& replyText,
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 9b44f31e14..e012d693fb 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -387,7 +387,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
++end;
}
- for_each(start, end, boost::bind(&SemanticState::acknowledged, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -427,16 +427,16 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
}
}
-void SemanticState::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::adjustFlow(const DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->acknowledged(delivery);
+ get_pointer(i)->adjustFlow(delivery);
}
}
-void SemanticState::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
{
if (windowing) {
if (msgCredit != 0xFFFFFFFF) msgCredit++;
@@ -639,4 +639,33 @@ void SemanticState::ConsumerImpl::notify()
parent->outputTasks.activateOutput();
}
+
+void SemanticState::accepted(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ if (txBuffer.get()) {
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
+
+ if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ }
+ } else {
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(range.start, range.end);
+ }
+}
+
+void SemanticState::completed(DeliveryId first, DeliveryId last)
+{
+ AckRange range = findRange(first, last);
+ for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ requestDispatch();
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index cc9c0e1e9b..88a2fcab5c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void acknowledged(const DeliveryRecord&);
+ void adjustFlow(const DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
@@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
- void acknowledged(const DeliveryRecord&);
+ void adjustFlow(const DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);
@@ -171,8 +171,6 @@ class SemanticState : public framing::FrameHandler::Chains,
void endDtx(const std::string& xid, bool fail);
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
- void ackCumulative(DeliveryId deliveryTag);
- void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
void recover(bool requeue);
void flow(bool active);
DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
@@ -180,8 +178,15 @@ class SemanticState : public framing::FrameHandler::Chains,
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
void handle(intrusive_ptr<Message> msg);
-
bool doOutput() { return outputTasks.doOutput(); }
+
+ //preview only (completed == ack):
+ void ackCumulative(DeliveryId deliveryTag);
+ void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
+
+ //final 0-10 spec (completed and accepted are distinct):
+ void completed(DeliveryId deliveryTag, DeliveryId endTag);
+ void accepted(DeliveryId deliveryTag, DeliveryId endTag);
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 0e3c9928d1..de96ae3f12 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -60,17 +60,10 @@ void SessionHandler::handleIn(AMQFrame& f) {
AMQMethodBody* m = f.getBody()->getMethod();
try {
if (!ignoring) {
- if (m &&
- (invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m) ||
- invoke(static_cast<AMQP_ServerOperations::ExecutionHandler&>(*this), *m))) {
+ if (m && invoke(static_cast<AMQP_ServerOperations::Session010Handler&>(*this), *m)) {
return;
} else if (session.get()) {
- boost::optional<SequenceNumber> ack=session->received(f);
session->handle(f);
- if (ack)
- peerSession.ack(*ack, SequenceNumberSet());
- } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
- return;
} else {
throw ChannelErrorException(
QPID_MSG("Channel " << channel.get() << " is not open"));
@@ -80,7 +73,8 @@ void SessionHandler::handleIn(AMQFrame& f) {
ignoring=true; // Ignore trailing frames sent by client.
session->detach();
session.reset();
- peerSession.closed(e.code, e.what());
+ //TODO: implement new exception handling mechanism
+ //peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -92,7 +86,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
- peerSession.solicitAck();
+ peerSession.flush(false, false, true);
}
void SessionHandler::assertAttached(const char* method) const {
@@ -111,136 +105,123 @@ void SessionHandler::assertClosed(const char* method) const {
<< " is already open."));
}
-void SessionHandler::open(uint32_t detachedLifetime) {
- assertClosed("open");
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
- peerSession.attached(session->getId(), session->getTimeout());
+void SessionHandler::localSuspend() {
+ if (session.get() && session->isAttached()) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ session.reset();
+ }
}
-void SessionHandler::resume(const Uuid& id) {
- assertClosed("resume");
- session = connection.broker.getSessionManager().resume(id);
- session->attach(*this);
- SequenceNumber seq = session->resuming();
- peerSession.attached(session->getId(), session->getTimeout());
- proxy.getSession().ack(seq, SequenceNumberSet());
-}
-void SessionHandler::flow(bool /*active*/) {
- assertAttached("flow");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flow");
+ConnectionState& SessionHandler::getConnection() { return connection; }
+const ConnectionState& SessionHandler::getConnection() const { return connection; }
+
+//new methods:
+void SessionHandler::attach(const std::string& name, bool /*force*/)
+{
+ //TODO: need to revise session manager to support resume as well
+ assertClosed("attach");
+ std::auto_ptr<SessionState> state(
+ connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
+ peerSession.attached(name);
}
-void SessionHandler::flowOk(bool /*active*/) {
- assertAttached("flowOk");
- // TODO aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException("session.flowOk");
+void SessionHandler::attached(const std::string& /*name*/)
+{
+ std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this, 0));
+ session.reset(state.release());
}
-void SessionHandler::close() {
- assertAttached("close");
- QPID_LOG(info, "Received session.close");
- ignoring=false;
- session->detach();
- session.reset();
- peerSession.closed(REPLY_SUCCESS, "ok");
+void SessionHandler::detach(const std::string& name)
+{
+ assertAttached("detach");
+ localSuspend();
+ peerSession.detached(name, 0);
assert(&connection.getChannel(channel.get()) == this);
connection.closeChannel(channel.get());
}
-void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
- QPID_LOG(warning, "Received session.closed: "<<replyCode<<" "<<replyText);
+void SessionHandler::detached(const std::string& name, uint8_t code)
+{
ignoring=false;
session->detach();
session.reset();
-}
-
-void SessionHandler::localSuspend() {
- if (session.get() && session->isAttached()) {
- session->detach();
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ if (code) {
+ //no error
+ } else {
+ //error occured
+ QPID_LOG(warning, "Received session.closed: "<< name << " " << code);
}
}
-void SessionHandler::suspend() {
- assertAttached("suspend");
- localSuspend();
- peerSession.detached();
- assert(&connection.getChannel(channel.get()) == this);
- connection.closeChannel(channel.get());
-}
-
-void SessionHandler::ack(uint32_t cumulativeSeenMark,
- const SequenceNumberSet& /*seenFrameSet*/)
+void SessionHandler::requestTimeout(uint32_t t)
{
- assertAttached("ack");
- if (session->getState() == SessionState::RESUMING) {
- session->receivedAck(cumulativeSeenMark);
- framing::SessionState::Replay replay=session->replay();
- std::for_each(replay.begin(), replay.end(),
- boost::bind(&SessionHandler::handleOut, this, _1));
- }
- else
- session->receivedAck(cumulativeSeenMark);
+ session->setTimeout(t);
+ //proxy.timeout(t);
}
-void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- // TODO aconway 2007-10-02: may be removed from spec.
- assert(0); throw NotImplementedException("session.high-water-mark");
+void SessionHandler::timeout(uint32_t)
+{
+ //not sure what we need to do on the server for this...
}
-void SessionHandler::solicitAck() {
- assertAttached("solicit-ack");
- peerSession.ack(session->sendingAck(), SequenceNumberSet());
+void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
+{
+ if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for command-point");
+
+ session->next = id;
}
-void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+void SessionHandler::expected(const framing::SequenceSet& commands, const framing::Array& fragments)
{
- std::auto_ptr<SessionState> state(
- connection.broker.getSessionManager().open(*this, detachedLifetime));
- session.reset(state.release());
+ if (!commands.empty() || fragments.size()) {
+ throw NotImplementedException("Session resumption not yet supported");
+ }
}
-void SessionHandler::detached()
+void SessionHandler::confirmed(const framing::SequenceSet& /*commands*/, const framing::Array& /*fragments*/)
{
- connection.broker.getSessionManager().suspend(session);
- session.reset();
+ //don't really care too much about this yet
}
-
-ConnectionState& SessionHandler::getConnection() { return connection; }
-const ConnectionState& SessionHandler::getConnection() const { return connection; }
-
-void SessionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
+void SessionHandler::completed(const framing::SequenceSet& commands, bool timelyReply)
{
- assertAttached("complete");
- session->complete(cumulative, range);
+ session->complete(commands);
+ if (timelyReply) {
+ peerSession.knownCompleted(session->knownCompleted);
+ session->knownCompleted.clear();
+ }
}
-void SessionHandler::flush()
+void SessionHandler::knownCompleted(const framing::SequenceSet& commands)
{
- assertAttached("flush");
- session->flush();
+ session->completed.remove(commands);
}
-void SessionHandler::sync()
+
+void SessionHandler::flush(bool expected, bool confirmed, bool completed)
{
- assertAttached("sync");
- session->sync();
+ if (expected) {
+ peerSession.expected(SequenceSet(session->next), Array());
+ }
+ if (confirmed) {
+ peerSession.confirmed(session->completed, Array());
+ }
+ if (completed) {
+ peerSession.completed(session->completed, true);
+ }
}
-void SessionHandler::noop()
+
+void SessionHandler::sendCompletion()
{
- assertAttached("noop");
- session->noop();
+ peerSession.completed(session->completed, true);
}
-void SessionHandler::result(uint32_t /*command*/, const std::string& /*data*/)
+void SessionHandler::gap(const framing::SequenceSet& /*commands*/)
{
- //never actually sent by client at present
+ throw NotImplementedException("gap not yet supported");
}
-
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index e6bc463a82..4b031f2951 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -27,8 +27,10 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/Array.h"
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
#include <boost/noncopyable.hpp>
@@ -44,9 +46,7 @@ class SessionState;
* receives incoming frames, handles session controls and manages the
* association between the channel and a session.
*/
-class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
- public framing::AMQP_ClientOperations::SessionHandler,
- public framing::AMQP_ServerOperations::ExecutionHandler,
+class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
public framing::FrameHandler::InOutHandler,
private boost::noncopyable
{
@@ -69,35 +69,32 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
// Called by closing connection.
void localSuspend();
void detach() { localSuspend(); }
+ void sendCompletion();
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
private:
- /// Session methods
- void open(uint32_t detachedLifetime);
- void flow(bool active);
- void flowOk(bool active);
- void close();
- void closed(uint16_t replyCode, const std::string& replyText);
- void resume(const framing::Uuid& sessionId);
- void suspend();
- void ack(uint32_t cumulativeSeenMark,
- const framing::SequenceNumberSet& seenFrameSet);
- void highWaterMark(uint32_t lastSentMark);
- void solicitAck();
-
- //extra methods required for assuming client role
- void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
- void detached();
-
- //Execution methods:
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void result(uint32_t command, const std::string& data);
- void sync();
+ //new methods:
+ void attach(const std::string& name, bool force);
+ void attached(const std::string& name);
+ void detach(const std::string& name);
+ void detached(const std::string& name, uint8_t code);
+
+ void requestTimeout(uint32_t t);
+ void timeout(uint32_t t);
+
+ void commandPoint(const framing::SequenceNumber& id, uint64_t offset);
+ void expected(const framing::SequenceSet& commands, const framing::Array& fragments);
+ void confirmed(const framing::SequenceSet& commands,const framing::Array& fragments);
+ void completed(const framing::SequenceSet& commands, bool timelyReply);
+ void knownCompleted(const framing::SequenceSet& commands);
+ void flush(bool expected, bool confirmed, bool completed);
+ void gap(const framing::SequenceSet& commands);
+
+ //hacks for old generator:
+ void commandPoint(uint32_t id, uint64_t offset) { commandPoint(framing::SequenceNumber(id), offset); }
void assertAttached(const char* method) const;
void assertActive(const char* method) const;
@@ -106,7 +103,7 @@ class SessionHandler : public framing::AMQP_ServerOperations::SessionHandler,
Connection& connection;
framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
- framing::AMQP_ClientProxy::Session peerSession;
+ framing::AMQP_ClientProxy::Session010 peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
};
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 573a567da6..5f04136444 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -49,7 +49,7 @@ SessionState::SessionState(
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
- ackOp(boost::bind(&SemanticState::ackRange, &semanticState, _1, _2))
+ ackOp(boost::bind(&SemanticState::completed, &semanticState, _1, _2))
{
getConnection().outputTasks.addOutputTask(&semanticState);
@@ -170,9 +170,9 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
void SessionState::handleCommand(framing::AMQMethodBody* method)
{
- SequenceNumber id = incoming.next();
+ SequenceNumber id = next++;
Invoker::Result invocation = invoke(adapter, *method);
- incoming.complete(id);
+ completed.add(id);
if (!invocation.wasHandled()) {
throw NotImplementedException("Not implemented");
@@ -180,7 +180,6 @@ void SessionState::handleCommand(framing::AMQMethodBody* method)
getProxy().getExecution().result(id.getValue(), invocation.getResult());
}
if (method->isSync()) {
- incoming.sync(id);
sendCompletion();
}
//TODO: if window gets too large send unsolicited completion
@@ -190,7 +189,8 @@ void SessionState::handleContent(AMQFrame& frame)
{
intrusive_ptr<Message> msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(incoming.next());
+ SequenceNumber id = next++;
+ msgBuilder.start(id);
msg = msgBuilder.getMessage();
}
msgBuilder.handle(frame);
@@ -198,9 +198,9 @@ void SessionState::handleContent(AMQFrame& frame)
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
- incoming.track(msg);
+ //TODO: may want to hold up execution until async enqueue is complete
+ completed.add(msg->getCommandId());
if (msg->getFrames().getMethod()->isSync()) {
- incoming.sync(msg->getCommandId());
sendCompletion();
}
}
@@ -208,6 +208,8 @@ void SessionState::handleContent(AMQFrame& frame)
void SessionState::handle(AMQFrame& frame)
{
+ received(frame);
+
//TODO: make command handling more uniform, regardless of whether
//commands carry content. (For now, assume all single frame
//assmblies are non-content bearing and all content-bearing
@@ -229,38 +231,13 @@ DeliveryId SessionState::deliver(QueuedMessage& msg, DeliveryToken::shared_ptr t
void SessionState::sendCompletion()
{
- SequenceNumber mark = incoming.getMark();
- SequenceNumberSet range = incoming.getRange();
- getProxy().getExecution().complete(mark.getValue(), range);
-}
-
-void SessionState::complete(uint32_t cumulative, const SequenceNumberSet& range)
-{
- //record:
- SequenceNumber mark(cumulative);
- if (outgoing.lwm < mark) {
- outgoing.lwm = mark;
- //ack messages:
- semanticState.ackCumulative(mark.getValue());
- }
- range.processRanges(ackOp);
-}
-
-void SessionState::flush()
-{
- incoming.flush();
- sendCompletion();
-}
-
-void SessionState::sync()
-{
- incoming.sync();
- sendCompletion();
+ handler->sendCompletion();
}
-void SessionState::noop()
+void SessionState::complete(const SequenceSet& commands)
{
- incoming.noop();
+ knownCompleted.add(commands);
+ commands.for_each(ackOp);
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 98c21a8ab5..fa6bd14ef3 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -25,6 +25,7 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SessionState.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Time.h"
@@ -83,6 +84,8 @@ class SessionState : public framing::SessionState,
ConnectionState& getConnection();
uint32_t getTimeout() const { return timeout; }
+ void setTimeout(uint32_t t) { timeout = t; }
+
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
@@ -93,10 +96,7 @@ class SessionState : public framing::SessionState,
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
- void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);
- void flush();
- void noop();
- void sync();
+ void complete(const framing::SequenceSet& ranges);
void sendCompletion();
//delivery adapter methods:
@@ -114,6 +114,10 @@ class SessionState : public framing::SessionState,
uint32_t ackInterval);
+ framing::SequenceSet completed;
+ framing::SequenceSet knownCompleted;
+ framing::SequenceNumber next;
+
private:
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
@@ -130,8 +134,6 @@ class SessionState : public framing::SessionState,
BrokerAdapter adapter;
MessageBuilder msgBuilder;
- //execution state
- IncomingExecutionContext incoming;
framing::Window outgoing;
RangedOperation ackOp;
diff --git a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
index bf53bf0cd6..2d3ecf3f6a 100644
--- a/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
+++ b/qpid/cpp/src/qpid/framing/AccumulatedAck.cpp
@@ -83,7 +83,7 @@ void AccumulatedAck::update(SequenceNumber first, SequenceNumber last){
void AccumulatedAck::consolidate(){}
void AccumulatedAck::clear(){
- mark = 0;//not sure that this is valid when wraparound is a possibility
+ mark = SequenceNumber(0);//not sure that this is valid when wraparound is a possibility
ranges.clear();
}
diff --git a/qpid/cpp/src/qpid/framing/BodyHolder.cpp b/qpid/cpp/src/qpid/framing/BodyHolder.cpp
index f66f29d36a..de971b5b28 100644
--- a/qpid/cpp/src/qpid/framing/BodyHolder.cpp
+++ b/qpid/cpp/src/qpid/framing/BodyHolder.cpp
@@ -48,6 +48,7 @@ void BodyHolder::encode(Buffer& b) const {
void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) {
switch(type)
{
+ case 0://CONTROL
case METHOD_BODY: {
ClassId c = buffer.getOctet();
MethodId m = buffer.getOctet();
diff --git a/qpid/cpp/src/qpid/framing/Buffer.cpp b/qpid/cpp/src/qpid/framing/Buffer.cpp
index c0cd210042..60d67f1b07 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.cpp
+++ b/qpid/cpp/src/qpid/framing/Buffer.cpp
@@ -194,6 +194,13 @@ void Buffer::putShortString(const string& s){
position += len;
}
+void Buffer::putMediumString(const string& s){
+ uint16_t len = s.length();
+ putShort(len);
+ s.copy(data + position, len);
+ position += len;
+}
+
void Buffer::putLongString(const string& s){
uint32_t len = s.length();
putLong(len);
@@ -208,6 +215,13 @@ void Buffer::getShortString(string& s){
position += len;
}
+void Buffer::getMediumString(string& s){
+ uint16_t len = getShort();
+ checkAvailable(len);
+ s.assign(data + position, len);
+ position += len;
+}
+
void Buffer::getLongString(string& s){
uint32_t len = getLong();
checkAvailable(len);
diff --git a/qpid/cpp/src/qpid/framing/Buffer.h b/qpid/cpp/src/qpid/framing/Buffer.h
index 9c0d403462..585379b09a 100644
--- a/qpid/cpp/src/qpid/framing/Buffer.h
+++ b/qpid/cpp/src/qpid/framing/Buffer.h
@@ -97,8 +97,10 @@ class Buffer
void putUInt(uint64_t);
void putShortString(const string& s);
+ void putMediumString(const string& s);
void putLongString(const string& s);
void getShortString(string& s);
+ void getMediumString(string& s);
void getLongString(string& s);
void getBin128(uint8_t* b);
diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
index 3172246cc2..1b62d296c6 100644
--- a/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
+++ b/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
@@ -51,9 +51,10 @@ const SequenceNumber SequenceNumber::operator++(int)
return old;
}
-SequenceNumber SequenceNumber::operator+(uint32_t i) const
+SequenceNumber& SequenceNumber::operator--()
{
- return SequenceNumber(value + i);
+ value = value - 1;
+ return *this;
}
bool SequenceNumber::operator<(const SequenceNumber& other) const
diff --git a/qpid/cpp/src/qpid/framing/SequenceNumber.h b/qpid/cpp/src/qpid/framing/SequenceNumber.h
index b2594452d0..0ed591b804 100644
--- a/qpid/cpp/src/qpid/framing/SequenceNumber.h
+++ b/qpid/cpp/src/qpid/framing/SequenceNumber.h
@@ -39,7 +39,7 @@ class SequenceNumber
SequenceNumber& operator++();//prefix ++
const SequenceNumber operator++(int);//postfix ++
- SequenceNumber operator+(uint32_t) const;
+ SequenceNumber& operator--();//prefix ++
bool operator==(const SequenceNumber& other) const;
bool operator!=(const SequenceNumber& other) const;
bool operator<(const SequenceNumber& other) const;
diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
new file mode 100644
index 0000000000..e3461e233b
--- /dev/null
+++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SequenceSet.h"
+
+using namespace qpid::framing;
+using std::max;
+using std::min;
+
+namespace {
+//each range contains 2 numbers, 4 bytes each
+uint16_t RANGE_SIZE = 2 * 4;
+}
+
+void SequenceSet::encode(Buffer& buffer) const
+{
+ buffer.putShort(ranges.size() * RANGE_SIZE);
+ for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ i->encode(buffer);
+ }
+}
+
+void SequenceSet::decode(Buffer& buffer)
+{
+ uint16_t size = buffer.getShort();
+ uint16_t count = size / RANGE_SIZE;//number of ranges
+ if (size % RANGE_SIZE) throw FrameErrorException(QPID_MSG("Invalid size for sequence set: " << size));
+
+ for (uint16_t i = 0; i < count; i++) {
+ add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong()));
+ }
+}
+
+uint32_t SequenceSet::size() const
+{
+ return 2 /*size field*/ + (ranges.size() * RANGE_SIZE);
+}
+
+bool SequenceSet::contains(const SequenceNumber& point) const
+{
+ for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ if (i->contains(point)) return true;
+ }
+ return false;
+}
+
+void SequenceSet::add(const SequenceNumber& s)
+{
+ add(s, s);
+}
+
+void SequenceSet::add(const SequenceNumber& start, const SequenceNumber& end)
+{
+ if (start > end) {
+ add(end, start);
+ } else {
+ Range r(start, end);
+ bool merged = false;
+ Ranges::iterator i = ranges.begin();
+ while (i != ranges.end() && !merged && i->start < start) {
+ if (i->merge(r)) merged = true;
+ i++;
+ }
+ if (!merged) {
+ ranges.insert(i, r);
+ }
+ }
+}
+
+void SequenceSet::add(const SequenceSet& set)
+{
+ for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) {
+ add(i->start, i->end);
+ }
+}
+
+void SequenceSet::remove(const SequenceSet& set)
+{
+ for (Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) {
+ remove(i->start, i->end);
+ }
+}
+
+void SequenceSet::remove(const SequenceNumber& start, const SequenceNumber& end)
+{
+ if (start > end) {
+ remove(end, start);
+ } else {
+ Ranges::iterator i = ranges.begin();
+ while (i != ranges.end() && i->start < start) {
+ if (start <= i->end) {
+ if (end > i->end) {
+ //i.e. start is within the range pointed to by i, but end is not
+ i->end = (uint32_t)start - 1;
+ } else {
+ //whole of range to be deleted is contained within that pointed to be i
+ if (end == i->end) {
+ //just shrink range pointed to by i
+ i->end = (uint32_t)start - 1;
+ } else {
+ //need to split the range pointed to by i
+ Range r(i->start, (uint32_t)start - 1);
+ i->start = end + 1;
+ ranges.insert(i, r);
+ }
+ return;//no need to go any further
+ }
+ }
+ i++;
+ }
+ Ranges::iterator j = i;
+ while (j != ranges.end() && j->end < end) {
+ j++;
+ }
+ if (j->start <= end){
+ j->start = end + 1;
+ }
+ ranges.erase(i, j);
+ }
+}
+
+void SequenceSet::remove(const SequenceNumber& s)
+{
+ for (Ranges::iterator i = ranges.begin(); i != ranges.end() && s >= i->start; i++) {
+ if (i->start == s) {
+ if (i->start == i->end) {
+ ranges.erase(i);
+ } else {
+ ++(i->start);
+ }
+ } else if (i->end == s) {
+ --(i->end);
+ } else if (i->contains(s)) {
+ //need to split range pointed to by i
+ Range r(i->start, (uint32_t)s - 1);
+ i->start = s + 1;
+ ranges.insert(i, r);
+ }
+ }
+}
+
+bool SequenceSet::empty() const
+{
+ return ranges.empty();
+}
+
+void SequenceSet::clear()
+{
+ return ranges.clear();
+}
+
+bool SequenceSet::Range::contains(SequenceNumber i) const
+{
+ return i >= start && i <= end;
+}
+
+bool SequenceSet::Range::intersects(const Range& r) const
+{
+ return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end);
+}
+
+bool SequenceSet::Range::merge(const Range& r)
+{
+ if (intersects(r) || mergeable(r.end) || r.mergeable(end)) {
+ start = min(start, r.start);
+ end = max(end, r.end);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool SequenceSet::Range::mergeable(const SequenceNumber& s) const
+{
+ if (contains(s) || start - s == 1) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void SequenceSet::Range::encode(Buffer& buffer) const
+{
+ buffer.putLong(start);
+ buffer.putLong(end);
+}
+
+SequenceSet::Range::Range(SequenceNumber s, SequenceNumber e) : start(s), end(e) {}
+
+namespace qpid{
+namespace framing{
+
+std::ostream& operator<<(std::ostream& out, const SequenceSet& set) {
+ out << "{";
+ for (SequenceSet::Ranges::const_iterator i = set.ranges.begin(); i != set.ranges.end(); i++) {
+ if (i != set.ranges.begin()) out << ", ";
+ out << i->start.getValue() << "-" << i->end.getValue();
+ }
+ out << "}";
+ return out;
+}
+
+}
+}
diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.h b/qpid/cpp/src/qpid/framing/SequenceSet.h
new file mode 100644
index 0000000000..2f34cb5cba
--- /dev/null
+++ b/qpid/cpp/src/qpid/framing/SequenceSet.h
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _framing_SequenceSet_h
+#define _framing_SequenceSet_h
+
+#include <ostream>
+#include <list>
+#include "amqp_types.h"
+#include "Buffer.h"
+#include "SequenceNumber.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace framing {
+
+class SequenceSet
+{
+ struct Range
+ {
+ SequenceNumber start;
+ SequenceNumber end;
+
+ Range(SequenceNumber s, SequenceNumber e);
+ bool contains(SequenceNumber i) const;
+ bool intersects(const Range& r) const;
+ bool merge(const Range& r);
+ bool mergeable(const SequenceNumber& r) const;
+ void encode(Buffer& buffer) const;
+ };
+
+ typedef std::list<Range> Ranges;
+ Ranges ranges;
+
+public:
+ SequenceSet() {}
+ SequenceSet(const SequenceNumber& s) { add(s); }
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+ uint32_t size() const;
+
+ bool contains(const SequenceNumber& s) const;
+ void add(const SequenceNumber& s);
+ void add(const SequenceNumber& start, const SequenceNumber& end);
+ void add(const SequenceSet& set);
+ void remove(const SequenceNumber& s);
+ void remove(const SequenceNumber& start, const SequenceNumber& end);
+ void remove(const SequenceSet& set);
+
+ void clear();
+ bool empty() const;
+
+ template <class T>
+ void for_each(T& t) const
+ {
+ for (Ranges::const_iterator i = ranges.begin(); i != ranges.end(); i++) {
+ t(i->start, i->end);
+ }
+ }
+
+ friend std::ostream& operator<<(std::ostream&, const SequenceSet&);
+};
+
+
+}} // namespace qpid::framing
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/framing/amqp_types.h b/qpid/cpp/src/qpid/framing/amqp_types.h
index 94442aa357..943970cc56 100644
--- a/qpid/cpp/src/qpid/framing/amqp_types.h
+++ b/qpid/cpp/src/qpid/framing/amqp_types.h
@@ -65,6 +65,7 @@ const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX);
class FramingContent;
class FieldTable;
class SequenceNumberSet;
+class SequenceSet;
class Uuid;
}} // namespace qpid::framing
diff --git a/qpid/cpp/src/qpid/framing/amqp_types_full.h b/qpid/cpp/src/qpid/framing/amqp_types_full.h
index f1ed44ec05..da7bdc876d 100644
--- a/qpid/cpp/src/qpid/framing/amqp_types_full.h
+++ b/qpid/cpp/src/qpid/framing/amqp_types_full.h
@@ -34,6 +34,7 @@
#include "FramingContent.h"
#include "FieldTable.h"
#include "SequenceNumberSet.h"
+#include "SequenceSet.h"
#include "Uuid.h"
#endif /*!_framing_amqp_types_decl_h*/
diff --git a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 0586eb9d36..c24205f53e 100644
--- a/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -94,7 +94,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
std::queue<framing::AMQFrame> frameQueue;
Mutex frameQueueLock;
bool frameQueueClosed;
- bool initiated;
+ bool isInitiated;
bool readError;
std::string identifier;
bool isClient;
@@ -105,7 +105,7 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
AsynchIOHandler() :
inputHandler(0),
frameQueueClosed(false),
- initiated(false),
+ isInitiated(false),
readError(false),
isClient(false)
{}
@@ -128,6 +128,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
void send(framing::AMQFrame&);
void close();
void activateOutput();
+ void initiated(const framing::ProtocolInitiation&);
+
// Input side
void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -259,13 +261,18 @@ void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
+void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi)
+{
+ write(pi);
+}
+
// Input side
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
return;
}
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- if(initiated){
+ if(isInitiated){
framing::AMQFrame frame;
try{
while(frame.decode(in)) {
@@ -282,7 +289,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if(protocolInit.decode(in)){
QPID_LOG(debug, "INIT [" << identifier << "]");
inputHandler->initiated(protocolInit);
- initiated = true;
+ isInitiated = true;
}
}
// TODO: unreading needs to go away, and when we can cope
@@ -324,10 +331,10 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
- if (isClient && !initiated) {
+ if (isClient && !isInitiated) {
//get & write protocol header from upper layers
write(inputHandler->getInitiation());
- initiated = true;
+ isInitiated = true;
return;
}
ScopedLock<Mutex> l(frameQueueLock);
diff --git a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
index 5a60ae4998..13407d9b9d 100644
--- a/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
+++ b/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
@@ -22,6 +22,7 @@
#define _ConnectionOutputHandler_
#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
#include "OutputControl.h"
namespace qpid {
@@ -30,7 +31,7 @@ namespace sys {
/**
* Provides the output handler associated with a connection.
*/
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler
{
public:
virtual void close() = 0;
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index d25378a519..0baf1a2763 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -38,6 +38,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
InlineVector.cpp \
ISList.cpp IList.cpp \
ClientSessionTest.cpp \
+ SequenceSet.cpp \
serialize.cpp \
ProxyTemplate.cpp apply.cpp
# FIXME aconway 2008-02-20: removed RefCountedMap.cpp due to valgrind error.
diff --git a/qpid/cpp/src/tests/SequenceSet.cpp b/qpid/cpp/src/tests/SequenceSet.cpp
new file mode 100644
index 0000000000..bffeed648e
--- /dev/null
+++ b/qpid/cpp/src/tests/SequenceSet.cpp
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceSet.h"
+#include "unit_test.h"
+
+QPID_AUTO_TEST_SUITE(SequenceSetTestSuite)
+
+using namespace qpid::framing;
+
+BOOST_AUTO_TEST_CASE(testAdd) {
+ SequenceSet s;
+ s.add(2);
+ s.add(8,8);
+ s.add(3,5);
+
+ for (uint32_t i = 0; i <= 1; i++) //0, 1
+ BOOST_CHECK(!s.contains(i));
+
+ for (uint32_t i = 2; i <= 5; i++) //2, 3, 4 & 5
+ BOOST_CHECK(s.contains(i));
+
+ for (uint32_t i = 0; i <= 1; i++) //6, 7
+ BOOST_CHECK(!s.contains(i));
+
+ BOOST_CHECK(s.contains(8));//8
+
+ SequenceSet t;
+ t.add(6, 10);
+ t.add(s);
+
+ for (uint32_t i = 0; i <= 1; i++)
+ BOOST_CHECK(!t.contains(i));
+
+ for (uint32_t i = 2; i <= 10; i++)
+ BOOST_CHECK(t.contains(i));
+}
+
+BOOST_AUTO_TEST_CASE(testRemove) {
+ SequenceSet s;
+ SequenceSet t;
+ s.add(0, 10);
+ t.add(0, 10);
+
+ s.remove(7);
+ s.remove(3, 5);
+ s.remove(9, 10);
+
+ t.remove(s);
+
+ for (uint32_t i = 0; i <= 2; i++) {
+ BOOST_CHECK(s.contains(i));
+ BOOST_CHECK(!t.contains(i));
+ }
+
+ for (uint32_t i = 3; i <= 5; i++) {
+ BOOST_CHECK(!s.contains(i));
+ BOOST_CHECK(t.contains(i));
+ }
+
+ BOOST_CHECK(s.contains(6));
+ BOOST_CHECK(!t.contains(6));
+
+ BOOST_CHECK(!s.contains(7));
+ BOOST_CHECK(t.contains(7));
+
+ BOOST_CHECK(s.contains(8));
+ BOOST_CHECK(!t.contains(8));
+
+ for (uint32_t i = 9; i <= 10; i++) {
+ BOOST_CHECK(!s.contains(i));
+ BOOST_CHECK(t.contains(i));
+ }
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+