diff options
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/SessionContext.cpp')
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SessionContext.cpp | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/cpp/src/qpid/messaging/amqp/SessionContext.cpp new file mode 100644 index 0000000000..9bdc658bc7 --- /dev/null +++ b/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -0,0 +1,156 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionContext.h" +#include "SenderContext.h" +#include "ReceiverContext.h" +#include <boost/format.hpp> +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/exceptions.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { + +SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} +SessionContext::~SessionContext() +{ + senders.clear(); receivers.clear(); + pn_session_free(session); +} + +boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address) +{ + std::string name = address.getName(); + + int count = 1; + for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) { + name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); + } + boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address)); + senders[name] = s; + return s; +} + +boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) +{ + std::string name = address.getName(); + + int count = 1; + for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) { + name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); + } + boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); + receivers[name] = r; + return r; +} + +boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const +{ + SenderMap::const_iterator i = senders.find(name); + if (i == senders.end()) { + throw qpid::messaging::KeyError(std::string("No such sender") + name); + } else { + return i->second; + } +} + +boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const +{ + ReceiverMap::const_iterator i = receivers.find(name); + if (i == receivers.end()) { + throw qpid::messaging::KeyError(std::string("No such receiver") + name); + } else { + return i->second; + } +} + +void SessionContext::closeReceiver(const std::string&) +{ + +} + +void SessionContext::closeSender(const std::string&) +{ + +} + +boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/) +{ + return boost::shared_ptr<ReceiverContext>(); +} + +uint32_t SessionContext::getReceivable() +{ + return 0;//TODO +} + +uint32_t SessionContext::getUnsettledAcks() +{ + return 0;//TODO +} + +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) +{ + qpid::framing::SequenceNumber id = next++; + unacked[id] = delivery; + QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); + return id; +} + +void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) +{ + for (DeliveryMap::iterator i = begin; i != end; ++i) { + QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second); + pn_delivery_update(i->second, PN_ACCEPTED); + pn_delivery_settle(i->second);//TODO: different settlement modes? + } + unacked.erase(begin, end); +} + +void SessionContext::acknowledge() +{ + QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages"); + acknowledge(unacked.begin(), unacked.end()); +} + +void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) +{ + DeliveryMap::iterator i = unacked.find(id); + if (i != unacked.end()) { + acknowledge(cumulative ? unacked.begin() : i, ++i); + } +} + +bool SessionContext::settled() +{ + bool result = true; + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + if (!i->second->settled()) result = false; + } + return result; +} + +}}} // namespace qpid::messaging::amqp |