diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/broker/inc | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/inc')
-rw-r--r-- | cpp/broker/inc/AutoDelete.h | 54 | ||||
-rw-r--r-- | cpp/broker/inc/Binding.h | 35 | ||||
-rw-r--r-- | cpp/broker/inc/Channel.h | 87 | ||||
-rw-r--r-- | cpp/broker/inc/Configuration.h | 125 | ||||
-rw-r--r-- | cpp/broker/inc/ConnectionToken.h | 35 | ||||
-rw-r--r-- | cpp/broker/inc/Consumer.h | 34 | ||||
-rw-r--r-- | cpp/broker/inc/DirectExchange.h | 55 | ||||
-rw-r--r-- | cpp/broker/inc/Exchange.h | 39 | ||||
-rw-r--r-- | cpp/broker/inc/ExchangeBinding.h | 45 | ||||
-rw-r--r-- | cpp/broker/inc/ExchangeRegistry.h | 42 | ||||
-rw-r--r-- | cpp/broker/inc/FanOutExchange.h | 58 | ||||
-rw-r--r-- | cpp/broker/inc/Message.h | 73 | ||||
-rw-r--r-- | cpp/broker/inc/NameGenerator.h | 36 | ||||
-rw-r--r-- | cpp/broker/inc/Queue.h | 106 | ||||
-rw-r--r-- | cpp/broker/inc/QueueRegistry.h | 88 | ||||
-rw-r--r-- | cpp/broker/inc/SessionHandlerFactoryImpl.h | 49 | ||||
-rw-r--r-- | cpp/broker/inc/SessionHandlerImpl.h | 230 | ||||
-rw-r--r-- | cpp/broker/inc/TopicExchange.h | 55 |
18 files changed, 1246 insertions, 0 deletions
diff --git a/cpp/broker/inc/AutoDelete.h b/cpp/broker/inc/AutoDelete.h new file mode 100644 index 0000000000..864d68358f --- /dev/null +++ b/cpp/broker/inc/AutoDelete.h @@ -0,0 +1,54 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _AutoDelete_ +#define _AutoDelete_ + +#include <iostream> +#include <queue> +#include "MonitorImpl.h" +#include "Queue.h" +#include "QueueRegistry.h" +#include "ThreadFactoryImpl.h" + +namespace qpid { + namespace broker{ + class AutoDelete : private virtual qpid::concurrent::Runnable{ + qpid::concurrent::ThreadFactoryImpl factory; + qpid::concurrent::MonitorImpl lock; + qpid::concurrent::MonitorImpl monitor; + std::queue<Queue::shared_ptr> queues; + QueueRegistry* const registry; + const u_int32_t period; + volatile bool stopped; + qpid::concurrent::Thread* runner; + + Queue::shared_ptr const pop(); + void process(); + virtual void run(); + + public: + AutoDelete(QueueRegistry* const registry, u_int32_t period); + void add(Queue::shared_ptr const); + void start(); + void stop(); + }; + } +} + + +#endif diff --git a/cpp/broker/inc/Binding.h b/cpp/broker/inc/Binding.h new file mode 100644 index 0000000000..b11419e92c --- /dev/null +++ b/cpp/broker/inc/Binding.h @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Binding_ +#define _Binding_ + +#include "FieldTable.h" + +namespace qpid { + namespace broker { + class Binding{ + public: + virtual void cancel() = 0; + virtual ~Binding(){} + }; + } +} + + +#endif + diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h new file mode 100644 index 0000000000..aaf2ce569b --- /dev/null +++ b/cpp/broker/inc/Channel.h @@ -0,0 +1,87 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Channel_ +#define _Channel_ + +#include <map> +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "BasicPublishBody.h" +#include "Binding.h" +#include "Consumer.h" +#include "Message.h" +#include "MonitorImpl.h" +#include "NameGenerator.h" +#include "OutputHandler.h" +#include "Queue.h" + +namespace qpid { + namespace broker { + class Channel{ + private: + class ConsumerImpl : public virtual Consumer{ + ConnectionToken* const connection; + Channel* parent; + string tag; + Queue::shared_ptr queue; + public: + ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection); + virtual bool deliver(Message::shared_ptr& msg); + void cancel(); + }; + + typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + + const int id; + qpid::framing::OutputHandler* out; + u_int64_t deliveryTag; + Queue::shared_ptr defaultQueue; + bool transactional; + std::map<string, ConsumerImpl*> consumers; + u_int32_t prefetchSize; + u_int16_t prefetchCount; + u_int32_t framesize; + Message::shared_ptr message; + NameGenerator tagGenerator; + + void deliver(Message::shared_ptr& msg, string& tag); + void publish(ExchangeRegistry* exchanges); + + public: + Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize); + ~Channel(); + inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } + inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } + inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; } + inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; } + void handlePublish(Message* msg); + void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges); + void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges); + bool exists(string& consumerTag); + void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); + void cancel(string& tag); + void begin(); + void close(); + void commit(); + void rollback(); + }; + } +} + + +#endif diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h new file mode 100644 index 0000000000..5ec70a839b --- /dev/null +++ b/cpp/broker/inc/Configuration.h @@ -0,0 +1,125 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Configuration_ +#define _Configuration_ + +#include <cstdlib> +#include <iostream> +#include <vector> + +namespace qpid { + namespace broker { + class Configuration{ + class Option{ + const std::string flag; + const std::string name; + const std::string desc; + + bool match(const std::string& arg); + + protected: + virtual bool needsValue() const = 0; + virtual void setValue(const std::string& value) = 0; + + public: + Option(const char flag, const std::string& name, const std::string& desc); + Option(const std::string& name, const std::string& desc); + virtual ~Option(); + + bool parse(int& i, char** argv, int argc); + void print(std::ostream& out) const; + }; + + class IntOption : public Option{ + const int defaultValue; + int value; + public: + IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0); + IntOption(const std::string& name, const std::string& desc, const int value = 0); + virtual ~IntOption(); + + int getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + }; + + class StringOption : public Option{ + const std::string defaultValue; + std::string value; + public: + StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = ""); + StringOption(const std::string& name, const std::string& desc, const std::string value = ""); + virtual ~StringOption(); + + const std::string& getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + }; + + class BoolOption : public Option{ + const bool defaultValue; + bool value; + public: + BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0); + BoolOption(const std::string& name, const std::string& desc, const bool value = 0); + virtual ~BoolOption(); + + bool getValue() const; + virtual bool needsValue() const; + virtual void setValue(const std::string& value); + }; + + BoolOption trace; + IntOption port; + IntOption workerThreads; + IntOption maxConnections; + IntOption connectionBacklog; + StringOption acceptor; + BoolOption help; + + typedef std::vector<Option*>::iterator op_iterator; + std::vector<Option*> options; + + public: + class ParseException{ + public: + const std::string& error; + ParseException(const std::string& _error) : error(_error) {} + }; + + + Configuration(); + ~Configuration(); + + void parse(int argc, char** argv); + + bool isHelp(); + bool isTrace(); + int getPort(); + int getWorkerThreads(); + int getMaxConnections(); + int getConnectionBacklog(); + const std::string& getAcceptor(); + + void usage(); + }; + } +} + + +#endif diff --git a/cpp/broker/inc/ConnectionToken.h b/cpp/broker/inc/ConnectionToken.h new file mode 100644 index 0000000000..1faefec2cc --- /dev/null +++ b/cpp/broker/inc/ConnectionToken.h @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ConnectionToken_ +#define _ConnectionToken_ + +namespace qpid { + namespace broker { + /** + * An empty interface allowing opaque implementations of some + * form of token to identify a connection. + */ + class ConnectionToken{ + public: + virtual ~ConnectionToken(){} + }; + } +} + + +#endif diff --git a/cpp/broker/inc/Consumer.h b/cpp/broker/inc/Consumer.h new file mode 100644 index 0000000000..af2d5d7812 --- /dev/null +++ b/cpp/broker/inc/Consumer.h @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Consumer_ +#define _Consumer_ + +#include "Message.h" + +namespace qpid { + namespace broker { + class Consumer{ + public: + virtual bool deliver(Message::shared_ptr& msg) = 0; + virtual ~Consumer(){} + }; + } +} + + +#endif diff --git a/cpp/broker/inc/DirectExchange.h b/cpp/broker/inc/DirectExchange.h new file mode 100644 index 0000000000..bf8c5f0b37 --- /dev/null +++ b/cpp/broker/inc/DirectExchange.h @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _DirectExchange_ +#define _DirectExchange_ + +#include <map> +#include <vector> +#include "Exchange.h" +#include "FieldTable.h" +#include "Message.h" +#include "MonitorImpl.h" +#include "Queue.h" + +namespace qpid { +namespace broker { + class DirectExchange : public virtual Exchange{ + const string name; + std::map<string, std::vector<Queue::shared_ptr> > bindings; + qpid::concurrent::MonitorImpl lock; + + public: + static const std::string typeName; + + DirectExchange(const string& name); + + inline virtual const string& getName(){ return name; } + + virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + + virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + + virtual ~DirectExchange(); + }; +} +} + + +#endif diff --git a/cpp/broker/inc/Exchange.h b/cpp/broker/inc/Exchange.h new file mode 100644 index 0000000000..5f5dc5ce71 --- /dev/null +++ b/cpp/broker/inc/Exchange.h @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Exchange_ +#define _Exchange_ + +#include "FieldTable.h" +#include "Message.h" +#include "Queue.h" + +namespace qpid { +namespace broker { + class Exchange{ + public: + virtual const string& getName() = 0; + virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0; + virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0; + virtual ~Exchange(){} + }; +} +} + + +#endif diff --git a/cpp/broker/inc/ExchangeBinding.h b/cpp/broker/inc/ExchangeBinding.h new file mode 100644 index 0000000000..4cbb73acbf --- /dev/null +++ b/cpp/broker/inc/ExchangeBinding.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ExchangeBinding_ +#define _ExchangeBinding_ + +#include "Binding.h" +#include "FieldTable.h" +#include "Queue.h" + +namespace qpid { + namespace broker { + class Exchange; + class Queue; + + class ExchangeBinding : public virtual Binding{ + Exchange* e; + Queue::shared_ptr q; + const string key; + qpid::framing::FieldTable* args; + public: + ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args); + virtual void cancel(); + virtual ~ExchangeBinding(); + }; + } +} + + +#endif + diff --git a/cpp/broker/inc/ExchangeRegistry.h b/cpp/broker/inc/ExchangeRegistry.h new file mode 100644 index 0000000000..0f0eaae0d0 --- /dev/null +++ b/cpp/broker/inc/ExchangeRegistry.h @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ExchangeRegistry_ +#define _ExchangeRegistry_ + +#include <map> +#include "Exchange.h" +#include "Monitor.h" + +namespace qpid { +namespace broker { + class ExchangeRegistry{ + std::map<string, Exchange*> exchanges; + qpid::concurrent::Monitor* lock; + public: + ExchangeRegistry(); + void declare(Exchange* exchange); + void destroy(const string& name); + Exchange* get(const string& name); + inline qpid::concurrent::Monitor* getLock(){ return lock; } + ~ExchangeRegistry(); + }; +} +} + + +#endif diff --git a/cpp/broker/inc/FanOutExchange.h b/cpp/broker/inc/FanOutExchange.h new file mode 100644 index 0000000000..9d0d32bbf8 --- /dev/null +++ b/cpp/broker/inc/FanOutExchange.h @@ -0,0 +1,58 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _FanOutExchange_ +#define _FanOutExchange_ + +#include <map> +#include <vector> +#include "Exchange.h" +#include "FieldTable.h" +#include "Message.h" +#include "MonitorImpl.h" +#include "Queue.h" + +namespace qpid { +namespace broker { + +class FanOutExchange : public virtual Exchange { + const string name; + std::vector<Queue::shared_ptr> bindings; + qpid::concurrent::MonitorImpl lock; + + public: + static const std::string typeName; + + FanOutExchange(const string& name); + + inline virtual const string& getName(){ return name; } + + virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + + virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + + virtual ~FanOutExchange(); +}; + +} +} + + + +#endif diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h new file mode 100644 index 0000000000..37a0c9b2c8 --- /dev/null +++ b/cpp/broker/inc/Message.h @@ -0,0 +1,73 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Message_ +#define _Message_ + +#include "memory.h" +#include "AMQContentBody.h" +#include "AMQHeaderBody.h" +#include "BasicHeaderProperties.h" +#include "BasicPublishBody.h" +#include "ConnectionToken.h" +#include "OutputHandler.h" + +namespace qpid { + namespace broker { + class ExchangeRegistry; + + class Message{ + typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list; + typedef content_list::iterator content_iterator; + + const ConnectionToken* const publisher; + string exchange; + string routingKey; + const bool mandatory; + const bool immediate; + qpid::framing::AMQHeaderBody::shared_ptr header; + content_list content; + + u_int64_t contentSize(); + qpid::framing::BasicHeaderProperties* getHeaderProperties(); + + + public: + typedef std::tr1::shared_ptr<Message> shared_ptr; + + Message(const ConnectionToken* const publisher, + const string& exchange, const string& routingKey, + bool mandatory, bool immediate); + ~Message(); + void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header); + void addContent(qpid::framing::AMQContentBody::shared_ptr data); + bool isComplete(); + const ConnectionToken* const getPublisher(); + + void deliver(qpid::framing::OutputHandler* out, int channel, + string& consumerTag, u_int64_t deliveryTag, + u_int32_t framesize); + + friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry); + + }; + bool route(Message::shared_ptr& msg, ExchangeRegistry* registry); + } +} + + +#endif diff --git a/cpp/broker/inc/NameGenerator.h b/cpp/broker/inc/NameGenerator.h new file mode 100644 index 0000000000..6e6e0acf28 --- /dev/null +++ b/cpp/broker/inc/NameGenerator.h @@ -0,0 +1,36 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _NameGenerator_ +#define _NameGenerator_ + +#include "Message.h" + +namespace qpid { + namespace broker { + class NameGenerator{ + const std::string base; + unsigned int counter; + public: + NameGenerator(const std::string& base); + std::string generate(); + }; + } +} + + +#endif diff --git a/cpp/broker/inc/Queue.h b/cpp/broker/inc/Queue.h new file mode 100644 index 0000000000..2229ba6235 --- /dev/null +++ b/cpp/broker/inc/Queue.h @@ -0,0 +1,106 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Queue_ +#define _Queue_ + +#include <vector> +#include <queue> +#include "memory.h" +#include "apr_time.h" +#include "amqp_types.h" +#include "Binding.h" +#include "ConnectionToken.h" +#include "Consumer.h" +#include "Message.h" +#include "MonitorImpl.h" + +namespace qpid { + namespace broker { + + /** + * Thrown when exclusive access would be violated. + */ + struct ExclusiveAccessException{}; + + /** + * The brokers representation of an amqp queue. Messages are + * delivered to a queue from where they can be dispatched to + * registered consumers or be stored until dequeued or until one + * or more consumers registers. + */ + class Queue{ + const string name; + const u_int32_t autodelete; + const bool durable; + const ConnectionToken* const owner; + std::vector<Consumer*> consumers; + std::queue<Binding*> bindings; + std::queue<Message::shared_ptr> messages; + bool queueing; + bool dispatching; + int next; + mutable qpid::concurrent::MonitorImpl lock; + apr_time_t lastUsed; + Consumer* exclusive; + + bool startDispatching(); + bool dispatch(Message::shared_ptr& msg); + + public: + + typedef std::tr1::shared_ptr<Queue> shared_ptr; + + typedef std::vector<shared_ptr> vector; + + Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0); + ~Queue(); + /** + * Informs the queue of a binding that should be cancelled on + * destruction of the queue. + */ + void bound(Binding* b); + /** + * Delivers a message to the queue from where it will be + * dispatched to immediately to a consumer if one is + * available or stored for dequeue or later dispatch if + * not. + */ + void deliver(Message::shared_ptr& msg); + /** + * Dispatch any queued messages providing there are + * consumers for them. Only one thread can be dispatching + * at any time, but this method (rather than the caller) + * is responsible for ensuring that. + */ + void dispatch(); + void consume(Consumer* c, bool exclusive = false); + void cancel(Consumer* c); + Message::shared_ptr dequeue(); + u_int32_t purge(); + u_int32_t getMessageCount() const; + u_int32_t getConsumerCount() const; + inline const string& getName() const { return name; } + inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } + inline bool hasExclusiveConsumer() const { return exclusive; } + bool canAutoDelete() const; + }; + } +} + + +#endif diff --git a/cpp/broker/inc/QueueRegistry.h b/cpp/broker/inc/QueueRegistry.h new file mode 100644 index 0000000000..ac12aa8f88 --- /dev/null +++ b/cpp/broker/inc/QueueRegistry.h @@ -0,0 +1,88 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _QueueRegistry_ +#define _QueueRegistry_ + +#include <map> +#include "MonitorImpl.h" +#include "Queue.h" + +namespace qpid { +namespace broker { + +class SessionHandlerImpl; + +/** + * A registry of queues indexed by queue name. + * + * Queues are reference counted using shared_ptr to ensure that they + * are deleted when and only when they are no longer in use. + * + */ +class QueueRegistry{ + + public: + QueueRegistry(); + ~QueueRegistry(); + + /** + * Declare a queue. + * + * @return The queue and a boolean flag which is true if the queue + * was created by this declare call false if it already existed. + */ + std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0); + + /** + * Destroy the named queue. + * + * Note: if the queue is in use it is not actually destroyed until + * all shared_ptrs to it are destroyed. During that time it is + * possible that a new queue with the same name may be + * created. This should not create any problems as the new and + * old queues exist independently. The registry has + * forgotten the old queue so there can be no confusion for + * subsequent calls to find or declare with the same name. + * + */ + void destroy(const string& name); + + /** + * Find the named queue. Return 0 if not found. + */ + Queue::shared_ptr find(const string& name); + + /** + * Generate unique queue name. + */ + string generateName(); + + private: + typedef std::map<string, Queue::shared_ptr> QueueMap; + QueueMap queues; + qpid::concurrent::MonitorImpl lock; + int counter; + +}; + + +} +} + + +#endif diff --git a/cpp/broker/inc/SessionHandlerFactoryImpl.h b/cpp/broker/inc/SessionHandlerFactoryImpl.h new file mode 100644 index 0000000000..2317a6667b --- /dev/null +++ b/cpp/broker/inc/SessionHandlerFactoryImpl.h @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandlerFactoryImpl_ +#define _SessionHandlerFactoryImpl_ + +#include "AMQFrame.h" +#include "AutoDelete.h" +#include "DirectExchange.h" +#include "ExchangeRegistry.h" +#include "ProtocolInitiation.h" +#include "QueueRegistry.h" +#include "SessionHandlerFactory.h" +#include "TimeoutHandler.h" + +namespace qpid { + namespace broker { + + class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory + { + QueueRegistry queues; + ExchangeRegistry exchanges; + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + AutoDelete cleaner; + public: + SessionHandlerFactoryImpl(u_int32_t timeout = 30000); + virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt); + virtual ~SessionHandlerFactoryImpl(); + }; + + } +} + + +#endif diff --git a/cpp/broker/inc/SessionHandlerImpl.h b/cpp/broker/inc/SessionHandlerImpl.h new file mode 100644 index 0000000000..14a6404c78 --- /dev/null +++ b/cpp/broker/inc/SessionHandlerImpl.h @@ -0,0 +1,230 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandlerImpl_ +#define _SessionHandlerImpl_ + +#include <map> +#include <sstream> +#include <vector> +#include <exception> +#include "AMQFrame.h" +#include "AMQP_ServerOperations.h" +#include "AutoDelete.h" +#include "ExchangeRegistry.h" +#include "Channel.h" +#include "ConnectionToken.h" +#include "DirectExchange.h" +#include "OutputHandler.h" +#include "ProtocolInitiation.h" +#include "QueueRegistry.h" +#include "SessionContext.h" +#include "SessionHandler.h" +#include "TimeoutHandler.h" +#include "TopicExchange.h" + +namespace qpid { +namespace broker { + +struct ChannelException : public std::exception { + u_int16_t code; + string text; + ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ChannelException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +struct ConnectionException : public std::exception { + u_int16_t code; + string text; + ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {} + ~ConnectionException() throw() {} + const char* what() const throw() { return text.c_str(); } +}; + +class SessionHandlerImpl : public virtual qpid::io::SessionHandler, + public virtual qpid::framing::AMQP_ServerOperations, + public virtual ConnectionToken +{ + typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; + typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + + qpid::io::SessionContext* context; + QueueRegistry* queues; + ExchangeRegistry* const exchanges; + AutoDelete* const cleaner; + const u_int32_t timeout;//timeout for auto-deleted queues (in ms) + + ConnectionHandler* connectionHandler; + ChannelHandler* channelHandler; + BasicHandler* basicHandler; + ExchangeHandler* exchangeHandler; + QueueHandler* queueHandler; + + std::map<u_int16_t, Channel*> channels; + std::vector<Queue::shared_ptr> exclusiveQueues; + + u_int32_t framemax; + u_int16_t heartbeat; + + void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body); + void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body); + void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body); + + /** + * Get named queue, never returns 0. + * @return: named queue or default queue for channel if name="" + * @exception: ChannelException if no queue of that name is found. + * @exception: ConnectionException if no queue specified and channel has not declared one. + */ + Queue::shared_ptr getQueue(const string& name, u_int16_t channel); + + Exchange* findExchange(const string& name); + + public: + SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues, + ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); + virtual void received(qpid::framing::AMQFrame* frame); + virtual void initiated(qpid::framing::ProtocolInitiation* header); + virtual void idleOut(); + virtual void idleIn(); + virtual void closed(); + virtual ~SessionHandlerImpl(); + + class ConnectionHandlerImpl : public virtual ConnectionHandler{ + SessionHandlerImpl* parent; + public: + inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism, + string& response, string& locale); + + virtual void secureOk(u_int16_t channel, string& response); + + virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); + + virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist); + + virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, + u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ConnectionHandlerImpl(){} + }; + + class ChannelHandlerImpl : public virtual ChannelHandler{ + SessionHandlerImpl* parent; + public: + inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void open(u_int16_t channel, string& outOfBand); + + virtual void flow(u_int16_t channel, bool active); + + virtual void flowOk(u_int16_t channel, bool active); + + virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, + u_int16_t classId, u_int16_t methodId); + + virtual void closeOk(u_int16_t channel); + + virtual ~ChannelHandlerImpl(){} + }; + + class ExchangeHandlerImpl : public virtual ExchangeHandler{ + SessionHandlerImpl* parent; + public: + inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type, + bool passive, bool durable, bool autoDelete, bool internal, bool nowait, + qpid::framing::FieldTable& arguments); + + virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait); + + virtual ~ExchangeHandlerImpl(){} + }; + + + class QueueHandlerImpl : public virtual QueueHandler{ + SessionHandlerImpl* parent; + public: + inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments); + + virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue, + string& exchange, string& routingKey, bool nowait, + qpid::framing::FieldTable& arguments); + + virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue, + bool nowait); + + virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty, + bool nowait); + + virtual ~QueueHandlerImpl(){} + }; + + class BasicHandlerImpl : public virtual BasicHandler{ + SessionHandlerImpl* parent; + public: + inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {} + + virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); + + virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag, + bool noLocal, bool noAck, bool exclusive, bool nowait); + + virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait); + + virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey, + bool mandatory, bool immediate); + + virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck); + + virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); + + virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); + + virtual void recover(u_int16_t channel, bool requeue); + + virtual ~BasicHandlerImpl(){} + }; + + inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; } + inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; } + inline virtual BasicHandler* getBasicHandler(){ return basicHandler; } + inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; } + inline virtual QueueHandler* getQueueHandler(){ return queueHandler; } + + inline virtual AccessHandler* getAccessHandler(){ return 0; } + inline virtual FileHandler* getFileHandler(){ return 0; } + inline virtual StreamHandler* getStreamHandler(){ return 0; } + inline virtual TxHandler* getTxHandler(){ return 0; } + inline virtual DtxHandler* getDtxHandler(){ return 0; } + inline virtual TunnelHandler* getTunnelHandler(){ return 0; } +}; + +} +} + + +#endif diff --git a/cpp/broker/inc/TopicExchange.h b/cpp/broker/inc/TopicExchange.h new file mode 100644 index 0000000000..d9ff62ecc6 --- /dev/null +++ b/cpp/broker/inc/TopicExchange.h @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TopicExchange_ +#define _TopicExchange_ + +#include <map> +#include <vector> +#include "Exchange.h" +#include "FieldTable.h" +#include "Message.h" +#include "MonitorImpl.h" +#include "Queue.h" + +namespace qpid { +namespace broker { + class TopicExchange : public virtual Exchange{ + const string name; + std::map<string, std::vector<Queue::shared_ptr> > bindings;//NOTE: pattern matching not yet supported + qpid::concurrent::MonitorImpl lock; + + public: + static const std::string typeName; + + TopicExchange(const string& name); + + inline virtual const string& getName(){ return name; } + + virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + + virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args); + + virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args); + + virtual ~TopicExchange(); + }; +} +} + + +#endif |