/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Core.h" #include "MessageHandler.h" #include "BrokerContext.h" #include "EventHandler.h" #include "qpid/broker/Message.h" #include "qpid/broker/Broker.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Queue.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/Buffer.h" #include "qpid/sys/Thread.h" #include "qpid/log/Statement.h" #include namespace qpid { namespace cluster { using namespace broker; MessageHandler::MessageHandler(EventHandler& e) : HandlerBase(e), broker(e.getCore().getBroker()) {} bool MessageHandler::invoke(const framing::AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } void MessageHandler::routing(RoutingId routingId, const std::string& message) { if (sender() == self()) return; // Already in getCore().getRoutingMap() boost::intrusive_ptr msg = new Message; // FIXME aconway 2010-10-28: decode message in bounded-size buffers. framing::Buffer buf(const_cast(&message[0]), message.size()); msg->decodeHeader(buf); msg->decodeContent(buf); memberMap[sender()].routingMap[routingId] = msg; } boost::shared_ptr MessageHandler::findQueue( const std::string& q, const char* msg) { boost::shared_ptr queue = broker.getQueues().find(q); if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q)); return queue; } void MessageHandler::enqueue(RoutingId routingId, const std::string& q) { boost::shared_ptr queue = findQueue(q, "Cluster enqueue failed"); boost::intrusive_ptr msg; if (sender() == self()) msg = eventHandler.getCore().getRoutingMap().get(routingId); else msg = memberMap[sender()].routingMap[routingId]; if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q << " failed: unknown message")); BrokerContext::ScopedSuppressReplication ssr; queue->deliver(msg); } void MessageHandler::routed(RoutingId routingId) { if (sender() == self()) eventHandler.getCore().getRoutingMap().erase(routingId); else memberMap[sender()].routingMap.erase(routingId); } void MessageHandler::acquire(const std::string& q, uint32_t position) { // Note acquires from other members. My own acquires were executed in // the connection thread if (sender() != self()) { // FIXME aconway 2010-10-28: need to store acquired messages on QueueContext // by broker for possible re-queuing if a broker leaves. boost::shared_ptr queue = findQueue(q, "Cluster dequeue failed"); QueuedMessage qm; BrokerContext::ScopedSuppressReplication ssr; bool ok = queue->acquireMessageAt(position, qm); (void)ok; // Avoid unused variable warnings. assert(ok); // FIXME aconway 2011-08-04: failing this assertion. assert(qm.position.getValue() == position); assert(qm.payload); } } void MessageHandler::dequeue(const std::string& q, uint32_t /*position*/) { if (sender() == self()) { // FIXME aconway 2010-10-28: we should complete the ack that initiated // the dequeue at this point, see BrokerContext::dequeue return; } boost::shared_ptr queue = findQueue(q, "Cluster dequeue failed"); BrokerContext::ScopedSuppressReplication ssr; // FIXME aconway 2011-05-12: Remove the acquired message from QueueContext. // Do we need to call this? Review with gsim. // QueuedMessage qm; // Get qm from QueueContext? // queue->dequeue(0, qm); } void MessageHandler::release(const std::string& /*queue*/ , uint32_t /*position*/) { // FIXME aconway 2011-05-24: } }} // namespace qpid::cluster