diff options
Diffstat (limited to 'qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp new file mode 100644 index 0000000000..a28509b0b1 --- /dev/null +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/amqp/ReceiverContext.h" +#include "qpid/messaging/AddressImpl.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/log/Statement.h" +extern "C" { +#include <proton/engine.h> +} + +namespace qpid { +namespace messaging { +namespace amqp { +//TODO: proper conversion to wide string for address +ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) + : name(n), + address(a), + helper(address), + receiver(pn_receiver(session, name.c_str())), + capacity(0), used(0) {} + +ReceiverContext::~ReceiverContext() +{ + if (receiver) pn_link_free(receiver); +} + +void ReceiverContext::setCapacity(uint32_t c) +{ + if (c != capacity) { + //stop + capacity = c; + //reissue credit + } +} + +uint32_t ReceiverContext::getCapacity() +{ + return capacity; +} + +uint32_t ReceiverContext::getAvailable() +{ + return pn_link_queued(receiver); +} + +uint32_t ReceiverContext::getUnsettled() +{ + assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver)); + return pn_link_unsettled(receiver) - pn_link_queued(receiver); +} + +void ReceiverContext::close() +{ + if (receiver) pn_link_close(receiver); +} + +const std::string& ReceiverContext::getName() const +{ + return name; +} + +const std::string& ReceiverContext::getSource() const +{ + return address.getName(); +} +void ReceiverContext::verify() +{ + pn_terminus_t* source = pn_link_remote_source(receiver); + if (!pn_terminus_get_address(source)) { + std::string msg("No such source : "); + msg += getSource(); + QPID_LOG(debug, msg); + throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(address)) { + address.setName(pn_terminus_get_address(source)); + QPID_LOG(debug, "Dynamic source name set to " << address.getName()); + } + helper.checkAssertion(source, AddressHelper::FOR_RECEIVER); +} +void ReceiverContext::configure() +{ + if (receiver) configure(pn_link_source(receiver)); +} +void ReceiverContext::configure(pn_terminus_t* source) +{ + helper.configure(receiver, source, AddressHelper::FOR_RECEIVER); + std::string option; + if (helper.getLinkTarget(option)) { + pn_terminus_set_address(pn_link_target(receiver), option.c_str()); + } else { + pn_terminus_set_address(pn_link_target(receiver), pn_terminus_get_address(pn_link_source(receiver))); + } +} + +Address ReceiverContext::getAddress() const +{ + return address; +} + +void ReceiverContext::reset(pn_session_t* session) +{ + receiver = session ? pn_receiver(session, name.c_str()) : 0; + if (receiver) configure(); +} + +bool ReceiverContext::hasCurrent() +{ + return receiver && pn_link_current(receiver); +} + +bool ReceiverContext::wakeupToIssueCredit() +{ + if (++used >= (capacity/2)) { + used = 0; + return true; + } else { + return false; + } +} + +}}} // namespace qpid::messaging::amqp |