/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/client/amqp0_10/IncomingMessages.h" #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/client/SessionImpl.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/log/Statement.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/types/Variant.h" #include "qpid/framing/DeliveryProperties.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/enum.h" #include namespace qpid { namespace client { namespace amqp0_10 { using namespace qpid::framing; using namespace qpid::framing::message; using namespace qpid::amqp_0_10; using qpid::sys::AbsTime; using qpid::sys::Duration; using qpid::messaging::MessageImplAccess; using qpid::types::Variant; namespace { const std::string EMPTY_STRING; struct GetNone : IncomingMessages::Handler { bool accept(IncomingMessages::MessageTransfer&) { return false; } bool expire(IncomingMessages::MessageTransfer&) { return false; } }; struct GetAny : IncomingMessages::Handler { bool accept(IncomingMessages::MessageTransfer& transfer) { transfer.retrieve(0); return true; } bool expire(IncomingMessages::MessageTransfer&) { return false; } }; struct MatchAndTrack { const std::string destination; SequenceSet ids; MatchAndTrack(const std::string& d) : destination(d) {} bool operator()(boost::shared_ptr command) { if (command->as()->getDestination() == destination) { ids.add(command->getId()); return true; } else { return false; } } }; struct Match { const std::string destination; uint32_t matched; Match(const std::string& d) : destination(d), matched(0) {} bool operator()(boost::shared_ptr command) { if (command->as()->getDestination() == destination) { ++matched; return true; } else { return false; } } }; struct ScopedRelease { bool& flag; qpid::sys::Monitor& lock; ScopedRelease(bool& f, qpid::sys::Monitor& l) : flag(f), lock(l) {} ~ScopedRelease() { sys::Monitor::ScopedLock l(lock); flag = false; lock.notifyAll(); } }; } IncomingMessages::IncomingMessages() : inUse(false) {} void IncomingMessages::setSession(qpid::client::AsyncSession s) { sys::Mutex::ScopedLock l(lock); session = s; incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); acceptTracker.reset(); } namespace { qpid::sys::Duration get_duration(qpid::sys::Duration timeout, qpid::sys::AbsTime deadline) { if (timeout == qpid::sys::TIME_INFINITE) { return qpid::sys::TIME_INFINITE; } else { return std::max(qpid::sys::Duration(0), qpid::sys::Duration(AbsTime::now(), deadline)); } } } bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) { sys::Mutex::ScopedLock l(lock); AbsTime deadline(AbsTime::now(), timeout); do { //search through received list for any transfer of interest: for (FrameSetQueue::iterator i = received.begin(); i != received.end();) { MessageTransfer transfer(*i, *this); if (transfer.checkExpired() && handler.expire(transfer)) { i = received.erase(i); } else if (handler.accept(transfer)) { received.erase(i); return true; } else { ++i; } } if (inUse) { //someone is already waiting on the incoming session queue, wait for them to finish lock.wait(deadline); } else { inUse = true; ScopedRelease release(inUse, lock); sys::Mutex::ScopedUnlock l(lock); //wait for suitable new message to arrive switch (process(&handler, get_duration(timeout, deadline))) { case OK: return true; case CLOSED: return false; case EMPTY: break; } } if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed"); } while (AbsTime::now() < deadline); return false; } namespace { struct Wakeup : public qpid::types::Exception {}; } void IncomingMessages::wakeup() { sys::Mutex::ScopedLock l(lock); incoming->close(qpid::sys::ExceptionHolder(new Wakeup())); lock.notifyAll(); } bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout) { sys::Mutex::ScopedLock l(lock); AbsTime deadline(AbsTime::now(), timeout); while (received.empty()) { if (inUse) { //someone is already waiting on the sessions incoming queue lock.wait(deadline); } else { inUse = true; ScopedRelease release(inUse, lock); sys::Mutex::ScopedUnlock l(lock); //wait for an incoming message wait(get_duration(timeout, deadline)); } if (!(AbsTime::now() < deadline)) break; } if (!received.empty()) { destination = received.front()->as()->getDestination(); return true; } else { return false; } } void IncomingMessages::accept() { sys::Mutex::ScopedLock l(lock); acceptTracker.accept(session); } void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative) { sys::Mutex::ScopedLock l(lock); acceptTracker.accept(id, session, cumulative); } void IncomingMessages::releaseAll() { { //first process any received messages... sys::Mutex::ScopedLock l(lock); while (!received.empty()) { retrieve(received.front(), 0); received.pop_front(); } } //then pump out any available messages from incoming queue... GetAny handler; while (process(&handler, 0) == OK) ; //now release all messages sys::Mutex::ScopedLock l(lock); acceptTracker.release(session); } void IncomingMessages::releasePending(const std::string& destination) { //first pump all available messages from incoming to received... while (process(0, 0) == OK) ; //now remove all messages for this destination from received list, recording their ids... sys::Mutex::ScopedLock l(lock); MatchAndTrack match(destination); for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ; //now release those messages session.messageRelease(match.ids); } bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration timeout) { try { return incoming->pop(content, timeout); } catch (const Wakeup&) { incoming->open(); return false; } } /** * Get a frameset that is accepted by the specified handler from * session queue, waiting for up to the specified duration and * returning true if this could be achieved, false otherwise. Messages * that are not accepted by the handler are pushed onto received queue * for later retrieval. */ IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) { AbsTime deadline(AbsTime::now(), duration); FrameSet::shared_ptr content; try { for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA()) { MessageTransfer transfer(content, *this); if (handler && transfer.checkExpired() && handler->expire(transfer)) { QPID_LOG(debug, "Expired received transfer: " << *content->getMethod()); } else if (handler && handler->accept(transfer)) { QPID_LOG(debug, "Delivered " << *content->getMethod() << " " << *content->getHeaders()); return OK; } else { //received message for another destination, keep for later QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); sys::Mutex::ScopedLock l(lock); received.push_back(content); lock.notifyAll(); } } else { //TODO: handle other types of commands (e.g. message-accept, message-flow etc) } } } catch (const qpid::ClosedException&) { return CLOSED; } return EMPTY; } bool IncomingMessages::wait(qpid::sys::Duration duration) { AbsTime deadline(AbsTime::now(), duration); FrameSet::shared_ptr content; for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA()) { QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); sys::Mutex::ScopedLock l(lock); received.push_back(content); lock.notifyAll(); return true; } else { //TODO: handle other types of commands (e.g. message-accept, message-flow etc) } } return false; } uint32_t IncomingMessages::pendingAccept() { sys::Mutex::ScopedLock l(lock); return acceptTracker.acceptsPending(); } uint32_t IncomingMessages::pendingAccept(const std::string& destination) { sys::Mutex::ScopedLock l(lock); return acceptTracker.acceptsPending(destination); } uint32_t IncomingMessages::available() { //first pump all available messages from incoming to received... while (process(0, 0) == OK) {} //return the count of received messages sys::Mutex::ScopedLock l(lock); return received.size(); } uint32_t IncomingMessages::available(const std::string& destination) { //first pump all available messages from incoming to received... while (process(0, 0) == OK) {} //count all messages for this destination from received list sys::Mutex::ScopedLock l(lock); return std::for_each(received.begin(), received.end(), Match(destination)).matched; } void populate(qpid::messaging::Message& message, FrameSet& command); /** * Called when message is retrieved; records retrieval for subsequent * acceptance, marks the command as completed and converts command to * message if message is required */ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* message) { if (message) { populate(*message, *command); } const MessageTransferBody* transfer = command->as(); if (transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { sys::Mutex::ScopedLock l(lock); acceptTracker.delivered(transfer->getDestination(), command->getId()); } session.markCompleted(command->getId(), false, false); } IncomingMessages::MessageTransfer::MessageTransfer(FrameSetPtr c, IncomingMessages& p) : content(c), parent(p) {} const std::string& IncomingMessages::MessageTransfer::getDestination() { return content->as()->getDestination(); } void IncomingMessages::MessageTransfer::retrieve(qpid::messaging::Message* message) { parent.retrieve(content, message); } bool IncomingMessages::MessageTransfer::checkExpired() { if (content->hasExpired()) { retrieve(0); parent.accept(content->getId(), false); return true; } else { return false; } } namespace { //TODO: unify conversion to and from 0-10 message that is currently //split between IncomingMessages and OutgoingMessage const std::string SUBJECT("qpid.subject"); const std::string X_APP_ID("x-amqp-0-10.app-id"); const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); const std::string X_TIMESTAMP("x-amqp-0-10.timestamp"); } void populateHeaders(qpid::messaging::Message& message, const DeliveryProperties* deliveryProperties, const MessageProperties* messageProperties) { if (deliveryProperties) { message.setTtl(qpid::messaging::Duration(deliveryProperties->getTtl())); message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT); message.setPriority(deliveryProperties->getPriority()); message.setRedelivered(deliveryProperties->getRedelivered()); } if (messageProperties) { message.setContentType(messageProperties->getContentType()); if (messageProperties->hasReplyTo()) { message.setReplyTo(AddressResolution::convert(messageProperties->getReplyTo())); } message.setSubject(messageProperties->getApplicationHeaders().getAsString(SUBJECT)); message.getProperties().clear(); translate(messageProperties->getApplicationHeaders(), message.getProperties()); message.setCorrelationId(messageProperties->getCorrelationId()); message.setUserId(messageProperties->getUserId()); if (messageProperties->hasMessageId()) { message.setMessageId(messageProperties->getMessageId().str()); } //expose 0-10 specific items through special properties: // app-id, content-encoding if (messageProperties->hasAppId()) { message.getProperties()[X_APP_ID] = messageProperties->getAppId(); } if (messageProperties->hasContentEncoding()) { message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); } // routing-key, timestamp, others? if (deliveryProperties && deliveryProperties->hasRoutingKey()) { message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); } if (deliveryProperties && deliveryProperties->hasTimestamp()) { message.getProperties()[X_TIMESTAMP] = deliveryProperties->getTimestamp(); } } } void populateHeaders(qpid::messaging::Message& message, const AMQHeaderBody* headers) { populateHeaders(message, headers->get(), headers->get()); } void populate(qpid::messaging::Message& message, FrameSet& command) { //need to be able to link the message back to the transfer it was delivered by //e.g. for rejecting. MessageImplAccess::get(message).setInternalId(command.getId()); message.setContent(command.getContent()); populateHeaders(message, command.getHeaders()); } }}} // namespace qpid::client::amqp0_10