diff options
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 50 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerMessage.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerMessageMessage.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/broker/ConnectionAdapter.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 30 | ||||
-rw-r--r-- | cpp/src/qpid/broker/InMemoryContent.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/LazyLoadedContent.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 27 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.h | 16 |
11 files changed, 112 insertions, 76 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index be43dacb27..9bf148bcf0 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -56,12 +56,12 @@ ProtocolVersion BrokerAdapter::getVersion() const { void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk(std::string()/* ID */);//GRS, context.getRequestId()); + client.openOk(std::string()/* ID */); } void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ channel.flow(active); - client.flowOk(active);//GRS, context.getRequestId()); + client.flowOk(active); } void BrokerAdapter::ChannelHandlerImpl::flowOk(bool /*active*/){} @@ -70,7 +70,7 @@ void BrokerAdapter::ChannelHandlerImpl::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { - client.closeOk();//GRS context.getRequestId()); + client.closeOk(); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } @@ -104,7 +104,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri } } if(!nowait){ - client.declareOk();//GRS context.getRequestId()); + client.declareOk(); } } @@ -114,16 +114,16 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, Exchange::shared_ptr exchange(broker.getExchanges().get(name)); if (exchange->isDurable()) broker.getStore().destroy(*exchange); broker.getExchanges().destroy(name); - if(!nowait) client.deleteOk();//GRS context.getRequestId()); + if(!nowait) client.deleteOk(); } void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(broker.getExchanges().get(name)); - client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());//GRS, context.getRequestId()); + client.queryOk(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - client.queryOk("", false, true, FieldTable());//GRS, context.getRequestId()); + client.queryOk("", false, true, FieldTable()); } } @@ -144,18 +144,18 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, } if (!exchange) { - client.queryOk(true, false, false, false, false);//GRS, context.getRequestId()); + client.queryOk(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - client.queryOk(false, true, false, false, false);//GRS, context.getRequestId()); + client.queryOk(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - client.queryOk(false, false, false, false, false);//GRS, context.getRequestId()); + client.queryOk(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched);//GRS, context.getRequestId()); + client.queryOk(false, false, !queueMatched, !keyMatched, !argsMatched); } } @@ -196,7 +196,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& if (!nowait) { string queueName = queue->getName(); client.declareOk( - queueName, queue->getMessageCount(), queue->getConsumerCount());//GRS, context.getRequestId()); + queueName, queue->getMessageCount(), queue->getConsumerCount()); } } @@ -214,7 +214,7 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu broker.getStore().bind(*exchange, *queue, routingKey, arguments); } } - if(!nowait) client.bindOk();//GRS context.getRequestId()); + if(!nowait) client.bindOk(); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -238,14 +238,14 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, broker.getStore().unbind(*exchange, *queue, routingKey, arguments); } - client.unbindOk();//GRS context.getRequestId()); + client.unbindOk(); } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = getQueue(queueName); int count = queue->purge(); - if(!nowait) client.purgeOk( count);//GRS, context.getRequestId()); + if(!nowait) client.purgeOk( count); } void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, @@ -270,7 +270,7 @@ void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& } if(!nowait) - client.deleteOk(count);//GRS, context.getRequestId()); + client.deleteOk(count); } @@ -280,7 +280,7 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.qosOk();//GRS context.getRequestId()); + client.qosOk(); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -300,7 +300,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) client.consumeOk(newTag);//GRS, context.getRequestId()); + if(!nowait) client.consumeOk(newTag); //allow messages to be dispatched if required as there is now a consumer: queue->requestDispatch(); @@ -309,7 +309,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) client.cancelOk(consumerTag);//GRS, context.getRequestId()); + if(!nowait) client.cancelOk(consumerTag); } void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, @@ -333,7 +333,7 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que if(!channel.get(out, queue, !noAck)){ string clusterId;//not used, part of an imatix hack - client.getEmpty(clusterId);//GRS, context.getRequestId()); + client.getEmpty(clusterId); } } @@ -351,19 +351,19 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) void BrokerAdapter::TxHandlerImpl::select() { channel.startTx(); - client.selectOk();//GRS context.getRequestId()); + client.selectOk(); } void BrokerAdapter::TxHandlerImpl::commit() { channel.commit(); - client.commitOk();//GRS context.getRequestId()); + client.commitOk(); } void BrokerAdapter::TxHandlerImpl::rollback() { channel.rollback(); - client.rollbackOk();//GRS context.getRequestId()); + client.rollbackOk(); channel.recover(false); } @@ -378,7 +378,7 @@ void BrokerAdapter::ChannelHandlerImpl::ok() // void BrokerAdapter::ChannelHandlerImpl::ping() { - client.ok();//GRS context.getRequestId()); + client.ok(); client.pong(); } @@ -386,7 +386,7 @@ void BrokerAdapter::ChannelHandlerImpl::ping() void BrokerAdapter::ChannelHandlerImpl::pong() { - client.ok();//GRS context.getRequestId()); + client.ok(); } void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/) diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 41013e8bf6..a7e27a0ee6 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -79,6 +79,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } + ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); } ConnectionHandler* getConnectionHandler() { throw ConnectionException(503, "Can't access connection class on non-zero channel!"); diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index 21b56869d4..d192b09a63 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -78,10 +78,10 @@ void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize) { - channel.send( + channel.send(make_shared_ptr( new BasicDeliverBody( channel.getVersion(), consumerTag, deliveryTag, - getRedelivered(), getExchange(), getRoutingKey())); + getRedelivered(), getExchange(), getRoutingKey()))); sendContent(channel, framesize); } @@ -92,12 +92,12 @@ void BasicMessage::sendGetOk(ChannelAdapter& channel, uint64_t deliveryTag, uint32_t framesize) { - channel.send( + channel.send(make_shared_ptr( new BasicGetOkBody( channel.getVersion(), responseTo, deliveryTag, getRedelivered(), getExchange(), - getRoutingKey(), messageCount)); + getRoutingKey(), messageCount))); sendContent(channel, framesize); } diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index b23ebaf50b..01f8250b84 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -85,7 +85,7 @@ void MessageMessage::transferMessage( if (ref){ // Open - channel.send(new MessageOpenBody(channel.getVersion(), ref->getId())); + channel.send(make_shared_ptr(new MessageOpenBody(channel.getVersion(), ref->getId()))); // Appends for(Reference::Appends::const_iterator a = ref->getAppends().begin(); a != ref->getAppends().end(); @@ -98,8 +98,8 @@ void MessageMessage::transferMessage( string::size_type contentStart = 0; while (sizeleft) { string::size_type contentSize = sizeleft <= framesize ? sizeleft : framesize-overhead; - channel.send(new MessageAppendBody(channel.getVersion(), ref->getId(), - string(content, contentStart, contentSize))); + channel.send(make_shared_ptr(new MessageAppendBody(channel.getVersion(), ref->getId(), + string(content, contentStart, contentSize)))); sizeleft -= contentSize; contentStart += contentSize; } @@ -108,7 +108,7 @@ void MessageMessage::transferMessage( // The transfer if ( transfer->size()<=framesize ) { - channel.send( + channel.send(make_shared_ptr( new MessageTransferBody(channel.getVersion(), transfer->getTicket(), consumerTag, @@ -132,7 +132,7 @@ void MessageMessage::transferMessage( transfer->getSecurityToken(), transfer->getApplicationHeaders(), body, - transfer->getMandatory())); + transfer->getMandatory()))); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; @@ -172,7 +172,7 @@ void MessageMessage::transferMessage( } // Close any reference data if (ref) - channel.send(new MessageCloseBody(channel.getVersion(), ref->getId())); + channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); } void MessageMessage::deliver( diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index 2e27abd333..b624102cd2 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -71,6 +71,7 @@ public: TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } framing::ProtocolVersion getVersion() const; }; diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 2aa53e66bd..d1f925e40c 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -52,7 +52,7 @@ const int XA_OK(8); void DtxHandlerImpl::select() { channel.selectDtx(); - dClient.selectOk();//GRS context.getRequestId()); + dClient.selectOk(); } void DtxHandlerImpl::end(u_int16_t /*ticket*/, @@ -66,7 +66,7 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/, if (suspend) { throw ConnectionException(503, "End and suspend cannot both be set."); } else { - dClient.endOk(XA_RBROLLBACK);//GRS, context.getRequestId()); + dClient.endOk(XA_RBROLLBACK); } } else { if (suspend) { @@ -74,10 +74,10 @@ void DtxHandlerImpl::end(u_int16_t /*ticket*/, } else { channel.endDtx(xid, false); } - dClient.endOk(XA_OK);//GRS, context.getRequestId()); + dClient.endOk(XA_OK); } } catch (const DtxTimeoutException& e) { - dClient.endOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + dClient.endOk(XA_RBTIMEOUT); } } @@ -95,9 +95,9 @@ void DtxHandlerImpl::start(u_int16_t /*ticket*/, } else { channel.startDtx(xid, broker.getDtxManager(), join); } - dClient.startOk(XA_OK);//GRS, context.getRequestId()); + dClient.startOk(XA_OK); } catch (const DtxTimeoutException& e) { - dClient.startOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + dClient.startOk(XA_RBTIMEOUT); } } @@ -108,9 +108,9 @@ void DtxHandlerImpl::prepare(u_int16_t /*ticket*/, { try { bool ok = broker.getDtxManager().prepare(xid); - cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId()); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - cClient.prepareOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + cClient.prepareOk(XA_RBTIMEOUT); } } @@ -120,9 +120,9 @@ void DtxHandlerImpl::commit(u_int16_t /*ticket*/, { try { bool ok = broker.getDtxManager().commit(xid, onePhase); - cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK);//GRS, context.getRequestId()); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - cClient.commitOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + cClient.commitOk(XA_RBTIMEOUT); } } @@ -132,9 +132,9 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/, { try { broker.getDtxManager().rollback(xid); - cClient.rollbackOk(XA_OK);//GRS, context.getRequestId()); + cClient.rollbackOk(XA_OK); } catch (const DtxTimeoutException& e) { - cClient.rollbackOk(XA_RBTIMEOUT);//GRS, context.getRequestId()); + cClient.rollbackOk(XA_RBTIMEOUT); } } @@ -171,7 +171,7 @@ void DtxHandlerImpl::recover(u_int16_t /*ticket*/, FieldTable response; response.setString("xids", data); - cClient.recoverOk(response);//GRS, context.getRequestId()); + cClient.recoverOk(response); } void DtxHandlerImpl::forget(u_int16_t /*ticket*/, @@ -184,7 +184,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, void DtxHandlerImpl::getTimeout(const string& xid) { uint32_t timeout = broker.getDtxManager().getTimeout(xid); - cClient.getTimeoutOk(timeout);//GRS, context.getRequestId()); + cClient.getTimeoutOk(timeout); } @@ -193,7 +193,7 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/, u_int32_t timeout) { broker.getDtxManager().setTimeout(xid, timeout); - cClient.setTimeoutOk();//GRS context.getRequestId()); + cClient.setTimeoutOk(); } void DtxHandlerImpl::setResponseTo(framing::RequestId r) diff --git a/cpp/src/qpid/broker/InMemoryContent.cpp b/cpp/src/qpid/broker/InMemoryContent.cpp index 96563b0329..a6ce820f7e 100644 --- a/cpp/src/qpid/broker/InMemoryContent.cpp +++ b/cpp/src/qpid/broker/InMemoryContent.cpp @@ -47,13 +47,13 @@ void InMemoryContent::send(ChannelAdapter& channel, uint32_t framesize) uint32_t offset = 0; for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) { string data = (*i)->getData().substr(offset, framesize); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); offset += framesize; } uint32_t remainder = (*i)->size() % framesize; if (remainder) { string data = (*i)->getData().substr(offset, remainder); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); } } else { AMQBody::shared_ptr contentBody = diff --git a/cpp/src/qpid/broker/LazyLoadedContent.cpp b/cpp/src/qpid/broker/LazyLoadedContent.cpp index c11d049317..80d06ebf2b 100644 --- a/cpp/src/qpid/broker/LazyLoadedContent.cpp +++ b/cpp/src/qpid/broker/LazyLoadedContent.cpp @@ -52,12 +52,12 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize) string data; store->loadContent(*msg, data, offset, remaining > framesize ? framesize : remaining); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); } } else { string data; store->loadContent(*msg, data, 0, expectedSize); - channel.send(new AMQContentBody(data)); + channel.send(make_shared_ptr(new AMQContentBody(data))); } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index c9fbc2b95d..de32368158 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -45,14 +45,14 @@ void MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - client.ok();//GRS context.getRequestId()); + client.ok(); } void MessageHandlerImpl::open(const string& reference) { references.open(reference); - client.ok();//GRS context.getRequestId()); + client.ok(); } void @@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context) { MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); references.get(body->getReference())->append(body); - client.ok();//GRS context.getRequestId()); + client.ok(); } void MessageHandlerImpl::close(const string& reference) { Reference::shared_ptr ref = references.get(reference); - client.ok();//GRS context.getRequestId()); + client.ok(); // Send any transfer messages to their correct exchanges and okay them const Reference::Messages& msgs = ref->getMessages(); @@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/, { // Initial implementation (which is conforming) is to do nothing here // and return offset zero for the resume - client.ok();//GRS context.getRequestId()); + client.ok(); } void @@ -95,7 +95,7 @@ MessageHandlerImpl::resume(const string& reference, // Initial (null) implementation // open reference and return 0 offset references.open(reference); - client.offset(0);//GRS, );//GRS, context.getRequestId()); + client.offset(0); } void @@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok();//GRS context.getRequestId()); + client.ok(); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -138,9 +138,9 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, GetAdapter out(adapter, queue, destination, connection.getFrameMax()); if(channel.get(out, queue, !noAck)) - client.ok();//GRS context.getRequestId()); + client.ok(); else - client.empty();//GRS context.getRequestId()); + client.empty(); } void @@ -166,22 +166,19 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok();//GRS context.getRequestId()); + client.ok(); } void MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - client.ok();//GRS context.getRequestId()); + client.ok(); } void -MessageHandlerImpl::reject(uint16_t /*code*/, - const string& /*text*/ ) +MessageHandlerImpl::reject(uint16_t /*code*/, const string& /*text*/ ) { - //channel.ack(); - // channel.requeue(); } void diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index a96a8c5cde..f616ec2db8 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -36,7 +36,7 @@ SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : void SemanticHandler::handle(framing::AMQFrame& frame) -{ +{ //TODO: assembly etc when move to 0-10 framing // //have potentially three separate tracks at this point: @@ -56,16 +56,41 @@ void SemanticHandler::handle(framing::AMQFrame& frame) //if ready to execute (i.e. if segment is complete or frame is //message content): handleBody(frame.getBody()); - //if the frameset is complete, we can move the execution-mark - //forward (not for execution controls) - //note: need to be more sophisticated than this if we execute - //commands that arrive within an active message frameset } //ChannelAdapter virtual methods: void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, const qpid::framing::MethodContext& context) { + if (!method->invoke(this)) { + //else do the usual: + handleL4(method, context); + //(if the frameset is complete) we can move the execution-mark + //forward + ++(incoming.hwm); + + //note: need to be more sophisticated than this if we execute + //commands that arrive within an active message frameset (that + //can't happen until 0-10 framing is implemented) + } +} + +void SemanticHandler::complete(u_int32_t mark) +{ + //just record it for now (will eventually need to use it to ack messages): + outgoing.lwm = SequenceNumber(mark); +} + +void SemanticHandler::flush() +{ + //flush doubles as a sync to begin with - send an execution.complete + incoming.lwm = incoming.hwm; + send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue()))); +} + +void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context) +{ try{ if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { if (!method->isA<ChannelCloseOkBody>()) { diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index eecf61466b..6003bbec0c 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -25,6 +25,7 @@ #include "BrokerChannel.h" #include "Connection.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceNumber.h" @@ -34,11 +35,18 @@ namespace broker { class BrokerAdapter; class framing::ChannelAdapter; -class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHandler { +class SemanticHandler : private framing::ChannelAdapter, + public framing::FrameHandler, + public framing::AMQP_ServerOperations::ExecutionHandler +{ Connection& connection; Channel channel; std::auto_ptr<BrokerAdapter> adapter; - framing::SequenceNumber executionMark; + framing::Window incoming; + framing::Window outgoing; + + void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, + const qpid::framing::MethodContext& context); //ChannelAdapter virtual methods: void handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, @@ -50,6 +58,10 @@ class SemanticHandler : private framing::ChannelAdapter, public framing::FrameHa public: SemanticHandler(framing::ChannelId id, Connection& c); void handle(framing::AMQFrame& frame); + + //execution class method handlers: + void complete(u_int32_t cumulativeExecutionMark); + void flush(); }; }} |