summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
committerAlan Conway <aconway@apache.org>2006-12-01 05:11:45 +0000
commitfb9ad93a3d422c1e83c998f44c4782f7bf1d1a66 (patch)
treea2ebf932750bf13bf3db271f92df390335b0e844 /cpp/lib/broker/BrokerChannel.cpp
parent33c04c7e619a65e2d92ac231805e8ad27f4a29c2 (diff)
downloadqpid-python-fb9ad93a3d422c1e83c998f44c4782f7bf1d1a66.tar.gz
2006-12-01 Jim Meyering <meyering@redhat.com>
This delta imposes two major changes on the C++ hierarchy: - adds autoconf, automake, libtool support - makes the hierarchy flatter and renames a few files (e.g., Queue.h, Queue.cpp) that appeared twice, once under client/ and again under broker/. In the process, I've changed many #include directives, mostly to remove a qpid/ or qpid/framing/ prefix from the file name argument. Although most changes were to .cpp and .h files under qpid/cpp/, there were also several to template files under qpid/gentools, and even one to CppGenerator.java. Nearly all files are moved to a new position in the hierarchy. The new hierarchy looks like this: src # this is the new home of qpidd.cpp tests # all tests are here. See Makefile.am. gen # As before, all generated files go here. lib # This is just a container for the 3 lib dirs: lib/client lib/broker lib/common lib/common/framing lib/common/sys lib/common/sys/posix lib/common/sys/apr build-aux m4 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@481159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp257
1 files changed, 257 insertions, 0 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
new file mode 100644
index 0000000000..42e45dd291
--- /dev/null
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerChannel.h>
+#include <QpidError.h>
+#include <iostream>
+#include <sstream>
+#include <assert.h>
+
+using std::mem_fun_ref;
+using std::bind2nd;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
+ id(_id),
+ out(_out),
+ currentDeliveryTag(1),
+ transactional(false),
+ prefetchSize(0),
+ prefetchCount(0),
+ framesize(_framesize),
+ tagGenerator("sgen"),
+ store(0),
+ messageBuilder(this){
+
+ outstanding.reset();
+}
+
+Channel::~Channel(){
+}
+
+bool Channel::exists(const string& consumerTag){
+ return consumers.find(consumerTag) != consumers.end();
+}
+
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
+ if(tag.empty()) tag = tagGenerator.generate();
+ ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
+ try{
+ queue->consume(c, exclusive);//may throw exception
+ consumers[tag] = c;
+ }catch(ExclusiveAccessException& e){
+ delete c;
+ throw e;
+ }
+}
+
+void Channel::cancel(consumer_iterator i){
+ ConsumerImpl* c = i->second;
+ consumers.erase(i);
+ if(c){
+ c->cancel();
+ delete c;
+ }
+}
+
+void Channel::cancel(const string& tag){
+ consumer_iterator i = consumers.find(tag);
+ if(i != consumers.end()){
+ cancel(i);
+ }
+}
+
+void Channel::close(){
+ //cancel all consumers
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+ cancel(i);
+ }
+ //requeue:
+ recover(true);
+}
+
+void Channel::begin(){
+ transactional = true;
+}
+
+void Channel::commit(){
+ TxAck txAck(accumulatedAck, unacked);
+ txBuffer.enlist(&txAck);
+ if(txBuffer.prepare(store)){
+ txBuffer.commit();
+ }
+ accumulatedAck.clear();
+}
+
+void Channel::rollback(){
+ txBuffer.rollback();
+ accumulatedAck.clear();
+}
+
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+ Mutex::ScopedLock locker(deliveryLock);
+
+ u_int64_t deliveryTag = currentDeliveryTag++;
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
+ }
+ //send deliver method, header and content(s)
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
+}
+
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
+ return countOk && sizeOk;
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
+ Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack) : parent(_parent),
+ tag(_tag),
+ queue(_queue),
+ connection(_connection),
+ ackExpected(ack),
+ blocked(false){
+}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+ if(!connection || connection != msg->getPublisher()){//check for no_local
+ if(ackExpected && !parent->checkPrefetch(msg)){
+ blocked = true;
+ }else{
+ blocked = false;
+ parent->deliver(msg, tag, queue, ackExpected);
+ return true;
+ }
+ }
+ return false;
+}
+
+void Channel::ConsumerImpl::cancel(){
+ if(queue) queue->cancel(this);
+}
+
+void Channel::ConsumerImpl::requestDispatch(){
+ if(blocked) queue->dispatch();
+}
+
+void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
+ Message::shared_ptr message(_message);
+ exchange = _exchange;
+ messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+ messageBuilder.setHeader(header);
+ //at this point, decide based on the size of the message whether we want
+ //to stage it by saving content directly to disk as it arrives
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+ messageBuilder.addContent(content);
+}
+
+void Channel::complete(Message::shared_ptr& msg){
+ if(exchange){
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ }
+ exchange.reset();
+ }else{
+ std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
+ }
+}
+
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if(transactional){
+ accumulatedAck.update(deliveryTag, multiple);
+ //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: ...this may then necessitate dispatching to consumers
+ }else{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ if(i == unacked.end()){
+ throw InvalidAckException();
+ }else if(multiple){
+ ack_iterator end = ++i;
+ for_each(unacked.begin(), end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
+ unacked.erase(unacked.begin(), end);
+
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+ }else{
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
+ }
+
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
+ }
+}
+
+void Channel::recover(bool requeue){
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ if(requeue){
+ outstanding.reset();
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
+ }else{
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
+ }
+}
+
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Mutex::ScopedLock locker(deliveryLock);
+ u_int64_t myDeliveryTag = currentDeliveryTag++;
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ if(ackExpected){
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
+}