summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/MessageHandlerImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/MessageHandlerImpl.cpp')
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp208
1 files changed, 0 insertions, 208 deletions
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
deleted file mode 100644
index 5e0e759dfb..0000000000
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/Exception.h"
-#include "qpid/log/Statement.h"
-#include "MessageHandlerImpl.h"
-#include "qpid/framing/FramingContent.h"
-#include "Connection.h"
-#include "Broker.h"
-#include "MessageDelivery.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "BrokerAdapter.h"
-
-#include <boost/format.hpp>
-#include <boost/cast.hpp>
-#include <boost/bind.hpp>
-
-namespace qpid {
-namespace broker {
-
-using namespace framing;
-
-MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
- HandlerImpl(s),
- releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
- {}
-
-//
-// Message class method handlers
-//
-
-void
-MessageHandlerImpl::cancel(const string& destination )
-{
- state.cancel(destination);
-}
-
-void
-MessageHandlerImpl::open(const string& /*reference*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::close(const string& /*reference*/)
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::checkpoint(const string& /*reference*/,
- const string& /*identifier*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::resume(const string& /*reference*/,
- const string& /*identifier*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::offset(uint64_t /*value*/ )
-{
- throw NotImplementedException("References no longer supported");
-}
-
-void
-MessageHandlerImpl::get(uint16_t /*ticket*/,
- const string& /*queueName*/,
- const string& /*destination*/,
- bool /*noAck*/ )
-{
- throw NotImplementedException("get no longer supported");
-}
-
-void
-MessageHandlerImpl::empty()
-{
- throw NotImplementedException("empty no longer supported");
-}
-
-void
-MessageHandlerImpl::ok()
-{
- throw NotImplementedException("Message.Ok no longer supported");
-}
-
-void
-MessageHandlerImpl::qos(uint32_t prefetchSize,
- uint16_t prefetchCount,
- bool /*global*/ )
-{
- //TODO: handle global
- state.setPrefetchSize(prefetchSize);
- state.setPrefetchCount(prefetchCount);
-}
-
-void
-MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
- const string& queueName,
- const string& destination,
- bool noLocal,
- u_int8_t confirmMode,
- u_int8_t acquireMode,
- bool exclusive,
- const framing::FieldTable& filter )
-{
- Queue::shared_ptr queue = state.getQueue(queueName);
- if(!destination.empty() && state.exists(destination))
- throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
-
- string tag = destination;
- state.consume(MessageDelivery::getPreviewMessageDeliveryToken(destination, confirmMode, acquireMode),
- tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
-}
-
-void
-MessageHandlerImpl::recover(bool requeue)
-{
- state.recover(requeue);
-}
-
-void
-MessageHandlerImpl::reject(const SequenceNumberSet& transfers, uint16_t /*code*/, const string& /*text*/ )
-{
- transfers.processRanges(rejectOp);
-}
-
-void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
-{
- if (unit == 0) {
- //message
- state.addMessageCredit(destination, value);
- } else if (unit == 1) {
- //bytes
- state.addByteCredit(destination, value);
- } else {
- //unknown
- throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
- }
-
-}
-
-void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
-{
- if (mode == 0) {
- //credit
- state.setCreditMode(destination);
- } else if (mode == 1) {
- //window
- state.setWindowMode(destination);
- } else{
- throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
- }
-}
-
-void MessageHandlerImpl::flush(const std::string& destination)
-{
- state.flush(destination);
-}
-
-void MessageHandlerImpl::stop(const std::string& destination)
-{
- state.stop(destination);
-}
-
-void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
-{
- //TODO: implement mode
-
- SequenceNumberSet results;
- RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
- transfers.processRanges(op);
- results = results.condense();
- getProxy().getMessage().acquired(results);
-}
-
-void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
-{
- transfers.processRanges(releaseOp);
-}
-
-}} // namespace qpid::broker