summaryrefslogtreecommitdiff
path: root/cpp/broker/inc
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/broker/inc
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/inc')
-rw-r--r--cpp/broker/inc/AutoDelete.h54
-rw-r--r--cpp/broker/inc/Binding.h35
-rw-r--r--cpp/broker/inc/Channel.h87
-rw-r--r--cpp/broker/inc/Configuration.h125
-rw-r--r--cpp/broker/inc/ConnectionToken.h35
-rw-r--r--cpp/broker/inc/Consumer.h34
-rw-r--r--cpp/broker/inc/DirectExchange.h55
-rw-r--r--cpp/broker/inc/Exchange.h39
-rw-r--r--cpp/broker/inc/ExchangeBinding.h45
-rw-r--r--cpp/broker/inc/ExchangeRegistry.h42
-rw-r--r--cpp/broker/inc/FanOutExchange.h58
-rw-r--r--cpp/broker/inc/Message.h73
-rw-r--r--cpp/broker/inc/NameGenerator.h36
-rw-r--r--cpp/broker/inc/Queue.h106
-rw-r--r--cpp/broker/inc/QueueRegistry.h88
-rw-r--r--cpp/broker/inc/SessionHandlerFactoryImpl.h49
-rw-r--r--cpp/broker/inc/SessionHandlerImpl.h230
-rw-r--r--cpp/broker/inc/TopicExchange.h55
18 files changed, 1246 insertions, 0 deletions
diff --git a/cpp/broker/inc/AutoDelete.h b/cpp/broker/inc/AutoDelete.h
new file mode 100644
index 0000000000..864d68358f
--- /dev/null
+++ b/cpp/broker/inc/AutoDelete.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _AutoDelete_
+#define _AutoDelete_
+
+#include <iostream>
+#include <queue>
+#include "MonitorImpl.h"
+#include "Queue.h"
+#include "QueueRegistry.h"
+#include "ThreadFactoryImpl.h"
+
+namespace qpid {
+ namespace broker{
+ class AutoDelete : private virtual qpid::concurrent::Runnable{
+ qpid::concurrent::ThreadFactoryImpl factory;
+ qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::MonitorImpl monitor;
+ std::queue<Queue::shared_ptr> queues;
+ QueueRegistry* const registry;
+ const u_int32_t period;
+ volatile bool stopped;
+ qpid::concurrent::Thread* runner;
+
+ Queue::shared_ptr const pop();
+ void process();
+ virtual void run();
+
+ public:
+ AutoDelete(QueueRegistry* const registry, u_int32_t period);
+ void add(Queue::shared_ptr const);
+ void start();
+ void stop();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Binding.h b/cpp/broker/inc/Binding.h
new file mode 100644
index 0000000000..b11419e92c
--- /dev/null
+++ b/cpp/broker/inc/Binding.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Binding_
+#define _Binding_
+
+#include "FieldTable.h"
+
+namespace qpid {
+ namespace broker {
+ class Binding{
+ public:
+ virtual void cancel() = 0;
+ virtual ~Binding(){}
+ };
+ }
+}
+
+
+#endif
+
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
new file mode 100644
index 0000000000..aaf2ce569b
--- /dev/null
+++ b/cpp/broker/inc/Channel.h
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Channel_
+#define _Channel_
+
+#include <map>
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "BasicPublishBody.h"
+#include "Binding.h"
+#include "Consumer.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "NameGenerator.h"
+#include "OutputHandler.h"
+#include "Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Channel{
+ private:
+ class ConsumerImpl : public virtual Consumer{
+ ConnectionToken* const connection;
+ Channel* parent;
+ string tag;
+ Queue::shared_ptr queue;
+ public:
+ ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection);
+ virtual bool deliver(Message::shared_ptr& msg);
+ void cancel();
+ };
+
+ typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
+
+ const int id;
+ qpid::framing::OutputHandler* out;
+ u_int64_t deliveryTag;
+ Queue::shared_ptr defaultQueue;
+ bool transactional;
+ std::map<string, ConsumerImpl*> consumers;
+ u_int32_t prefetchSize;
+ u_int16_t prefetchCount;
+ u_int32_t framesize;
+ Message::shared_ptr message;
+ NameGenerator tagGenerator;
+
+ void deliver(Message::shared_ptr& msg, string& tag);
+ void publish(ExchangeRegistry* exchanges);
+
+ public:
+ Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
+ ~Channel();
+ inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+ inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
+ inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; }
+ inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; }
+ void handlePublish(Message* msg);
+ void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges);
+ void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges);
+ bool exists(string& consumerTag);
+ void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
+ void cancel(string& tag);
+ void begin();
+ void close();
+ void commit();
+ void rollback();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h
new file mode 100644
index 0000000000..5ec70a839b
--- /dev/null
+++ b/cpp/broker/inc/Configuration.h
@@ -0,0 +1,125 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Configuration_
+#define _Configuration_
+
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+
+namespace qpid {
+ namespace broker {
+ class Configuration{
+ class Option{
+ const std::string flag;
+ const std::string name;
+ const std::string desc;
+
+ bool match(const std::string& arg);
+
+ protected:
+ virtual bool needsValue() const = 0;
+ virtual void setValue(const std::string& value) = 0;
+
+ public:
+ Option(const char flag, const std::string& name, const std::string& desc);
+ Option(const std::string& name, const std::string& desc);
+ virtual ~Option();
+
+ bool parse(int& i, char** argv, int argc);
+ void print(std::ostream& out) const;
+ };
+
+ class IntOption : public Option{
+ const int defaultValue;
+ int value;
+ public:
+ IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0);
+ IntOption(const std::string& name, const std::string& desc, const int value = 0);
+ virtual ~IntOption();
+
+ int getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ class StringOption : public Option{
+ const std::string defaultValue;
+ std::string value;
+ public:
+ StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = "");
+ StringOption(const std::string& name, const std::string& desc, const std::string value = "");
+ virtual ~StringOption();
+
+ const std::string& getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ class BoolOption : public Option{
+ const bool defaultValue;
+ bool value;
+ public:
+ BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0);
+ BoolOption(const std::string& name, const std::string& desc, const bool value = 0);
+ virtual ~BoolOption();
+
+ bool getValue() const;
+ virtual bool needsValue() const;
+ virtual void setValue(const std::string& value);
+ };
+
+ BoolOption trace;
+ IntOption port;
+ IntOption workerThreads;
+ IntOption maxConnections;
+ IntOption connectionBacklog;
+ StringOption acceptor;
+ BoolOption help;
+
+ typedef std::vector<Option*>::iterator op_iterator;
+ std::vector<Option*> options;
+
+ public:
+ class ParseException{
+ public:
+ const std::string& error;
+ ParseException(const std::string& _error) : error(_error) {}
+ };
+
+
+ Configuration();
+ ~Configuration();
+
+ void parse(int argc, char** argv);
+
+ bool isHelp();
+ bool isTrace();
+ int getPort();
+ int getWorkerThreads();
+ int getMaxConnections();
+ int getConnectionBacklog();
+ const std::string& getAcceptor();
+
+ void usage();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/ConnectionToken.h b/cpp/broker/inc/ConnectionToken.h
new file mode 100644
index 0000000000..1faefec2cc
--- /dev/null
+++ b/cpp/broker/inc/ConnectionToken.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ConnectionToken_
+#define _ConnectionToken_
+
+namespace qpid {
+ namespace broker {
+ /**
+ * An empty interface allowing opaque implementations of some
+ * form of token to identify a connection.
+ */
+ class ConnectionToken{
+ public:
+ virtual ~ConnectionToken(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Consumer.h b/cpp/broker/inc/Consumer.h
new file mode 100644
index 0000000000..af2d5d7812
--- /dev/null
+++ b/cpp/broker/inc/Consumer.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Consumer_
+#define _Consumer_
+
+#include "Message.h"
+
+namespace qpid {
+ namespace broker {
+ class Consumer{
+ public:
+ virtual bool deliver(Message::shared_ptr& msg) = 0;
+ virtual ~Consumer(){}
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/DirectExchange.h b/cpp/broker/inc/DirectExchange.h
new file mode 100644
index 0000000000..bf8c5f0b37
--- /dev/null
+++ b/cpp/broker/inc/DirectExchange.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DirectExchange_
+#define _DirectExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+ class DirectExchange : public virtual Exchange{
+ const string name;
+ std::map<string, std::vector<Queue::shared_ptr> > bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ DirectExchange(const string& name);
+
+ inline virtual const string& getName(){ return name; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~DirectExchange();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Exchange.h b/cpp/broker/inc/Exchange.h
new file mode 100644
index 0000000000..5f5dc5ce71
--- /dev/null
+++ b/cpp/broker/inc/Exchange.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Exchange_
+#define _Exchange_
+
+#include "FieldTable.h"
+#include "Message.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+ class Exchange{
+ public:
+ virtual const string& getName() = 0;
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+ virtual ~Exchange(){}
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/ExchangeBinding.h b/cpp/broker/inc/ExchangeBinding.h
new file mode 100644
index 0000000000..4cbb73acbf
--- /dev/null
+++ b/cpp/broker/inc/ExchangeBinding.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeBinding_
+#define _ExchangeBinding_
+
+#include "Binding.h"
+#include "FieldTable.h"
+#include "Queue.h"
+
+namespace qpid {
+ namespace broker {
+ class Exchange;
+ class Queue;
+
+ class ExchangeBinding : public virtual Binding{
+ Exchange* e;
+ Queue::shared_ptr q;
+ const string key;
+ qpid::framing::FieldTable* args;
+ public:
+ ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args);
+ virtual void cancel();
+ virtual ~ExchangeBinding();
+ };
+ }
+}
+
+
+#endif
+
diff --git a/cpp/broker/inc/ExchangeRegistry.h b/cpp/broker/inc/ExchangeRegistry.h
new file mode 100644
index 0000000000..0f0eaae0d0
--- /dev/null
+++ b/cpp/broker/inc/ExchangeRegistry.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeRegistry_
+#define _ExchangeRegistry_
+
+#include <map>
+#include "Exchange.h"
+#include "Monitor.h"
+
+namespace qpid {
+namespace broker {
+ class ExchangeRegistry{
+ std::map<string, Exchange*> exchanges;
+ qpid::concurrent::Monitor* lock;
+ public:
+ ExchangeRegistry();
+ void declare(Exchange* exchange);
+ void destroy(const string& name);
+ Exchange* get(const string& name);
+ inline qpid::concurrent::Monitor* getLock(){ return lock; }
+ ~ExchangeRegistry();
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/FanOutExchange.h b/cpp/broker/inc/FanOutExchange.h
new file mode 100644
index 0000000000..9d0d32bbf8
--- /dev/null
+++ b/cpp/broker/inc/FanOutExchange.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _FanOutExchange_
+#define _FanOutExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class FanOutExchange : public virtual Exchange {
+ const string name;
+ std::vector<Queue::shared_ptr> bindings;
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ FanOutExchange(const string& name);
+
+ inline virtual const string& getName(){ return name; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~FanOutExchange();
+};
+
+}
+}
+
+
+
+#endif
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
new file mode 100644
index 0000000000..37a0c9b2c8
--- /dev/null
+++ b/cpp/broker/inc/Message.h
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Message_
+#define _Message_
+
+#include "memory.h"
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "BasicHeaderProperties.h"
+#include "BasicPublishBody.h"
+#include "ConnectionToken.h"
+#include "OutputHandler.h"
+
+namespace qpid {
+ namespace broker {
+ class ExchangeRegistry;
+
+ class Message{
+ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+ typedef content_list::iterator content_iterator;
+
+ const ConnectionToken* const publisher;
+ string exchange;
+ string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ qpid::framing::AMQHeaderBody::shared_ptr header;
+ content_list content;
+
+ u_int64_t contentSize();
+ qpid::framing::BasicHeaderProperties* getHeaderProperties();
+
+
+ public:
+ typedef std::tr1::shared_ptr<Message> shared_ptr;
+
+ Message(const ConnectionToken* const publisher,
+ const string& exchange, const string& routingKey,
+ bool mandatory, bool immediate);
+ ~Message();
+ void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+ void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+ bool isComplete();
+ const ConnectionToken* const getPublisher();
+
+ void deliver(qpid::framing::OutputHandler* out, int channel,
+ string& consumerTag, u_int64_t deliveryTag,
+ u_int32_t framesize);
+
+ friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
+
+ };
+ bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/NameGenerator.h b/cpp/broker/inc/NameGenerator.h
new file mode 100644
index 0000000000..6e6e0acf28
--- /dev/null
+++ b/cpp/broker/inc/NameGenerator.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include "Message.h"
+
+namespace qpid {
+ namespace broker {
+ class NameGenerator{
+ const std::string base;
+ unsigned int counter;
+ public:
+ NameGenerator(const std::string& base);
+ std::string generate();
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/Queue.h b/cpp/broker/inc/Queue.h
new file mode 100644
index 0000000000..2229ba6235
--- /dev/null
+++ b/cpp/broker/inc/Queue.h
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Queue_
+#define _Queue_
+
+#include <vector>
+#include <queue>
+#include "memory.h"
+#include "apr_time.h"
+#include "amqp_types.h"
+#include "Binding.h"
+#include "ConnectionToken.h"
+#include "Consumer.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+
+namespace qpid {
+ namespace broker {
+
+ /**
+ * Thrown when exclusive access would be violated.
+ */
+ struct ExclusiveAccessException{};
+
+ /**
+ * The brokers representation of an amqp queue. Messages are
+ * delivered to a queue from where they can be dispatched to
+ * registered consumers or be stored until dequeued or until one
+ * or more consumers registers.
+ */
+ class Queue{
+ const string name;
+ const u_int32_t autodelete;
+ const bool durable;
+ const ConnectionToken* const owner;
+ std::vector<Consumer*> consumers;
+ std::queue<Binding*> bindings;
+ std::queue<Message::shared_ptr> messages;
+ bool queueing;
+ bool dispatching;
+ int next;
+ mutable qpid::concurrent::MonitorImpl lock;
+ apr_time_t lastUsed;
+ Consumer* exclusive;
+
+ bool startDispatching();
+ bool dispatch(Message::shared_ptr& msg);
+
+ public:
+
+ typedef std::tr1::shared_ptr<Queue> shared_ptr;
+
+ typedef std::vector<shared_ptr> vector;
+
+ Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+ ~Queue();
+ /**
+ * Informs the queue of a binding that should be cancelled on
+ * destruction of the queue.
+ */
+ void bound(Binding* b);
+ /**
+ * Delivers a message to the queue from where it will be
+ * dispatched to immediately to a consumer if one is
+ * available or stored for dequeue or later dispatch if
+ * not.
+ */
+ void deliver(Message::shared_ptr& msg);
+ /**
+ * Dispatch any queued messages providing there are
+ * consumers for them. Only one thread can be dispatching
+ * at any time, but this method (rather than the caller)
+ * is responsible for ensuring that.
+ */
+ void dispatch();
+ void consume(Consumer* c, bool exclusive = false);
+ void cancel(Consumer* c);
+ Message::shared_ptr dequeue();
+ u_int32_t purge();
+ u_int32_t getMessageCount() const;
+ u_int32_t getConsumerCount() const;
+ inline const string& getName() const { return name; }
+ inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+ inline bool hasExclusiveConsumer() const { return exclusive; }
+ bool canAutoDelete() const;
+ };
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/QueueRegistry.h b/cpp/broker/inc/QueueRegistry.h
new file mode 100644
index 0000000000..ac12aa8f88
--- /dev/null
+++ b/cpp/broker/inc/QueueRegistry.h
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class SessionHandlerImpl;
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+
+ public:
+ QueueRegistry();
+ ~QueueRegistry();
+
+ /**
+ * Declare a queue.
+ *
+ * @return The queue and a boolean flag which is true if the queue
+ * was created by this declare call false if it already existed.
+ */
+ std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+
+ /**
+ * Destroy the named queue.
+ *
+ * Note: if the queue is in use it is not actually destroyed until
+ * all shared_ptrs to it are destroyed. During that time it is
+ * possible that a new queue with the same name may be
+ * created. This should not create any problems as the new and
+ * old queues exist independently. The registry has
+ * forgotten the old queue so there can be no confusion for
+ * subsequent calls to find or declare with the same name.
+ *
+ */
+ void destroy(const string& name);
+
+ /**
+ * Find the named queue. Return 0 if not found.
+ */
+ Queue::shared_ptr find(const string& name);
+
+ /**
+ * Generate unique queue name.
+ */
+ string generateName();
+
+ private:
+ typedef std::map<string, Queue::shared_ptr> QueueMap;
+ QueueMap queues;
+ qpid::concurrent::MonitorImpl lock;
+ int counter;
+
+};
+
+
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/SessionHandlerFactoryImpl.h b/cpp/broker/inc/SessionHandlerFactoryImpl.h
new file mode 100644
index 0000000000..2317a6667b
--- /dev/null
+++ b/cpp/broker/inc/SessionHandlerFactoryImpl.h
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerFactoryImpl_
+#define _SessionHandlerFactoryImpl_
+
+#include "AMQFrame.h"
+#include "AutoDelete.h"
+#include "DirectExchange.h"
+#include "ExchangeRegistry.h"
+#include "ProtocolInitiation.h"
+#include "QueueRegistry.h"
+#include "SessionHandlerFactory.h"
+#include "TimeoutHandler.h"
+
+namespace qpid {
+ namespace broker {
+
+ class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
+ {
+ QueueRegistry queues;
+ ExchangeRegistry exchanges;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+ AutoDelete cleaner;
+ public:
+ SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+ virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
+ virtual ~SessionHandlerFactoryImpl();
+ };
+
+ }
+}
+
+
+#endif
diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h
new file mode 100644
index 0000000000..14a6404c78
--- /dev/null
+++ b/cpp/broker/inc/SessionHandlerImpl.h
@@ -0,0 +1,230 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerImpl_
+#define _SessionHandlerImpl_
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include <exception>
+#include "AMQFrame.h"
+#include "AMQP_ServerOperations.h"
+#include "AutoDelete.h"
+#include "ExchangeRegistry.h"
+#include "Channel.h"
+#include "ConnectionToken.h"
+#include "DirectExchange.h"
+#include "OutputHandler.h"
+#include "ProtocolInitiation.h"
+#include "QueueRegistry.h"
+#include "SessionContext.h"
+#include "SessionHandler.h"
+#include "TimeoutHandler.h"
+#include "TopicExchange.h"
+
+namespace qpid {
+namespace broker {
+
+struct ChannelException : public std::exception {
+ u_int16_t code;
+ string text;
+ ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ChannelException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+struct ConnectionException : public std::exception {
+ u_int16_t code;
+ string text;
+ ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+ ~ConnectionException() throw() {}
+ const char* what() const throw() { return text.c_str(); }
+};
+
+class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
+ public virtual qpid::framing::AMQP_ServerOperations,
+ public virtual ConnectionToken
+{
+ typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+ typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+ qpid::io::SessionContext* context;
+ QueueRegistry* queues;
+ ExchangeRegistry* const exchanges;
+ AutoDelete* const cleaner;
+ const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+
+ ConnectionHandler* connectionHandler;
+ ChannelHandler* channelHandler;
+ BasicHandler* basicHandler;
+ ExchangeHandler* exchangeHandler;
+ QueueHandler* queueHandler;
+
+ std::map<u_int16_t, Channel*> channels;
+ std::vector<Queue::shared_ptr> exclusiveQueues;
+
+ u_int32_t framemax;
+ u_int16_t heartbeat;
+
+ void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
+ void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+ void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+ /**
+ * Get named queue, never returns 0.
+ * @return: named queue or default queue for channel if name=""
+ * @exception: ChannelException if no queue of that name is found.
+ * @exception: ConnectionException if no queue specified and channel has not declared one.
+ */
+ Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
+
+ Exchange* findExchange(const string& name);
+
+ public:
+ SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues,
+ ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+ virtual void received(qpid::framing::AMQFrame* frame);
+ virtual void initiated(qpid::framing::ProtocolInitiation* header);
+ virtual void idleOut();
+ virtual void idleIn();
+ virtual void closed();
+ virtual ~SessionHandlerImpl();
+
+ class ConnectionHandlerImpl : public virtual ConnectionHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism,
+ string& response, string& locale);
+
+ virtual void secureOk(u_int16_t channel, string& response);
+
+ virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat);
+
+ virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist);
+
+ virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId,
+ u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ConnectionHandlerImpl(){}
+ };
+
+ class ChannelHandlerImpl : public virtual ChannelHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void open(u_int16_t channel, string& outOfBand);
+
+ virtual void flow(u_int16_t channel, bool active);
+
+ virtual void flowOk(u_int16_t channel, bool active);
+
+ virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText,
+ u_int16_t classId, u_int16_t methodId);
+
+ virtual void closeOk(u_int16_t channel);
+
+ virtual ~ChannelHandlerImpl(){}
+ };
+
+ class ExchangeHandlerImpl : public virtual ExchangeHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type,
+ bool passive, bool durable, bool autoDelete, bool internal, bool nowait,
+ qpid::framing::FieldTable& arguments);
+
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait);
+
+ virtual ~ExchangeHandlerImpl(){}
+ };
+
+
+ class QueueHandlerImpl : public virtual QueueHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool passive, bool durable, bool exclusive,
+ bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments);
+
+ virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue,
+ string& exchange, string& routingKey, bool nowait,
+ qpid::framing::FieldTable& arguments);
+
+ virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue,
+ bool nowait);
+
+ virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty,
+ bool nowait);
+
+ virtual ~QueueHandlerImpl(){}
+ };
+
+ class BasicHandlerImpl : public virtual BasicHandler{
+ SessionHandlerImpl* parent;
+ public:
+ inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+ virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global);
+
+ virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag,
+ bool noLocal, bool noAck, bool exclusive, bool nowait);
+
+ virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait);
+
+ virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey,
+ bool mandatory, bool immediate);
+
+ virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck);
+
+ virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple);
+
+ virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue);
+
+ virtual void recover(u_int16_t channel, bool requeue);
+
+ virtual ~BasicHandlerImpl(){}
+ };
+
+ inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; }
+ inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; }
+ inline virtual BasicHandler* getBasicHandler(){ return basicHandler; }
+ inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; }
+ inline virtual QueueHandler* getQueueHandler(){ return queueHandler; }
+
+ inline virtual AccessHandler* getAccessHandler(){ return 0; }
+ inline virtual FileHandler* getFileHandler(){ return 0; }
+ inline virtual StreamHandler* getStreamHandler(){ return 0; }
+ inline virtual TxHandler* getTxHandler(){ return 0; }
+ inline virtual DtxHandler* getDtxHandler(){ return 0; }
+ inline virtual TunnelHandler* getTunnelHandler(){ return 0; }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/broker/inc/TopicExchange.h b/cpp/broker/inc/TopicExchange.h
new file mode 100644
index 0000000000..d9ff62ecc6
--- /dev/null
+++ b/cpp/broker/inc/TopicExchange.h
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+ class TopicExchange : public virtual Exchange{
+ const string name;
+ std::map<string, std::vector<Queue::shared_ptr> > bindings;//NOTE: pattern matching not yet supported
+ qpid::concurrent::MonitorImpl lock;
+
+ public:
+ static const std::string typeName;
+
+ TopicExchange(const string& name);
+
+ inline virtual const string& getName(){ return name; }
+
+ virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+ virtual ~TopicExchange();
+ };
+}
+}
+
+
+#endif