summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-04 20:42:19 +0000
committerGordon Sim <gsim@apache.org>2008-03-04 20:42:19 +0000
commitd47950ff7a88e4684d1e07e334e705776ed569a7 (patch)
tree23f00efd6c9009f158c0b9643746dd16fd8adbf6 /cpp/src
parentcf9a1617599bb680deef2bab76fc6022f7dad50b (diff)
downloadqpid-python-d47950ff7a88e4684d1e07e334e705776ed569a7.tar.gz
Further updates to support final 0-10 spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@633627 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h4
-rw-r--r--cpp/src/qpid/broker/Connection.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp3
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp8
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h8
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp18
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp382
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h183
-rw-r--r--cpp/src/qpid/broker/SessionState.h4
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp2
-rw-r--r--cpp/src/qpid/framing/FrameSet.h2
-rw-r--r--cpp/src/qpid/framing/ModelMethod.h6
14 files changed, 613 insertions, 12 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index f15f8d7a91..8ec15619c1 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -164,6 +164,7 @@ libqpidbroker_la_SOURCES = \
$(mgen_broker_cpp) \
qpid/broker/Broker.cpp \
qpid/broker/BrokerAdapter.cpp \
+ qpid/broker/SessionAdapter.cpp \
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
@@ -273,6 +274,7 @@ nobase_include_HEADERS = \
qpid/shared_ptr.h \
qpid/broker/Broker.h \
qpid/broker/BrokerAdapter.h \
+ qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
qpid/broker/BrokerSingleton.h \
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 5237087dc8..6e92f89706 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -80,6 +80,10 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
TunnelHandler* getTunnelHandler() {
throw framing::NotImplementedException("Tunnel class not implemented"); }
+ Exchange010Handler* getExchange010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Queue010Handler* getQueue010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Message010Handler* getMessage010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+
// Handlers no longer implemented in BrokerAdapter:
#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 822890ae76..8be4f7756e 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -117,7 +117,7 @@ void Connection::received(framing::AMQFrame& frame){
if (mgmtClosing)
close (403, "Closed by Management Request", 0, 0);
- if (frame.getChannel() == 0) {
+ if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 1730c01099..a2c78daa7c 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -35,6 +35,7 @@ using namespace qpid::framing;
using std::string;
TransferAdapter Message::TRANSFER;
+PreviewAdapter Message::TRANSFER_99_0;
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), publisher(0), adapter(0) {}
@@ -219,6 +220,8 @@ MessageAdapter& Message::getAdapter() const
{
if (!adapter) {
if(frames.isA<MessageTransferBody>()) {
+ adapter = &TRANSFER_99_0;
+ } else if(frames.isA<Message010TransferBody>()) {
adapter = &TRANSFER;
} else {
const AMQMethodBody* method = frames.getMethod();
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 8a30c5770b..801834d519 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -133,6 +133,7 @@ public:
mutable MessageAdapter* adapter;
static TransferAdapter TRANSFER;
+ static PreviewAdapter TRANSFER_99_0;
MessageAdapter& getAdapter() const;
};
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
index 2c29aa5444..a70de72646 100644
--- a/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -36,12 +36,12 @@ namespace broker{
std::string TransferAdapter::getExchange(const framing::FrameSet& f)
{
- return f.as<framing::MessageTransferBody>()->getDestination();
+ return f.as<framing::Message010TransferBody>()->getDestination();
}
bool TransferAdapter::isImmediate(const framing::FrameSet&)
{
- //TODO: we seem to have lost the immediate flag
+ //TODO: delete this, immediate is no longer part of the spec
return false;
}
@@ -57,4 +57,8 @@ namespace broker{
return p && p->getDeliveryMode() == 2;
}
+ std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::MessageTransferBody>()->getDestination();
+ }
}}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 3220f304cc..de488b295e 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -29,6 +29,7 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/Message010TransferBody.h"
namespace qpid {
namespace broker {
@@ -48,12 +49,17 @@ struct MessageAdapter
struct TransferAdapter : MessageAdapter
{
std::string getRoutingKey(const framing::FrameSet& f);
- std::string getExchange(const framing::FrameSet& f);
+ virtual std::string getExchange(const framing::FrameSet& f);
bool isImmediate(const framing::FrameSet&);
const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
bool isPersistent(const framing::FrameSet& f);
};
+struct PreviewAdapter : TransferAdapter
+{
+ std::string getExchange(const framing::FrameSet& f);
+};
+
}}
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 376a321d2d..269fc2d423 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -28,6 +28,10 @@
using namespace qpid::broker;
using namespace qpid::framing;
+namespace
+{
+ std::string type_str(uint8_t type);
+}
MessageBuilder::MessageBuilder(MessageStore* const _store, uint64_t _stagingThreshold) :
state(DORMANT), store(_store), stagingThreshold(_stagingThreshold), staging(false) {}
@@ -39,7 +43,19 @@ void MessageBuilder::handle(AMQFrame& frame)
state = HEADER;
break;
case HEADER:
- checkType(HEADER_BODY, frame.getBody()->type());
+ switch (frame.getBody()->type()) {
+ case CONTENT_BODY:
+ //TODO: rethink how to handle non-existent headers...
+ //didn't get a header: add in a dummy
+ message->getFrames().append(AMQFrame(AMQHeaderBody()));
+ break;
+ case HEADER_BODY:
+ break;
+ default:
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame sequence for message, expected header or content got "
+ << type_str(frame.getBody()->type()) << ")"));
+ }
state = CONTENT;
break;
case CONTENT:
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
new file mode 100644
index 0000000000..15c29ed482
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -0,0 +1,382 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "SessionAdapter.h"
+#include "Connection.h"
+#include "DeliveryToken.h"
+#include "MessageDelivery.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
+#include <boost/format.hpp>
+#include <boost/cast.hpp>
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+using namespace qpid;
+using namespace qpid::framing;
+
+typedef std::vector<Queue::shared_ptr> QueueVector;
+
+SessionAdapter::SessionAdapter(SemanticState& s) :
+ HandlerImpl(s),
+ exchangeImpl(s),
+ queueImpl(s),
+ messageImpl(s)
+{}
+
+
+void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const string& type,
+ const string& alternateExchange,
+ bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
+
+ //TODO: implement autoDelete
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = getBroker().getExchanges().get(alternateExchange);
+ }
+ if(passive){
+ Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
+ checkType(actual, type);
+ checkAlternate(actual, alternate);
+ }else{
+ try{
+ std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
+ if (response.second) {
+ if (durable) {
+ getBroker().getStore().create(*response.first);
+ }
+ if (alternate) {
+ response.first->setAlternate(alternate);
+ alternate->incAlternateUsers();
+ }
+ } else {
+ checkType(response.first, type);
+ checkAlternate(response.first, alternate);
+ }
+ }catch(UnknownExchangeTypeException& e){
+ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
+ }
+ }
+}
+
+void SessionAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
+{
+ if (!type.empty() && exchange->getType() != type) {
+ throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
+ }
+}
+
+void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
+{
+ if (alternate && alternate != exchange->getAlternate())
+ throw NotAllowedException(
+ QPID_MSG("Exchange declared with alternate-exchange "
+ << exchange->getAlternate()->getName() << ", requested "
+ << alternate->getName()));
+}
+
+void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/){
+ //TODO: implement unused
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
+ if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
+ if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
+ if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
+ getBroker().getExchanges().destroy(name);
+}
+
+Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
+{
+ try {
+ Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
+ return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ } catch (const ChannelException& e) {
+ return Exchange010QueryResult("", false, true, FieldTable());
+ }
+}
+void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
+ const string& exchangeName, const string& routingKey,
+ const FieldTable& arguments){
+
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+ if(exchange){
+ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
+ if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
+ queue->bound(exchangeName, routingKey, arguments);
+ if (exchange->isDurable() && queue->isDurable()) {
+ getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
+ }
+ }
+ }else{
+ throw NotFoundException(
+ "Bind failed. No such exchange: " + exchangeName);
+ }
+}
+
+void
+SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
+ const string& exchangeName,
+ const string& routingKey)
+{
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+ Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
+ if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
+
+ //TODO: revise unbind to rely solely on binding key (not args)
+ if (exchange->unbind(queue, routingKey, 0) && exchange->isDurable() && queue->isDurable()) {
+ getBroker().getStore().unbind(*exchange, *queue, routingKey, FieldTable());
+ }
+
+}
+
+Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
+ const std::string& queueName,
+ const std::string& key,
+ const framing::FieldTable& args)
+{
+ Exchange::shared_ptr exchange;
+ try {
+ exchange = getBroker().getExchanges().get(exchangeName);
+ } catch (const ChannelException&) {}
+
+ Queue::shared_ptr queue;
+ if (!queueName.empty()) {
+ queue = getBroker().getQueues().find(queueName);
+ }
+
+ if (!exchange) {
+ return Exchange010BoundResult(true, false, false, false, false);
+ } else if (!queueName.empty() && !queue) {
+ return Exchange010BoundResult(false, true, false, false, false);
+ } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
+ return Exchange010BoundResult(false, false, false, false, false);
+ } else {
+ //need to test each specified option individually
+ bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
+ bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
+ bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
+
+ return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
+ }
+}
+
+Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
+{
+ Queue::shared_ptr queue = state.getQueue(name);
+ Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
+
+ return Queue010QueryResult(queue->getName(),
+ alternateExchange ? alternateExchange->getName() : "",
+ queue->isDurable(),
+ queue->hasExclusiveOwner(),
+ queue->isAutoDelete(),
+ queue->getSettings(),
+ queue->getMessageCount(),
+ queue->getConsumerCount());
+}
+
+void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& alternateExchange,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, const qpid::framing::FieldTable& arguments){
+
+ Exchange::shared_ptr alternate;
+ if (!alternateExchange.empty()) {
+ alternate = getBroker().getExchanges().get(alternateExchange);
+ }
+ Queue::shared_ptr queue;
+ if (passive && !name.empty()) {
+ queue = state.getQueue(name);
+ //TODO: check alternate-exchange is as expected
+ } else {
+ std::pair<Queue::shared_ptr, bool> queue_created =
+ getBroker().getQueues().declare(
+ name, durable,
+ autoDelete,
+ exclusive ? &getConnection() : 0);
+ queue = queue_created.first;
+ assert(queue);
+ if (queue_created.second) { // This is a new queue
+ if (alternate) {
+ queue->setAlternateExchange(alternate);
+ alternate->incAlternateUsers();
+ }
+
+ //apply settings & create persistent record if required
+ queue_created.first->create(arguments);
+
+ //add default binding:
+ getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+ queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
+
+ //handle automatic cleanup:
+ if (exclusive) {
+ getConnection().exclusiveQueues.push_back(queue);
+ }
+ } else {
+ if (exclusive && queue->setExclusiveOwner(&getConnection())) {
+ getConnection().exclusiveQueues.push_back(queue);
+ }
+ }
+ }
+ if (exclusive && !queue->isExclusiveOwner(&getConnection()))
+ throw ResourceLockedException(
+ QPID_MSG("Cannot grant exclusive access to queue "
+ << queue->getName()));
+}
+
+
+void SessionAdapter::QueueHandlerImpl::purge(const string& queue){
+ state.getQueue(queue)->purge();
+}
+
+void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty){
+ ChannelException error(0, "");
+ Queue::shared_ptr q = state.getQueue(queue);
+ if(ifEmpty && q->getMessageCount() > 0){
+ throw PreconditionFailedException("Queue not empty.");
+ }else if(ifUnused && q->getConsumerCount() > 0){
+ throw PreconditionFailedException("Queue in use.");
+ }else{
+ //remove the queue from the list of exclusive queues if necessary
+ if(q->isExclusiveOwner(&getConnection())){
+ QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
+ if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
+ }
+ q->destroy();
+ getBroker().getQueues().destroy(queue);
+ q->unbind(getBroker().getExchanges(), q);
+ }
+}
+
+
+SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
+ HandlerImpl(s),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
+ acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+ {}
+
+//
+// Message class method handlers
+//
+
+void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
+ uint8_t /*acceptMode*/,
+ uint8_t /*acquireMode*/)
+{
+ //not yet used (content containing assemblies treated differently at present
+}
+
+ void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool /*setRedelivered*/)
+{
+ transfers.for_each(releaseOp);
+}
+
+void
+SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& /*resumeId*/,//TODO implement resume behaviour
+ uint64_t /*resumeTtl*/,
+ const FieldTable& arguments)
+{
+ Queue::shared_ptr queue = state.getQueue(queueName);
+ if(!destination.empty() && state.exists(destination))
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
+
+ string tag = destination;
+ state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode),
+ tag, queue, false, //TODO get rid of no-local
+ acceptMode == 1, acquireMode == 0, exclusive, &arguments);
+}
+
+void
+SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
+{
+ state.cancel(destination);
+}
+
+void
+SessionAdapter::MessageHandlerImpl::reject(const SequenceSet& transfers, uint16_t /*code*/, const string& /*text*/ )
+{
+ transfers.for_each(rejectOp);
+}
+
+void SessionAdapter::MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value)
+{
+ if (unit == 0) {
+ //message
+ state.addMessageCredit(destination, value);
+ } else if (unit == 1) {
+ //bytes
+ state.addByteCredit(destination, value);
+ } else {
+ //unknown
+ throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
+ }
+
+}
+
+void SessionAdapter::MessageHandlerImpl::setFlowMode(const std::string& destination, u_int8_t mode)
+{
+ if (mode == 0) {
+ //credit
+ state.setCreditMode(destination);
+ } else if (mode == 1) {
+ //window
+ state.setWindowMode(destination);
+ } else{
+ throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
+ }
+}
+
+void SessionAdapter::MessageHandlerImpl::flush(const std::string& destination)
+{
+ state.flush(destination);
+}
+
+void SessionAdapter::MessageHandlerImpl::stop(const std::string& destination)
+{
+ state.stop(destination);
+}
+
+void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
+{
+
+ commands.for_each(acceptOp);
+}
+
+/*
+void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers)
+{
+ SequenceNumberSet results;
+ RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, boost::ref(results));
+ transfers.processRanges(op);
+ results = results.condense();
+ getProxy().getMessage().acquired(results);
+}
+*/
+
+}} // namespace qpid::broker
+
+
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
new file mode 100644
index 0000000000..0dd3529359
--- /dev/null
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -0,0 +1,183 @@
+#ifndef _broker_SessionAdapter_h
+#define _broker_SessionAdapter_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "HandlerImpl.h"
+
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/SequenceSet.h"
+
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Channel;
+class Connection;
+class Broker;
+
+/**
+ * Per-channel protocol adapter.
+ *
+ * A container for a collection of AMQP-class adapters that translate
+ * AMQP method bodies into calls on the core Broker objects. Each
+ * adapter class also provides a client proxy to send methods to the
+ * peer.
+ *
+ */
+class SessionAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
+{
+ public:
+ SessionAdapter(SemanticState& session);
+
+
+ framing::ProtocolVersion getVersion() const { return session.getConnection().getVersion();}
+
+ Message010Handler* getMessage010Handler(){ return &messageImpl; }
+ Exchange010Handler* getExchange010Handler(){ return &exchangeImpl; }
+ Queue010Handler* getQueue010Handler(){ return &queueImpl; }
+
+
+ BasicHandler* getBasicHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ ExchangeHandler* getExchangeHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ BindingHandler* getBindingHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ QueueHandler* getQueueHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ TxHandler* getTxHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ MessageHandler* getMessageHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ DtxCoordinationHandler* getDtxCoordinationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ DtxDemarcationHandler* getDtxDemarcationHandler(){ throw framing::NotImplementedException("Class not implemented"); }
+ AccessHandler* getAccessHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ FileHandler* getFileHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ StreamHandler* getStreamHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ TunnelHandler* getTunnelHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ ExecutionHandler* getExecutionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ ConnectionHandler* getConnectionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ SessionHandler* getSessionHandler() { throw framing::NotImplementedException("Class not implemented"); }
+ Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+
+ private:
+ class ExchangeHandlerImpl :
+ public Exchange010Handler,
+ public HandlerImpl
+ {
+ public:
+ ExchangeHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+
+ void declare(const std::string& exchange, const std::string& type,
+ const std::string& alternateExchange,
+ bool passive, bool durable, bool autoDelete,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(const std::string& exchange, bool ifUnused);
+ framing::Exchange010QueryResult query(const std::string& name);
+ void bind(const std::string& queue,
+ const std::string& exchange, const std::string& routingKey,
+ const qpid::framing::FieldTable& arguments);
+ void unbind(const std::string& queue,
+ const std::string& exchange,
+ const std::string& routingKey);
+ framing::Exchange010BoundResult bound(const std::string& exchange,
+ const std::string& queue,
+ const std::string& routingKey,
+ const framing::FieldTable& arguments);
+ private:
+ void checkType(shared_ptr<Exchange> exchange, const std::string& type);
+
+ void checkAlternate(shared_ptr<Exchange> exchange,
+ shared_ptr<Exchange> alternate);
+ };
+
+ class QueueHandlerImpl :
+ public Queue010Handler,
+ public HandlerImpl
+ {
+ public:
+ QueueHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
+
+ void declare(const std::string& queue,
+ const std::string& alternateExchange,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete,
+ const qpid::framing::FieldTable& arguments);
+ void delete_(const std::string& queue,
+ bool ifUnused, bool ifEmpty);
+ void purge(const std::string& queue);
+ framing::Queue010QueryResult query(const std::string& queue);
+ };
+
+ class MessageHandlerImpl :
+ public Message010Handler,
+ public HandlerImpl
+ {
+ typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+ RangedOperation releaseOp;
+ RangedOperation rejectOp;
+ RangedOperation acceptOp;
+
+ public:
+ MessageHandlerImpl(SemanticState& session);
+ void transfer(const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode);
+
+ void accept(const framing::SequenceSet& commands);
+
+ void reject(const framing::SequenceSet& commands,
+ uint16_t code,
+ const string& text);
+
+ void release(const framing::SequenceSet& commands,
+ bool setRedelivered);
+
+ void subscribe(const string& queue,
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const framing::FieldTable& arguments);
+
+ void cancel(const string& destination);
+
+ void setFlowMode(const string& destination,
+ uint8_t flowMode);
+
+ void flow(const string& destination,
+ uint8_t unit,
+ uint32_t value);
+
+ void flush(const string& destination);
+
+ void stop(const string& destination);
+
+ };
+
+ ExchangeHandlerImpl exchangeImpl;
+ QueueHandlerImpl queueImpl;
+ MessageHandlerImpl messageImpl;
+};
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_SessionAdapter_h*/
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 2db7d688b7..c936edee21 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -31,7 +31,7 @@
#include "qpid/sys/Time.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Session.h"
-#include "BrokerAdapter.h"
+#include "SessionAdapter.h"
#include "DeliveryAdapter.h"
#include "MessageBuilder.h"
#include "SessionContext.h"
@@ -132,7 +132,7 @@ class SessionState : public framing::SessionState,
sys::Mutex lock;
SemanticState semanticState;
- BrokerAdapter adapter;
+ SessionAdapter adapter;
MessageBuilder msgBuilder;
RangedOperation ackOp;
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
index 0e1c9da922..e6f15f2c4e 100644
--- a/cpp/src/qpid/framing/FrameSet.cpp
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -30,7 +30,7 @@ using namespace boost;
FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {parts.reserve(4);}
-void FrameSet::append(AMQFrame& part)
+void FrameSet::append(const AMQFrame& part)
{
parts.push_back(part);
}
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
index 8ba22f07cb..454d292d9d 100644
--- a/cpp/src/qpid/framing/FrameSet.h
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -43,7 +43,7 @@ public:
typedef boost::shared_ptr<FrameSet> shared_ptr;
FrameSet(const SequenceNumber& id);
- void append(AMQFrame& part);
+ void append(const AMQFrame& part);
bool isComplete() const;
uint64_t getContentSize() const;
diff --git a/cpp/src/qpid/framing/ModelMethod.h b/cpp/src/qpid/framing/ModelMethod.h
index f3c0fa5d65..07600aadca 100644
--- a/cpp/src/qpid/framing/ModelMethod.h
+++ b/cpp/src/qpid/framing/ModelMethod.h
@@ -33,9 +33,9 @@ class ModelMethod : public AMQMethodBody
mutable ExecutionHeader header;
public:
virtual ~ModelMethod() {}
- virtual void encode(Buffer& buffer) const { header.encode(buffer); }
- virtual void decode(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); }
- virtual uint32_t size() const { return header.size(); }
+ virtual void encodeHeader(Buffer& buffer) const { header.encode(buffer); }
+ virtual void decodeHeader(Buffer& buffer, uint32_t size=0) { header.decode(buffer, size); }
+ virtual uint32_t headerSize() const { return header.size(); }
virtual bool isSync() const { return header.getSync(); }
virtual void setSync(bool on) const { header.setSync(on); }
ExecutionHeader& getHeader() { return header; }