diff options
Diffstat (limited to 'cpp/src')
37 files changed, 1914 insertions, 1082 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d4293b70fd..0f8ec224cf 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -229,12 +229,21 @@ libqpidclient_la_SOURCES = \ qpid/client/ClientChannel.cpp \ qpid/client/ClientExchange.cpp \ qpid/client/ClientQueue.cpp \ - qpid/client/BasicMessageChannel.cpp \ qpid/client/Connector.cpp \ - qpid/client/IncomingMessage.cpp \ qpid/client/MessageListener.cpp \ qpid/client/ResponseHandler.cpp \ - qpid/client/ReturnedMessageHandler.cpp + qpid/client/ReturnedMessageHandler.cpp \ + qpid/client/Correlator.cpp \ + qpid/client/CompletionTracker.cpp \ + qpid/client/ChannelHandler.cpp \ + qpid/client/ConnectionHandler.cpp \ + qpid/client/ExecutionHandler.cpp \ + qpid/client/FutureCompletion.cpp \ + qpid/client/FutureResponse.cpp \ + qpid/client/FutureFactory.cpp \ + qpid/client/ReceivedContent.cpp \ + qpid/client/StateManager.cpp + nobase_include_HEADERS = \ $(platform_hdr) \ @@ -306,19 +315,29 @@ nobase_include_HEADERS = \ qpid/broker/TransactionalStore.h \ qpid/broker/TxAck.h \ qpid/client/AckMode.h \ - qpid/client/BasicMessageChannel.h \ qpid/client/ClientChannel.h \ qpid/client/ClientExchange.h \ qpid/client/ClientMessage.h \ qpid/client/ClientQueue.h \ qpid/client/Connection.h \ qpid/client/Connector.h \ - qpid/client/IncomingMessage.h \ qpid/client/MessageChannel.h \ qpid/client/MessageListener.h \ qpid/client/MethodBodyInstances.h \ qpid/client/ResponseHandler.h \ qpid/client/ReturnedMessageHandler.h \ + qpid/client/BlockingQueue.h \ + qpid/client/Correlator.h \ + qpid/client/CompletionTracker.h \ + qpid/client/ChannelHandler.h \ + qpid/client/ChainableFrameHandler.h \ + qpid/client/ConnectionHandler.h \ + qpid/client/ExecutionHandler.h \ + qpid/client/FutureCompletion.h \ + qpid/client/FutureResponse.h \ + qpid/client/FutureFactory.h \ + qpid/client/ReceivedContent.h \ + qpid/client/StateManager.h \ qpid/framing/AMQBody.h \ qpid/framing/AMQContentBody.h \ qpid/framing/AMQDataBlock.h \ diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index e9ec698400..27f484cfcb 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -68,7 +68,11 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ handleL4(method, context); //(if the frameset is complete) we can move the execution-mark //forward - ++(incoming.hwm); + + //temporary hack until channel management is moved to its own handler: + if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { + ++(incoming.hwm); + } //note: need to be more sophisticated than this if we execute //commands that arrive within an active message frameset (that @@ -175,8 +179,11 @@ RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action act Mutex::ScopedLock l(outLock); uint8_t type(body->type()); if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) { - ++outgoing.hwm; - //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; + //temporary hack until channel management is moved to its own handler: + if (dynamic_pointer_cast<AMQMethodBody>(body)->amqpClassId() != ChannelOpenBody::CLASS_ID) { + ++outgoing.hwm; + //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; + } } return ChannelAdapter::send(body, action); } diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp deleted file mode 100644 index 70cb473426..0000000000 --- a/cpp/src/qpid/client/BasicMessageChannel.cpp +++ /dev/null @@ -1,335 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/log/Statement.h" -#include "BasicMessageChannel.h" -#include "qpid/framing/AMQMethodBody.h" -#include "ClientChannel.h" -#include "ReturnedMessageHandler.h" -#include "MessageListener.h" -#include "qpid/framing/FieldTable.h" -#include "Connection.h" -#include <queue> -#include <iostream> -#include <boost/format.hpp> -#include <boost/variant.hpp> - -namespace qpid { -namespace client { - -using namespace std; -using namespace sys; -using namespace framing; -using boost::format; - -namespace { - -// Destination name constants -const std::string BASIC_GET("__basic_get__"); -const std::string BASIC_RETURN("__basic_return__"); - -// Reference name constant -const std::string BASIC_REF("__basic_reference__"); -} - -BasicMessageChannel::BasicMessageChannel(Channel& ch) - : channel(ch), returnsHandler(0) -{ - incoming.addDestination(BASIC_RETURN, destDispatch); -} - -void BasicMessageChannel::consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) -{ - { - // Note we create a consumer even if tag="". In that case - // It will be renamed when we handle BasicConsumeOkBody. - // - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i != consumers.end()) - THROW_QPID_ERROR(CLIENT_ERROR, - "Consumer already exists with tag="+tag); - Consumer& c = consumers[tag]; - c.listener = listener; - c.ackMode = ackMode; - c.lastDeliveryTag = 0; - } - - // FIXME aconway 2007-03-23: get processed in both. - - // BasicConsumeOkBody is really processed in handle(), here - // we just pick up the tag to return to the user. - // - // We can't process it here because messages for the consumer may - // already be arriving. - // - BasicConsumeOkBody::shared_ptr ok = - channel.sendAndReceiveSync<BasicConsumeOkBody>( - synch, - make_shared_ptr(new BasicConsumeBody( - channel.version, 0, queue.getName(), tag, noLocal, - ackMode == NO_ACK, false, !synch, - fields ? *fields : FieldTable()))); - tag = ok->getConsumerTag(); -} - - -void BasicMessageChannel::cancel(const std::string& tag, bool synch) { - Consumer c; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find(tag); - if (i == consumers.end()) - return; - c = i->second; - consumers.erase(i); - } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { - channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true))); - } - channel.sendAndReceiveSync<BasicCancelOkBody>( - synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); -} - -void BasicMessageChannel::close(){ - destGet.shutdown(); - destDispatch.shutdown(); -} - -void BasicMessageChannel::cancelAll(){ - Mutex::ScopedLock l(lock); - for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++) - { - Consumer& c = i->second; - if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) - { - channel.send(make_shared_ptr(new BasicAckBody(channel.version, c.lastDeliveryTag, true))); - } - channel.send(make_shared_ptr(new BasicCancelBody(channel.version, i->first, true))); - } - consumers.clear(); -} - -bool BasicMessageChannel::get( - Message& msg, const Queue& queue, AckMode ackMode) -{ - // Prepare for incoming response - incoming.addDestination(BASIC_GET, destGet); - channel.send(make_shared_ptr(new BasicGetBody(channel.version, 0, queue.getName(), ackMode))); - bool got = destGet.wait(msg); - return got; -} - -void BasicMessageChannel::publish( - const Message& msg, const Exchange& exchange, - const std::string& routingKey, bool mandatory, bool immediate) -{ - const string e = exchange.getName(); - string key = routingKey; - - // Make a header for the message - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy( - *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); - header->setContentSize(msg.getData().size()); - - channel.send(make_shared_ptr(new BasicPublishBody(channel.version, 0, e, key, mandatory, immediate))); - channel.send(header); - string data = msg.getData(); - u_int64_t data_length = data.length(); - if(data_length > 0){ - //frame itself uses 8 bytes - u_int32_t frag_size = channel.connection->getMaxFrameSize() - 8; - if(data_length < frag_size){ - channel.send(make_shared_ptr(new AMQContentBody(data))); - }else{ - u_int32_t offset = 0; - u_int32_t remaining = data_length - offset; - while (remaining > 0) { - u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); - channel.send(make_shared_ptr(new AMQContentBody(frag))); - - offset += length; - remaining = data_length - offset; - } - } - } -} - -void BasicMessageChannel::handle(boost::shared_ptr<AMQMethodBody> method) { - assert(method->amqpClassId() ==BasicGetBody::CLASS_ID); - switch(method->amqpMethodId()) { - case BasicGetOkBody::METHOD_ID: { - incoming.openReference(BASIC_REF); - incoming.createMessage(BASIC_GET, BASIC_REF); - return; - } - case BasicGetEmptyBody::METHOD_ID: { - incoming.getDestination(BASIC_GET).empty(); - incoming.removeDestination(BASIC_GET); - return; - } - case BasicDeliverBody::METHOD_ID: { - BasicDeliverBody::shared_ptr deliver= - boost::shared_polymorphic_downcast<BasicDeliverBody>(method); - incoming.openReference(BASIC_REF); - Message& msg = incoming.createMessage( - deliver->getConsumerTag(), BASIC_REF); - msg.setDestination(deliver->getConsumerTag()); - msg.setDeliveryTag(deliver->getDeliveryTag()); - msg.setRedelivered(deliver->getRedelivered()); - return; - } - case BasicConsumeOkBody::METHOD_ID: { - Mutex::ScopedLock l(lock); - BasicConsumeOkBody::shared_ptr consumeOk = - boost::shared_polymorphic_downcast<BasicConsumeOkBody>(method); - std::string tag = consumeOk->getConsumerTag(); - ConsumerMap::iterator i = consumers.find(std::string()); - if (i != consumers.end()) { - // Need to rename the un-named consumer. - if (consumers.find(tag) == consumers.end()) { - consumers[tag] = i->second; - consumers.erase(i); - } - else // Tag already exists. - throw ChannelException(404, "Tag already exists: "+tag); - } - // FIXME aconway 2007-03-23: Integrate consumer & destination - // maps. - incoming.addDestination(tag, destDispatch); - return; - } - } - throw Channel::UnknownMethod(); -} - -void BasicMessageChannel::handle(AMQHeaderBody::shared_ptr header) { - BasicHeaderProperties* props = - boost::polymorphic_downcast<BasicHeaderProperties*>( - header->getProperties()); - IncomingMessage::Reference& ref = incoming.getReference(BASIC_REF); - assert (ref.messages.size() == 1); - ref.messages.front().BasicHeaderProperties::operator=(*props); - incoming_size = header->getContentSize(); - if (incoming_size==0) - incoming.closeReference(BASIC_REF); -} - -void BasicMessageChannel::handle(AMQContentBody::shared_ptr content){ - incoming.appendReference(BASIC_REF, content->getData()); - size_t size = incoming.getReference(BASIC_REF).data.size(); - if (size >= incoming_size) { - incoming.closeReference(BASIC_REF); - if (size > incoming_size) - throw ChannelException(502, "Content exceeded declared size"); - } -} - -void BasicMessageChannel::deliver(Consumer& consumer, Message& msg){ - //record delivery tag: - consumer.lastDeliveryTag = msg.getDeliveryTag(); - - //allow registered listener to handle the message - consumer.listener->received(msg); - - if(channel.isOpen()){ - bool multiple(false); - switch(consumer.ackMode){ - case LAZY_ACK: - multiple = true; - if(++(consumer.count) < channel.getPrefetch()) - break; - //else drop-through - case AUTO_ACK: - consumer.lastDeliveryTag = 0; - channel.send(make_shared_ptr( - new BasicAckBody( - channel.version, - msg.getDeliveryTag(), - multiple))); - case NO_ACK: // Nothing to do - case CLIENT_ACK: // User code must ack. - break; - // TODO aconway 2007-02-22: Provide a way for user - // to ack! - } - } - - //as it stands, transactionality is entirely orthogonal to ack - //mode, though the acks will not be processed by the broker under - //a transaction until it commits. -} - - -void BasicMessageChannel::run() { - while(channel.isOpen()) { - try { - Message msg; - bool gotMessge = destDispatch.wait(msg); - if (gotMessge) { - if(msg.getDestination() == BASIC_RETURN) { - ReturnedMessageHandler* handler=0; - { - Mutex::ScopedLock l(lock); - handler=returnsHandler; - } - if(handler != 0) - handler->returned(msg); - } - else { - Consumer consumer; - { - Mutex::ScopedLock l(lock); - ConsumerMap::iterator i = consumers.find( - msg.getDestination()); - if(i == consumers.end()) - THROW_QPID_ERROR(PROTOCOL_ERROR+504, - "Unknown consumer tag=" + - msg.getDestination()); - consumer = i->second; - } - deliver(consumer, msg); - } - } - } - catch (const ShutdownException&) { - // Orderly shutdown. - } - catch (const Exception& e) { - std::cout << "Error caught by dispatch thread: " << e.what() << std::endl; - // FIXME aconway 2007-02-20: Report exception to user. - QPID_LOG(error, e.what()); - } - } -} - -void BasicMessageChannel::setReturnedMessageHandler(ReturnedMessageHandler* handler){ - Mutex::ScopedLock l(lock); - returnsHandler = handler; -} - -void BasicMessageChannel::setQos(){ - channel.send(make_shared_ptr(new BasicQosBody(channel.version, 0, channel.getPrefetch(), false))); - if(channel.isTransactional()) - channel.send(make_shared_ptr(new TxSelectBody(channel.version))); -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/BasicMessageChannel.h b/cpp/src/qpid/client/BasicMessageChannel.h deleted file mode 100644 index 99838321ae..0000000000 --- a/cpp/src/qpid/client/BasicMessageChannel.h +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef _client_BasicMessageChannel_h -#define _client_BasicMessageChannel_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "MessageChannel.h" -#include "IncomingMessage.h" -#include <boost/scoped_ptr.hpp> - -namespace qpid { -namespace client { -/** - * Messaging implementation using AMQP 0-8 BasicMessageChannel class - * to send and receiving messages. - */ -class BasicMessageChannel : public MessageChannel -{ - public: - BasicMessageChannel(Channel& parent); - - void consume( - Queue& queue, std::string& tag, MessageListener* listener, - AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, - const framing::FieldTable* fields = 0); - - void cancel(const std::string& tag, bool synch = true); - - bool get(Message& msg, const Queue& queue, AckMode ackMode = NO_ACK); - - void publish(const Message& msg, const Exchange& exchange, - const std::string& routingKey, - bool mandatory = false, bool immediate = false); - - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - void run(); - - void handle(boost::shared_ptr<framing::AMQMethodBody>); - - void handle(shared_ptr<framing::AMQHeaderBody>); - - void handle(shared_ptr<framing::AMQContentBody>); - - void setQos(); - - void close(); - - void cancelAll(); - - private: - - struct Consumer{ - MessageListener* listener; - AckMode ackMode; - int count; - u_int64_t lastDeliveryTag; - }; - typedef std::map<std::string, Consumer> ConsumerMap; - - void deliver(Consumer& consumer, Message& msg); - - sys::Mutex lock; - Channel& channel; - IncomingMessage incoming; - uint64_t incoming_size; - ConsumerMap consumers ; - ReturnedMessageHandler* returnsHandler; - IncomingMessage::WaitableDestination destGet; - IncomingMessage::WaitableDestination destDispatch; -}; - -}} // namespace qpid::client - - - -#endif /*!_client_BasicMessageChannel_h*/ diff --git a/cpp/src/qpid/client/BlockingQueue.h b/cpp/src/qpid/client/BlockingQueue.h new file mode 100644 index 0000000000..7081b76b68 --- /dev/null +++ b/cpp/src/qpid/client/BlockingQueue.h @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _BlockingQueue_ +#define _BlockingQueue_ + +#include <queue> +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace client { + +struct QueueClosed {}; + +template <class T> +class BlockingQueue +{ + sys::Monitor lock; + std::queue<T> queue; + bool closed; + +public: + + BlockingQueue() : closed(false) {} + + void reset() + { + sys::Monitor::ScopedLock l(lock); + closed = true; + } + + T pop() + { + sys::Monitor::ScopedLock l(lock); + while (!closed && queue.empty()) { + lock.wait(); + } + if (closed) { + throw QueueClosed(); + } else { + T t = queue.front(); + queue.pop(); + return t; + } + } + + void push(T t) + { + sys::Monitor::ScopedLock l(lock); + bool wasEmpty = queue.empty(); + queue.push(t); + if (wasEmpty) { + lock.notifyAll(); + } + } + + void close() + { + sys::Monitor::ScopedLock l(lock); + closed = true; + lock.notifyAll(); + } +}; + +}} + + + +#endif diff --git a/cpp/src/qpid/client/ChainableFrameHandler.h b/cpp/src/qpid/client/ChainableFrameHandler.h new file mode 100644 index 0000000000..29e16d53dc --- /dev/null +++ b/cpp/src/qpid/client/ChainableFrameHandler.h @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _ChainableFrameHandler_ +#define _ChainableFrameHandler_ + +#include <boost/function.hpp> +#include "qpid/framing/AMQFrame.h" + +namespace qpid { +namespace client { + +struct ChainableFrameHandler +{ + typedef boost::function<void(framing::AMQFrame&)> FrameDelegate; + + FrameDelegate in; + FrameDelegate out; + + ChainableFrameHandler() {} + ChainableFrameHandler(FrameDelegate i, FrameDelegate o): in(i), out(o) {} + virtual ~ChainableFrameHandler() {} +}; + +}} + + + +#endif diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp new file mode 100644 index 0000000000..a6aea438f0 --- /dev/null +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ChannelHandler.h" +#include "qpid/framing/amqp_framing.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace boost; + +ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {} + +void ChannelHandler::incoming(AMQFrame& frame) +{ + AMQBody::shared_ptr body = frame.getBody(); + if (getState() == OPEN) { + if (isA<ChannelCloseBody>(body)) { + ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body)); + setState(CLOSED); + if (onClose) { + onClose(method->getReplyCode(), method->getReplyText()); + } + } else { + try { + in(frame); + }catch(ChannelException& e){ + if (body->type() == METHOD_BODY) { + AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body)); + close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + } else { + close(e.code, e.toString(), 0, 0); + } + } + } + } else { + if (body->type() == METHOD_BODY) { + handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); + } else { + throw new ConnectionException(504, "Channel not open."); + } + + } +} + +void ChannelHandler::outgoing(AMQFrame& frame) +{ + if (getState() == OPEN) { + frame.channel = id; + out(frame); + } else { + throw Exception("Channel not open"); + } +} + +void ChannelHandler::open(uint16_t _id) +{ + id = _id; + + setState(OPENING); + AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version))); + out(f); + + std::set<int> states; + states.insert(OPEN); + states.insert(CLOSED); + waitFor(states); + if (getState() != OPEN) { + throw Exception("Failed to open channel."); + } +} + +void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) +{ + setState(CLOSING); + AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId))); + out(f); +} + +void ChannelHandler::close() +{ + close(200, "OK", 0, 0); + waitFor(CLOSED); +} + +void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method) +{ + switch (getState()) { + case OPENING: + if (method->isA<ChannelOpenOkBody>()) { + setState(OPEN); + } else { + throw ConnectionException(504, "Channel not opened."); + } + break; + case CLOSING: + if (method->isA<ChannelCloseOkBody>()) { + setState(CLOSED); + } //else just ignore it + break; + case CLOSED: + throw ConnectionException(504, "Channel not opened."); + default: + throw Exception("Unexpected state encountered in ChannelHandler!"); + } +} diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h new file mode 100644 index 0000000000..eaa7e7cc72 --- /dev/null +++ b/cpp/src/qpid/client/ChannelHandler.h @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ChannelHandler_ +#define _ChannelHandler_ + +#include "StateManager.h" +#include "ChainableFrameHandler.h" +#include "qpid/framing/amqp_framing.h" + +namespace qpid { +namespace client { + +class ChannelHandler : private StateManager, public ChainableFrameHandler +{ + enum STATES {OPENING, OPEN, CLOSING, CLOSED}; + framing::ProtocolVersion version; + uint16_t id; + + void handleMethod(framing::AMQMethodBody::shared_ptr method); + + template <class T> bool isA(framing::AMQBody::shared_ptr body) { + return body->type() == framing::METHOD_BODY && + boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>(); + } + + + void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId); + + +public: + typedef boost::function<void(uint16_t, const std::string&)> CloseListener; + + ChannelHandler(); + + void incoming(framing::AMQFrame& frame); + void outgoing(framing::AMQFrame& frame); + + void open(uint16_t id); + void close(); + + CloseListener onClose; +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 19b4726a72..8b85017ba0 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -24,9 +24,12 @@ #include "qpid/sys/Monitor.h" #include "ClientMessage.h" #include "qpid/QpidError.h" -#include "MethodBodyInstances.h" #include "Connection.h" -#include "BasicMessageChannel.h" +#include "ConnectionHandler.h" +#include "FutureResponse.h" +#include "MessageListener.h" +#include <boost/format.hpp> +#include <boost/bind.hpp> // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -45,18 +48,13 @@ const std::string empty; }} -Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) : +Channel::Channel(bool _transactional, u_int16_t _prefetch) : connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false) { - switch (mode) { - case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break; - default: assert(0); QPID_ERROR(INTERNAL_ERROR, "Invalid interop-mode."); - } } Channel::~Channel(){ closeInternal(); - stop(); } void Channel::open(ChannelId id, Connection& con) @@ -64,65 +62,15 @@ void Channel::open(ChannelId id, Connection& con) if (isOpen()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel "+id); connection = &con; - init(id, con, con.getVersion()); // ChannelAdapter initialization. - string oob; - if (id != 0) - sendAndReceive<ChannelOpenOkBody>(make_shared_ptr(new ChannelOpenBody(version, oob))); -} + channelId = id; + //link up handlers: + channelHandler.out = boost::bind(&ConnectionHandler::outgoing, &(connection->handler), _1); + channelHandler.in = boost::bind(&ExecutionHandler::handle, &executionHandler, _1); + executionHandler.out = boost::bind(&ChannelHandler::outgoing, &channelHandler, _1); + //set up close notification: + channelHandler.onClose = boost::bind(&Channel::peerClose, this, _1, _2); -void Channel::protocolInit( - const std::string& uid, const std::string& pwd, const std::string& vhost) { - assert(connection); - responses.expect(); - connection->connector->init(); // Send ProtocolInit block. - ConnectionStartBody::shared_ptr connectionStart = - responses.receive<ConnectionStartBody>(); - - FieldTable props; - string mechanism("PLAIN"); - string response = ((char)0) + uid + ((char)0) + pwd; - string locale("en_US"); - ConnectionTuneBody::shared_ptr proposal = - sendAndReceive<ConnectionTuneBody>( - make_shared_ptr(new ConnectionStartOkBody( - version, //connectionStart->getRequestId(), - props, mechanism, - response, locale))); - - /** - * Assume for now that further challenges will not be required - //receive connection.secure - responses.receive(connection_secure)); - //send connection.secure-ok - connection->send(new AMQFrame(0, new ConnectionSecureOkBody(response))); - **/ - - sendCommand(make_shared_ptr(new ConnectionTuneOkBody( - version, //proposal->getRequestId(), - proposal->getChannelMax(), connection->getMaxFrameSize(), - proposal->getHeartbeat()))); - - uint16_t heartbeat = proposal->getHeartbeat(); - connection->connector->setReadTimeout(heartbeat * 2); - connection->connector->setWriteTimeout(heartbeat); - - // Send connection open. - std::string capabilities; - responses.expect(); - sendCommand(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, true))); - //receive connection.open-ok (or redirect, but ignore that for now - //esp. as using force=true). - AMQMethodBody::shared_ptr openResponse = responses.receive(); - if(openResponse->isA<ConnectionOpenOkBody>()) { - //ok - }else if(openResponse->isA<ConnectionRedirectBody>()){ - //ignore for now - ConnectionRedirectBody::shared_ptr redirect( - shared_polymorphic_downcast<ConnectionRedirectBody>(openResponse)); - QPID_LOG(error, "Ignoring redirect to " << redirect->getHost()); - } else { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Bad response to Connection.open"); - } + channelHandler.open(id); } bool Channel::isOpen() const { @@ -131,7 +79,11 @@ bool Channel::isOpen() const { } void Channel::setQos() { - messaging->setQos(); + executionHandler.send(make_shared_ptr(new BasicQosBody(version, 0, getPrefetch(), false))); + if(isTransactional()) { + //I think this is wrong! should only send TxSelect once... + executionHandler.send(make_shared_ptr(new TxSelectBody(version))); + } } void Channel::setPrefetch(uint16_t _prefetch){ @@ -143,14 +95,12 @@ void Channel::declareExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); string type = exchange.getType(); FieldTable args; - send(make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args))); - if (synch) synchWithServer(); + sendSync(synch, make_shared_ptr(new ExchangeDeclareBody(version, 0, name, type, empty, false, false, false, args))); } void Channel::deleteExchange(Exchange& exchange, bool synch){ string name = exchange.getName(); - send(make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false))); - if (synch) synchWithServer(); + sendSync(synch, make_shared_ptr(new ExchangeDeleteBody(version, 0, name, false))); } void Channel::declareQueue(Queue& queue, bool synch){ @@ -179,131 +129,41 @@ void Channel::deleteQueue(Queue& queue, bool ifunused, bool ifempty, bool synch) void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ string e = exchange.getName(); string q = queue.getName(); - send(make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args))); - if (synch) synchWithServer(); + sendSync(synch, make_shared_ptr(new QueueBindBody(version, 0, q, e, key, args))); } void Channel::commit(){ - send(make_shared_ptr(new TxCommitBody(version))); + executionHandler.send(make_shared_ptr(new TxCommitBody(version))); } void Channel::rollback(){ - send(make_shared_ptr(new TxRollbackBody(version))); + executionHandler.send(make_shared_ptr(new TxRollbackBody(version))); } -void Channel::handleMethodInContext( -AMQMethodBody::shared_ptr method, const MethodContext& ctxt) +void Channel::close() { - // Special case for consume OK as it is both an expected response - // and needs handling in this thread. - if (method->isA<BasicConsumeOkBody>()) { - messaging->handle(method); - responses.signalResponse(method); - return; - } - if(responses.isWaiting()) { - responses.signalResponse(method); - return; - } - try { - switch (method->amqpClassId()) { - case MessageTransferBody::CLASS_ID: - case BasicGetOkBody::CLASS_ID: messaging->handle(method); break; - case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break; - case ConnectionCloseBody::CLASS_ID: handleConnection(method); break; - case ExecutionCompleteBody::CLASS_ID: handleExecution(method); break; - default: throw UnknownMethod(); - } - } - catch (const UnknownMethod&) { - connection->close( - 504, "Unknown method", - method->amqpClassId(), method->amqpMethodId()); - } - } - -void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& /*ctxt*/) { - switch (method->amqpMethodId()) { - case ChannelCloseBody::METHOD_ID: - sendCommand(make_shared_ptr(new ChannelCloseOkBody(version/*, ctxt.getRequestId()*/))); - peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method)); - return; - case ChannelFlowBody::METHOD_ID: - // FIXME aconway 2007-02-22: Not yet implemented. - return; - } - throw UnknownMethod(); -} - -void Channel::handleConnection(AMQMethodBody::shared_ptr method) { - if (method->amqpMethodId() == ConnectionCloseBody::METHOD_ID) { - connection->close(); - return; - } - throw UnknownMethod(); -} - -void Channel::handleExecution(AMQMethodBody::shared_ptr method) { - if (method->amqpMethodId() == ExecutionCompleteBody::METHOD_ID) { - Monitor::ScopedLock l(outgoingMonitor); - //record the completion mark: - outgoing.lwm = shared_polymorphic_downcast<ExecutionCompleteBody>(method)->getCumulativeExecutionMark(); - //TODO: notify anyone waiting for completion notification: - outgoingMonitor.notifyAll(); - } else{ - throw UnknownMethod(); - } -} - -void Channel::handleHeader(AMQHeaderBody::shared_ptr body){ - messaging->handle(body); -} - -void Channel::handleContent(AMQContentBody::shared_ptr body){ - messaging->handle(body); -} - -void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Channel received heartbeat"); -} - -void Channel::start(){ - running = true; - dispatcher = Thread(*messaging); -} - -// Close called by local application. -void Channel::close( - uint16_t code, const std::string& text, - ClassId classId, MethodId methodId) -{ - if (isOpen()) { - try { - if (getId() != 0) { - if (code == 200) messaging->cancelAll(); - - sendAndReceive<ChannelCloseOkBody>( - make_shared_ptr(new ChannelCloseBody( - version, code, text, classId, methodId))); - } - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - } catch (...) { - static_cast<ConnectionForChannel*>(connection)->erase(getId()); - closeInternal(); - throw; + channelHandler.close(); + { + Mutex::ScopedLock l(lock); + if (connection); + { + connection->erase(channelId); + connection = 0; } } stop(); } + // Channel closed by peer. -void Channel::peerClose(ChannelCloseBody::shared_ptr reason) { +void Channel::peerClose(uint16_t code, const std::string& message) { assert(isOpen()); //record reason: - errorCode = reason->getReplyCode(); - errorText = reason->getReplyText(); + errorCode = code; + errorText = message; closeInternal(); + stop(); + futures.close(code, message); } void Channel::closeInternal() { @@ -311,26 +171,26 @@ void Channel::closeInternal() { if (connection); { connection = 0; - messaging->close(); - // A 0 response means we are closed. - responses.signalResponse(AMQMethodBody::shared_ptr()); } } -void Channel::stop() { - Mutex::ScopedLock l(stopLock); - if(running) { - dispatcher.join(); - running = false; - } +AMQMethodBody::shared_ptr Channel::sendAndReceive(AMQMethodBody::shared_ptr toSend, ClassId /*c*/, MethodId /*m*/) +{ + + boost::shared_ptr<FutureResponse> fr(futures.createResponse()); + executionHandler.send(toSend, boost::bind(&FutureResponse::completed, fr), boost::bind(&FutureResponse::received, fr, _1)); + return fr->getResponse(); } -AMQMethodBody::shared_ptr Channel::sendAndReceive( - AMQMethodBody::shared_ptr toSend, ClassId c, MethodId m) +void Channel::sendSync(bool sync, AMQMethodBody::shared_ptr command) { - responses.expect(); - sendCommand(toSend); - return responses.receive(c, m); + if(sync) { + boost::shared_ptr<FutureCompletion> fc(futures.createCompletion()); + executionHandler.send(command, boost::bind(&FutureCompletion::completed, fc)); + fc->waitForCompletion(); + } else { + executionHandler.send(command); + } } AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( @@ -339,68 +199,138 @@ AMQMethodBody::shared_ptr Channel::sendAndReceiveSync( if(sync) return sendAndReceive(body, c, m); else { - sendCommand(body); + executionHandler.send(body); return AMQMethodBody::shared_ptr(); } } void Channel::consume( - Queue& queue, std::string& tag, MessageListener* listener, + Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode, bool noLocal, bool synch, const FieldTable* fields) { - messaging->consume(queue, tag, listener, ackMode, noLocal, synch, fields); + + if (tag.empty()) { + throw Exception("A tag must be specified for a consumer."); + } + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + Consumer& c = consumers[tag]; + c.listener = listener; + c.ackMode = ackMode; + c.lastDeliveryTag = 0; + } + sendAndReceiveSync<BasicConsumeOkBody>( + synch, + make_shared_ptr(new BasicConsumeBody( + version, 0, queue.getName(), tag, noLocal, + ackMode == NO_ACK, false, !synch, + fields ? *fields : FieldTable()))); } void Channel::cancel(const std::string& tag, bool synch) { - messaging->cancel(tag, synch); + Consumer c; + { + Mutex::ScopedLock l(lock); + ConsumerMap::iterator i = consumers.find(tag); + if (i == consumers.end()) + return; + c = i->second; + consumers.erase(i); + } + sendAndReceiveSync<BasicCancelOkBody>( + synch, make_shared_ptr(new BasicCancelBody(version, tag, !synch))); } bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { - bool result = messaging->get(msg, queue, ackMode); - if (!isOpen()) { - throw ChannelException(errorCode, errorText); + + AMQMethodBody::shared_ptr request(new BasicGetBody(version, 0, queue.getName(), ackMode)); + AMQMethodBody::shared_ptr response = sendAndReceive(request); + if (response && response->isA<BasicGetEmptyBody>()) { + return false; + } else { + ReceivedContent::shared_ptr content = gets.pop(); + content->populate(msg); + return true; } - return result; } void Channel::publish(const Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate) { - messaging->publish(msg, exchange, routingKey, mandatory, immediate); -} - -void Channel::setReturnedMessageHandler(ReturnedMessageHandler* handler) { - messaging->setReturnedMessageHandler(handler); -} -void Channel::run() { - messaging->run(); + const string e = exchange.getName(); + string key = routingKey; + + executionHandler.sendContent(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate)), + msg, msg.getData(), connection->getMaxFrameSize());//sending framesize here is horrible, fix this! + /* + // Make a header for the message + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + BasicHeaderProperties::copy( + *static_cast<BasicHeaderProperties*>(header->getProperties()), msg); + header->setContentSize(msg.getData().size()); + + executionHandler.send(make_shared_ptr(new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + executionHandler.sendContent(header); + string data = msg.getData(); + u_int64_t data_length = data.length(); + if(data_length > 0){ + //frame itself uses 8 bytes + u_int32_t frag_size = connection->getMaxFrameSize() - 8; + if(data_length < frag_size){ + executionHandler.sendContent(make_shared_ptr(new AMQContentBody(data))); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(data.substr(offset, length)); + executionHandler.sendContent(make_shared_ptr(new AMQContentBody(frag))); + + offset += length; + remaining = data_length - offset; + } + } + } + */ } -void Channel::sendCommand(AMQBody::shared_ptr body) -{ - ++(outgoing.hwm); - send(body); +void Channel::start(){ + running = true; + dispatcher = Thread(*this); } -bool Channel::waitForCompletion(SequenceNumber poi, Duration timeout) -{ - AbsTime end; - if (timeout == 0) { - end = AbsTime::FarFuture(); - } else { - end = AbsTime(AbsTime::now(), timeout); - } - - Monitor::ScopedLock l(outgoingMonitor); - while (end > AbsTime::now() && outgoing.lwm < poi) { - outgoingMonitor.wait(end); +void Channel::stop() { + executionHandler.received.close(); + gets.close(); + Mutex::ScopedLock l(stopLock); + if(running) { + dispatcher.join(); + running = false; } - return !(outgoing.lwm < poi); } -bool Channel::synchWithServer(Duration timeout) -{ - send(make_shared_ptr(new ExecutionFlushBody(version))); - return waitForCompletion(outgoing.hwm, timeout); +void Channel::run() { + try { + while (true) { + ReceivedContent::shared_ptr content = executionHandler.received.pop(); + //need to dispatch this to the relevant listener: + if (content->isA<BasicDeliverBody>()) { + ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag()); + if (i != consumers.end()) { + Message msg; + content->populate(msg); + i->second.listener->received(msg); + } else { + QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod()); + } + } else if (content->isA<BasicGetOkBody>()) { + gets.push(content); + } else { + QPID_LOG(warning, "Dropping unsupported message type: " << content->getMethod()); + } + } + } catch (const QueueClosed&) {} } - diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index fc82fb41ff..4853603281 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -26,10 +26,12 @@ #include "ClientExchange.h" #include "ClientMessage.h" #include "ClientQueue.h" -#include "ResponseHandler.h" +#include "ChannelHandler.h" +#include "ExecutionHandler.h" +#include "FutureFactory.h" #include "qpid/Exception.h" -#include "qpid/framing/ChannelAdapter.h" -#include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "AckMode.h" @@ -54,19 +56,23 @@ class ReturnedMessageHandler; * * \ingroup clientapi */ -class Channel : public framing::ChannelAdapter +class Channel : private sys::Runnable { private: struct UnknownMethod {}; typedef shared_ptr<framing::AMQMethodBody> MethodPtr; + + struct Consumer{ + MessageListener* listener; + AckMode ackMode; + int count; + u_int64_t lastDeliveryTag; + }; + typedef std::map<std::string, Consumer> ConsumerMap; mutable sys::Mutex lock; - boost::scoped_ptr<MessageChannel> messaging; Connection* connection; sys::Thread dispatcher; - ResponseHandler responses; - sys::Monitor outgoingMonitor; - framing::Window outgoing; uint16_t prefetch; const bool transactional; @@ -78,32 +84,29 @@ class Channel : public framing::ChannelAdapter sys::Mutex stopLock; bool running; - void stop(); + ConsumerMap consumers; + ExecutionHandler executionHandler; + ChannelHandler channelHandler; + framing::ChannelId channelId; + BlockingQueue<ReceivedContent::shared_ptr> gets; + FutureFactory futures; - void handleHeader(framing::AMQHeaderBody::shared_ptr body); - void handleContent(framing::AMQContentBody::shared_ptr body); - void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body); - void handleMethodInContext( - framing::AMQMethodBody::shared_ptr, const framing::MethodContext&); - void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt); - void handleConnection(framing::AMQMethodBody::shared_ptr method); - void handleExecution(framing::AMQMethodBody::shared_ptr method); + void stop(); void setQos(); - - void protocolInit( - const std::string& uid, const std::string& pwd, - const std::string& vhost); framing::AMQMethodBody::shared_ptr sendAndReceive( framing::AMQMethodBody::shared_ptr, - framing::ClassId, framing::MethodId); + framing::ClassId = 0, framing::MethodId = 0); framing::AMQMethodBody::shared_ptr sendAndReceiveSync( bool sync, framing::AMQMethodBody::shared_ptr, framing::ClassId, framing::MethodId); + void sendSync(bool sync, framing::AMQMethodBody::shared_ptr body); + + template <class BodyType> boost::shared_ptr<BodyType> sendAndReceive(framing::AMQMethodBody::shared_ptr body) { return boost::shared_polymorphic_downcast<BodyType>( @@ -118,21 +121,16 @@ class Channel : public framing::ChannelAdapter sync, body, BodyType::CLASS_ID, BodyType::METHOD_ID)); } - void sendCommand(framing::AMQBody::shared_ptr body); - void open(framing::ChannelId, Connection&); void closeInternal(); - void peerClose(boost::shared_ptr<framing::ChannelCloseBody>); - bool waitForCompletion(framing::SequenceNumber, sys::Duration); - + void peerClose(uint16_t, const std::string&); + // FIXME aconway 2007-02-23: Get rid of friendships. - friend class Connection; - friend class BasicMessageChannel; // for sendAndReceive. - friend class MessageMessageChannel; // for sendAndReceive. + friend class Connection; + friend class BasicMessageChannel; // for sendAndReceive. + friend class MessageMessageChannel; // for sendAndReceive. public: - enum InteropMode { AMQP_08, AMQP_09 }; - /** * Creates a channel object. * @@ -143,16 +141,10 @@ class Channel : public framing::ChannelAdapter * @param prefetch specifies the number of unacknowledged * messages the channel is willing to have sent to it * asynchronously - * - * @param messageImpl Alternate messaging implementation class to - * allow alternate protocol implementations of messaging - * operations. Takes ownership. */ - Channel( - bool transactional = false, u_int16_t prefetch = 500, - InteropMode=AMQP_08); + Channel(bool transactional = false, u_int16_t prefetch = 500); - ~Channel(); + ~Channel(); /** * Declares an exchange. @@ -254,12 +246,10 @@ class Channel : public framing::ChannelAdapter void start(); /** - * Close the channel with optional error information. - * Closing a channel that is not open has no effect. + * Close the channel. Closing a channel that is not open has no + * effect. */ - void close( - framing::ReplyCode = 200, const std::string& ="OK", - framing::ClassId = 0, framing::MethodId = 0); + void close(); /** True if the channel is transactional */ bool isTransactional() { return transactional; } @@ -301,7 +291,7 @@ class Channel : public framing::ChannelAdapter * is received from the broker */ void consume( - Queue& queue, std::string& tag, MessageListener* listener, + Queue& queue, const std::string& tag, MessageListener* listener, AckMode ackMode = NO_ACK, bool noLocal = false, bool synch = true, const framing::FieldTable* fields = 0); @@ -353,22 +343,9 @@ class Channel : public framing::ChannelAdapter bool mandatory = false, bool immediate = false); /** - * Set a handler for this channel that will process any - * returned messages - * - * @see publish() - */ - void setReturnedMessageHandler(ReturnedMessageHandler* handler); - - /** - * Deliver messages from the broker to the appropriate MessageListener. + * Deliver incoming messages to the appropriate MessageListener. */ void run(); - - /** - * TESTING ONLY FOR NOW! - */ - bool synchWithServer(sys::Duration timeout = 0); }; }} diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index 4b8f32a26f..c998ec30df 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -46,11 +46,13 @@ const std::string Connection::OK("OK"); Connection::Connection( bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version -) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), + ) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), defaultConnector(version, _debug, _max_frame_size), isOpen(false), debug(_debug) { setConnector(defaultConnector); + + handler.maxFrameSize = _max_frame_size; } Connection::~Connection(){} @@ -58,7 +60,7 @@ Connection::~Connection(){} void Connection::setConnector(Connector& con) { connector = &con; - connector->setInputHandler(this); + connector->setInputHandler(&handler); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); @@ -70,10 +72,19 @@ void Connection::open( { if (isOpen) THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + + //wire up the handler: + handler.in = boost::bind(&Connection::received, this, _1); + handler.out = boost::bind(&Connector::send, connector, _1); + handler.onClose = boost::bind(&Connection::closeChannels, this); + + handler.uid = uid; + handler.pwd = pwd; + handler.vhost = vhost; + connector->connect(host, port); - channels[0] = &channel0; - channel0.open(0, *this); - channel0.protocolInit(uid, pwd, vhost); + connector->init(); + handler.waitForOpen(); isOpen = true; } @@ -87,14 +98,12 @@ void Connection::shutdown() { } void Connection::close( - ReplyCode code, const string& msg, ClassId classId, MethodId methodId + ReplyCode /*code*/, const string& /*msg*/, ClassId /*classId*/, MethodId /*methodId*/ ) { if(markClosed()) { try { - channel0.sendAndReceive<ConnectionCloseOkBody>( - make_shared_ptr(new ConnectionCloseBody( - getVersion(), code, msg, classId, methodId))); + handler.close(); } catch (const std::exception& e) { QPID_LOG(error, "Exception closing channel: " << e.what()); } @@ -138,35 +147,16 @@ void Connection::erase(ChannelId id) { void Connection::received(AMQFrame& frame){ ChannelId id = frame.getChannel(); Channel* channel = channels[id]; - if (channel == 0) - THROW_QPID_ERROR( - PROTOCOL_ERROR+504, - (boost::format("Invalid channel number %g") % id).str()); - try{ - channel->getHandlers().in->handle(frame); - }catch(const qpid::QpidError& e){ - std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl; - channelException( - *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e); + if (channel == 0) { + throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); } + channel->channelHandler.incoming(frame); } void Connection::send(AMQFrame& frame) { out->send(frame); } -void Connection::channelException( - Channel& channel, AMQMethodBody* method, const QpidError& e) -{ - int code = (e.code >= PROTOCOL_ERROR) ? e.code - PROTOCOL_ERROR : 500; - string msg = e.msg; - if(method == 0) - channel.close(code, msg); - else - channel.close( - code, msg, method->amqpClassId(), method->amqpMethodId()); -} - void Connection::idleIn(){ connector->close(); } diff --git a/cpp/src/qpid/client/CompletionTracker.cpp b/cpp/src/qpid/client/CompletionTracker.cpp new file mode 100644 index 0000000000..996971dbd2 --- /dev/null +++ b/cpp/src/qpid/client/CompletionTracker.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "CompletionTracker.h" + +using qpid::client::CompletionTracker; +using namespace qpid::framing; +using namespace boost; + +CompletionTracker::CompletionTracker() {} +CompletionTracker::CompletionTracker(const SequenceNumber& m) : mark(m) {} + + +void CompletionTracker::completed(const SequenceNumber& _mark) +{ + sys::Mutex::ScopedLock l(lock); + mark = _mark; + while (!listeners.empty() && !(listeners.front().first > mark)) { + Listener f(listeners.front().second); + { + sys::Mutex::ScopedUnlock u(lock); + f(); + } + listeners.pop(); + } +} + +void CompletionTracker::listen(const SequenceNumber& point, Listener listener) +{ + if (!add(point, listener)) { + listener(); + } +} + +bool CompletionTracker::add(const SequenceNumber& point, Listener listener) +{ + sys::Mutex::ScopedLock l(lock); + if (point < mark) { + return false; + } else { + listeners.push(make_pair(point, listener)); + return true; + } +} + + diff --git a/cpp/src/qpid/client/CompletionTracker.h b/cpp/src/qpid/client/CompletionTracker.h new file mode 100644 index 0000000000..30999b4184 --- /dev/null +++ b/cpp/src/qpid/client/CompletionTracker.h @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <queue> +#include <boost/function.hpp> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Mutex.h" + +#ifndef _CompletionTracker_ +#define _CompletionTracker_ + +namespace qpid { +namespace client { + +class CompletionTracker +{ +public: + typedef boost::function<void()> Listener; + + CompletionTracker(); + CompletionTracker(const framing::SequenceNumber& mark); + void completed(const framing::SequenceNumber& mark); + void listen(const framing::SequenceNumber& point, Listener l); + +private: + sys::Mutex lock; + framing::SequenceNumber mark; + std::queue< std::pair<framing::SequenceNumber, Listener> > listeners; + + bool add(const framing::SequenceNumber& point, Listener l); +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 51434fcefd..4d32456c40 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -26,6 +26,7 @@ #include "qpid/QpidError.h" #include "ClientChannel.h" #include "Connector.h" +#include "ConnectionHandler.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -79,17 +80,15 @@ class Connection : public ConnectionForChannel framing::ProtocolVersion version; const uint32_t max_frame_size; ChannelMap channels; + ConnectionHandler handler; Connector defaultConnector; Connector* connector; framing::OutputHandler* out; bool isOpen; sys::Mutex shutdownLock; - Channel channel0; bool debug; void erase(framing::ChannelId); - void channelException( - Channel&, framing::AMQMethodBody*, const QpidError&); void closeChannels(); bool markClosed(); @@ -174,7 +173,7 @@ class Connection : public ConnectionForChannel inline uint32_t getMaxFrameSize(){ return max_frame_size; } /** @return protocol version in use on this connection. */ - framing::ProtocolVersion getVersion() const { return version; } + //framing::ProtocolVersion getVersion() const { return version; } }; }} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp new file mode 100644 index 0000000000..ada3fa4fb0 --- /dev/null +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionHandler.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/amqp_framing.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace boost; + +namespace { +const std::string OK("OK"); +} + +ConnectionHandler::ConnectionHandler() + : StateManager(NOT_STARTED) +{ + + mechanism = "PLAIN"; + locale = "en_US"; + heartbeat = 0; + maxChannels = 32767; + maxFrameSize = 65536; + insist = true; + version = framing::highestProtocolVersion; + + ESTABLISHED.insert(FAILED); + ESTABLISHED.insert(OPEN); +} + +void ConnectionHandler::incoming(AMQFrame& frame) +{ + if (getState() == CLOSED) { + throw Exception("Connection is closed."); + } + + AMQBody::shared_ptr body = frame.getBody(); + if (frame.getChannel() == 0) { + if (body->type() == METHOD_BODY) { + handle(shared_polymorphic_cast<AMQMethodBody>(body)); + } else { + error(503, "Cannot send content on channel zero."); + } + } else { + switch(getState()) { + case OPEN: + try { + in(frame); + }catch(ConnectionException& e){ + error(e.code, e.toString(), body); + }catch(std::exception& e){ + error(541/*internal error*/, e.what(), body); + } + break; + case CLOSING: + QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored."); + break; + default: + //must be in connection initialisation: + fail("Cannot receive frames on non-zero channel until connection is established."); + } + } +} + +void ConnectionHandler::outgoing(AMQFrame& frame) +{ + if (getState() == OPEN) { + out(frame); + } else { + throw Exception("Connection is not open."); + } +} + +void ConnectionHandler::waitForOpen() +{ + waitFor(ESTABLISHED); + if (getState() == FAILED) { + throw Exception("Failed to establish connection."); + } +} + +void ConnectionHandler::close() +{ + setState(CLOSING); + send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0))); + + waitFor(CLOSED); +} + +void ConnectionHandler::send(framing::AMQBody::shared_ptr body) +{ + AMQFrame f; + f.setBody(body); + out(f); +} + +void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) +{ + setState(CLOSING); + send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId))); +} + +void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body) +{ + if (body->type() == METHOD_BODY) { + AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body)); + error(code, message, method->amqpClassId(), method->amqpMethodId()); + } else { + error(code, message); + } +} + + +void ConnectionHandler::fail(const std::string& message) +{ + QPID_LOG(error, message); + setState(FAILED); +} + +void ConnectionHandler::handle(AMQMethodBody::shared_ptr method) +{ + switch (getState()) { + case NOT_STARTED: + if (method->isA<ConnectionStartBody>()) { + setState(NEGOTIATING); + string response = ((char)0) + uid + ((char)0) + pwd; + send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale))); + } else { + fail("Bad method sequence, expected connection-start."); + } + break; + case NEGOTIATING: + if (method->isA<ConnectionTuneBody>()) { + ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method)); + heartbeat = proposal->getHeartbeat(); + maxChannels = proposal->getChannelMax(); + send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat))); + setState(OPENING); + send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist))); + //TODO: support for further security challenges + //} else if (method->isA<ConnectionSecureBody>()) { + } else { + fail("Unexpected method sequence, expected connection-tune."); + } + break; + case OPENING: + if (method->isA<ConnectionOpenOkBody>()) { + setState(OPEN); + //TODO: support for redirection + //} else if (method->isA<ConnectionRedirectBody>()) { + } else { + fail("Unexpected method sequence, expected connection-open-ok."); + } + break; + case OPEN: + if (method->isA<ConnectionCloseBody>()) { + send(make_shared_ptr(new ConnectionCloseOkBody(version))); + setState(CLOSED); + if (onClose) { + onClose(); + } + } else { + error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); + } + break; + case CLOSING: + if (method->isA<ConnectionCloseOkBody>()) { + setState(CLOSED); + if (onClose) { + onClose(); + } + } else { + QPID_LOG(warning, "Received frame on channel zero while closing connection; frame ignored."); + } + break; + } +} diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h new file mode 100644 index 0000000000..50618b50b1 --- /dev/null +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ConnectionHandler_ +#define _ConnectionHandler_ + +#include "Connector.h" +#include "StateManager.h" +#include "ChainableFrameHandler.h" +#include "qpid/framing/InputHandler.h" + +namespace qpid { +namespace client { + +struct ConnectionProperties +{ + std::string uid; + std::string pwd; + std::string vhost; + framing::FieldTable properties; + std::string mechanism; + std::string locale; + std::string capabilities; + uint16_t heartbeat; + uint16_t maxChannels; + uint64_t maxFrameSize; + bool insist; + framing::ProtocolVersion version; +}; + +class ConnectionHandler : private StateManager, + public ConnectionProperties, + public ChainableFrameHandler, + public framing::InputHandler +{ + enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; + std::set<int> ESTABLISHED; + + void handle(framing::AMQMethodBody::shared_ptr method); + void send(framing::AMQBody::shared_ptr body); + void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); + void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body); + void fail(const std::string& message); + +public: + typedef boost::function<void()> CloseListener; + + ConnectionHandler(); + + void received(framing::AMQFrame& f) { incoming(f); } + + void incoming(framing::AMQFrame& frame); + void outgoing(framing::AMQFrame& frame); + + void waitForOpen(); + void close(); + + CloseListener onClose; +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp new file mode 100644 index 0000000000..edb16bbcee --- /dev/null +++ b/cpp/src/qpid/client/Correlator.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Correlator.h" + +using qpid::client::Correlator; +using namespace qpid::framing; +using namespace boost; + +void Correlator::receive(AMQMethodBody::shared_ptr response) +{ + if (listeners.empty()) { + throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name + } else { + Listener l = listeners.front(); + if (l) l(response); + listeners.pop(); + } +} + +void Correlator::listen(Listener l) +{ + listeners.push(l); +} + + diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h new file mode 100644 index 0000000000..339c9bd0c4 --- /dev/null +++ b/cpp/src/qpid/client/Correlator.h @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <memory> +#include <queue> +#include <set> +#include <boost/function.hpp> +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/sys/Monitor.h" + +#ifndef _Correlator_ +#define _Correlator_ + +namespace qpid { +namespace client { + + +class Correlator +{ +public: + typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener; + + void receive(framing::AMQMethodBody::shared_ptr); + void listen(Listener l); + +private: + std::queue<Listener> listeners; +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp new file mode 100644 index 0000000000..e4270f4e98 --- /dev/null +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -0,0 +1,159 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ExecutionHandler.h" +#include "qpid/Exception.h" +#include "qpid/framing/BasicDeliverBody.h" +#include "qpid/framing/MessageTransferBody.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace boost; + +bool isMessageMethod(AMQMethodBody::shared_ptr method) +{ + return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>(); +} + +bool isMessageMethod(AMQBody::shared_ptr body) +{ + return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body)); +} + +bool isContentFrame(AMQFrame& frame) +{ + AMQBody::shared_ptr body = frame.getBody(); + uint8_t type = body->type(); + return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); +} + +bool invoke(AMQBody::shared_ptr body, Invocable* target) +{ + return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target); +} + +ExecutionHandler::ExecutionHandler() : version(framing::highestProtocolVersion) {} + +//incoming: +void ExecutionHandler::handle(AMQFrame& frame) +{ + AMQBody::shared_ptr body = frame.getBody(); + if (!invoke(body, this)) { + if (isContentFrame(frame)) { + if (!arriving) { + arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm)); + } + arriving->append(body); + if (arriving->isComplete()) { + received.push(arriving); + arriving.reset(); + } + } else { + ++incoming.hwm; + correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body)); + } + } +} + +void ExecutionHandler::complete(uint32_t cumulative, SequenceNumberSet range) +{ + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + completion.completed(outgoing.lwm); + } + if (range.size() % 2) { //must be even number + throw ConnectionException(530, "Received odd number of elements in ranged mark"); + } else { + //TODO: need to manage (record and accumulate) ranges such + //that we can implictly move the mark when appropriate + + //TODO: signal listeners of early notification? + } +} + +void ExecutionHandler::flush() +{ + //send completion + incoming.lwm = incoming.hwm; + //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); +} + +void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g) +{ + //allocate id: + ++outgoing.hwm; + //register listeners if necessary: + if (f) { + completion.listen(outgoing.hwm, f); + } + if (g) { + correlation.listen(g); + } + + AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command); + out(frame); + + if (f) { + AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version))); + out(frame); + } +} + +void ExecutionHandler::sendContent(framing::AMQBody::shared_ptr content) +{ + AMQFrame frame(version, 0/*id will be filled in be channel handler*/, content); + out(frame); +} + +void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data, + uint64_t frameSize, + CompletionTracker::Listener f, Correlator::Listener g) +{ + send(command, f, g); + + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers); + header->setContentSize(data.size()); + AMQFrame h(version, 0, header); + out(h); + + u_int64_t data_length = data.length(); + if(data_length > 0){ + //frame itself uses 8 bytes + u_int32_t frag_size = frameSize - 8; + if(data_length < frag_size){ + AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data))); + out(frame); + }else{ + u_int32_t offset = 0; + u_int32_t remaining = data_length - offset; + while (remaining > 0) { + u_int32_t length = remaining > frag_size ? frag_size : remaining; + string frag(data.substr(offset, length)); + AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag))); + out(frame); + offset += length; + remaining = data_length - offset; + } + } + } +} diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h new file mode 100644 index 0000000000..99b0f4b915 --- /dev/null +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ExecutionHandler_ +#define _ExecutionHandler_ + +#include <queue> +#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/SequenceNumber.h" +#include "BlockingQueue.h" +#include "ChainableFrameHandler.h" +#include "CompletionTracker.h" +#include "Correlator.h" +#include "ReceivedContent.h" + +namespace qpid { +namespace client { + +class ExecutionHandler : + private framing::AMQP_ServerOperations::ExecutionHandler, + public ChainableFrameHandler +{ + framing::Window incoming; + framing::Window outgoing; + ReceivedContent::shared_ptr arriving; + Correlator correlation; + CompletionTracker completion; + framing::ProtocolVersion version; + + void complete(uint32_t mark, framing::SequenceNumberSet range); + void flush(); + +public: + BlockingQueue<ReceivedContent::shared_ptr> received; + + ExecutionHandler(); + + void handle(framing::AMQFrame& frame); + void send(framing::AMQBody::shared_ptr command, + CompletionTracker::Listener f = CompletionTracker::Listener(), + Correlator::Listener g = Correlator::Listener()); + void sendContent(framing::AMQBody::shared_ptr command, + const framing::BasicHeaderProperties& headers, const std::string& data, + uint64_t frameSize, + CompletionTracker::Listener f = CompletionTracker::Listener(), + Correlator::Listener g = Correlator::Listener()); + + void sendContent(framing::AMQBody::shared_ptr content); +}; + +}} + +#endif diff --git a/cpp/src/qpid/client/FutureCompletion.cpp b/cpp/src/qpid/client/FutureCompletion.cpp new file mode 100644 index 0000000000..6fc3d5f088 --- /dev/null +++ b/cpp/src/qpid/client/FutureCompletion.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "FutureCompletion.h" + +using namespace qpid::client; +using namespace qpid::sys; + +FutureCompletion::FutureCompletion() : complete(false), closed(false), code(0) {} + +bool FutureCompletion::isComplete() +{ + Monitor::ScopedLock l(lock); + return complete; +} + +void FutureCompletion::completed() +{ + Monitor::ScopedLock l(lock); + complete = true; + lock.notifyAll(); +} + +void FutureCompletion::waitForCompletion() +{ + Monitor::ScopedLock l(lock); + while (!complete && !closed) { + lock.wait(); + } + if (closed) { + throw ChannelException(code, text); + } +} + +void FutureCompletion::close(uint16_t _code, const std::string& _text) +{ + Monitor::ScopedLock l(lock); + complete = true; + closed = true; + code = _code; + text = _text; + lock.notifyAll(); +} diff --git a/cpp/src/qpid/client/FutureCompletion.h b/cpp/src/qpid/client/FutureCompletion.h new file mode 100644 index 0000000000..3487a0910a --- /dev/null +++ b/cpp/src/qpid/client/FutureCompletion.h @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _FutureCompletion_ +#define _FutureCompletion_ + +#include "qpid/framing/amqp_framing.h" +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace client { + +class FutureCompletion +{ +protected: + sys::Monitor lock; + bool complete; + bool closed; + uint16_t code; + std::string text; + +public: + FutureCompletion(); + virtual ~FutureCompletion(){} + bool isComplete(); + void waitForCompletion(); + void completed(); + void close(uint16_t code, const std::string& text); +}; + +}} + + +#endif diff --git a/cpp/src/qpid/client/FutureFactory.cpp b/cpp/src/qpid/client/FutureFactory.cpp new file mode 100644 index 0000000000..7f9d51e77f --- /dev/null +++ b/cpp/src/qpid/client/FutureFactory.cpp @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "FutureFactory.h" + +using namespace qpid::client; +using namespace boost; + +shared_ptr<FutureCompletion> FutureFactory::createCompletion() +{ + shared_ptr<FutureCompletion> f(new FutureCompletion()); + weak_ptr<FutureCompletion> w(f); + set.push_back(w); + return f; +} + +shared_ptr<FutureResponse> FutureFactory::createResponse() +{ + shared_ptr<FutureResponse> f(new FutureResponse()); + weak_ptr<FutureCompletion> w(static_pointer_cast<FutureCompletion>(f)); + set.push_back(w); + return f; +} + +void FutureFactory::close(uint16_t code, const std::string& text) +{ + for (WeakPtrSet::iterator i = set.begin(); i != set.end(); i++) { + shared_ptr<FutureCompletion> p = i->lock(); + if (p) { + p->close(code, text); + } + } +} diff --git a/cpp/src/qpid/client/FutureFactory.h b/cpp/src/qpid/client/FutureFactory.h new file mode 100644 index 0000000000..b126e296fd --- /dev/null +++ b/cpp/src/qpid/client/FutureFactory.h @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _FutureFactory_ +#define _FutureFactory_ + +#include <vector> +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> +#include "FutureCompletion.h" +#include "FutureResponse.h" + +namespace qpid { +namespace client { + +class FutureFactory +{ + typedef std::vector< boost::weak_ptr<FutureCompletion> > WeakPtrSet; + WeakPtrSet set; + +public: + boost::shared_ptr<FutureCompletion> createCompletion(); + boost::shared_ptr<FutureResponse> createResponse(); + void close(uint16_t code, const std::string& text); +}; + +}} + + +#endif diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp new file mode 100644 index 0000000000..6b1246a449 --- /dev/null +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "FutureResponse.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::sys; + + +AMQMethodBody::shared_ptr FutureResponse::getResponse() +{ + waitForCompletion(); + return response; +} + +void FutureResponse::received(AMQMethodBody::shared_ptr r) +{ + Monitor::ScopedLock l(lock); + response = r; + complete = true; + lock.notifyAll(); +} + diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h new file mode 100644 index 0000000000..ccc6fb5894 --- /dev/null +++ b/cpp/src/qpid/client/FutureResponse.h @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef _FutureResponse_ +#define _FutureResponse_ + +#include "qpid/framing/amqp_framing.h" +#include "FutureCompletion.h" + +namespace qpid { +namespace client { + +class FutureResponse : public FutureCompletion +{ + framing::AMQMethodBody::shared_ptr response; + +public: + framing::AMQMethodBody::shared_ptr getResponse(); + void received(framing::AMQMethodBody::shared_ptr response); +}; + +}} + + + +#endif diff --git a/cpp/src/qpid/client/IncomingMessage.cpp b/cpp/src/qpid/client/IncomingMessage.cpp deleted file mode 100644 index 059e644464..0000000000 --- a/cpp/src/qpid/client/IncomingMessage.cpp +++ /dev/null @@ -1,168 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "IncomingMessage.h" -#include "qpid/Exception.h" -#include "ClientMessage.h" -#include <boost/format.hpp> - -namespace qpid { -namespace client { - -using boost::format; -using sys::Mutex; - -IncomingMessage::Destination::~Destination() {} - - -IncomingMessage::WaitableDestination::WaitableDestination() - : shutdownFlag(false) {} - -void IncomingMessage::WaitableDestination::message(const Message& msg) { - Mutex::ScopedLock l(monitor); - queue.push(msg); - monitor.notify(); -} - -void IncomingMessage::WaitableDestination::empty() { - Mutex::ScopedLock l(monitor); - queue.push(Empty()); - monitor.notify(); -} - -bool IncomingMessage::WaitableDestination::wait(Message& msgOut) { - Mutex::ScopedLock l(monitor); - while (queue.empty() && !shutdownFlag) - monitor.wait(); - if (shutdownFlag) - return false; - Message* msg = boost::get<Message>(&queue.front()); - bool success = msg; - if (success) - msgOut=*msg; - queue.pop(); - if (!queue.empty()) - monitor.notify(); // Wake another waiter. - return success; -} - -void IncomingMessage::WaitableDestination::shutdown() { - Mutex::ScopedLock l(monitor); - shutdownFlag = true; - monitor.notifyAll(); -} - -void IncomingMessage::openReference(const std::string& name) { - Mutex::ScopedLock l(lock); - if (references.find(name) != references.end()) - throw ConnectionException( - 503, format("Attempt to open existing reference %s.") % name); - references[name]; - return; -} - -void IncomingMessage::appendReference( - const std::string& name, const std::string& data) -{ - Mutex::ScopedLock l(lock); - getRefUnlocked(name).data += data; -} - -Message& IncomingMessage::createMessage( - const std::string& destination, const std::string& reference) -{ - Mutex::ScopedLock l(lock); - getDestUnlocked(destination); // Verify destination. - Reference& ref = getRefUnlocked(reference); - ref.messages.resize(ref.messages.size() +1); - ref.messages.back().setDestination(destination); - return ref.messages.back(); -} - -void IncomingMessage::closeReference(const std::string& name) { - Reference refCopy; - { - Mutex::ScopedLock l(lock); - refCopy = getRefUnlocked(name); - references.erase(name); - } - for (std::vector<Message>::iterator i = refCopy.messages.begin(); - i != refCopy.messages.end(); - ++i) - { - i->setData(refCopy.data); - // TODO aconway 2007-03-23: Thread safety, - // can a destination be removed while we're doing this? - getDestination(i->getDestination()).message(*i); - } -} - - -void IncomingMessage::addDestination(std::string name, Destination& dest) { - Mutex::ScopedLock l(lock); - DestinationMap::iterator i = destinations.find(name); - if (i == destinations.end()) - destinations[name]=&dest; - else if (i->second != &dest) - throw ConnectionException( - 503, format("Destination already exists: %s.") % name); -} - -void IncomingMessage::removeDestination(std::string name) { - Mutex::ScopedLock l(lock); - DestinationMap::iterator i = destinations.find(name); - if (i == destinations.end()) - throw ConnectionException( - 503, format("No such destination: %s.") % name); - destinations.erase(i); -} - -IncomingMessage::Destination& IncomingMessage::getDestination( - const std::string& name) { - return getDestUnlocked(name); -} - -IncomingMessage::Reference& IncomingMessage::getReference( - const std::string& name) { - return getRefUnlocked(name); -} - -IncomingMessage::Reference& IncomingMessage::getRefUnlocked( - const std::string& name) { - Mutex::ScopedLock l(lock); - ReferenceMap::iterator i = references.find(name); - if (i == references.end()) - throw ConnectionException( - 503, format("No such reference: %s.") % name); - return i->second; -} - -IncomingMessage::Destination& IncomingMessage::getDestUnlocked( - const std::string& name) { - Mutex::ScopedLock l(lock); - DestinationMap::iterator i = destinations.find(name); - if (i == destinations.end()) - throw ConnectionException( - 503, format("No such destination: %s.") % name); - return *i->second; -} - -}} // namespace qpid::client diff --git a/cpp/src/qpid/client/IncomingMessage.h b/cpp/src/qpid/client/IncomingMessage.h deleted file mode 100644 index 7aa8e33df2..0000000000 --- a/cpp/src/qpid/client/IncomingMessage.h +++ /dev/null @@ -1,136 +0,0 @@ -#ifndef _IncomingMessage_ -#define _IncomingMessage_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/sys/Monitor.h" -#include <map> -#include <queue> -#include <vector> -#include <boost/variant.hpp> - -namespace qpid { -namespace client { - -class Message; - -/** - * Manage incoming messages. - * - * Uses reference and destination concepts from 0-9 Messsage class. - * - * Basic messages use special destination and reference names to indicate - * get-ok, return etc. messages. - * - */ -class IncomingMessage { - public: - /** Accumulate data associated with a set of messages. */ - struct Reference { - std::string data; - std::vector<Message> messages; - }; - - /** Interface to a destination for messages. */ - class Destination { - public: - virtual ~Destination(); - - /** Pass a message to the destination */ - virtual void message(const Message&) = 0; - - /** Notify destination of queue-empty contition */ - virtual void empty() = 0; - }; - - - /** A destination that a thread can wait on till a message arrives. */ - class WaitableDestination : public Destination - { - public: - WaitableDestination(); - void message(const Message& msg); - void empty(); - /** Wait till message() or empty() is called. True for message() */ - bool wait(Message& msgOut); - void shutdown(); - - private: - struct Empty {}; - typedef boost::variant<Message,Empty> Item; - sys::Monitor monitor; - std::queue<Item> queue; - bool shutdownFlag; - }; - - - - /** Add a reference. Throws if already open. */ - void openReference(const std::string& name); - - /** Get a reference. Throws if not already open. */ - void appendReference(const std::string& name, - const std::string& data); - - /** Create a message to destination associated with reference - *@exception if destination or reference non-existent. - */ - Message& createMessage(const std::string& destination, - const std::string& reference); - - /** Get a reference. - *@exception if non-existent. - */ - Reference& getReference(const std::string& name); - - /** Close a reference and deliver all its messages. - * Throws if not open or a message has an invalid destination. - */ - void closeReference(const std::string& name); - - /** Add a destination. - *@exception if a different Destination is already registered - * under name. - */ - void addDestination(std::string name, Destination&); - - /** Remove a destination. Throws if does not exist */ - void removeDestination(std::string name); - - /** Get a destination. Throws if does not exist */ - Destination& getDestination(const std::string& name); - private: - - typedef std::map<std::string, Reference> ReferenceMap; - typedef std::map<std::string, Destination*> DestinationMap; - - Reference& getRefUnlocked(const std::string& name); - Destination& getDestUnlocked(const std::string& name); - - mutable sys::Mutex lock; - ReferenceMap references; - DestinationMap destinations; -}; - -}} - - -#endif diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp new file mode 100644 index 0000000000..9cfee21c3c --- /dev/null +++ b/cpp/src/qpid/client/ReceivedContent.cpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReceivedContent.h" + +using qpid::client::ReceivedContent; +using namespace qpid::framing; +using namespace boost; + +ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {} + +void ReceivedContent::append(AMQBody::shared_ptr part) +{ + parts.push_back(part); +} + +bool ReceivedContent::isComplete() const +{ + if (parts.empty()) { + return false; + } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { + AMQHeaderBody::shared_ptr headers(getHeaders()); + return headers && headers->getContentSize() == getContentSize(); + } else if (isA<MessageTransferBody>()) { + //no longer support references, headers and data are still method fields + return true; + } else { + throw Exception("Unknown content class"); + } +} + + +AMQMethodBody::shared_ptr ReceivedContent::getMethod() const +{ + return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]); +} + +AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const +{ + return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]); +} + +uint64_t ReceivedContent::getContentSize() const +{ + if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { + uint64_t size(0); + for (uint i = 2; i < parts.size(); i++) { + size += parts[i]->size(); + } + return size; + } else if (isA<MessageTransferBody>()) { + return as<MessageTransferBody>()->getBody().getValue().size(); + } else { + throw Exception("Unknown content class"); + } +} + +std::string ReceivedContent::getContent() const +{ + if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { + string data; + for (uint i = 2; i < parts.size(); i++) { + data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData(); + } + return data; + } else if (isA<MessageTransferBody>()) { + return as<MessageTransferBody>()->getBody().getValue(); + } else { + throw Exception("Unknown content class"); + } +} + +void ReceivedContent::populate(Message& msg) +{ + if (!isComplete()) throw Exception("Incomplete message"); + + if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) { + BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties()); + BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties); + msg.setData(getContent()); + } else if (isA<MessageTransferBody>()) { + throw Exception("Transfer not yet supported"); + } else { + throw Exception("Unknown content class"); + } +} diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h new file mode 100644 index 0000000000..1886034f9b --- /dev/null +++ b/cpp/src/qpid/client/ReceivedContent.h @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <vector> +#include <boost/shared_ptr.hpp> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/SequenceNumber.h" +#include "ClientMessage.h" + +#ifndef _ReceivedContent_ +#define _ReceivedContent_ + +namespace qpid { +namespace client { + +/** + * Collects the frames representing some received 'content'. This + * provides a raw interface to 'message' data and attributes. + */ +class ReceivedContent +{ + const framing::SequenceNumber id; + std::vector<framing::AMQBody::shared_ptr> parts; + +public: + typedef boost::shared_ptr<ReceivedContent> shared_ptr; + + ReceivedContent(const framing::SequenceNumber& id); + void append(framing::AMQBody::shared_ptr part); + bool isComplete() const; + + uint64_t getContentSize() const; + std::string getContent() const; + + framing::AMQMethodBody::shared_ptr getMethod() const; + framing::AMQHeaderBody::shared_ptr getHeaders() const; + + template <class T> bool isA() const { + framing::AMQMethodBody::shared_ptr method = getMethod(); + if (!method) { + return false; + } else { + return method->isA<T>(); + } + } + + template <class T> boost::shared_ptr<T> as() const { + framing::AMQMethodBody::shared_ptr method = getMethod(); + if (method && method->isA<T>()) { + return boost::dynamic_pointer_cast<T>(method); + } else { + return boost::shared_ptr<T>(); + } + } + + const framing::SequenceNumber& getId() const { return id; } + + void populate(Message& msg); +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp new file mode 100644 index 0000000000..b72967c098 --- /dev/null +++ b/cpp/src/qpid/client/StateManager.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "StateManager.h" +#include "qpid/framing/amqp_framing.h" + +using namespace qpid::client; +using namespace qpid::framing; +using namespace qpid::sys; + +StateManager::StateManager(int s) : state(s) {} + +void StateManager::waitForStateChange(int current) +{ + Monitor::ScopedLock l(stateLock); + while (state == current) { + stateLock.wait(); + } +} + +void StateManager::waitFor(int desired) +{ + Monitor::ScopedLock l(stateLock); + while (state != desired) { + stateLock.wait(); + } +} + +void StateManager::waitFor(std::set<int> desired) +{ + Monitor::ScopedLock l(stateLock); + while (desired.find(state) == desired.end()) { + stateLock.wait(); + } +} + + +void StateManager::setState(int s) +{ + Monitor::ScopedLock l(stateLock); + state = s; + stateLock.notifyAll(); +} + +int StateManager::getState() +{ + Monitor::ScopedLock l(stateLock); + return state; +} + diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h new file mode 100644 index 0000000000..fd0c1b7f86 --- /dev/null +++ b/cpp/src/qpid/client/StateManager.h @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _StateManager_ +#define _StateManager_ + +#include <set> +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace client { + +class StateManager +{ + int state; + sys::Monitor stateLock; + +public: + StateManager(int initial); + void setState(int state); + int getState(); + void waitForStateChange(int current); + void waitFor(std::set<int> states); + void waitFor(int state); +}; + +}} + +#endif diff --git a/cpp/src/tests/FramingTest.cpp b/cpp/src/tests/FramingTest.cpp index 98f89b59be..582c7d6e55 100644 --- a/cpp/src/tests/FramingTest.cpp +++ b/cpp/src/tests/FramingTest.cpp @@ -410,12 +410,14 @@ class FramingTest : public CppUnit::TestCase ASSERT_FRAME("BROKER: Frame[channel=1; ChannelOpenOk: ]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=1; ExchangeDeclare: ticket=0; exchange=MyExchange; type=topic; alternateExchange=; passive=0; durable=0; autoDelete=0; arguments={}]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=1; rangedExecutionSet={}]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=1; QueueDeclare: ticket=0; queue=MyQueue; alternateExchange=; passive=0; durable=0; exclusive=1; autoDelete=1; nowait=0; arguments={}]", *i++); ASSERT_FRAME("BROKER: Frame[channel=1; QueueDeclareOk: queue=MyQueue; messageCount=0; consumerCount=0]", *i++); + ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=2; rangedExecutionSet={}]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=1; QueueBind: ticket=0; queue=MyQueue; exchange=MyExchange; routingKey=MyTopic; arguments={}]", *i++); ASSERT_FRAME("CLIENT: Frame[channel=1; ExecutionFlush: ]", *i++); - ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=4; rangedExecutionSet={}]", *i++); + ASSERT_FRAME("BROKER: Frame[channel=1; ExecutionComplete: cumulativeExecutionMark=3; rangedExecutionSet={}]", *i++); } }; diff --git a/cpp/src/tests/Serializer.cpp b/cpp/src/tests/Serializer.cpp index 8c0ee7b85c..d7345acf06 100644 --- a/cpp/src/tests/Serializer.cpp +++ b/cpp/src/tests/Serializer.cpp @@ -61,12 +61,17 @@ struct Tester { } }; +void execute(Serializer& s, Serializer::Task t) +{ + s.execute(t); +} + BOOST_AUTO_TEST_CASE(testSingleThread) { // Verify that we call in the same thread by default. Tester tester; Serializer s; for (int i = 0; i < 100; ++i) - s.execute(boost::bind(&Tester::test, &tester)); + execute(s, boost::bind(&Tester::test, &tester)); // All should be executed in this thread. BOOST_CHECK_EQUAL(0u, tester.collisions); BOOST_CHECK_EQUAL(100u, tester.count); @@ -80,7 +85,7 @@ BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) { Tester tester; Serializer s(false); for (int i = 0; i < 100; ++i) - s.execute(boost::bind(&Tester::test, &tester)); + execute(s, boost::bind(&Tester::test, &tester)); { // Wait for dispatch thread to complete. Mutex::ScopedLock l(tester.lock); @@ -95,7 +100,7 @@ BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) { struct Caller : public Runnable, public Tester { Caller(Serializer& s) : serializer(s) {} - void run() { serializer.execute(boost::bind(&Tester::test, this)); } + void run() { execute(serializer, boost::bind(&Tester::test, this)); } Serializer& serializer; }; @@ -134,7 +139,7 @@ BOOST_AUTO_TEST_CASE(testExternalDispatch) { serializer.reset(new Serializer(false, ¬ifyDispatch)); Tester tester; for (int i = 0; i < 100; ++i) - serializer->execute(boost::bind(&Tester::test, &tester)); + execute(*serializer, boost::bind(&Tester::test, &tester)); { // Wait for dispatch thread to complete. Mutex::ScopedLock l(tester.lock); diff --git a/cpp/src/tests/client_test.cpp b/cpp/src/tests/client_test.cpp index cefc4338eb..4903312cd7 100644 --- a/cpp/src/tests/client_test.cpp +++ b/cpp/src/tests/client_test.cpp @@ -41,7 +41,6 @@ using namespace qpid::client; using namespace qpid::sys; using std::string; -bool verbose = false; /** * A simple message listener implementation that prints out the @@ -50,9 +49,10 @@ bool verbose = false; */ class SimpleListener : public virtual MessageListener{ Monitor* monitor; + bool verbose; public: - inline SimpleListener(Monitor* _monitor) : monitor(_monitor){} + inline SimpleListener(Monitor* _monitor, bool debug) : monitor(_monitor), verbose(debug) {} inline virtual void received(Message& msg){ if (verbose) @@ -101,7 +101,7 @@ int main(int argc, char** argv) //montior to use to notify the main thread when that message //is received. Monitor monitor; - SimpleListener listener(&monitor); + SimpleListener listener(&monitor, opts.trace); string tag("MyTag"); channel.consume(queue, tag, &listener); if (opts.trace) std::cout << "Registered consumer." << std::endl; @@ -118,11 +118,6 @@ int main(int argc, char** argv) msg.setData(data); channel.publish(msg, exchange, "MyTopic"); if (opts.trace) std::cout << "Published message: " << data << std::endl; - if (opts.trace) { - std::cout << "Publication " - << (channel.synchWithServer(qpid::sys::TIME_SEC * 1) ? " DID " : " did NOT ") - << "complete" << std::endl; - } { Monitor::ScopedLock l(monitor); diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index cb6bafcd8e..cddf3cb92a 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -111,8 +111,7 @@ int main(int argc, char** argv){ channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "topic_control", bindArgs); //set up listener Listener listener(&channel, response.getName(), args.transactional); - string tag; - channel.consume(control, tag, &listener, AckMode(args.ackmode)); + channel.consume(control, "c1", &listener, AckMode(args.ackmode)); cout << "topic_listener: Consuming." << endl; channel.run(); connection.close(); diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index f792540c09..5800f9225d 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -121,8 +121,7 @@ int main(int argc, char** argv) { //set up listener Publisher publisher(&channel, "topic_control", args.transactional); - string tag("mytag"); - channel.consume(response, tag, &publisher, AckMode(args.ackmode)); + channel.consume(response, "mytag", &publisher, AckMode(args.ackmode)); channel.start(); int batchSize(args.batches); |