diff options
author | Alan Conway <aconway@apache.org> | 2007-02-06 15:01:45 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-02-06 15:01:45 +0000 |
commit | fbd97f554b04a109c95c01fe6ad538c5f50161af (patch) | |
tree | 0324d02ee4f8d6ca2387d1d3ff85bcd61a123a34 /cpp/lib | |
parent | 80b1b0b5f443bfb3c9d62a80e1419c224d0229d8 (diff) | |
download | qpid-python-fbd97f554b04a109c95c01fe6ad538c5f50161af.tar.gz |
* broker/Reference, tests/ReferenceTest: class representing a reference.
* broker/BrokerChannel.cpp (complete): get destination exchange from Message,
don't assume only one message in progress (could have multiple
references open.)
* broker/BrokerMessageMessage.cpp,.h: Contains transfer body and
vector of append bodies. Construct from Reference.
* broker/CompletionHandler.h: Extracted from BrokerMessage, used for
MessageMessage also.
* broker/ExchangeRegistry.cpp: Moved throw for missing exchanges to
registry.
* cpp/tests/start_broker: Increased wait time to 5 secs.
* cpp/tests/*: renamed DummyChannel as MockChannel.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@504172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib')
-rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 240 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 30 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 8 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.cpp | 51 | ||||
-rw-r--r-- | cpp/lib/broker/BrokerMessageMessage.h | 33 | ||||
-rw-r--r-- | cpp/lib/broker/CompletionHandler.h | 39 | ||||
-rw-r--r-- | cpp/lib/broker/ExchangeRegistry.cpp | 5 | ||||
-rw-r--r-- | cpp/lib/broker/Makefile.am | 2 | ||||
-rw-r--r-- | cpp/lib/broker/MessageBuilder.cpp | 5 | ||||
-rw-r--r-- | cpp/lib/broker/MessageBuilder.h | 11 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 88 | ||||
-rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.h | 23 | ||||
-rw-r--r-- | cpp/lib/broker/Reference.cpp | 56 | ||||
-rw-r--r-- | cpp/lib/broker/Reference.h | 111 | ||||
-rw-r--r-- | cpp/lib/common/framing/MethodContext.h | 5 |
15 files changed, 363 insertions, 344 deletions
diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index fa25221bbd..6f55f32d47 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -355,245 +355,5 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( assert(0); // FIXME aconway 2007-01-04: 0-9 feature } - -// -// Message class method handlers -// -void -BrokerAdapter::MessageHandlerImpl::append( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*bytes*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - - -void -BrokerAdapter::MessageHandlerImpl::cancel( u_int16_t channel, - const string& destination ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - connection.getChannel(channel).cancel(destination); - - connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::checkpoint( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::close( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::consume( u_int16_t channelId, - u_int16_t /*ticket*/, - const string& queueName, - const string& destination, - bool noLocal, - bool noAck, - bool exclusive, - const qpid::framing::FieldTable& filter ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - Channel& channel = connection.getChannel(channelId); - if(!destination.empty() && channel.exists(destination)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - connection.client->getMessageHandler()->ok(channelId); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - - connection.getChannel(channel).cancel(destination); - - connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::empty( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::get( u_int16_t channelId, - u_int16_t /*ticket*/, - const string& queueName, - const string& /*destination*/, - bool noAck ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - - // FIXME: get is probably Basic specific - if(!connection.getChannel(channelId).get(queue, !noAck)){ - - connection.client->getMessageHandler()->empty(channelId); - } - -} - -void -BrokerAdapter::MessageHandlerImpl::offset( u_int16_t /*channel*/, - u_int64_t /*value*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - Channel& channel = connection.getChannel(channelId); - if(!destination.empty() && channel.exists(destination)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - connection.client->getMessageHandler()->ok(channelId); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } - - connection.getChannel(channel).cancel(destination); - - connection.client->getMessageHandler()->ok(channel); -} - -void -BrokerAdapter::MessageHandlerImpl::ok( u_int16_t /*channel*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::open( u_int16_t /*channel*/, - const string& /*reference*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::qos( u_int16_t channel, - u_int32_t prefetchSize, - u_int16_t prefetchCount, - bool /*global*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - //TODO: handle global - connection.getChannel(channel).setPrefetchSize(prefetchSize); - connection.getChannel(channel).setPrefetchCount(prefetchCount); - - connection.client->getMessageHandler()->ok(channel); - - Queue::shared_ptr queue = connection.getQueue(queueName, channelId); - Channel& channel = connection.getChannel(channelId); - if(!destination.empty() && channel.exists(destination)){ - throw ConnectionException(530, "Consumer tags must be unique"); - } - - try{ - string newTag = destination; - channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - - connection.client->getMessageHandler()->ok(channelId); - - //allow messages to be dispatched if required as there is now a consumer: - queue->dispatch(); - }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); - } -} - -void -BrokerAdapter::MessageHandlerImpl::recover( u_int16_t channel, - bool requeue ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - connection.getChannel(channel).recover(requeue); - -} - -void -BrokerAdapter::MessageHandlerImpl::reject( u_int16_t /*channel*/, - u_int16_t /*code*/, - const string& /*text*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::resume( u_int16_t /*channel*/, - const string& /*reference*/, - const string& /*identifier*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature -} - -void -BrokerAdapter::MessageHandlerImpl::transfer( u_int16_t channel, - u_int16_t /*ticket*/, - const string& /*destination*/, - bool /*redelivered*/, - bool immediate, - u_int64_t /*ttl*/, - u_int8_t /*priority*/, - u_int64_t /*timestamp*/, - u_int8_t /*deliveryMode*/, - u_int64_t /*expiration*/, - const string& exchangeName, - const string& routingKey, - const string& /*messageId*/, - const string& /*correlationId*/, - const string& /*replyTo*/, - const string& /*contentType*/, - const string& /*contentEncoding*/, - const string& /*userId*/, - const string& /*appId*/, - const string& /*transactionId*/, - const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content /*body*/ ) -{ - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Exchange::shared_ptr exchange = exchangeName.empty() ? - connection.broker.getExchanges().getDefault() : connection.broker.getExchanges().get(exchangeName); - if(exchange){ - Message* msg = new Message(&connection, exchangeName, routingKey, false /*mandatory?*/, immediate); - connection.getChannel(channel).handlePublish(msg, exchange); - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); - } -} - }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 96215a60ed..c0250815e8 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -78,7 +78,7 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { - if(tag.empty()) tag = tagGenerator.generate(); + if(tag.empty()) tag = tagGenerator.generate(); ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception @@ -187,6 +187,8 @@ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } +// FIXME aconway 2007-02-05: Drop exchange member, calculate from +// message in ::complete(). void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ Message::shared_ptr message(_message); exchange = _exchange; @@ -207,19 +209,19 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { // TODO aconway 2007-01-17: Implement heartbeating. } -void Channel::complete(Message::shared_ptr& msg){ - if(exchange){ - if(transactional){ - TxPublish* deliverable = new TxPublish(msg); - exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); - txBuffer.enlist(new DeletingTxOp(deliverable)); - }else{ - DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); - } - exchange.reset(); - }else{ - std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; +void Channel::complete(Message::shared_ptr msg) { + Exchange::shared_ptr exchange = + connection.broker.getExchanges().get(msg->getExchange()); + assert(exchange.get()); + if(transactional) { + std::auto_ptr<TxPublish> deliverable(new TxPublish(msg)); + exchange->route(*deliverable, msg->getRoutingKey(), + &(msg->getHeaderProperties()->getHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable.release())); + } else { + DeliverableMessage deliverable(msg); + exchange->route(deliverable, msg->getRoutingKey(), + &(msg->getHeaderProperties()->getHeaders())); } } diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index dd95e944bb..484a4d64e3 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -36,6 +36,7 @@ #include <Prefetch.h> #include <TxBuffer.h> #include "framing/ChannelAdapter.h" +#include "CompletionHandler.h" namespace qpid { namespace broker { @@ -51,9 +52,8 @@ using framing::string; * Maintains state for an AMQP channel. Handles incoming and * outgoing messages for that channel. */ -class Channel : - public framing::ChannelAdapter, - private MessageBuilder::CompletionHandler +class Channel : public framing::ChannelAdapter, + public CompletionHandler { class ConsumerImpl : public virtual Consumer { @@ -96,7 +96,7 @@ class Channel : boost::scoped_ptr<BrokerAdapter> adapter; - virtual void complete(Message::shared_ptr& msg); + virtual void complete(Message::shared_ptr msg); void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); void cancel(consumer_iterator consumer); bool checkPrefetch(Message::shared_ptr& msg); diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index 4168ff639c..e2c4b94811 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -18,25 +18,39 @@ * under the License. * */ +#include <iostream> #include "BrokerMessageMessage.h" +#include "MessageTransferBody.h" +#include "MessageAppendBody.h" +#include "Reference.h" +using namespace std; using namespace qpid::broker; -MessageMessage::MessageMessage( - const qpid::framing::AMQMethodBody::shared_ptr _methodBody, - const std::string& _exchange, const std::string& _routingKey, - bool _mandatory, bool _immediate) : - Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody), - methodBody(_methodBody) -{ -} +MessageMessage::MessageMessage(TransferPtr transfer_) + : Message(transfer_->getExchange(), transfer_->getRoutingKey(), + transfer_->getMandatory(), transfer_->getImmediate(), + transfer_), + transfer(transfer_) +{} + +MessageMessage::MessageMessage(TransferPtr transfer_, const Reference& ref) + : Message(transfer_->getExchange(), transfer_->getRoutingKey(), + transfer_->getMandatory(), transfer_->getImmediate(), + transfer_), + transfer(transfer_), + appends(ref.getAppends()) +{} void MessageMessage::deliver( - framing::ChannelAdapter& /*out*/, + framing::ChannelAdapter& /*channel*/, const std::string& /*consumerTag*/, u_int64_t /*deliveryTag*/, u_int32_t /*framesize*/) { + // FIXME aconway 2007-02-05: + cout << "MessageMessage::deliver" << *transfer << " + " << appends.size() + << " appends." << endl; } void MessageMessage::sendGetOk( @@ -45,49 +59,50 @@ void MessageMessage::sendGetOk( u_int64_t /*deliveryTag*/, u_int32_t /*framesize*/) { + // FIXME aconway 2007-02-05: } bool MessageMessage::isComplete() { - return true; + return true; // FIXME aconway 2007-02-05: } u_int64_t MessageMessage::contentSize() const { - return 0; + return 0; // FIXME aconway 2007-02-05: } qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties() { - return 0; + return 0; // FIXME aconway 2007-02-05: } bool MessageMessage::isPersistent() { - return false; + return false; // FIXME aconway 2007-02-05: } const ConnectionToken* const MessageMessage::getPublisher() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedHeaderSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int32_t MessageMessage::encodedContentSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } u_int64_t MessageMessage::expectedContentSize() { - return 0; + return 0; // FIXME aconway 2007-02-05: } diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index cad5cf15b0..aa136863a1 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -21,23 +21,28 @@ * under the License. * */ - +#include <vector> #include "BrokerMessageBase.h" +#include "Reference.h" namespace qpid { + namespace framing { -class AMQMethodBody; +class MessageTransferBody; +class MessageApppendBody; } namespace broker { -class MessageMessage: public Message{ - const qpid::framing::AMQMethodBody::shared_ptr methodBody; +class Reference; +class MessageMessage: public Message{ public: - MessageMessage( - const framing::AMQMethodBody::shared_ptr methodBody, - const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); + typedef Reference::TransferPtr TransferPtr; + typedef Reference::AppendPtr AppendPtr; + typedef Reference::Appends Appends; + + MessageMessage(TransferPtr transfer); + MessageMessage(TransferPtr transfer, const Reference&); // Default destructor okay @@ -52,7 +57,7 @@ class MessageMessage: public Message{ u_int32_t framesize); bool isComplete(); - + u_int64_t contentSize() const; qpid::framing::BasicHeaderProperties* getHeaderProperties(); bool isPersistent(); @@ -62,10 +67,16 @@ class MessageMessage: public Message{ u_int32_t encodedHeaderSize(); u_int32_t encodedContentSize(); u_int64_t expectedContentSize(); + + TransferPtr getTransfer() { return transfer; } + const Appends& getAppends() { return appends; } + private: + + const TransferPtr transfer; + const Appends appends; }; -} -} +}} #endif /*!_broker_BrokerMessage_h*/ diff --git a/cpp/lib/broker/CompletionHandler.h b/cpp/lib/broker/CompletionHandler.h new file mode 100644 index 0000000000..9d51656282 --- /dev/null +++ b/cpp/lib/broker/CompletionHandler.h @@ -0,0 +1,39 @@ +#ifndef _broker_CompletionHandler_h +#define _broker_CompletionHandler_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +namespace qpid { +namespace broker { + +/** + * Callback interface to handle completion of a message. + */ +class CompletionHandler +{ + public: + virtual ~CompletionHandler(){} + virtual void complete(Message::shared_ptr) = 0; +}; + +}} // namespace qpid::broker + + + +#endif /*!_broker_CompletionHandler_h*/ diff --git a/cpp/lib/broker/ExchangeRegistry.cpp b/cpp/lib/broker/ExchangeRegistry.cpp index 7bf96c4544..3e5ed89b54 100644 --- a/cpp/lib/broker/ExchangeRegistry.cpp +++ b/cpp/lib/broker/ExchangeRegistry.cpp @@ -59,7 +59,10 @@ void ExchangeRegistry::destroy(const string& name){ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ Mutex::ScopedLock locker(lock); - return exchanges[name]; + Exchange::shared_ptr exchange =exchanges[name]; + if (!exchange) + throw ChannelException(404, "Exchange not found:" + name); + return exchange; } namespace diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index 064b592124..760c6d61e2 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -69,6 +69,8 @@ libqpidbroker_la_SOURCES = \ QueueRegistry.h \ RecoveryManager.cpp \ RecoveryManager.h \ + Reference.cpp \ + Reference.h \ ConnectionFactory.cpp \ ConnectionFactory.h \ Connection.cpp \ diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp index 41bf812d2d..69e771c793 100644 --- a/cpp/lib/broker/MessageBuilder.cpp +++ b/cpp/lib/broker/MessageBuilder.cpp @@ -27,7 +27,10 @@ using namespace qpid::broker; using namespace qpid::framing; using std::auto_ptr; -MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : +MessageBuilder::MessageBuilder(CompletionHandler* _handler, + MessageStore* const _store, + u_int64_t _stagingThreshold +) : handler(_handler), store(_store), stagingThreshold(_stagingThreshold) diff --git a/cpp/lib/broker/MessageBuilder.h b/cpp/lib/broker/MessageBuilder.h index 5b8516be42..f0b90a86cd 100644 --- a/cpp/lib/broker/MessageBuilder.h +++ b/cpp/lib/broker/MessageBuilder.h @@ -29,22 +29,19 @@ #include <AMQContentBody.h> #include <AMQHeaderBody.h> #include <BasicPublishBody.h> +#include "CompletionHandler.h" namespace qpid { namespace broker { class MessageBuilder{ public: - class CompletionHandler{ - public: - virtual void complete(Message::shared_ptr&) = 0; - virtual ~CompletionHandler(){} - }; MessageBuilder(CompletionHandler* _handler, MessageStore* const store = 0, u_int64_t stagingThreshold = 0); void initialise(Message::shared_ptr& msg); - void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header); - void addContent(qpid::framing::AMQContentBody::shared_ptr& content); + void setHeader(framing::AMQHeaderBody::shared_ptr& header); + void addContent(framing::AMQContentBody::shared_ptr& content); + Message::shared_ptr getMessage() { return message; } private: Message::shared_ptr message; CompletionHandler* handler; diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 71100996e7..30b69e4654 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -23,6 +23,8 @@ #include "Connection.h" #include "Broker.h" #include "BrokerMessageMessage.h" +#include "MessageAppendBody.h" +#include "MessageTransferBody.h" namespace qpid { namespace broker { @@ -33,23 +35,23 @@ using namespace framing; // Message class method handlers // void -MessageHandlerImpl::append(const MethodContext&, - const string& /*reference*/, +MessageHandlerImpl::append(const MethodContext& context, + const string& reference, const string& /*bytes*/ ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).append( + boost::shared_polymorphic_downcast<MessageAppendBody>( + context.methodBody)); + sendOk(context); } void -MessageHandlerImpl::cancel( const MethodContext& context, - const string& destination ) +MessageHandlerImpl::cancel(const MethodContext& context, + const string& destination ) { - //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - channel.cancel(destination); - - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -61,10 +63,11 @@ MessageHandlerImpl::checkpoint(const MethodContext&, } void -MessageHandlerImpl::close(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::close(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.get(reference).close(); + sendOk(context); } void @@ -88,13 +91,16 @@ MessageHandlerImpl::consume(const MethodContext& context, string newTag = destination; channel.consume(newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - connection.client->getMessageHandler()->ok(context); + sendOk(context); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); }catch(ExclusiveAccessException& e){ - if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted"); - else throw ChannelException(403, "Access would violate previously granted exclusivity"); + if(exclusive) + throw ChannelException(403, "Exclusive access cannot be granted"); + else + throw ChannelException( + 403, "Access would violate previously granted exclusivity"); } } @@ -133,14 +139,15 @@ MessageHandlerImpl::offset(const MethodContext&, void MessageHandlerImpl::ok( const MethodContext& ) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + // TODO aconway 2007-02-05: For HA, we can drop acked messages here. } void -MessageHandlerImpl::open(const MethodContext&, - const string& /*reference*/ ) +MessageHandlerImpl::open(const MethodContext& context, + const string& reference) { - assert(0); // FIXME astitcher 2007-01-11: 0-9 feature + references.open(reference); + sendOk(context); } void @@ -155,7 +162,7 @@ MessageHandlerImpl::qos(const MethodContext& context, channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getMessageHandler()->ok(context); + sendOk(context); } void @@ -189,14 +196,14 @@ MessageHandlerImpl::transfer(const MethodContext& context, u_int16_t /*ticket*/, const string& /*destination*/, bool /*redelivered*/, - bool immediate, + bool /* immediate */, u_int64_t /*ttl*/, u_int8_t /*priority*/, u_int64_t /*timestamp*/, u_int8_t /*deliveryMode*/, u_int64_t /*expiration*/, const string& exchangeName, - const string& routingKey, + const string& /* routingKey */, const string& /*messageId*/, const string& /*correlationId*/, const string& /*replyTo*/, @@ -208,27 +215,28 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*securityToken*/, const qpid::framing::FieldTable& /*applicationHeaders*/, qpid::framing::Content body, - bool mandatory ) + bool /* mandatory */ ) { //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature - - Exchange::shared_ptr exchange = exchangeName.empty() ? - broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); - if(exchange){ - if (body.isInline()) { - MessageMessage* msg = - new MessageMessage(context.methodBody, exchangeName, - routingKey, mandatory, immediate); - channel.handlePublish(msg, exchange); - - connection.client->getMessageHandler()->ok(context); - } else { - // Don't handle reference content yet - assert(body.isInline()); - } - }else{ - throw ChannelException(404, "Exchange not found '" + exchangeName + "'"); + MessageTransferBody::shared_ptr transfer = + boost::shared_polymorphic_downcast<MessageTransferBody>( + context.methodBody); + // Verify the exchange exists, will throw if not. + broker.getExchanges().get(exchangeName); + if (body.isInline()) { + MessageMessage* msg = new MessageMessage(transfer); + // FIXME aconway 2007-02-05: Remove exchange parameter. + // use shared_ptr for message. + channel.handlePublish(msg, Exchange::shared_ptr()); + sendOk(context); + } else { + references.get(body.getValue()).transfer(transfer); } } + +void MessageHandlerImpl::sendOk(const MethodContext& context) { + connection.client->getMessageHandler()->ok(context); +} + }} // namespace qpid::broker diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 985efe3847..886ca5fb54 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -19,23 +19,25 @@ * */ +#include <memory> + #include "AMQP_ServerOperations.h" +#include "Reference.h" +#include "BrokerChannel.h" namespace qpid { namespace broker { -class Channel; class Connection; class Broker; +class MessageMessage; -class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageHandler { - Channel& channel; - Connection& connection; - Broker& broker; - +class MessageHandlerImpl : + public framing::AMQP_ServerOperations::MessageHandler +{ public: MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b) {} + : channel(ch), connection(c), broker(b), references(ch) {} void append(const framing::MethodContext&, const std::string& reference, @@ -116,6 +118,13 @@ class MessageHandlerImpl : public qpid::framing::AMQP_ServerOperations::MessageH const framing::FieldTable& applicationHeaders, framing::Content body, bool mandatory ); + private: + void sendOk(const framing::MethodContext&); + + Channel& channel; + Connection& connection; + Broker& broker; + ReferenceRegistry references; }; }} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.cpp b/cpp/lib/broker/Reference.cpp new file mode 100644 index 0000000000..a5e734d77a --- /dev/null +++ b/cpp/lib/broker/Reference.cpp @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/bind.hpp> +#include "Reference.h" +#include "BrokerMessageMessage.h" +#include "QpidError.h" +#include "CompletionHandler.h" + +namespace qpid { +namespace broker { + +Reference& ReferenceRegistry::open(const Reference::Id& id) { + ReferenceMap::iterator i = references.find(id); + // TODO aconway 2007-02-05: should we throw Channel or Connection + // exceptions here? + if (i != references.end()) + THROW_QPID_ERROR(CLIENT_ERROR, "Attempt to re-open reference " +id); + return references[id] = Reference(id, this); +} + +Reference& ReferenceRegistry::get(const Reference::Id& id) { + ReferenceMap::iterator i = references.find(id); + if (i == references.end()) + THROW_QPID_ERROR( + CLIENT_ERROR, "Attempt to use non-existent reference "+id); + return i->second; +} + +void Reference::close() { + for_each(transfers.begin(), transfers.end(), + boost::bind(&Reference::complete, this, _1)); + registry->references.erase(getId()); +} + +void Reference::complete(TransferPtr transfer) { + MessageMessage::shared_ptr msg(new MessageMessage(transfer, *this)); + registry->handler.complete(msg); +} + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/Reference.h b/cpp/lib/broker/Reference.h new file mode 100644 index 0000000000..ecaca3de41 --- /dev/null +++ b/cpp/lib/broker/Reference.h @@ -0,0 +1,111 @@ +#ifndef _broker_Reference_h +#define _broker_Reference_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <string> +#include <vector> +#include <map> +#include <boost/shared_ptr.hpp> +#include <boost/range.hpp> + +namespace qpid { + +namespace framing { +class MessageTransferBody; +class MessageAppendBody; +} + +namespace broker { + +class CompletionHandler; +class ReferenceRegistry; + +/** + * A reference is an accumulation point for data in a multi-frame + * message. A reference can be used by multiple transfer commands, so + * the reference tracks which commands are using it. When the reference + * is closed, all the associated transfers are completed. + * + * THREAD UNSAFE: per-channel resource, access to channels is + * serialized. + */ +class Reference +{ + public: + typedef std::string Id; + typedef boost::shared_ptr<framing::MessageTransferBody> TransferPtr; + typedef std::vector<TransferPtr> Transfers; + typedef boost::shared_ptr<framing::MessageAppendBody> AppendPtr; + typedef std::vector<AppendPtr> Appends; + + Reference(const Id& id_=Id(), ReferenceRegistry* reg=0) + : id(id_), registry(reg) {} + + const std::string& getId() const { return id; } + + /** Add a transfer to be completed with this reference */ + void transfer(TransferPtr transfer) { transfers.push_back(transfer); } + + /** Append more data to the reference */ + void append(AppendPtr ptr) { appends.push_back(ptr); } + + /** Close the reference, complete each associated transfer */ + void close(); + + const Appends& getAppends() const { return appends; } + const Transfers& getTransfers() const { return transfers; } + + private: + void complete(TransferPtr transfer); + + Id id; + ReferenceRegistry* registry; + Transfers transfers; + Appends appends; +}; + + +/** + * A registry/factory for references. + * + * THREAD UNSAFE: per-channel resource, access to channels is + * serialized. + */ +class ReferenceRegistry { + public: + ReferenceRegistry(CompletionHandler& handler_) : handler(handler_) {}; + Reference& open(const Reference::Id& id); + Reference& get(const Reference::Id& id); + + private: + typedef std::map<Reference::Id, Reference> ReferenceMap; + CompletionHandler& handler; + ReferenceMap references; + + // Reference calls references.erase() and uses handler. + friend class Reference; +}; + + +}} // namespace qpid::broker + + + +#endif /*!_broker_Reference_h*/ diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index d9717d90a0..afb499023d 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -64,7 +64,10 @@ struct MethodContext }; // FIXME aconway 2007-02-01: Method context only required on Handler -// functions, not on Proxy functions. +// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*) +// on AMQBody and set it during decodeing then we could get rid of the context. + + }} // namespace qpid::framing |