summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-04-13 20:58:27 +0000
committerAlan Conway <aconway@apache.org>2007-04-13 20:58:27 +0000
commitca3a7cd64822e874076bd23e9981af077eb47b03 (patch)
tree677b7d1a4940d10bbb7874a5138c9c2dd45429a7 /cpp/src/qpid/broker/RecoveryManagerImpl.cpp
parentee865f87027fb559d8884cca3f672a8cbdd44ae0 (diff)
downloadqpid-python-ca3a7cd64822e874076bd23e9981af077eb47b03.tar.gz
Moved src/ source code to src/qpid directory:
- allows rhm package to build consistently against checked-out or installed qpid. - consistent correspondence between source paths and C++ namespaces. - consistent use of #include <qpid/foo> in source and by users. - allows header files to split over multiple directories, e.g. separating generated code, separating public API from private files. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@528668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/RecoveryManagerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp143
1 files changed, 143 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
new file mode 100644
index 0000000000..f82399f95c
--- /dev/null
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RecoveryManagerImpl.h"
+
+#include "BrokerMessage.h"
+#include "BrokerMessageMessage.h"
+#include "BrokerQueue.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using boost::dynamic_pointer_cast;
+
+
+static const uint8_t BASIC = 1;
+static const uint8_t MESSAGE = 2;
+
+RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold)
+ : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {}
+
+RecoveryManagerImpl::~RecoveryManagerImpl() {}
+
+class RecoverableMessageImpl : public RecoverableMessage
+{
+ Message::shared_ptr msg;
+ const uint64_t stagingThreshold;
+public:
+ RecoverableMessageImpl(Message::shared_ptr& _msg, uint64_t _stagingThreshold)
+ : msg(_msg), stagingThreshold(_stagingThreshold) {}
+ ~RecoverableMessageImpl() {};
+ void setPersistenceId(uint64_t id);
+ bool loadContent(uint64_t available);
+ void decodeContent(framing::Buffer& buffer);
+ void recover(Queue::shared_ptr queue);
+};
+
+class RecoverableQueueImpl : public RecoverableQueue
+{
+ Queue::shared_ptr queue;
+public:
+ RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {}
+ ~RecoverableQueueImpl() {};
+ void setPersistenceId(uint64_t id);
+ void recover(RecoverableMessage::shared_ptr msg);
+};
+
+void RecoveryManagerImpl::recoverExchange(framing::Buffer&)
+{
+ //TODO
+}
+
+RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer)
+{
+ Queue::shared_ptr queue = Queue::decode(queues, buffer);
+ try {
+ Exchange::shared_ptr exchange = exchanges.getDefault();
+ if (exchange) {
+ exchange->bind(queue, queue->getName(), 0);
+ }
+ } catch (ChannelException& e) {
+ //assume no default exchange has been declared
+ }
+ return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue));
+}
+
+RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
+{
+ buffer.record();
+ //peek at type:
+ Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ?
+ ((Message*) new MessageMessage()) :
+ ((Message*) new BasicMessage()));
+ buffer.restore();
+ message->decodeHeader(buffer);
+ return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));
+}
+
+void RecoveryManagerImpl::recoveryComplete()
+{
+ //TODO (finalise binding setup etc)
+}
+
+uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer)
+{
+ return buffer.getOctet();
+}
+
+void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer)
+{
+ buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC);
+}
+
+uint32_t RecoveryManagerImpl::encodedMessageTypeSize()
+{
+ return 1;
+}
+
+bool RecoverableMessageImpl::loadContent(uint64_t available)
+{
+ return !stagingThreshold || available < stagingThreshold;
+}
+
+void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer)
+{
+ msg->decodeContent(buffer);
+}
+
+void RecoverableMessageImpl::recover(Queue::shared_ptr queue)
+{
+ queue->recover(msg);
+}
+
+void RecoverableMessageImpl::setPersistenceId(uint64_t id)
+{
+ msg->setPersistenceId(id);
+}
+
+void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
+{
+ dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
+}
+
+void RecoverableQueueImpl::setPersistenceId(uint64_t id)
+{
+ queue->setPersistenceId(id);
+}