diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-13 17:29:16 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-13 17:29:16 +0000 |
commit | 0a1b3430450f274aee273a9f792a2d43f771b85f (patch) | |
tree | 71be3bc1a920a568c0680f8e8a5e802c1c3bee8d /cpp/src | |
parent | e00a1cfa3881e3bb8aadfecdf502f17903e319b1 (diff) | |
download | qpid-python-0a1b3430450f274aee273a9f792a2d43f771b85f.tar.gz |
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
27 files changed, 224 insertions, 179 deletions
diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index d96622cd4f..29e2256b56 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -126,6 +126,7 @@ bool Queue::acquire(const QueuedMessage& msg) { void Queue::requestDispatch(Consumer* c, bool sync){ if (!c || c->preAcquires()) { if (sync) { + Mutex::ScopedLock locker(messageLock); dispatch(); } else { serializer.execute(dispatchCallback); @@ -153,7 +154,9 @@ bool Queue::dispatch(QueuedMessage& msg){ int start = next; while(c){ next++; - if(c->deliver(msg)) return true; + if(c->deliver(msg)) { + return true; + } next = next % acquirers.size(); c = next == start ? 0 : acquirers[next]; } diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index a8a0745104..619d59f710 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -30,12 +30,15 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, const string _consumerTag, const DeliveryId _id, - bool _acquired) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - id(_id), - acquired(_acquired), - pull(false){} + bool _acquired, bool _confirmed) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + id(_id), + acquired(_acquired), + confirmed(_confirmed), + pull(false) +{ +} DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, @@ -44,11 +47,12 @@ DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, consumerTag(""), id(_id), acquired(true), + confirmed(false), pull(true){} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - if (acquired) { + if (acquired && !confirmed) { queue->dequeue(ctxt, msg.payload); } } @@ -70,24 +74,30 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const } void DeliveryRecord::redeliver(Session* const session) const{ - if(pull){ - //if message was originally sent as response to get, we must requeue it - requeue(); - }else{ - session->deliver(msg.payload, consumerTag, id); + if (!confirmed) { + if(pull){ + //if message was originally sent as response to get, we must requeue it + requeue(); + }else{ + session->deliver(msg.payload, consumerTag, id); + } } } void DeliveryRecord::requeue() const { - msg.payload->redeliver(); - queue->requeue(msg); + if (!confirmed) { + msg.payload->redeliver(); + queue->requeue(msg); + } } void DeliveryRecord::release() { - queue->requeue(msg); - acquired = false; + if (!confirmed) { + queue->requeue(msg); + acquired = false; + } } void DeliveryRecord::reject() diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 3caac6bf40..4d98b0c5da 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -45,11 +45,12 @@ class DeliveryRecord{ const std::string consumerTag; const DeliveryId id; bool acquired; + const bool confirmed; const bool pull; public: DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, - const DeliveryId id, bool acquired); + const DeliveryId id, bool acquired, bool confirmed = false); DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); void dequeue(TransactionContext* ctxt = 0) const; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 84d3478173..39f9f85c13 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -144,7 +144,7 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t if (isContentReleased()) { //load content from store in chunks of maxContentSize uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); - uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load? + uint64_t expectedSize(frames.getHeaders()->getContentLength()); for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) { uint64_t remaining = expectedSize - offset; @@ -153,11 +153,22 @@ void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t store->loadContent(*this, data, offset, remaining > maxContentSize ? maxContentSize : remaining); + frame.setBof(false); + if (offset > 0) { + frame.setBos(false); + } + if (remaining) { + frame.setEos(false); + frame.setEof(false); + } out.handle(frame); } } else { - SendContent f(out, channel, maxFrameSize); + Count c; + frames.map_if(c, TypeFilter(CONTENT_BODY)); + + SendContent f(out, channel, maxFrameSize, c.getCount()); frames.map_if(f, TypeFilter(CONTENT_BODY)); } } diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index b259aa6b8f..6471245ed9 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -39,7 +39,7 @@ namespace broker{ struct BaseToken : DeliveryToken { virtual ~BaseToken() {} - virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0; + virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0; }; struct BasicGetToken : BaseToken @@ -50,12 +50,11 @@ struct BasicGetToken : BaseToken BasicGetToken(Queue::shared_ptr q) : queue(q) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) { - channel.send(BasicGetOkBody( - channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), + return AMQFrame(0, BasicGetOkBody( + ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey(), queue->getMessageCount())); - } }; @@ -67,10 +66,10 @@ struct BasicConsumeToken : BaseToken BasicConsumeToken(const string c) : consumer(c) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) { - channel.send(BasicDeliverBody( - channel.getVersion(), consumer, id.getValue(), + return AMQFrame(0, BasicDeliverBody( + ProtocolVersion(), consumer, id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey())); } @@ -85,16 +84,13 @@ struct MessageDeliveryToken : BaseToken MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) : destination(d), confirmMode(c), acquireMode(a) {} - void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/) + AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/) { - //TODO; need to figure out how the acquire mode gets - //communicated (this is just a temporary solution) - channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode)); - //may need to set the redelivered flag: if (msg->getRedelivered()){ msg->getProperties<DeliveryProperties>()->setRedelivered(true); } + return AMQFrame(0, MessageTransferBody(ProtocolVersion(), 0, destination, confirmMode, acquireMode)); } }; @@ -127,11 +123,15 @@ void MessageDelivery::deliver(Message::shared_ptr msg, //another may well have the wrong headers; however we will only //have one content class for 0-10 proper + FrameHandler& handler = channel.getHandlers().out; + //send method boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token); - t->sendMethod(msg, channel, id); + AMQFrame method = t->sendMethod(msg, id); + method.setEof(false); + method.setChannel(channel.getId()); + handler.handle(method); - FrameHandler& handler = channel.getHandlers().out; msg->sendHeader(handler, channel.getId(), framesize); msg->sendContent(handler, channel.getId(), framesize); } diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f1bdc68899..ead2fad379 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -172,10 +172,11 @@ bool SemanticHandler::isOpen() const { DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) { Mutex::ScopedLock l(outLock); - SequenceNumber copy(outgoing.hwm); - ++copy; - MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax()); - return outgoing.hwm.getValue(); + //SequenceNumber copy(outgoing.hwm); + //++copy; + MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax()); + return outgoing.hwm; + //return outgoing.hwm.getValue(); } void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) diff --git a/cpp/src/qpid/broker/Session.cpp b/cpp/src/qpid/broker/Session.cpp index c98fdd6291..d3f82655d0 100644 --- a/cpp/src/qpid/broker/Session.cpp +++ b/cpp/src/qpid/broker/Session.cpp @@ -268,8 +268,8 @@ bool Session::ConsumerImpl::deliver(QueuedMessage& msg) DeliveryId deliveryTag = parent->deliveryAdapter->deliver(msg.payload, token); - if (ackExpected) { - parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire)); + if (windowing || ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected)); } } return !blocked; @@ -565,12 +565,14 @@ AckRange Session::findRange(DeliveryId first, DeliveryId last) ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); ack_iterator end = start; - if (first == last) { - //just acked single element (move end past it) - ++end; - } else { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + if (start != unacked.end()) { + if (first == last) { + //just acked single element (move end past it) + ++end; + } else { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } } return AckRange(start, end); } diff --git a/cpp/src/qpid/client/AckMode.h b/cpp/src/qpid/client/AckMode.h index 9ad5ef925c..f565c1d36b 100644 --- a/cpp/src/qpid/client/AckMode.h +++ b/cpp/src/qpid/client/AckMode.h @@ -1,72 +1,25 @@ #ifndef _client_AckMode_h #define _client_AckMode_h -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace client { - -/** - * The available acknowledgements modes. - * - * \ingroup clientapi - */ -enum AckMode { - /** No acknowledgement will be sent, broker can - discard messages as soon as they are delivered - to a consumer using this mode. **/ - NO_ACK = 0, - /** Each message will be automatically - acknowledged as soon as it is delivered to the - application **/ - AUTO_ACK = 1, - /** Acknowledgements will be sent automatically, - but not for each message. **/ - LAZY_ACK = 2, - /** The application is responsible for explicitly - acknowledging messages. **/ - CLIENT_ACK = 3 -}; - -}} // namespace qpid::client - - - -#endif /*!_client_AckMode_h*/ -#ifndef _client_AckMode_h -#define _client_AckMode_h /* * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ @@ -97,6 +50,4 @@ enum AckMode { }} // namespace qpid::client - - -#endif /*!_client_AckMode_h*/ +#endif diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp index c9b7a68f38..49e7285a47 100644 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -58,7 +58,7 @@ void ChannelHandler::incoming(AMQFrame& frame) if (body->getMethod()) handleMethod(body->getMethod()); else - throw new ConnectionException(504, "Channel not open."); + throw ConnectionException(504, "Channel not open for content."); } } @@ -68,7 +68,7 @@ void ChannelHandler::outgoing(AMQFrame& frame) frame.setChannel(id); out(frame); } else if (getState() == CLOSED) { - throw Exception("Channel not open"); + throw Exception(QPID_MSG("Channel not open, can't send " << frame)); } else if (getState() == CLOSED_BY_PEER) { throw ChannelException(code, text); } @@ -120,7 +120,7 @@ void ChannelHandler::handleMethod(AMQMethodBody* method) } //else just ignore it break; case CLOSED: - throw ConnectionException(504, "Channel not opened."); + throw ConnectionException(504, "Channel is closed."); default: throw Exception("Unexpected state encountered in ChannelHandler!"); } diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 87062e1470..0a85b8e4f0 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -69,7 +69,7 @@ Channel::~Channel() void Channel::open(const Session& s) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(stopLock); if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); active = true; @@ -80,7 +80,7 @@ void Channel::open(const Session& s) } bool Channel::isOpen() const { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(stopLock); return active; } @@ -146,7 +146,7 @@ void Channel::consume( Consumer& c = consumers[tag]; c.listener = listener; c.ackMode = ackMode; - c.lastDeliveryTag = 0; + c.count = 0; } uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1; ScopedSync s(session, synch); @@ -205,7 +205,7 @@ void Channel::close() { session.close(); { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(stopLock); active = false; } stop(); @@ -231,20 +231,18 @@ void Channel::join() { void Channel::dispatch(FrameSet& content, const std::string& destination) { - MessageListener* listener(0); - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(destination); - if (i != consumers.end()) { - Message msg; - msg.populate(content); - listener = i->second.listener; - } - } - if (listener) { + ConsumerMap::iterator i = consumers.find(destination); + if (i != consumers.end()) { Message msg; msg.populate(content); + MessageListener* listener = i->second.listener; listener->received(msg); + if (isOpen() && i->second.ackMode != CLIENT_ACK) { + bool send = i->second.ackMode == AUTO_ACK + || (prefetch && ++(i->second.count) > (prefetch / 2)); + if (send) i->second.count = 0; + session.execution().completed(content.getId(), true, send); + } } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination); } diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index b33af65d21..527f5d418f 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -63,8 +63,7 @@ class Channel : private sys::Runnable struct Consumer{ MessageListener* listener; AckMode ackMode; - int count; - u_int64_t lastDeliveryTag; + uint32_t count; }; typedef std::map<std::string, Consumer> ConsumerMap; @@ -75,7 +74,7 @@ class Channel : private sys::Runnable const bool transactional; framing::ProtocolVersion version; - sys::Mutex stopLock; + mutable sys::Mutex stopLock; bool running; ConsumerMap consumers; diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp index 9ef6857957..f30c92b992 100644 --- a/cpp/src/qpid/client/Correlator.cpp +++ b/cpp/src/qpid/client/Correlator.cpp @@ -25,14 +25,15 @@ using qpid::client::Correlator; using namespace qpid::framing; using namespace boost; -void Correlator::receive(AMQMethodBody* response) +bool Correlator::receive(const AMQMethodBody* response) { if (listeners.empty()) { - throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name + return false; } else { Listener l = listeners.front(); if (l) l(response); listeners.pop(); + return true; } } diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h index d93e7b66cd..45b22fb2fe 100644 --- a/cpp/src/qpid/client/Correlator.h +++ b/cpp/src/qpid/client/Correlator.h @@ -36,9 +36,9 @@ namespace client { class Correlator { public: - typedef boost::function<void(framing::AMQMethodBody*)> Listener; + typedef boost::function<void(const framing::AMQMethodBody*)> Listener; - void receive(framing::AMQMethodBody*); + bool receive(const framing::AMQMethodBody*); void listen(Listener l); private: diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 8ea2cc64e6..95cdc7032a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -62,19 +62,16 @@ void ExecutionHandler::handle(AMQFrame& frame) { AMQBody* body = frame.getBody(); if (!invoke(body, this)) { - if (isContentFrame(frame)) { - if (!arriving) { - arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); - } - arriving->append(frame); - if (arriving->isComplete()) { + if (!arriving) { + arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); + } + arriving->append(frame); + if (arriving->isComplete()) { + if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) { demux.handle(arriving); - arriving.reset(); - } - } else { - ++incoming.hwm; - correlation.receive(body->getMethod()); - } + } + arriving.reset(); + } } } @@ -168,11 +165,19 @@ void ExecutionHandler::sendCompletion() SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l) { + return send(command, l, false); +} + +SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent) +{ SequenceNumber id = ++outgoing.hwm; if(l) { completion.listenForResult(id, l); } AMQFrame frame(0/*channel will be filled in be channel handler*/, command); + if (hasContent) { + frame.setEof(false); + } out(frame); return id; } @@ -180,7 +185,7 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker: SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content, CompletionTracker::ResultListener l) { - SequenceNumber id = send(command, l); + SequenceNumber id = send(command, l, true); sendContent(content); return id; } @@ -188,14 +193,16 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten void ExecutionHandler::sendContent(const MethodContent& content) { AMQFrame header(0, content.getHeader()); - out(header); - + header.setBof(false); u_int64_t data_length = content.getData().length(); if(data_length > 0){ + header.setEof(false); + out(header); //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ AMQFrame frame(0, AMQContentBody(content.getData())); + frame.setBof(false); out(frame); }else{ u_int32_t offset = 0; @@ -204,10 +211,20 @@ void ExecutionHandler::sendContent(const MethodContent& content) u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(content.getData().substr(offset, length)); AMQFrame frame(0, AMQContentBody(frag)); - out(frame); + frame.setBof(false); + if (offset > 0) { + frame.setBos(false); + } offset += length; remaining = data_length - offset; + if (remaining) { + frame.setEos(false); + frame.setEof(false); + } + out(frame); } } + } else { + out(header); } } diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index 88424b555a..5f9cdff9d2 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -59,6 +59,7 @@ class ExecutionHandler : void sendCompletion(); + framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent); void sendContent(const framing::MethodContent&); public: diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index afdd35c5eb..73b7c3a7a6 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -35,7 +35,7 @@ AMQMethodBody* FutureResponse::getResponse(SessionCore& session) return response.get(); } -void FutureResponse::received(AMQMethodBody* r) +void FutureResponse::received(const AMQMethodBody* r) { Monitor::ScopedLock l(lock); response = *r; diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h index 1e8a7eb456..df3b7c6f30 100644 --- a/cpp/src/qpid/client/FutureResponse.h +++ b/cpp/src/qpid/client/FutureResponse.h @@ -36,7 +36,7 @@ class FutureResponse : public FutureCompletion framing::MethodHolder response; public: framing::AMQMethodBody* getResponse(SessionCore& session); - void received(framing::AMQMethodBody* response); + void received(const framing::AMQMethodBody* response); }; }} diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 52425f28b7..736c3f08ef 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -156,7 +156,9 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) std::ostream& operator<<(std::ostream& out, const AMQFrame& f) { - return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody() + return out << "Frame[" + //<< "B=" << f.getBof() << "E=" << f.getEof() << "b=" << f.getBos() << "e=" << f.getEos() << "; " + << "channel=" << f.getChannel() << "; " << *f.getBody() << "]"; } diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index a96b0483b7..f4aec72e4c 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -74,6 +74,17 @@ class AMQFrame : public AMQDataBlock void encode(Buffer& buffer) const; bool decode(Buffer& buffer); uint32_t size() const; + + bool getBof() const { return bof; } + void setBof(bool isBof) { bof = isBof; } + bool getEof() const { return eof; } + void setEof(bool isEof) { eof = isEof; } + + bool getBos() const { return bos; } + void setBos(bool isBos) { bos = isBos; } + bool getEos() const { return eos; } + void setEos(bool isEos) { eos = isEos; } + static uint32_t frameOverhead(); private: diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index 12579f53cb..129219e0a1 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -38,20 +38,13 @@ void FrameSet::append(AMQFrame& part) bool FrameSet::isComplete() const { - //TODO: should eventually use the 0-10 frame header flags when available + return !parts.empty() && parts.back().getEof(); +} + +bool FrameSet::isContentBearing() const +{ const AMQMethodBody* method = getMethod(); - if (!method) { - return false; - } else if (method->isContentBearing()) { - const AMQHeaderBody* header = getHeaders(); - if (header) { - return header->getContentLength() == getContentSize(); - } else { - return false; - } - } else { - return true; - } + return method && method->isContentBearing(); } const AMQMethodBody* FrameSet::getMethod() const diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index 9a9512a6d4..8ba22f07cb 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -50,6 +50,8 @@ public: void getContent(std::string&) const; std::string getContent() const; + bool isContentBearing() const; + const AMQMethodBody* getMethod() const; const AMQHeaderBody* getHeaders() const; AMQHeaderBody* getHeaders(); diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp index 568cc01665..573ebca9e2 100644 --- a/cpp/src/qpid/framing/SendContent.cpp +++ b/cpp/src/qpid/framing/SendContent.cpp @@ -21,31 +21,47 @@ #include "SendContent.h" -qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {} +qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c), + maxFrameSize(mfs), + expectedFrameCount(efc), frameCount(0) {} -void qpid::framing::SendContent::operator()(AMQFrame& f) const +void qpid::framing::SendContent::operator()(const AMQFrame& f) { + bool first = frameCount == 0; + bool last = ++frameCount == expectedFrameCount; + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); const AMQContentBody* body(f.castBody<AMQContentBody>()); if (body->size() > maxContentSize) { uint32_t offset = 0; for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) { - sendFragment(*body, offset, maxContentSize); + sendFragment(*body, offset, maxContentSize, first && offset == 0, last && offset + maxContentSize == body->size()); offset += maxContentSize; } uint32_t remainder = body->size() % maxContentSize; if (remainder) { - sendFragment(*body, offset, remainder); + sendFragment(*body, offset, remainder, first && offset == 0, last); } } else { AMQFrame copy(f); + setFlags(copy, first, last); copy.setChannel(channel); handler.handle(copy); } } -void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const +void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const { AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size))); + setFlags(fragment, first, last); handler.handle(fragment); } + +void qpid::framing::SendContent::setFlags(AMQFrame& f, bool first, bool last) const +{ + f.setBof(false); + f.setBos(first); + f.setEof(last); + f.setEos(last); +} + diff --git a/cpp/src/qpid/framing/SendContent.h b/cpp/src/qpid/framing/SendContent.h index a88319e2f9..05b5838c62 100644 --- a/cpp/src/qpid/framing/SendContent.h +++ b/cpp/src/qpid/framing/SendContent.h @@ -39,11 +39,14 @@ class SendContent mutable FrameHandler& handler; const uint16_t channel; const uint16_t maxFrameSize; + uint expectedFrameCount; + uint frameCount; - void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const; + void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const; + void setFlags(AMQFrame& f, bool first, bool last) const; public: - SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize); - void operator()(AMQFrame& f) const; + SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount); + void operator()(const AMQFrame& f); }; } diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index 4c2d06ae42..e0372b2f68 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -63,7 +63,10 @@ DeliveryProperties& TransferContent::getDeliveryProperties() void TransferContent::populate(const FrameSet& frameset) { - header = *frameset.getHeaders(); + const AMQHeaderBody* h = frameset.getHeaders(); + if (h) { + header = *h; + } frameset.getContent(data); } diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h index 3112da8e24..7b7e24b2b3 100644 --- a/cpp/src/qpid/framing/frame_functors.h +++ b/cpp/src/qpid/framing/frame_functors.h @@ -49,6 +49,15 @@ public: uint64_t getSize() { return size; } }; +class Count +{ + uint count; +public: + Count() : count(0) {} + void operator()(const AMQFrame&) { count++; } + uint getCount() { return count; } +}; + class EncodeFrame { Buffer& buffer; diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index 341fdf56f5..98f5bd92a8 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -126,6 +126,10 @@ class MessageBuilderTest : public CppUnit::TestCase AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0)); AMQFrame header(0, AMQHeaderBody()); AMQFrame content(0, AMQContentBody(data)); + method.setEof(false); + header.setBof(false); + header.setEof(false); + content.setBof(false); header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size()); header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); @@ -156,6 +160,12 @@ class MessageBuilderTest : public CppUnit::TestCase AMQFrame header(0, AMQHeaderBody()); AMQFrame content1(0, AMQContentBody(data1)); AMQFrame content2(0, AMQContentBody(data2)); + method.setEof(false); + header.setBof(false); + header.setEof(false); + content1.setBof(false); + content1.setEof(false); + content2.setBof(false); header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size()); header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key); diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index cddf3cb92a..38f5c76f54 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -114,6 +114,7 @@ int main(int argc, char** argv){ channel.consume(control, "c1", &listener, AckMode(args.ackmode)); cout << "topic_listener: Consuming." << endl; channel.run(); + cout << "topic_listener: run returned, closing connection" << endl; connection.close(); cout << "topic_listener: normal exit" << endl; } |