summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp50
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h1
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp8
-rw-r--r--cpp/src/qpid/broker/BrokerMessageMessage.cpp12
-rw-r--r--cpp/src/qpid/broker/ConnectionAdapter.h1
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp30
-rw-r--r--cpp/src/qpid/broker/InMemoryContent.cpp4
-rw-r--r--cpp/src/qpid/broker/LazyLoadedContent.cpp4
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp27
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp35
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.h16
11 files changed, 112 insertions, 76 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index be43dacb27..9bf148bcf0 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -56,12 +56,12 @@ ProtocolVersion BrokerAdapter::getVersion() const {
void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){
channel.open();
// FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9
- client.openOk(std::string()/* ID */);//GRS, context.getRequestId());
+ client.openOk(std::string()/* ID */);
}
void BrokerAdapter::ChannelHandlerImpl::flow(bool active){
channel.flow(active);
- client.flowOk(active);//GRS, context.getRequestId());
+ client.flowOk(active);
}
void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){}
@@ -70,7 +70,7 @@ void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/,
const string& /*replyText*/,
uint16_t /*classId*/, uint16_t /*methodId*/)
{
- client.closeOk();//GRS context.getRequestId());
+ client.closeOk();
// FIXME aconway 2007-01-18: Following line will "delete this". Ugly.
connection.closeChannel(channel.getId());
}
@@ -104,7 +104,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
}
}
if(!nowait){
- client.declareOk();//GRS context.getRequestId());
+ client.declareOk();
}
}
@@ -114,16 +114,16 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/,
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
if (exchange->isDurable()) broker.getStore().destroy(*exchange);
broker.getExchanges().destroy(name);
- if(!nowait) client.deleteOk();//GRS context.getRequestId());
+ if(!nowait) client.deleteOk();
}
void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(broker.getExchanges().get(name));
- client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId());
+ client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId());
+ client.queryOk("", false, true, FieldTable());
}
}
@@ -144,18 +144,18 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
}
if (!exchange) {
- client.queryOk(true, false, false, false, false);//GRS, context.getRequestId());
+ client.queryOk(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- client.queryOk(false, true, false, false, false);//GRS, context.getRequestId());
+ client.queryOk(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- client.queryOk(false, false, false, false, false);//GRS, context.getRequestId());
+ client.queryOk(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId());
+ client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
@@ -196,7 +196,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string&
if (!nowait) {
string queueName = queue->getName();
client.declareOk(
- queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId());
+ queueName, queue->getMessageCount(), queue->getConsumerCount());
}
}
@@ -214,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu
broker.getStore().bind(*exchange, *queue, routingKey, arguments);
}
}
- if(!nowait) client.bindOk();//GRS context.getRequestId());
+ if(!nowait) client.bindOk();
}else{
throw ChannelException(
404, "Bind failed. No such exchange: " + exchangeName);
@@ -238,14 +238,14 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/,
broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
}
- client.unbindOk();//GRS context.getRequestId());
+ client.unbindOk();
}
void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){
Queue::shared_ptr queue = getQueue(queueName);
int count = queue->purge();
- if(!nowait) client.purgeOk( count);//GRS, context.getRequestId());
+ if(!nowait) client.purgeOk( count);
}
void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue,
@@ -270,7 +270,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string&
}
if(!nowait)
- client.deleteOk(count);//GRS, context.getRequestId());
+ client.deleteOk(count);
}
@@ -280,7 +280,7 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.qosOk();//GRS context.getRequestId());
+ client.qosOk();
}
void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
@@ -300,7 +300,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())),
newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields);
- if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId());
+ if(!nowait) client.consumeOk(newTag);
//allow messages to be dispatched if required as there is now a consumer:
queue->requestDispatch();
@@ -309,7 +309,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){
channel.cancel(consumerTag);
- if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId());
+ if(!nowait) client.cancelOk(consumerTag);
}
void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/,
@@ -333,7 +333,7 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que
if(!channel.get(out, queue, !noAck)){
string clusterId;//not used, part of an imatix hack
- client.getEmpty(clusterId);//GRS, context.getRequestId());
+ client.getEmpty(clusterId);
}
}
@@ -351,19 +351,19 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
void BrokerAdapter::TxHandlerImpl::select()
{
channel.startTx();
- client.selectOk();//GRS context.getRequestId());
+ client.selectOk();
}
void BrokerAdapter::TxHandlerImpl::commit()
{
channel.commit();
- client.commitOk();//GRS context.getRequestId());
+ client.commitOk();
}
void BrokerAdapter::TxHandlerImpl::rollback()
{
channel.rollback();
- client.rollbackOk();//GRS context.getRequestId());
+ client.rollbackOk();
channel.recover(false);
}
@@ -378,7 +378,7 @@ void BrokerAdapter::ChannelHandlerImpl::ok()
//
void BrokerAdapter::ChannelHandlerImpl::ping()
{
- client.ok();//GRS context.getRequestId());
+ client.ok();
client.pong();
}
@@ -386,7 +386,7 @@ void BrokerAdapter::ChannelHandlerImpl::ping()
void
BrokerAdapter::ChannelHandlerImpl::pong()
{
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/)
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 41013e8bf6..a7e27a0ee6 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -79,6 +79,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+ ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); }
ConnectionHandler* getConnectionHandler() {
throw ConnectionException(503, "Can't access connection class on non-zero channel!");
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index 21b56869d4..d192b09a63 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -78,10 +78,10 @@ void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, uint64_t deliveryTag,
uint32_t framesize)
{
- channel.send(
+ channel.send(make_shared_ptr(
new BasicDeliverBody(
channel.getVersion(), consumerTag, deliveryTag,
- getRedelivered(), getExchange(), getRoutingKey()));
+ getRedelivered(), getExchange(), getRoutingKey())));
sendContent(channel, framesize);
}
@@ -92,12 +92,12 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel,
uint64_t deliveryTag,
uint32_t framesize)
{
- channel.send(
+ channel.send(make_shared_ptr(
new BasicGetOkBody(
channel.getVersion(),
responseTo,
deliveryTag, getRedelivered(), getExchange(),
- getRoutingKey(), messageCount));
+ getRoutingKey(), messageCount)));
sendContent(channel, framesize);
}
diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
index b23ebaf50b..01f8250b84 100644
--- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp
@@ -85,7 +85,7 @@ void MessageMessage::transferMessage(
if (ref){
// Open
- channel.send(new MessageOpenBody(channel.getVersion(), ref->getId()));
+ channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId())));
// Appends
for(Reference::Appends::const_iterator a = ref->getAppends().begin();
a != ref->getAppends().end();
@@ -98,8 +98,8 @@ void MessageMessage::transferMessage(
string::size_type contentStart = 0;
while (sizeleft) {
string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead;
- channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(),
- string(content, contentStart, contentSize)));
+ channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(),
+ string(content, contentStart, contentSize))));
sizeleft -= contentSize;
contentStart += contentSize;
}
@@ -108,7 +108,7 @@ void MessageMessage::transferMessage(
// The transfer
if ( transfer->size()<=framesize ) {
- channel.send(
+ channel.send(make_shared_ptr(
new MessageTransferBody(channel.getVersion(),
transfer->getTicket(),
consumerTag,
@@ -132,7 +132,7 @@ void MessageMessage::transferMessage(
transfer->getSecurityToken(),
transfer->getApplicationHeaders(),
body,
- transfer->getMandatory()));
+ transfer->getMandatory())));
} else {
// Thing to do here is to construct a simple reference message then deliver that instead
// fragmentation will be taken care of in the delivery if necessary;
@@ -172,7 +172,7 @@ void MessageMessage::transferMessage(
}
// Close any reference data
if (ref)
- channel.send(new MessageCloseBody(channel.getVersion(), ref->getId()));
+ channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId())));
}
void MessageMessage::deliver(
diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h
index 2e27abd333..b624102cd2 100644
--- a/cpp/src/qpid/broker/ConnectionAdapter.h
+++ b/cpp/src/qpid/broker/ConnectionAdapter.h
@@ -71,6 +71,7 @@ public:
TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
+ ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); }
framing::ProtocolVersion getVersion() const;
};
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 2aa53e66bd..d1f925e40c 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -52,7 +52,7 @@ const int XA_OK(8);
void DtxHandlerImpl::select()
{
channel.selectDtx();
- dClient.selectOk();//GRS context.getRequestId());
+ dClient.selectOk();
}
void DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -66,7 +66,7 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (suspend) {
throw ConnectionException(503, "End and suspend cannot both be set.");
} else {
- dClient.endOk(XA_RBROLLBACK);//GRS, context.getRequestId());
+ dClient.endOk(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -74,10 +74,10 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/,
} else {
channel.endDtx(xid, false);
}
- dClient.endOk(XA_OK);//GRS, context.getRequestId());
+ dClient.endOk(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- dClient.endOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ dClient.endOk(XA_RBTIMEOUT);
}
}
@@ -95,9 +95,9 @@ void DtxHandlerImpl::start(u_int16_t /*ticket*/,
} else {
channel.startDtx(xid, broker.getDtxManager(), join);
}
- dClient.startOk(XA_OK);//GRS, context.getRequestId());
+ dClient.startOk(XA_OK);
} catch (const DtxTimeoutException& e) {
- dClient.startOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ dClient.startOk(XA_RBTIMEOUT);
}
}
@@ -108,9 +108,9 @@ void DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
{
try {
bool ok = broker.getDtxManager().prepare(xid);
- cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId());
+ cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.prepareOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ cClient.prepareOk(XA_RBTIMEOUT);
}
}
@@ -120,9 +120,9 @@ void DtxHandlerImpl::commit(u_int16_t /*ticket*/,
{
try {
bool ok = broker.getDtxManager().commit(xid, onePhase);
- cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId());
+ cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- cClient.commitOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ cClient.commitOk(XA_RBTIMEOUT);
}
}
@@ -132,9 +132,9 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
{
try {
broker.getDtxManager().rollback(xid);
- cClient.rollbackOk(XA_OK);//GRS, context.getRequestId());
+ cClient.rollbackOk(XA_OK);
} catch (const DtxTimeoutException& e) {
- cClient.rollbackOk(XA_RBTIMEOUT);//GRS, context.getRequestId());
+ cClient.rollbackOk(XA_RBTIMEOUT);
}
}
@@ -171,7 +171,7 @@ void DtxHandlerImpl::recover(u_int16_t /*ticket*/,
FieldTable response;
response.setString("xids", data);
- cClient.recoverOk(response);//GRS, context.getRequestId());
+ cClient.recoverOk(response);
}
void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
@@ -184,7 +184,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
void DtxHandlerImpl::getTimeout(const string& xid)
{
uint32_t timeout = broker.getDtxManager().getTimeout(xid);
- cClient.getTimeoutOk(timeout);//GRS, context.getRequestId());
+ cClient.getTimeoutOk(timeout);
}
@@ -193,7 +193,7 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/,
u_int32_t timeout)
{
broker.getDtxManager().setTimeout(xid, timeout);
- cClient.setTimeoutOk();//GRS context.getRequestId());
+ cClient.setTimeoutOk();
}
void DtxHandlerImpl::setResponseTo(framing::RequestId r)
diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp
index 96563b0329..a6ce820f7e 100644
--- a/cpp/src/qpid/broker/InMemoryContent.cpp
+++ b/cpp/src/qpid/broker/InMemoryContent.cpp
@@ -47,13 +47,13 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize)
uint32_t offset = 0;
for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
string data = (*i)->getData().substr(offset, framesize);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
offset += framesize;
}
uint32_t remainder = (*i)->size() % framesize;
if (remainder) {
string data = (*i)->getData().substr(offset, remainder);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}
} else {
AMQBody::shared_ptr contentBody =
diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp
index c11d049317..80d06ebf2b 100644
--- a/cpp/src/qpid/broker/LazyLoadedContent.cpp
+++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp
@@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize)
string data;
store->loadContent(*msg, data, offset,
remaining > framesize ? framesize : remaining);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}
} else {
string data;
store->loadContent(*msg, data, 0, expectedSize);
- channel.send(new AMQContentBody(data));
+ channel.send(make_shared_ptr(new AMQContentBody(data)));
}
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index c9fbc2b95d..de32368158 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -45,14 +45,14 @@ void
MessageHandlerImpl::cancel(const string& destination )
{
channel.cancel(destination);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
MessageHandlerImpl::open(const string& reference)
{
references.open(reference);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
@@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context)
{
MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody));
references.get(body->getReference())->append(body);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
MessageHandlerImpl::close(const string& reference)
{
Reference::shared_ptr ref = references.get(reference);
- client.ok();//GRS context.getRequestId());
+ client.ok();
// Send any transfer messages to their correct exchanges and okay them
const Reference::Messages& msgs = ref->getMessages();
@@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/,
{
// Initial implementation (which is conforming) is to do nothing here
// and return offset zero for the resume
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
@@ -95,7 +95,7 @@ MessageHandlerImpl::resume(const string& reference,
// Initial (null) implementation
// open reference and return 0 offset
references.open(reference);
- client.offset(0);//GRS, );//GRS, context.getRequestId());
+ client.offset(0);
}
void
@@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/,
channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())),
tag, queue, !noAck, exclusive,
noLocal ? &connection : 0, &filter);
- client.ok();//GRS context.getRequestId());
+ client.ok();
// Dispatch messages as there is now a consumer.
queue->requestDispatch();
}
@@ -138,9 +138,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
GetAdapter out(adapter, queue, destination, connection.getFrameMax());
if(channel.get(out, queue, !noAck))
- client.ok();//GRS context.getRequestId());
+ client.ok();
else
- client.empty();//GRS context.getRequestId());
+ client.empty();
}
void
@@ -166,22 +166,19 @@ MessageHandlerImpl::qos(uint32_t prefetchSize,
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
MessageHandlerImpl::recover(bool requeue)
{
channel.recover(requeue);
- client.ok();//GRS context.getRequestId());
+ client.ok();
}
void
-MessageHandlerImpl::reject(uint16_t /*code*/,
- const string& /*text*/ )
+MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ )
{
- //channel.ack();
- // channel.requeue();
}
void
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index a96a8c5cde..f616ec2db8 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -36,7 +36,7 @@ SemanticHandler::SemanticHandler(ChannelId id, Connection& c) :
void SemanticHandler::handle(framing::AMQFrame& frame)
-{
+{
//TODO: assembly etc when move to 0-10 framing
//
//have potentially three separate tracks at this point:
@@ -56,16 +56,41 @@ void SemanticHandler::handle(framing::AMQFrame& frame)
//if ready to execute (i.e. if segment is complete or frame is
//message content):
handleBody(frame.getBody());
- //if the frameset is complete, we can move the execution-mark
- //forward (not for execution controls)
- //note: need to be more sophisticated than this if we execute
- //commands that arrive within an active message frameset
}
//ChannelAdapter virtual methods:
void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
const qpid::framing::MethodContext& context)
{
+ if (!method->invoke(this)) {
+ //else do the usual:
+ handleL4(method, context);
+ //(if the frameset is complete) we can move the execution-mark
+ //forward
+ ++(incoming.hwm);
+
+ //note: need to be more sophisticated than this if we execute
+ //commands that arrive within an active message frameset (that
+ //can't happen until 0-10 framing is implemented)
+ }
+}
+
+void SemanticHandler::complete(u_int32_t mark)
+{
+ //just record it for now (will eventually need to use it to ack messages):
+ outgoing.lwm = SequenceNumber(mark);
+}
+
+void SemanticHandler::flush()
+{
+ //flush doubles as a sync to begin with - send an execution.complete
+ incoming.lwm = incoming.hwm;
+ send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue())));
+}
+
+void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context)
+{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
if (!method->isA<ChannelCloseOkBody>()) {
diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h
index eecf61466b..6003bbec0c 100644
--- a/cpp/src/qpid/broker/SemanticHandler.h
+++ b/cpp/src/qpid/broker/SemanticHandler.h
@@ -25,6 +25,7 @@
#include "BrokerChannel.h"
#include "Connection.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceNumber.h"
@@ -34,11 +35,18 @@ namespace broker {
class BrokerAdapter;
class framing::ChannelAdapter;
-class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler {
+class SemanticHandler : private framing::ChannelAdapter,
+ public framing::FrameHandler,
+ public framing::AMQP_ServerOperations::ExecutionHandler
+{
Connection& connection;
Channel channel;
std::auto_ptr<BrokerAdapter> adapter;
- framing::SequenceNumber executionMark;
+ framing::Window incoming;
+ framing::Window outgoing;
+
+ void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ const qpid::framing::MethodContext& context);
//ChannelAdapter virtual methods:
void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method,
@@ -50,6 +58,10 @@ class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHa
public:
SemanticHandler(framing::ChannelId id, Connection& c);
void handle(framing::AMQFrame& frame);
+
+ //execution class method handlers:
+ void complete(u_int32_t cumulativeExecutionMark);
+ void flush();
};
}}