diff options
Diffstat (limited to 'cpp/common')
112 files changed, 9052 insertions, 0 deletions
diff --git a/cpp/common/Makefile b/cpp/common/Makefile new file mode 100644 index 0000000000..5fe815b8da --- /dev/null +++ b/cpp/common/Makefile @@ -0,0 +1,51 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Make file to build qpid_common library. +# + +QPID_HOME=../.. +include $(QPID_HOME)/cpp/options.mk + +TARGET = $(LIB_DIR)/libqpid_common.so.1.0 + +CXXFLAGS = $(DEBUG) $(OPT) -MMD -fpic $(COMMON_INCLUDES) + +SOURCES = $(wildcard */src/*.cpp framing/generated/*.cpp) +OBJECTS = $(SOURCES:.cpp=.o) +DEPS = $(SOURCES:.cpp=.d) + +GENERATED_OBJECTS = framing/generated/amqp_methods.o + +.PHONY: all test clean + +# We have to do two separate makes to ensure we pick up all generated files. +all: + @$(MAKE) -C framing all + @make $(TARGET) + +test: + @$(MAKE) -C framing test + +clean: + @$(MAKE) -C framing clean + -@rm -f $(TARGET) $(OBJECTS) $(DEPS) + +$(TARGET): $(OBJECTS) + $(CXX) -shared -o $@ $(OBJECTS) $(LDFLAGS) -lapr-1 + +-include $(DEPS) diff --git a/cpp/common/concurrent/inc/APRBase.h b/cpp/common/concurrent/inc/APRBase.h new file mode 100644 index 0000000000..e0b526faa1 --- /dev/null +++ b/cpp/common/concurrent/inc/APRBase.h @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRBase_ +#define _APRBase_ + +#include <string> +#include "apr_thread_mutex.h" +#include "apr_errno.h" + +namespace qpid { +namespace concurrent { + + /** + * Use of APR libraries necessitates explicit init and terminate + * calls. Any class using APR libs should obtain the reference to + * this singleton and increment on construction, decrement on + * destruction. This class can then correctly initialise apr + * before the first use and terminate after the last use. + */ + class APRBase{ + static APRBase* instance; + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + int count; + + APRBase(); + ~APRBase(); + static APRBase* getInstance(); + bool _increment(); + void _decrement(); + public: + static void increment(); + static void decrement(); + }; + + //this is also a convenient place for a helper function for error checking: + void check(apr_status_t status, const std::string& file, const int line); + std::string get_desc(apr_status_t status); + +#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); + +} +} + + + + +#endif diff --git a/cpp/common/concurrent/inc/APRMonitor.h b/cpp/common/concurrent/inc/APRMonitor.h new file mode 100644 index 0000000000..bf72596564 --- /dev/null +++ b/cpp/common/concurrent/inc/APRMonitor.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRMonitor_ +#define _APRMonitor_ + +#include "apr_thread_mutex.h" +#include "apr_thread_cond.h" +#include "Monitor.h" + +namespace qpid { +namespace concurrent { + + class APRMonitor : public virtual Monitor + { + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + apr_thread_cond_t* condition; + + public: + APRMonitor(); + virtual ~APRMonitor(); + virtual void wait(); + virtual void wait(u_int64_t time); + virtual void notify(); + virtual void notifyAll(); + virtual void acquire(); + virtual void release(); + }; +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/APRThread.h b/cpp/common/concurrent/inc/APRThread.h new file mode 100644 index 0000000000..d5034ce3b7 --- /dev/null +++ b/cpp/common/concurrent/inc/APRThread.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRThread_ +#define _APRThread_ + +#include "apr_thread_proc.h" +#include "APRThread.h" +#include "Runnable.h" +#include "Thread.h" + +namespace qpid { +namespace concurrent { + + class APRThread : public virtual Thread + { + const Runnable* runnable; + apr_pool_t* pool; + apr_thread_t* runner; + + public: + APRThread(apr_pool_t* pool, Runnable* runnable); + virtual ~APRThread(); + virtual void start(); + virtual void join(); + virtual void interrupt(); + static unsigned int currentThread(); + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/APRThreadFactory.h b/cpp/common/concurrent/inc/APRThreadFactory.h new file mode 100644 index 0000000000..87b240025d --- /dev/null +++ b/cpp/common/concurrent/inc/APRThreadFactory.h @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRThreadFactory_ +#define _APRThreadFactory_ + +#include "apr_thread_proc.h" + +#include "APRThread.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "Runnable.h" + +namespace qpid { +namespace concurrent { + + class APRThreadFactory : public virtual ThreadFactory + { + apr_pool_t* pool; + public: + APRThreadFactory(); + virtual ~APRThreadFactory(); + virtual Thread* create(Runnable* runnable); + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/APRThreadPool.h b/cpp/common/concurrent/inc/APRThreadPool.h new file mode 100644 index 0000000000..cf6d30774c --- /dev/null +++ b/cpp/common/concurrent/inc/APRThreadPool.h @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRThreadPool_ +#define _APRThreadPool_ + +#include <queue> +#include <vector> +#include "APRMonitor.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "ThreadPool.h" +#include "Runnable.h" + +namespace qpid { +namespace concurrent { + + class APRThreadPool : public virtual ThreadPool + { + class Worker : public virtual Runnable{ + APRThreadPool* pool; + public: + inline Worker(APRThreadPool* _pool) : pool(_pool){} + inline virtual void run(){ + while(pool->running){ + pool->runTask(); + } + } + }; + const bool deleteFactory; + const int size; + ThreadFactory* factory; + APRMonitor lock; + std::vector<Thread*> threads; + std::queue<Runnable*> tasks; + Worker* worker; + volatile bool running; + + void runTask(); + public: + APRThreadPool(int size); + APRThreadPool(int size, ThreadFactory* factory); + virtual void start(); + virtual void stop(); + virtual void addTask(Runnable* task); + virtual ~APRThreadPool(); + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/LMonitor.h b/cpp/common/concurrent/inc/LMonitor.h new file mode 100644 index 0000000000..8e2569921d --- /dev/null +++ b/cpp/common/concurrent/inc/LMonitor.h @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LMonitor_ +#define _LMonitor_ + +/* Native Linux Monitor - Based of Kernel patch 19/20 */ + +#include "Monitor.h" + +namespace qpid { +namespace concurrent { + + class LMonitor : public virtual Monitor + { + + public: + LMonitor(); + virtual ~LMonitor(); + virtual void wait(); + virtual void notify(); + virtual void notifyAll(); + virtual void acquire(); + virtual void release(); + }; +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/LThreadFactory.h b/cpp/common/concurrent/inc/LThreadFactory.h new file mode 100644 index 0000000000..4a573d1bd1 --- /dev/null +++ b/cpp/common/concurrent/inc/LThreadFactory.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LAPRThreadFactory_ +#define _LAPRThreadFactory_ + + +namespace qpid { +namespace concurrent { + + class LThreadFactory + { + public: + LThreadFactory(); + virtual ~LThreadFactory(); + virtual Thread* create(Runnable* runnable); + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/LockedQueue.h b/cpp/common/concurrent/inc/LockedQueue.h new file mode 100644 index 0000000000..ef3f0b8381 --- /dev/null +++ b/cpp/common/concurrent/inc/LockedQueue.h @@ -0,0 +1,68 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LockedQueue_ +#define _LockedQueue_ + +#include <queue> +#include "Monitor.h" + +/** + * A threadsafe queue abstraction + */ +namespace qpid { +namespace concurrent { + template<class T, class L> class LockedQueue + { + L lock; + std::queue<T*> queue; + + public: + void put(T* item); + T* take(); + bool empty(); + }; + + template<class T, class L> void LockedQueue<T, L>::put(T* item){ + lock.acquire(); + queue.push(item); + lock.release(); + } + + template<class T, class L> T* LockedQueue<T, L>::take(){ + lock.acquire(); + T* item = 0; + if(!queue.empty()){ + item = queue.front(); + queue.pop(); + } + lock.release(); + return item; + } + + template<class T, class L> bool LockedQueue<T, L>::empty(){ + lock.acquire(); + bool result = queue.empty(); + lock.release(); + return result; + } + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/Monitor.h b/cpp/common/concurrent/inc/Monitor.h new file mode 100644 index 0000000000..7f1a299c6a --- /dev/null +++ b/cpp/common/concurrent/inc/Monitor.h @@ -0,0 +1,59 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Monitor_ +#define _Monitor_ + +#include "amqp_types.h" + +namespace qpid { +namespace concurrent { + +class Monitor +{ + public: + virtual ~Monitor(){} + virtual void wait() = 0; + virtual void wait(u_int64_t time) = 0; + virtual void notify() = 0; + virtual void notifyAll() = 0; + virtual void acquire() = 0; + virtual void release() = 0; +}; + +/** + * Scoped locker for a monitor. + */ +class Locker +{ + public: + Locker(Monitor& lock_) : lock(lock_) { lock.acquire(); } + ~Locker() { lock.release(); } + + private: + Monitor& lock; + + // private and unimplemented to prevent copying + Locker(const Locker&); + void operator=(const Locker&); +}; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/MonitorImpl.h b/cpp/common/concurrent/inc/MonitorImpl.h new file mode 100644 index 0000000000..e96e81d795 --- /dev/null +++ b/cpp/common/concurrent/inc/MonitorImpl.h @@ -0,0 +1,57 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#ifndef _MonitorImpl_ +#define _MonitorImpl_ + +#ifdef _USE_APR_IO_ +#include "APRMonitor.h" +#else /* use POSIX Monitor */ +#include "LMonitor.h" +#endif + + +namespace qpid { +namespace concurrent { + +#ifdef _USE_APR_IO_ + class MonitorImpl : public virtual APRMonitor + { + + public: + MonitorImpl() : APRMonitor(){}; + virtual ~MonitorImpl(){}; + + }; +#else + class MonitorImpl : public virtual LMonitor + { + + public: + MonitorImpl() : LMonitor(){}; + virtual ~MonitorImpl(){}; + + }; +#endif + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/Runnable.h b/cpp/common/concurrent/inc/Runnable.h new file mode 100644 index 0000000000..523ad813f7 --- /dev/null +++ b/cpp/common/concurrent/inc/Runnable.h @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Runnable_ +#define _Runnable_ + +namespace qpid { +namespace concurrent { + + class Runnable + { + public: + virtual void run() = 0; + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/TaskQueue.h b/cpp/common/concurrent/inc/TaskQueue.h new file mode 100644 index 0000000000..e06a3ce069 --- /dev/null +++ b/cpp/common/concurrent/inc/TaskQueue.h @@ -0,0 +1,200 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TaskQueue_ +#define _TaskQueue_ + +#include <iostream> +#include <memory> +#include <queue> +#include "LockedQueue.h" +#include "Runnable.h" +#include "ThreadPool.h" + +namespace qpid { +namespace concurrent { + template<class T, class L> class TaskQueue : public virtual Runnable + { + const int max_iterations_per_run; + L lock; + //LockedQueue<T, L> queue; + std::queue<T*> queue; + ThreadPool* const pool; + T* work; + bool running; + volatile bool stopped; + TaskQueue<T, L>* next; + + volatile bool inrun; + + bool hasWork(); + void completed(); + + T* take(); + + protected: + /** + * Callback though which the task is executed + */ + virtual void execute(T* t) = 0; + /** + * Allows a task to be completed asynchronously to the + * execute() call if required. + */ + virtual bool isComplete(T* t); + /** + * Should be called to signal completion of a task that was + * signalled as not complete through the isComplete() methods + * return value. This will allow normal processing to resume. + */ + virtual void complete(); + + public: + TaskQueue(ThreadPool* const pool, int max_iterations_per_run = 100); + virtual void run(); + void trigger(); + bool append(T* t); + void stop(bool drain); + inline void setNext(TaskQueue<T, L>* next){ this->next = next; } + }; + + template<class T, class L> TaskQueue<T, L>::TaskQueue(ThreadPool* const _pool, int _max_iterations_per_run) : + pool(_pool), + max_iterations_per_run(_max_iterations_per_run), + work(0), + running(false), + stopped(false), + next(0), inrun(false){ + } + + template<class T, class L> void TaskQueue<T, L>::run(){ + if(inrun) std::cout << "Already running" << std::endl; + inrun = true; + + bool blocked = false; + int count = max_iterations_per_run; + while(!blocked && hasWork() && count){ + execute(work); + if(isComplete(work)){ + completed(); + }else{ + blocked = true; + } + count--; + } + inrun = false; + + if(!blocked && count == 0){//performed max_iterations_per_run, requeue task to ensure fairness + //running will still be true at this point + lock.acquire(); + running = false; + if(stopped) lock.notify(); + lock.release(); + + trigger(); + }else if(hasWork()){//task was added to queue after we exited the loop above; should not need this? + trigger(); + } + } + + template<class T, class L> void TaskQueue<T, L>::trigger(){ + lock.acquire(); + if(!running){ + running = true; + pool->addTask(this); + } + lock.release(); + } + + template<class T, class L> bool TaskQueue<T, L>::hasWork(){ + lock.acquire(); + if(!work) work = take();//queue.take(); + if(!work){ + running = false; + if(stopped) lock.notify(); + } + lock.release(); + return work; + } + + template<class T, class L> bool TaskQueue<T, L>::append(T* item){ + if(!stopped){ + lock.acquire(); + + //queue.put(item); + queue.push(item); + + if(!running){ + running = true; + pool->addTask(this); + } + lock.release(); + //} + return true; + }else{ + return false; + } + } + + template<class T, class L> bool TaskQueue<T, L>::isComplete(T* item){ + return true;//by default assume all tasks are synchronous w.r.t. execute() + } + + + template<class T, class L> void TaskQueue<T, L>::completed(){ + if(next){ + if(!next->append(work)){ + std::cout << "Warning: dropping task as next queue appears to have stopped." << std::endl; + } + }else{ + delete work; + } + work = 0; + } + + template<class T, class L> void TaskQueue<T, L>::complete(){ + completed(); + lock.acquire(); + running = false; + if(stopped) lock.notify(); + lock.release(); + } + + template<class T, class L> void TaskQueue<T, L>::stop(bool drain){ + //prevent new tasks from being added + stopped = true; + //wait until no longer running + lock.acquire(); + while(running && (drain && hasWork())){ + lock.wait(); + } + lock.release(); + } + + template<class T, class L> T* TaskQueue<T, L>::take(){ + T* item = 0; + if(!queue.empty()){ + item = queue.front(); + queue.pop(); + } + return item; + } +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/Thread.h b/cpp/common/concurrent/inc/Thread.h new file mode 100644 index 0000000000..6bd2a379ce --- /dev/null +++ b/cpp/common/concurrent/inc/Thread.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Thread_ +#define _Thread_ + +namespace qpid { +namespace concurrent { + + class Thread + { + public: + virtual ~Thread(){} + virtual void start() = 0; + virtual void join() = 0; + virtual void interrupt() = 0; + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/ThreadFactory.h b/cpp/common/concurrent/inc/ThreadFactory.h new file mode 100644 index 0000000000..53be000ff3 --- /dev/null +++ b/cpp/common/concurrent/inc/ThreadFactory.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ThreadFactory_ +#define _ThreadFactory_ + +#include "Thread.h" +#include "Runnable.h" + +namespace qpid { +namespace concurrent { + + class ThreadFactory + { + public: + virtual ~ThreadFactory(){} + virtual Thread* create(Runnable* runnable) = 0; + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/ThreadFactoryImpl.h b/cpp/common/concurrent/inc/ThreadFactoryImpl.h new file mode 100644 index 0000000000..a534b3c1e2 --- /dev/null +++ b/cpp/common/concurrent/inc/ThreadFactoryImpl.h @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ThreadFactoryImpl_ +#define _ThreadFactoryImpl_ + + +#ifdef _USE_APR_IO_ +#include "APRThreadFactory.h" +#else +#include "LThreadFactory.h" +#endif + + +namespace qpid { +namespace concurrent { + + +#ifdef _USE_APR_IO_ + class ThreadFactoryImpl : public virtual APRThreadFactory + { + public: + ThreadFactoryImpl(): APRThreadFactory() {}; + virtual ~ThreadFactoryImpl() {}; + }; +#else + class ThreadFactoryImpl : public virtual LThreadFactory + { + public: + ThreadFactoryImpl(): LThreadFactory() {}; + virtual ~ThreadFactoryImpl() {}; + }; +#endif +} +} + + +#endif diff --git a/cpp/common/concurrent/inc/ThreadPool.h b/cpp/common/concurrent/inc/ThreadPool.h new file mode 100644 index 0000000000..679c889ff3 --- /dev/null +++ b/cpp/common/concurrent/inc/ThreadPool.h @@ -0,0 +1,40 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ThreadPool_ +#define _ThreadPool_ + +#include "Thread.h" +#include "Runnable.h" + +namespace qpid { +namespace concurrent { + + class ThreadPool + { + public: + virtual void start() = 0; + virtual void stop() = 0; + virtual void addTask(Runnable* runnable) = 0; + virtual ~ThreadPool(){} + }; + +} +} + + +#endif diff --git a/cpp/common/concurrent/src/APRBase.cpp b/cpp/common/concurrent/src/APRBase.cpp new file mode 100644 index 0000000000..f87ea9e25f --- /dev/null +++ b/cpp/common/concurrent/src/APRBase.cpp @@ -0,0 +1,97 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "APRBase.h" +#include "QpidError.h" + +using namespace qpid::concurrent; + +APRBase* APRBase::instance = 0; + +APRBase* APRBase::getInstance(){ + if(instance == 0){ + instance = new APRBase(); + } + return instance; +} + + +APRBase::APRBase() : count(0){ + apr_initialize(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRBase::~APRBase(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + apr_terminate(); +} + +bool APRBase::_increment(){ + bool deleted(false); + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(this == instance){ + count++; + }else{ + deleted = true; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + return !deleted; +} + +void APRBase::_decrement(){ + APRBase* copy = 0; + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(--count == 0){ + copy = instance; + instance = 0; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + if(copy != 0){ + delete copy; + } +} + +void APRBase::increment(){ + int count = 0; + while(count++ < 2 && !getInstance()->_increment()){ + std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; + } +} + +void APRBase::decrement(){ + getInstance()->_decrement(); +} + +void qpid::concurrent::check(apr_status_t status, const std::string& file, const int line){ + if (status != APR_SUCCESS){ + const int size = 50; + char tmp[size]; + std::string msg(apr_strerror(status, tmp, size)); + throw QpidError(APR_ERROR + ((int) status), msg, file, line); + } +} + +std::string qpid::concurrent::get_desc(apr_status_t status){ + const int size = 50; + char tmp[size]; + std::string msg(apr_strerror(status, tmp, size)); + return msg; +} + diff --git a/cpp/common/concurrent/src/APRMonitor.cpp b/cpp/common/concurrent/src/APRMonitor.cpp new file mode 100644 index 0000000000..428d76dff9 --- /dev/null +++ b/cpp/common/concurrent/src/APRMonitor.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRBase.h" +#include "APRMonitor.h" +#include <iostream> + +qpid::concurrent::APRMonitor::APRMonitor(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); +} + +qpid::concurrent::APRMonitor::~APRMonitor(){ + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + APRBase::decrement(); +} + +void qpid::concurrent::APRMonitor::wait(){ + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + + +void qpid::concurrent::APRMonitor::wait(u_int64_t time){ + apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void qpid::concurrent::APRMonitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void qpid::concurrent::APRMonitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +void qpid::concurrent::APRMonitor::acquire(){ + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} + +void qpid::concurrent::APRMonitor::release(){ + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} diff --git a/cpp/common/concurrent/src/APRThread.cpp b/cpp/common/concurrent/src/APRThread.cpp new file mode 100644 index 0000000000..4202fe81b6 --- /dev/null +++ b/cpp/common/concurrent/src/APRThread.cpp @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRBase.h" +#include "APRThread.h" +#include "apr_portable.h" + +using namespace qpid::concurrent; + +void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ + ((Runnable*) data)->run(); + CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); + return NULL; +} + +APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : pool(_pool), runnable(_runnable){} + +APRThread::~APRThread(){ +} + +void APRThread::start(){ + CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); +} + +void APRThread::join(){ + apr_status_t status; + CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); +} + +void APRThread::interrupt(){ + CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); +} + +unsigned int qpid::concurrent::APRThread::currentThread(){ + return apr_os_thread_current(); +} diff --git a/cpp/common/concurrent/src/APRThreadFactory.cpp b/cpp/common/concurrent/src/APRThreadFactory.cpp new file mode 100644 index 0000000000..9ba68e9e56 --- /dev/null +++ b/cpp/common/concurrent/src/APRThreadFactory.cpp @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRBase.h" +#include "APRThreadFactory.h" + +using namespace qpid::concurrent; + +APRThreadFactory::APRThreadFactory(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +APRThreadFactory::~APRThreadFactory(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +Thread* APRThreadFactory::create(Runnable* runnable){ + return new APRThread(pool, runnable); +} diff --git a/cpp/common/concurrent/src/APRThreadPool.cpp b/cpp/common/concurrent/src/APRThreadPool.cpp new file mode 100644 index 0000000000..e0fcb804e6 --- /dev/null +++ b/cpp/common/concurrent/src/APRThreadPool.cpp @@ -0,0 +1,85 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRThreadFactory.h" +#include "APRThreadPool.h" +#include "QpidError.h" +#include <iostream> + +using namespace qpid::concurrent; + +APRThreadPool::APRThreadPool(int _size) : size(_size), factory(new APRThreadFactory()), + deleteFactory(true), running(false){ + worker = new Worker(this); +} + +APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) : size(_size), factory(_factory), + deleteFactory(false), running(false){ + worker = new Worker(this); +} + +APRThreadPool::~APRThreadPool(){ + if(deleteFactory) delete factory; +} + +void APRThreadPool::addTask(Runnable* task){ + lock.acquire(); + tasks.push(task); + lock.notifyAll(); + lock.release(); +} + +void APRThreadPool::runTask(){ + lock.acquire(); + while(tasks.empty()){ + lock.wait(); + } + Runnable* task = tasks.front(); + tasks.pop(); + lock.release(); + try{ + task->run(); + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void APRThreadPool::start(){ + if(!running){ + running = true; + for(int i = 0; i < size; i++){ + Thread* t = factory->create(worker); + t->start(); + threads.push_back(t); + } + } +} + +void APRThreadPool::stop(){ + if(!running){ + running = false; + lock.acquire(); + lock.notifyAll(); + lock.release(); + for(int i = 0; i < size; i++){ + threads[i]->join(); + delete threads[i]; + } + } +} + + diff --git a/cpp/common/error/inc/QpidError.h b/cpp/common/error/inc/QpidError.h new file mode 100644 index 0000000000..a739b506c7 --- /dev/null +++ b/cpp/common/error/inc/QpidError.h @@ -0,0 +1,47 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef __QpidError__ +#define __QpidError__ + +namespace qpid { + +class QpidError{ +public: + const int code; + const std::string msg; + const std::string file; + const int line; + + inline QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) : code(_code), msg(_msg), file(_file), line(_line) {} + ~QpidError(){} + +}; + +#define THROW_QPID_ERROR(A, B) throw QpidError(A, B, __FILE__, __LINE__) + +} + +#define PROTOCOL_ERROR 10000 +#define APR_ERROR 20000 +#define FRAMING_ERROR 30000 +#define CLIENT_ERROR 40000 +#define INTERNAL_ERROR 50000 + +#endif diff --git a/cpp/common/error/inc/QpidErrorIO.h b/cpp/common/error/inc/QpidErrorIO.h new file mode 100644 index 0000000000..4f9bd3ce26 --- /dev/null +++ b/cpp/common/error/inc/QpidErrorIO.h @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "QpidError.h" +#include <ostream> + +std::ostream& operator <<(std::ostream& out, const QpidError& error) +{ + out << "Qpid Error [" << error.code << "] " << error.msg + << " (" << error.file << ":" << error.line << ")"; + return out; +} + + diff --git a/cpp/common/framing/Makefile b/cpp/common/framing/Makefile new file mode 100644 index 0000000000..1dfc286050 --- /dev/null +++ b/cpp/common/framing/Makefile @@ -0,0 +1,28 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +.PHONY: all clean test + +all: + @$(MAKE) -C generated all + +test: + @$(MAKE) -C test all + +clean : + @$(MAKE) -C generated clean + @$(MAKE) -C test clean + diff --git a/cpp/common/framing/generated/Makefile b/cpp/common/framing/generated/Makefile new file mode 100644 index 0000000000..12ec402760 --- /dev/null +++ b/cpp/common/framing/generated/Makefile @@ -0,0 +1,41 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +QPID_HOME = ../../../.. +include ${QPID_HOME}/cpp/options.mk + +STYLESHEET_DIR = stylesheets +JAVA = java +XSLTP = ${TOOLS_DIR}/saxon8.jar + +SPEC = ${SPEC_DIR}/amqp-8.0.xml +STYLESHEETS = $(wildcard stylesheets/*.xsl) + +GENERATED_SOURCES=amqp_methods.cpp # Seed generation + +.PHONY: all clean + +all: ${GENERATED_SOURCES} + +clean : + -@rm -f *.cpp *.h + +${GENERATED_SOURCES}: ${STYLESHEETS} ${SPEC} + ${JAVA} -jar ${XSLTP} -o results.out ${SPEC} ${STYLESHEET_DIR}/code_gen.xsl + ${JAVA} -jar ${XSLTP} -o results.out ${SPEC} ${STYLESHEET_DIR}/framing.xsl + +-include $(GENERATED_SOURCES:.cpp=.d) + diff --git a/cpp/common/framing/generated/stylesheets/amqp_client.xsl b/cpp/common/framing/generated/stylesheets/amqp_client.xsl new file mode 100644 index 0000000000..f0fa3d7890 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/amqp_client.xsl @@ -0,0 +1,155 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="code_utils.xsl"/> + + <!-- + ================== + Template: client_h + ================== + Client header file. + --> + <xsl:template match="amqp" mode="client_h"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_Client.h" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#ifndef _AMQP_Client_ +#define _AMQP_Client_ + +#include "AMQP_ServerOperations.h" +#include "FieldTable.h" +#include "OutputHandler.h" + +namespace qpid { +namespace framing { + +class AMQP_Client : virtual public AMQP_ServerOperations +{ + OutputHandler* out; + + public: + AMQP_Client(OutputHandler* _out); + virtual ~AMQP_Client() {}

</xsl:text> + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:if test="doc"> + <xsl:text>
/**
===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> =====
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>
*/
</xsl:text> + </xsl:if> + <xsl:text> class </xsl:text><xsl:value-of select="$class"/><xsl:text> : virtual public AMQP_ServerOperations::</xsl:text><xsl:value-of select="$class"/><xsl:text>Handler + { + OutputHandler* out; + + public: + /* Constructors and destructors */ + </xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out); + virtual ~</xsl:text><xsl:value-of select="$class"/><xsl:text>(); + + /* Protocol methods */
</xsl:text> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='server']"> + <xsl:variable name="method" select="amqp:cpp-name(@name)"/> + <xsl:if test="doc"> + <xsl:text>
/**
----- Method: </xsl:text><xsl:value-of select="$class"/><xsl:text>.</xsl:text><xsl:value-of select="$method"/><xsl:text> -----
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>
*/
</xsl:text> + </xsl:if> + <xsl:for-each select="rule"> + <xsl:text>
/**
</xsl:text> + <xsl:text>Rule "</xsl:text><xsl:value-of select="@name"/><xsl:text>":
</xsl:text><xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>
*/
</xsl:text> + </xsl:for-each> + <xsl:text> virtual void </xsl:text><xsl:value-of select="$method"/> + <xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field"><xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> );
</xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */
</xsl:text> + </xsl:for-each> + <xsl:text>}; /* class AMQP_Client */ + +} /* namespace framing */ +} /* namespace qpid */ + +#endif
</xsl:text> + </xsl:result-document> + </xsl:template> + + + <!-- + ==================== + Template: client_cpp + ==================== + Client body. + --> + <xsl:template match="amqp" mode="client_cpp"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_Client.cpp" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> + +#include "AMQP_Client.h" + +namespace qpid { +namespace framing { + +AMQP_Client::AMQP_Client(OutputHandler* _out) : + out(_out) +{ +}

</xsl:text> + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:text>
/* ++++++++++ Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> ++++++++++ */ + +AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) : + out(_out) +{ +} + +AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}

</xsl:text> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='server']"> + <xsl:text>void AMQP_Client::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text> + <xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> ) +{ + out->send( new AMQFrame( channel, + new </xsl:text><xsl:value-of select="concat($class, amqp:field-name(@name), 'Body')"/><xsl:text>( </xsl:text> + <xsl:for-each select="field"> + <xsl:value-of select="amqp:cpp-name(@name)"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text> ) ) ); +}

</xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:for-each> + <xsl:text> + +} /* namespace framing */ +} /* namespace qpid */
</xsl:text> + </xsl:result-document> + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/amqp_client_handler_impl.xsl b/cpp/common/framing/generated/stylesheets/amqp_client_handler_impl.xsl new file mode 100644 index 0000000000..aa095eaf79 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/amqp_client_handler_impl.xsl @@ -0,0 +1,187 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="code_utils.xsl"/> + + <!-- + =============================== + Template: client_handler_impl_h + =============================== + Template to generate the AMQP_ServerHandlerImpl class header file. + --> + <xsl:template match="amqp" mode="client_handler_impl_h"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_ClientHandlerImpl.h" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#ifndef _AMQP_ClientHandlerImpl_ +#define _AMQP_ClientHandlerImpl_ + +#include "AMQP_ClientOperations.h" +#include "FieldTable.h" + +namespace qpid { +namespace framing { + +class AMQP_ClientHandlerImpl : virtual public AMQP_ClientOperations +{
</xsl:text> + + <!-- List of pointers to each inner class instance --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + <xsl:text> AMQP_ClientOperations::</xsl:text><xsl:value-of select="$class"/><xsl:text>* </xsl:text> + <xsl:value-of select="$class"/><xsl:text>Ptr;
</xsl:text> + </xsl:for-each> + <xsl:text> + public: + AMQP_ClientHandlerImpl(); + virtual ~AMQP_ClientHandlerImpl();

</xsl:text> + + <!-- List of functions to return pointer to each inner class instance --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + <xsl:text> inline AMQP_ClientOperations::</xsl:text> + <xsl:value-of select="$class"/><xsl:text>* get</xsl:text><xsl:value-of select="$class"/> + <xsl:text>() { return </xsl:text><xsl:value-of select="$class"/><xsl:text>Ptr; }
</xsl:text> + </xsl:for-each> + <xsl:text>
</xsl:text> + + <!-- Inner classes --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + + <!-- Inner class documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
</xsl:text> + <xsl:text>===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text>Impl =====
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + + <!-- Inner class definition --> + <xsl:text> class </xsl:text><xsl:value-of select="$class"/> + <xsl:text>Impl : virtual public AMQP_ClientOperations::</xsl:text><xsl:value-of select="$class"/> + <xsl:text>
 { + public: + /* Constructors and destructors */ + </xsl:text><xsl:value-of select="$class"/><xsl:text>Impl(); + virtual ~</xsl:text><xsl:value-of select="$class"/><xsl:text>Impl(); + + /* Protocol methods */
</xsl:text> + + <!-- Inner class methods (only if the chassis is set to "client") --> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='client']"> + <xsl:variable name="method" select="amqp:cpp-name(@name)"/> + + <!-- Inner class method documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
</xsl:text> + <xsl:text>----- Method: </xsl:text><xsl:value-of select="$class"/> + <xsl:text>Impl.</xsl:text><xsl:value-of select="@name"/><xsl:text> -----
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + <xsl:for-each select="rule"> + <xsl:text>
/**
</xsl:text> + <xsl:text>Rule "</xsl:text><xsl:value-of select="@name"/><xsl:text>":
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:for-each> + + <!-- Inner class method definition --> + <xsl:text>
 virtual void </xsl:text><xsl:value-of select="$method"/> + <xsl:text>( u_int16_t channel</xsl:text> + + <!-- Inner class method parameter definition --> + <xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> );
</xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text>
 }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text>Impl */
</xsl:text> + </xsl:for-each> + <xsl:text>
}; /* AMQP_ClientHandlerImpl */ + +} /* namespace framing */ +} /* namespace qpid */ + +#endif
</xsl:text> + </xsl:result-document> + </xsl:template> + + <!-- + ================================= + Template: client_handler_impl_cpp + ================================= + Template to generate the AMQP_ServerHandlerImpl class stubs. + --> + <xsl:template match="amqp" mode="client_handler_impl_cpp"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_ClientHandlerImpl.cpp" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#include "AMQP_ClientHandlerImpl.h" + +namespace qpid { +namespace framing { + +AMQP_ClientHandlerImpl::AMQP_ClientHandlerImpl() :
 </xsl:text> + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:value-of select="$class"/> + <xsl:text>HandlerPtr( new </xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl() )</xsl:text> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text> +{ +} + +AMQP_ClientHandlerImpl::~AMQP_ClientHandlerImpl() +{
</xsl:text> + <xsl:for-each select="class"> + <xsl:text> delete </xsl:text><xsl:value-of select="amqp:cpp-class-name(@name)"/><xsl:text>HandlerPtr;
</xsl:text> + </xsl:for-each>} + + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:text>
/* ===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl ===== */

</xsl:text> + <xsl:text>AMQP_ClientHandlerImpl::</xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl::</xsl:text> + <xsl:value-of select="$class"/><xsl:text>HandlerImpl()
{
}

</xsl:text> + <xsl:text>AMQP_ClientHandlerImpl::</xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl::~</xsl:text> + <xsl:value-of select="$class"/><xsl:text>HandlerImpl()
{
}

</xsl:text> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='client']"> + <xsl:text>void AMQP_ClientHandlerImpl::</xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl::</xsl:text> + <xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text> + <xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if><xsl:text> )
{
}

</xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:for-each> + <xsl:text> + +} /* namespace framing */ +} /* namespace qpid */

</xsl:text> + </xsl:result-document> + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/amqp_client_operations.xsl b/cpp/common/framing/generated/stylesheets/amqp_client_operations.xsl new file mode 100644 index 0000000000..234b7080ba --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/amqp_client_operations.xsl @@ -0,0 +1,105 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="code_utils.xsl"/> + + <!-- + ============================= + Template: client-operations-h + ============================= + Template to generate the AMQP_ClientHandler virtual class. This is the pure + virtual class from which the AMQP_Server and AMQP_ClientHandlerImpl classes + are derived. + --> + <xsl:template match="amqp" mode="client-operations-h"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_ClientOperations.h" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#ifndef _AMQP_ClientOperations_ +#define _AMQP_ClientOperations_ + +#include "AMQP_Constants.h" +#include "FieldTable.h" + +namespace qpid { +namespace framing { + +class AMQP_ClientOperations +{ + public: + AMQP_ClientOperations() {} + virtual ~AMQP_ClientOperations() {} + inline u_int16_t getAmqpMajor() { return (u_int16_t)</xsl:text><xsl:value-of select="@major"/><xsl:text>; } + inline u_int16_t getAmqpMinor() { return (u_int16_t)</xsl:text><xsl:value-of select="@minor"/><xsl:text>; }

</xsl:text> + + <!-- Inner classes --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + + <!-- Inner class documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> =====
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + + <!-- Inner class definition --> + <xsl:text> class </xsl:text><xsl:value-of select="$class"/><xsl:text> + { + public: + /* Constructors and destructors */ + </xsl:text><xsl:value-of select="$class"/><xsl:text>() {} + virtual ~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {} + + /* Protocol methods */
</xsl:text> + + <!-- Inner class methods (only if the chassis is set to "client") --> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='client']"> + <xsl:variable name="method" select="amqp:cpp-name(@name)"/> + + <!-- Inner class method documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
----- Method: </xsl:text><xsl:value-of select="$class"/><xsl:text>.</xsl:text> + <xsl:value-of select="@name"/><xsl:text> -----
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + <xsl:for-each select="rule"> + <xsl:text>
/**
</xsl:text> + <xsl:text>Rule "</xsl:text><xsl:value-of select="@name"/><xsl:text>":
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:for-each> + + <!-- Inner class method definition --> + <xsl:text> virtual void </xsl:text><xsl:value-of select="$method"/> + <xsl:text>( u_int16_t channel</xsl:text> + + <!-- Inner class method parameter definition --> + <xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> ) = 0;
</xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text>
 }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */
</xsl:text> + </xsl:for-each> + <xsl:text>
}; /* class AMQP_ClientOperations */ + +} /* namespace framing */ +} /* namespace qpid */ + +#endif
</xsl:text> + </xsl:result-document> + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/amqp_consts.xsl b/cpp/common/framing/generated/stylesheets/amqp_consts.xsl new file mode 100644 index 0000000000..c1c927f941 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/amqp_consts.xsl @@ -0,0 +1,77 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="code_utils.xsl"/> + <xsl:output method="text" indent="yes" name="textFormat"/> + + <xsl:template match="/"> + <xsl:apply-templates select="amqp" mode="domain-table"/> + <xsl:apply-templates select="amqp" mode="domain-consts"/> + </xsl:template> + + <!-- + ====================== + Template: domain-table + ====================== + Generates the domain name to C++ type lookup table + which is required for later generation. + Format: + <domains> + <domain doamin-name="dname1" cpp-type="type1"/> + <domain doamin-name="dname2" cpp-type="type2"/> + ... + </domains> + --> + <xsl:template match="amqp" mode="domain-table"> + <domains><xsl:text>
</xsl:text> + <xsl:for-each select="domain"> + <xsl:text> </xsl:text><domain> + <xsl:attribute name="domain-name"> + <xsl:value-of select="@name"/> + </xsl:attribute> + <xsl:attribute name="cpp-type"> + <xsl:value-of select="amqp:cpp-type(@type)"/> + </xsl:attribute> + </domain><xsl:text>
</xsl:text> + </xsl:for-each> + </domains> + </xsl:template> + + <!-- + ======================= + Template: domain-consts + ======================= + Generates a header file (AMQP_Constants.h) containing definitions of + all the <constant> declarations in the AMQP XML specification. + --> + <xsl:template match="amqp" mode="domain-consts"> + <xsl:result-document href="AMQP_Constants.h" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#ifndef _AMQP_Constants_ +#define _AMQP_Constants_ + +#include "amqp_types.h" + +namespace qpid { +namespace framing { + +/**** Constants ****/

</xsl:text> + <xsl:for-each select="constant"> + <xsl:if test="doc"> + <xsl:text>
/*
</xsl:text> + <xsl:value-of select="normalize-space(doc)"/> + <xsl:text>
*/
</xsl:text> + </xsl:if> + <xsl:text>const u_int16_t </xsl:text><xsl:value-of select="concat('AMQP_', upper-case(amqp:cpp-name(@name)), ' = ', @value)"/><xsl:text>;
</xsl:text> + </xsl:for-each> + <xsl:text> + +} /* namespace framing */ +} /* namespace qpid */ + +#endif
</xsl:text> + </xsl:result-document> + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/amqp_server.xsl b/cpp/common/framing/generated/stylesheets/amqp_server.xsl new file mode 100644 index 0000000000..4ad29a4b95 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/amqp_server.xsl @@ -0,0 +1,155 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="code_utils.xsl"/> + + <!-- + ================== + Template: server_h + ================== + Server header file. + --> + <xsl:template match="amqp" mode="server_h"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_Server.h" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#ifndef _AMQP_Server_ +#define _AMQP_Server_ + +#include "AMQP_ClientOperations.h" +#include "FieldTable.h" +#include "OutputHandler.h" + +namespace qpid { +namespace framing { + +class AMQP_Server : virtual public AMQP_ClientOperations +{ + OutputHandler* out; + + public: + AMQP_Server(OutputHandler* _out); + virtual ~AMQP_Server() {}

</xsl:text> + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:if test="doc"> + <xsl:text>
/**
===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> =====
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>
*/
</xsl:text> + </xsl:if> + <xsl:text> class </xsl:text><xsl:value-of select="$class"/><xsl:text> : virtual public AMQP_ClientOperations::</xsl:text><xsl:value-of select="$class"/><xsl:text>Handler + { + OutputHandler* out; + + public: + /* Constructors and destructors */ + </xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out); + virtual ~</xsl:text><xsl:value-of select="$class"/><xsl:text>(); + + /* Protocol methods */
</xsl:text> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='client']"> + <xsl:variable name="method" select="amqp:cpp-name(@name)"/> + <xsl:if test="doc"> + <xsl:text>
/**
----- Method: </xsl:text><xsl:value-of select="$class"/><xsl:text>.</xsl:text><xsl:value-of select="$method"/><xsl:text> -----
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>
*/
</xsl:text> + </xsl:if> + <xsl:for-each select="rule"> + <xsl:text>
/**
</xsl:text> + <xsl:text>Rule "</xsl:text><xsl:value-of select="@name"/><xsl:text>":
</xsl:text><xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>
*/
</xsl:text> + </xsl:for-each> + <xsl:text> virtual void </xsl:text><xsl:value-of select="$method"/> + <xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field"><xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> );
</xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */
</xsl:text> + </xsl:for-each> + <xsl:text>}; /* class AMQP_Server */ + +} /* namespace framing */ +} /* namespace qpid */ + +#endif
</xsl:text> + </xsl:result-document> + </xsl:template> + + + <!-- + ==================== + Template: server_cpp + ==================== + Server body. + --> + <xsl:template match="amqp" mode="server_cpp"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_Server.cpp" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> + +#include "AMQP_Server.h" + +namespace qpid { +namespace framing { + +AMQP_Server::AMQP_Server(OutputHandler* _out) : + out(_out) +{ +}

</xsl:text> + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:text>
/* ++++++++++ Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> ++++++++++ */ + +AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text><xsl:value-of select="$class"/><xsl:text>(OutputHandler* _out) : + out(_out) +{ +} + +AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {}

</xsl:text> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='client']"> + <xsl:text>void AMQP_Server::</xsl:text><xsl:value-of select="$class"/><xsl:text>::</xsl:text> + <xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text><xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> ) +{ + out->send( new AMQFrame( channel, + new </xsl:text><xsl:value-of select="concat($class, amqp:field-name(@name), 'Body')"/><xsl:text>( </xsl:text> + <xsl:for-each select="field"> + <xsl:value-of select="amqp:cpp-name(@name)"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text> ) ) ); +}

</xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:for-each> + <xsl:text> + +} /* namespace framing */ +} /* namespace qpid */
</xsl:text> + </xsl:result-document> + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/amqp_server_handler_impl.xsl b/cpp/common/framing/generated/stylesheets/amqp_server_handler_impl.xsl new file mode 100644 index 0000000000..de879a5670 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/amqp_server_handler_impl.xsl @@ -0,0 +1,187 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="code_utils.xsl"/> + + <!-- + =============================== + Template: server_handler_impl_h + =============================== + Template to generate the AMQP_ServerHandlerImpl class header file. + --> + <xsl:template match="amqp" mode="server_handler_impl_h"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_ServerHandlerImpl.h" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#ifndef _AMQP_ServerHandlerImpl_ +#define _AMQP_ServerHandlerImpl_ + +#include "AMQP_ServerOperations.h" +#include "FieldTable.h" + +namespace qpid { +namespace framing { + +class AMQP_ServerHandlerImpl : virtual public AMQP_ServerOperations +{
</xsl:text> + + <!-- List of pointers to each inner class instance --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + <xsl:text> AMQP_ServerOperations::</xsl:text><xsl:value-of select="$class"/><xsl:text>* </xsl:text> + <xsl:value-of select="$class"/><xsl:text>Ptr;
</xsl:text> + </xsl:for-each> + <xsl:text> + public: + AMQP_ServerHandlerImpl(); + virtual ~AMQP_ServerHandlerImpl();

</xsl:text> + + <!-- List of functions to return pointer to each inner class instance --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + <xsl:text> virtual inline AMQP_ServerOperations::</xsl:text> + <xsl:value-of select="$class"/><xsl:text>* get</xsl:text><xsl:value-of select="$class"/> + <xsl:text>() { return </xsl:text><xsl:value-of select="$class"/><xsl:text>Ptr; }
</xsl:text> + </xsl:for-each> + <xsl:text>
</xsl:text> + + <!-- Inner classes --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + + <!-- Inner class documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
</xsl:text> + <xsl:text>===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text>Impl =====
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + + <!-- Inner class definition --> + <xsl:text> class </xsl:text><xsl:value-of select="$class"/> + <xsl:text>Impl : virtual public AMQP_ServerOperations::</xsl:text><xsl:value-of select="$class"/> + <xsl:text>
 { + public: + /* Constructors and destructors */ + </xsl:text><xsl:value-of select="$class"/><xsl:text>Impl(); + virtual ~</xsl:text><xsl:value-of select="$class"/><xsl:text>Impl(); + + /* Protocol methods */
</xsl:text> + + <!-- Inner class methods (only if the chassis is set to "server") --> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='server']"> + <xsl:variable name="method" select="amqp:cpp-name(@name)"/> + + <!-- Inner class method documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
</xsl:text> + <xsl:text>----- Method: </xsl:text><xsl:value-of select="$class"/> + <xsl:text>Impl.</xsl:text><xsl:value-of select="@name"/><xsl:text> -----
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + <xsl:for-each select="rule"> + <xsl:text>
/**
</xsl:text> + <xsl:text>Rule "</xsl:text><xsl:value-of select="@name"/><xsl:text>":
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:for-each> + + <!-- Inner class method definition --> + <xsl:text>
 virtual void </xsl:text><xsl:value-of select="$method"/> + <xsl:text>( u_int16_t channel</xsl:text> + + <!-- Inner class method parameter definition --> + <xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> );
</xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text>
 }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text>Impl */
</xsl:text> + </xsl:for-each> + <xsl:text>
}; /* AMQP_ServerHandlerImpl */ + +} /* namespace framing */ +} /* namespace qpid */ + +#endif
</xsl:text> + </xsl:result-document> + </xsl:template> + + <!-- + ================================= + Template: server_handler_impl_cpp + ================================= + Template to generate the AMQP_ServerHandlerImpl class stubs. + --> + <xsl:template match="amqp" mode="server_handler_impl_cpp"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_ServerHandlerImpl.cpp" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#include "AMQP_ServerHandlerImpl.h" + +namespace qpid { +namespace framing { + +AMQP_ServerHandlerImpl::AMQP_ServerHandlerImpl() :
 </xsl:text> + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:value-of select="$class"/> + <xsl:text>HandlerPtr( new </xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl() )</xsl:text> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text> +{ +} + +AMQP_ServerHandlerImpl::~AMQP_ServerHandlerImpl() +{
</xsl:text> + <xsl:for-each select="class"> + <xsl:text> delete </xsl:text><xsl:value-of select="amqp:cpp-class-name(@name)"/><xsl:text>HandlerPtr;
</xsl:text> + </xsl:for-each>} + + <xsl:for-each select="class"> + <xsl:variable name="class" select="amqp:cpp-class-name(@name)"/> + <xsl:text>
/* ===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl ===== */

</xsl:text> + <xsl:text>AMQP_ServerHandlerImpl::</xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl::</xsl:text> + <xsl:value-of select="$class"/><xsl:text>HandlerImpl()
{
}

</xsl:text> + <xsl:text>AMQP_ServerHandlerImpl::</xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl::~</xsl:text> + <xsl:value-of select="$class"/><xsl:text>HandlerImpl()
{
}

</xsl:text> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='server']"> + <xsl:text>void AMQP_ServerHandlerImpl::</xsl:text><xsl:value-of select="$class"/><xsl:text>HandlerImpl::</xsl:text> + <xsl:value-of select="amqp:cpp-name(@name)"/><xsl:text>( u_int16_t channel</xsl:text> + <xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if><xsl:text> )
{
}

</xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:for-each> + <xsl:text> + +} /* namespace framing */ +} /* namespace qpid */

</xsl:text> + </xsl:result-document> + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/amqp_server_operations.xsl b/cpp/common/framing/generated/stylesheets/amqp_server_operations.xsl new file mode 100644 index 0000000000..b42242e8fe --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/amqp_server_operations.xsl @@ -0,0 +1,113 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="code_utils.xsl"/> + + <!-- + ============================= + Template: server-operations-h + ============================= + Template to generate the AMQP_ServerHandler virtual class. This is the pure + virtual class from which the AMQP_Client and AMQP_ServerHandlerImpl classes + are derived. + --> + <xsl:template match="amqp" mode="server-operations-h"> + <xsl:param name="domain-cpp-table"/> + <xsl:result-document href="AMQP_ServerOperations.h" format="textFormat"> + <xsl:value-of select="amqp:copyright()"/> + <xsl:text> +#ifndef _AMQP_ServerOperations_ +#define _AMQP_ServerOperations_ + +#include "AMQP_Constants.h" +#include "FieldTable.h" + +namespace qpid { +namespace framing { + +class AMQP_ServerOperations +{ + public: + AMQP_ServerOperations() {} + virtual ~AMQP_ServerOperations() {} + inline u_int16_t getAmqpMajor() { return (u_int16_t)</xsl:text><xsl:value-of select="@major"/><xsl:text>; } + inline u_int16_t getAmqpMinor() { return (u_int16_t)</xsl:text><xsl:value-of select="@minor"/><xsl:text>; }

</xsl:text> + + <!-- Inner classes --> + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + + <!-- Inner class documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
===== Class: </xsl:text><xsl:value-of select="$class"/><xsl:text> =====
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + + <!-- Inner class definition --> + <xsl:text> class </xsl:text><xsl:value-of select="$class"/><xsl:text> + { + public: + /* Constructors and destructors */ + </xsl:text><xsl:value-of select="$class"/><xsl:text>() {} + virtual ~</xsl:text><xsl:value-of select="$class"/><xsl:text>() {} + + /* Protocol methods */
</xsl:text> + + <!-- Inner class methods (only if the chassis is set to "server") --> + <xsl:for-each select="method"> + <xsl:if test="chassis[@name='server']"> + <xsl:variable name="method" select="amqp:cpp-name(@name)"/> + + <!-- Inner class method documentation & rules --> + <xsl:if test="doc"> + <xsl:text>
/**
----- Method: </xsl:text><xsl:value-of select="$class"/><xsl:text>.</xsl:text> + <xsl:value-of select="@name"/><xsl:text> -----
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:if> + <xsl:for-each select="rule">/** + <xsl:text>
/**
</xsl:text> + <xsl:text>Rule "</xsl:text><xsl:value-of select="@name"/><xsl:text>":
</xsl:text> + <xsl:value-of select="amqp:process-docs(doc)"/> + <xsl:text>*/
</xsl:text> + </xsl:for-each> + + <!-- Inner class method definition --> + <xsl:text> virtual void </xsl:text><xsl:value-of select="$method"/> + <xsl:text>( u_int16_t channel</xsl:text> + + <!-- Inner class method parameter definition --> + <xsl:if test="field"> + <xsl:text>,
 </xsl:text> + <xsl:for-each select="field"> + <xsl:variable name="domain-cpp-type" select="amqp:cpp-lookup(@domain, $domain-cpp-table)"/> + <xsl:value-of select="concat($domain-cpp-type, amqp:cpp-arg-ref($domain-cpp-type), ' ', amqp:cpp-name(@name))"/> + <xsl:if test="position()!=last()"> + <xsl:text>,
 </xsl:text> + </xsl:if> + </xsl:for-each> + </xsl:if> + <xsl:text> ) = 0;
</xsl:text> + </xsl:if> + </xsl:for-each> + <xsl:text> }; /* class </xsl:text><xsl:value-of select="$class"/><xsl:text> */
</xsl:text> + </xsl:for-each> + + <xsl:for-each select="class"> + <xsl:variable name="class" select="concat(amqp:cpp-class-name(@name), 'Handler')"/> + <xsl:text> virtual AMQP_ServerOperations::</xsl:text> + <xsl:value-of select="$class"/><xsl:text>* get</xsl:text><xsl:value-of select="$class"/> + <xsl:text>() = 0;</xsl:text> + </xsl:for-each> + + <xsl:text>}; /* class AMQP_ServerOperations */ + +} /* namespace framing */ +} /* namespace qpid */ + +#endif
</xsl:text> + </xsl:result-document> + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/code_gen.xsl b/cpp/common/framing/generated/stylesheets/code_gen.xsl new file mode 100644 index 0000000000..5e9f4ef8f0 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/code_gen.xsl @@ -0,0 +1,91 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <xsl:import href="convert_0.81.xsl"/> + <xsl:import href="amqp_consts.xsl"/> + <xsl:import href="amqp_server_operations.xsl"/> + <xsl:import href="amqp_client_operations.xsl"/> + <xsl:import href="amqp_server.xsl"/> + <xsl:import href="amqp_client.xsl"/> + <xsl:import href="amqp_server_handler_impl.xsl"/> + <xsl:import href="amqp_client_handler_impl.xsl"/> + + <xsl:output method="text" indent="yes" name="textFormat"/> + <xsl:key name="domain-lookup" match="domains/domain" use="@domain-name"/> + + <xsl:template match="/"> + + <!-- 0. Convert to 0.81 format --> + <!-- + NOTE: The XML specification change from 0.8 to 0.81 is primarily a change to + the XML itself, not the protocol it represents. However, at the time of this + commit, the 0.81 specification has not been approved by the AMQP working group, + so this converter from the 0.8 format to the 0.81 format has been included as + a temporary measure. When the 0.81 format becomes official, then this conversion + should be removed, and all of the templates below will revert to select=".". + + TODO: Remove this conversion when the new 0.81 spec is checked in. + --> + <xsl:variable name="format-v081"> + <xsl:apply-templates mode="do-amqp" select="amqp"/> + </xsl:variable> + <!-- == Uncomment this to view output for debugging == + <xsl:result-document href="convert_081.out"> + <xsl:copy-of select="$format-v081"/> + </xsl:result-document> + --> + + <!-- 1. Domain to C++ type lookup table --> + <xsl:variable name="domain-cpp-table"> + <xsl:apply-templates mode="domain-table" select="$format-v081"/> + </xsl:variable> + <!-- == Uncomment this to view output for debugging == + <xsl:result-document href="domain_cpp_table.out"> + <xsl:copy-of select="$domain-cpp-table"/> + </xsl:result-document> + --> + + <!-- 2. Constant declarations (AMQP_Constants.h) --> + <xsl:apply-templates mode="domain-consts" select="$format-v081"/> + + <!-- 3. Client and server handler pure virtual classes --> + <xsl:apply-templates mode="server-operations-h" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + <xsl:apply-templates mode="client-operations-h" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + + <!-- 4. Client and server output classes --> + <xsl:apply-templates mode="server_h" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + <xsl:apply-templates mode="client_h" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + <xsl:apply-templates mode="server_cpp" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + <xsl:apply-templates mode="client_cpp" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + + <!-- 5. Client and server handler stub classes --> + <xsl:apply-templates mode="server_handler_impl_h" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + <xsl:apply-templates mode="client_handler_impl_h" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + <!-- TODO: Find a way to only run the .cpp stub generator when required, as + running this will overwrite any stub code in existance! --> + <xsl:apply-templates mode="server_handler_impl_cpp" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + <xsl:apply-templates mode="client_handler_impl_cpp" select="$format-v081"> + <xsl:with-param name="domain-cpp-table" select="$domain-cpp-table"/> + </xsl:apply-templates> + + </xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/code_utils.xsl b/cpp/common/framing/generated/stylesheets/code_utils.xsl new file mode 100644 index 0000000000..f4a0f6e5ce --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/code_utils.xsl @@ -0,0 +1,210 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + + <!-- + ======================== + Function: amqp:copyright + ======================== + Print out a standard Apache copyright notice and generated code warning. + --> + <xsl:function name="amqp:copyright">/** +* +* Copyright (c) 2006 The Apache Software Foundation +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ + +/** +* +* NOTE: This file is generated directly from the AMQP XML specification. +* === DO NOT EDIT === +* +*/
</xsl:function> + + <!-- + ========================== + Function: amqp:upper-first + ========================== + Convert the first character of the parameter to upper-case + --> + <xsl:function name="amqp:upper-first"> + <xsl:param name="in"/> + <xsl:value-of select="concat(upper-case(substring($in, 1, 1)), substring($in, 2))"/> + </xsl:function> + + <!-- + ======================== + Function: amqp:cpp-name-1 + ======================== + Convert parameter "name" to a valid C++ identifier, finding spaces and '-'s + in the parameter name and replacing them with '_' chars. Also check for C++ + reserved words and prefix them with '_'. No capitalization is performed. + --> + <xsl:function name="amqp:cpp-name-1"> + <xsl:param name="name"/> + <xsl:choose> + <!-- C++ reserved words. --> + <xsl:when test="$name='delete'">delete_</xsl:when> + <xsl:when test="$name='return'">return_</xsl:when> + <!-- Change unsuitable C++ identifier characters. --> + <xsl:otherwise><xsl:value-of select="translate($name, ' -', '__')"/></xsl:otherwise> + </xsl:choose> + </xsl:function> + + <!-- + ======================= + Function: amqp:cpp-name + ======================= + Convert parameter "name" to a valid, camel cased C++ name. + --> + <xsl:function name="amqp:cpp-name"> + <xsl:param name="name"/> + <xsl:value-of select="amqp:cpp-name-1(amqp:camel-case($name))"/> + </xsl:function> + + <!-- + ============================= + Function: amqp:cpp-class-name + ============================= + Convert parameter "name" to a valid C++ identifier, finding spaces and '-'s + in the parameter name and replacing them with '_' chars. Also check for C++ + reserved words and prefix them with '_'. First letter only is capitalized. + --> + <xsl:function name="amqp:cpp-class-name"> + <xsl:param name="name"/> + <xsl:value-of select="amqp:upper-first(amqp:cpp-name($name))"/> + </xsl:function> + + <!-- + ========================= + Function: amqp:camel-case + ========================= + *** NOTE: Only works with *one* of either '-' or ' '. If a name contains 2 or + *** more of these characters, then this will break. + Convert parameter "name" to camel case, where words are separated by ' ' or '-' + --> + <xsl:function name="amqp:camel-case"> + <xsl:param name="name"/> + <xsl:choose> + <xsl:when test="contains($name, ' ')"> + <xsl:value-of select="concat(substring-before($name, ' '), amqp:upper-first(substring-after($name, ' ')))"/> + </xsl:when> + <xsl:when test="contains($name, '-')"> + <xsl:value-of select="concat(substring-before($name, '-'), amqp:upper-first(substring-after($name, '-')))"/> + </xsl:when> + <xsl:otherwise> + <xsl:value-of select="$name"/> + </xsl:otherwise> + </xsl:choose> + </xsl:function> + + <!-- + ========================= + Function: amqp:field-name + ========================= + Get a valid field name, processing spaces and '-'s where appropriate + --> + <xsl:function name="amqp:field-name"> + <xsl:param name="name"/> + <xsl:value-of select="amqp:upper-first(amqp:camel-case($name))"/> + </xsl:function> + + <!-- + ======================= + Function: amqp:cpp-type + ======================= + Map the set of simple AMQP types to C++ types. Also map the AMQP table + domain to appropriate C++ class. + --> + <xsl:function name="amqp:cpp-type"> + <xsl:param name="type"/> + <xsl:choose> + <!-- Simple AMQP domain types --> + <xsl:when test="$type='octet'">u_int8_t</xsl:when> + <xsl:when test="$type='short'">u_int16_t</xsl:when> + <xsl:when test="$type='shortstr'">string</xsl:when> + <xsl:when test="$type='longstr'">string</xsl:when> + <xsl:when test="$type='bit'">bool</xsl:when> + <xsl:when test="$type='long'">u_int32_t</xsl:when> + <xsl:when test="$type='longlong'">u_int64_t</xsl:when> + <xsl:when test="$type='timestamp'">u_int64_t</xsl:when> + <!-- AMQP structures --> + <xsl:when test="$type='table'">FieldTable</xsl:when> + <!-- Fallback: unknown type --> + <xsl:otherwise>unknown_type /* WARNING: undefined type */</xsl:otherwise> + </xsl:choose> + </xsl:function> + + <!-- + ========================== + Function: amqp:cpp-arg-ref + ========================== + Determines whether a C++ reference is required for an argument. + --> + <xsl:function name="amqp:cpp-arg-ref"> + <xsl:param name="type"/> + <xsl:choose> + <xsl:when test="$type='string'">&</xsl:when> + <xsl:when test="$type='FieldTable'">&</xsl:when> + </xsl:choose> + </xsl:function> + + <!-- + ========================= + Function: amqp:cpp-lookup + ========================= + Template and function for looking up the cpp type from the domain name. + The template runs on a lookup table XML generated by the "domain_table" + template in amqp_domaintypes.xsl. + --> + <xsl:template match="/" mode="cpp-lookup"> + <xsl:param name="domain-name"/> + <xsl:for-each select="key('domain-lookup', $domain-name)"> + <xsl:value-of select="@cpp-type"/> + </xsl:for-each> + </xsl:template> + + <xsl:function name="amqp:cpp-lookup"> + <xsl:param name="domain-name"/> + <xsl:param name="domain-cpp-table"/> + <xsl:apply-templates mode="cpp-lookup" select="$domain-cpp-table"> + <xsl:with-param name="domain-name" select="$domain-name"/> + </xsl:apply-templates> + </xsl:function> + + <!-- + ========================= + Function: amqp:cpp-lookup + ========================= + Template and function for processing the possibly multiple <doc> elements + within a node. + --> + <xsl:template match="doc" mode="process-doc-elts"> + <xsl:for-each select="."> + <xsl:choose> + <xsl:when test=".[@type='grammar']"><xsl:value-of select="."/></xsl:when> + <xsl:when test=".[@type='scenario']"><xsl:value-of select="concat('
Test Scenario: ', normalize-space(.))"/></xsl:when> + <xsl:otherwise><xsl:value-of select="normalize-space(.)"/></xsl:otherwise> + </xsl:choose> + </xsl:for-each> + </xsl:template> + + <xsl:function name="amqp:process-docs"> + <xsl:param name="doc-elts"/> + <xsl:apply-templates mode="process-doc-elts" select="$doc-elts"/> + </xsl:function> + + +</xsl:stylesheet> + diff --git a/cpp/common/framing/generated/stylesheets/convert_0.81.xsl b/cpp/common/framing/generated/stylesheets/convert_0.81.xsl new file mode 100644 index 0000000000..9924f165da --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/convert_0.81.xsl @@ -0,0 +1,407 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<xsl:template match="/"> + <xsl:apply-templates select="/" mode="do-amqp"/> +</xsl:template> + +<!-- ====== + <amqp> + ====== --> +<xsl:template match="amqp" mode="do-amqp"> + +<!-- <xsl:text>
</xsl:text> --> +<xsl:element name= "amqp"> +<xsl:attribute name="major"><xsl:value-of select="@major"/></xsl:attribute> +<xsl:attribute name="minor"><xsl:value-of select="@minor"/></xsl:attribute> +<xsl:attribute name="port"><xsl:value-of select="@port"/></xsl:attribute> +<xsl:attribute name="comment"><xsl:value-of select="@comment"/></xsl:attribute> +<xsl:text>
</xsl:text> + +<!-- constant elements --> +<xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:comment> + ==================== + Constants + ==================== + </xsl:comment><xsl:text>
</xsl:text> +<xsl:text>
</xsl:text> +<xsl:apply-templates select="constant" mode="do-constant"> +<xsl:with-param name="indent" select="' '"/> +</xsl:apply-templates> + +<!-- domain elements --> +<xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:comment> + ==================== + Domains + ==================== + </xsl:comment><xsl:text>
</xsl:text> +<xsl:text>
</xsl:text> +<xsl:apply-templates select="domain" mode="do-domain"> +<xsl:with-param name="indent" select="' '"/> +</xsl:apply-templates> + +<!-- required elementary domain definition elements added into v0.81 --> +<xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:comment> Elementary domains </xsl:comment><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">bit</xsl:attribute> + <xsl:attribute name="type">bit</xsl:attribute> + <xsl:attribute name="label">single bit</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">octet</xsl:attribute> + <xsl:attribute name="type">octet</xsl:attribute> + <xsl:attribute name="label">single octet</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">short</xsl:attribute> + <xsl:attribute name="type">short</xsl:attribute> + <xsl:attribute name="label">16-bit integer</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">long</xsl:attribute> + <xsl:attribute name="type">long</xsl:attribute> + <xsl:attribute name="label">32-bit integer</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">longlong</xsl:attribute> + <xsl:attribute name="type">longlong</xsl:attribute> + <xsl:attribute name="label">64-bit integer</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">shortstr</xsl:attribute> + <xsl:attribute name="type">shortstr</xsl:attribute> + <xsl:attribute name="label">short string</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">longstr</xsl:attribute> + <xsl:attribute name="type">longstr</xsl:attribute> + <xsl:attribute name="label">long string</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">timestamp</xsl:attribute> + <xsl:attribute name="type">timestamp</xsl:attribute> + <xsl:attribute name="label">64-bit timestamp</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:element name="domain"> + <xsl:attribute name="name">table</xsl:attribute> + <xsl:attribute name="type">table</xsl:attribute> + <xsl:attribute name="label">field table</xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> + +<!-- class elements --> +<xsl:text>
</xsl:text> +<xsl:text> </xsl:text><xsl:comment> + ==================== + Classes + ==================== + </xsl:comment><xsl:text>
</xsl:text> +<xsl:apply-templates select="class" mode="do-class"> +<xsl:with-param name="indent" select="' '"/> +</xsl:apply-templates> + +</xsl:element><!-- amqp --> +<!-- <xsl:text>
</xsl:text> --> +</xsl:template> + +<!-- ========== + <constant> + ========== --> +<xsl:template match="constant" mode="do-constant"> +<xsl:param name="indent"/> +<xsl:variable name="constant" select="translate(@name, ' ', '-')"/> + +<xsl:value-of select="$indent"/><xsl:element name="constant"> +<xsl:attribute name="name"><xsl:value-of select="$constant"/></xsl:attribute> +<xsl:attribute name="value"><xsl:value-of select="@value"/></xsl:attribute> +<xsl:if test="@class"> +<xsl:attribute name="class"><xsl:value-of select="translate(@class, ' ', '-')"/></xsl:attribute> +</xsl:if> + +<!-- If there is content, place in child <doc> element --> +<xsl:if test="string-length(.) > 0"> +<xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text><xsl:element name="doc"><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text><xsl:value-of select="normalize-space(.)"/><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text></xsl:element><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/> +</xsl:if> + +</xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ======== + <domain> + ======== --> +<xsl:template match="domain" mode="do-domain"> +<xsl:param name="indent"/> +<xsl:variable name="domain" select="translate(@name, ' ', '-')"/> + +<xsl:value-of select="$indent"/><xsl:element name="domain"> +<xsl:attribute name="name"><xsl:value-of select="$domain"/></xsl:attribute> +<xsl:attribute name="type"><xsl:value-of select="@type"/></xsl:attribute> +<xsl:if test="doc|assert|rule"><xsl:text>
</xsl:text></xsl:if> + +<!-- doc elements --> +<xsl:apply-templates select="doc" mode="do-doc"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +</xsl:apply-templates> + +<!-- assert elements --> +<xsl:apply-templates select="assert" mode="do-assert"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +</xsl:apply-templates> + +<!-- rule elements --> +<xsl:apply-templates select="rule" mode="do-rule"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="$domain"/> +</xsl:apply-templates> + +<xsl:if test="doc|assert|rule"><xsl:value-of select="$indent"/></xsl:if></xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ======== + <class> + ======== --> + +<xsl:template match="class" mode="do-class"> +<xsl:param name="indent"/> +<xsl:variable name="class" select="translate(@name, ' ', '-')"/> + +<!-- Ignore class test - removed from 0.81 --> +<xsl:if test="not($class = 'test')"> +<xsl:text>
</xsl:text><xsl:value-of select="$indent"/><xsl:comment><xsl:value-of select="concat(' == Class: ', $class, ' == ')"/></xsl:comment><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:element name="class"> +<xsl:attribute name="name"><xsl:value-of select="$class"/></xsl:attribute> +<xsl:attribute name="handler"><xsl:value-of select="@handler"/></xsl:attribute> +<xsl:attribute name="index"><xsl:value-of select="@index"/></xsl:attribute> +<xsl:if test="doc|chassis|rule|field|method"><xsl:text>
</xsl:text></xsl:if> + +<!-- doc elements --> +<xsl:apply-templates select="doc" mode="do-doc"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="$class"/> +</xsl:apply-templates> + +<!-- chassis elements --> +<xsl:apply-templates select="chassis" mode="do-chassis"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +</xsl:apply-templates> + +<!-- rule elements --> +<xsl:apply-templates select="rule" mode="do-rule"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="$class"/> +</xsl:apply-templates> + +<!-- field elements --> +<xsl:apply-templates select="field" mode="do-field"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="$class"/> +</xsl:apply-templates> + +<!-- method elements --> +<xsl:apply-templates select="method" mode="do-method"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="$class"/> +</xsl:apply-templates> + +<xsl:if test="doc|chassis|rule|field|method"><xsl:value-of select="$indent"/></xsl:if></xsl:element><xsl:text>
</xsl:text> +</xsl:if> +</xsl:template> + +<!-- ======== + <method> + ======== --> + +<xsl:template match="method" mode="do-method"> +<xsl:param name="indent"/> +<xsl:param name="label"/> +<xsl:variable name="method" select="translate(@name, ' ', '-')"/> + +<xsl:text>
</xsl:text><xsl:value-of select="$indent"/><xsl:comment><xsl:value-of select="concat(' == Method: ', $label, '.', $method, ' == ')"/></xsl:comment><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:element name="method"> +<xsl:attribute name="name"><xsl:value-of select="$method"/></xsl:attribute> +<xsl:if test="@synchronous"><xsl:attribute name="synchronous"><xsl:value-of select="@synchronous"/></xsl:attribute></xsl:if> +<xsl:attribute name="index"><xsl:value-of select="@index"/></xsl:attribute> +<xsl:if test="doc|chassis|response|rule|field"><xsl:text>
</xsl:text></xsl:if> + +<!-- doc elements --> +<xsl:apply-templates select="doc" mode="do-doc"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="concat($label, '.', $method)"/> +</xsl:apply-templates> + +<!-- chassis and response elements --> +<xsl:apply-templates select="chassis" mode="do-chassis"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +</xsl:apply-templates> +<xsl:apply-templates select="response" mode="do-response"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +</xsl:apply-templates> + +<!-- rule elements --> +<xsl:apply-templates select="rule" mode="do-rule"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="concat($label, '.', $method)"/> +</xsl:apply-templates> + +<!-- field elements --> +<xsl:apply-templates select="field" mode="do-field"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="concat($label, '.', $method)"/> +</xsl:apply-templates> + +<xsl:if test="doc|chassis|response|rule|field"><xsl:value-of select="$indent"/></xsl:if></xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ======== + <field> + ======== --> + +<xsl:template match="field" mode="do-field"> +<xsl:param name="indent"/> +<xsl:param name="label"/> +<xsl:variable name="field" select="translate(@name, ' ', '-')"/> + +<xsl:value-of select="$indent"/><xsl:element name="field"> +<xsl:attribute name="name"><xsl:value-of select="$field"/></xsl:attribute> +<xsl:if test="@type"> +<xsl:attribute name="domain"><xsl:value-of select="translate(@type, ' ', '-')"/></xsl:attribute> +</xsl:if> +<xsl:if test="@domain"> +<xsl:attribute name="domain"><xsl:value-of select="translate(@domain, ' ', '-')"/></xsl:attribute> +</xsl:if> +<xsl:if test="doc|rule|assert"><xsl:text>
</xsl:text></xsl:if> + +<!-- doc elements --> +<xsl:apply-templates select="doc" mode="do-doc"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="concat($label, '.', $field)"/> +</xsl:apply-templates> + +<!-- rule elements --> +<xsl:apply-templates select="rule" mode="do-rule"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +<xsl:with-param name="label" select="concat($label, '.', $field)"/> +</xsl:apply-templates> + +<!-- assert elements --> +<xsl:apply-templates select="assert" mode="do-assert"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +</xsl:apply-templates> + +<xsl:if test="doc|rule|assert"><xsl:value-of select="$indent"/></xsl:if></xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ======== + <assert> + ======== --> +<xsl:template match="assert" mode="do-assert"> +<xsl:param name="indent"/> + +<xsl:value-of select="$indent"/><xsl:element name="assert"> +<xsl:attribute name="check"><xsl:value-of select="@check"/></xsl:attribute> +<xsl:if test="@value"><xsl:attribute name="value"><xsl:value-of select="@value"/></xsl:attribute></xsl:if> +<xsl:if test="@rule"><xsl:attribute name="rule"><xsl:value-of select="@rule"/></xsl:attribute></xsl:if> + +<xsl:apply-templates select="doc" mode="do-doc"> +<xsl:with-param name="indent" select="concat($indent, ' ')"/> +</xsl:apply-templates> + +</xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ======== + <rule> + ======== --> +<xsl:template match="rule" mode="do-rule"> +<xsl:param name="indent"/> +<xsl:param name="label"/> + +<xsl:value-of select="$indent"/><xsl:element name="rule"> +<xsl:attribute name="name">rule_<xsl:value-of select="$label"/>_<xsl:number format="01"/></xsl:attribute> + +<!-- If there is content, place in child <doc> element --> + +<xsl:if test="string-length(.) > 0"> +<xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text><xsl:element name="doc"><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text><xsl:value-of select="normalize-space(.)"/><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text></xsl:element><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/> +</xsl:if> + +</xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ========= + <chassis> + ========= --> +<xsl:template match="chassis" mode="do-chassis"> +<xsl:param name="indent"/> + +<xsl:value-of select="$indent"/><xsl:element name="chassis"> +<xsl:attribute name="name"><xsl:value-of select="@name"/></xsl:attribute> +<xsl:attribute name="implement"><xsl:value-of select="@implement"/></xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ========== + <response> + ========== --> +<xsl:template match="response" mode="do-response"> +<xsl:param name="indent"/> + +<xsl:value-of select="$indent"/><xsl:element name="response"> +<xsl:attribute name="name"><xsl:value-of select="@name"/></xsl:attribute> +</xsl:element><xsl:text>
</xsl:text> + +</xsl:template> + +<!-- ===== + <doc> + ===== --> +<xsl:template match="doc" mode="do-doc"> +<xsl:param name="indent"/> +<xsl:param name="label"/> + +<!-- Handle cases of <doc name="rule>...</doc>: turn them into <rule><doc>...</doc></rule> --> +<xsl:if test="@name = 'rule'"> +<xsl:value-of select="$indent"/><xsl:element name="rule"> +<xsl:if test="@test"><xsl:attribute name="name"><xsl:value-of select="@test"/></xsl:attribute></xsl:if> +<xsl:if test="not(@test)"><xsl:attribute name="name">doc_rule_<xsl:value-of select="$label"/>_<xsl:number format="01"/></xsl:attribute></xsl:if> +<xsl:text>
</xsl:text> +<xsl:value-of select="concat($indent, ' ')"/><xsl:element name="doc"><xsl:text>
</xsl:text> +<xsl:value-of select="concat($indent, ' ')"/><xsl:text> </xsl:text><xsl:value-of select="normalize-space(.)"/><xsl:text>
</xsl:text> +<xsl:value-of select="concat($indent, ' ')"/></xsl:element><xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/></xsl:element><xsl:text>
</xsl:text> +</xsl:if> + +<!-- Normal <doc>...</doc> elements --> +<xsl:if test="not(@name = 'rule')"> +<xsl:value-of select="$indent"/><xsl:element name="doc"> +<xsl:if test="@name = 'grammar'"> +<xsl:attribute name="type">grammar</xsl:attribute> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text><xsl:value-of select="."/> +</xsl:if> +<xsl:if test="not(@name = 'grammar')"> +<xsl:text>
</xsl:text> +<xsl:value-of select="$indent"/><xsl:text> </xsl:text><xsl:value-of select="normalize-space(.)"/><xsl:text>
</xsl:text> +</xsl:if> +<xsl:value-of select="$indent"/></xsl:element><xsl:text>
</xsl:text> +</xsl:if> + +</xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/cpp.xsl b/cpp/common/framing/generated/stylesheets/cpp.xsl new file mode 100644 index 0000000000..ae66f65745 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/cpp.xsl @@ -0,0 +1,318 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<!-- this class contains the templates for generating C++ source code for a given framing model --> + +<xsl:import href="utils.xsl"/> +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="generate-multi" select="frames"/> + <xsl:apply-templates mode="method-list-header" select="frames"/> + <xsl:apply-templates mode="method-list-source" select="frames"/> + <xsl:apply-templates mode="method-interface" select="frames"/> +</xsl:template> + +<!-- processes all frames outputting the classes in a single stream --> +<xsl:template match="frames" mode="generate-single"> + <xsl:result-document href="amqp_methods.h" format="textFormat"> +#include "amqp_framing.h" + <xsl:for-each select="frame"> + <xsl:call-template name="generate-class"> + <xsl:with-param name="f" select="."/> + </xsl:call-template> + </xsl:for-each> + </xsl:result-document> +</xsl:template> + +<!-- generates seperate file for each class/frame --> +<xsl:template match="frame" mode="generate-multi"> + <xsl:variable name="uri" select="concat(@name, '.h')"/> + <xsl:result-document href="{$uri}" format="textFormat"> +#include "amqp_types.h" +#include "AMQP_ServerOperations.h" +#include "AMQMethodBody.h" +#include "Buffer.h" +#include "FieldTable.h" + +#ifndef _<xsl:value-of select="@name"/>_ +#define _<xsl:value-of select="@name"/>_ + +namespace qpid { +namespace framing { + + <xsl:call-template name="generate-class"> + <xsl:with-param name="f" select="."/> + </xsl:call-template> +} +} + +#endif + +</xsl:result-document> +</xsl:template> + + +<!-- main class generation template --> +<xsl:template name="generate-class"> + <xsl:param name="f"/> +/** + * This class is autogenerated, do not modify. [From <xsl:value-of select="$f/parent::frames/@protocol"/>] + */ +class <xsl:value-of select="$f/@name"/> : virtual public AMQMethodBody +{ + <xsl:for-each select="$f/field"> + <xsl:value-of select="@cpp-type"/> + <xsl:text> </xsl:text> + <xsl:value-of select="@name"/>; + </xsl:for-each> + +public: + typedef std::tr1::shared_ptr<<xsl:value-of select="$f/@name"/>> shared_ptr; + + virtual ~<xsl:value-of select="$f/@name"/>() {} + + <xsl:for-each select="$f/field"> + inline <xsl:value-of select="concat(@cpp-arg-type, ' get', amqp:upper-first(@name), '() { return ', @name)"/>; } + </xsl:for-each> + + + inline void print(std::ostream& out) const{ + out << "<xsl:value-of select="$f/@declaration_name"/>" + <xsl:for-each select="$f/field"> + <xsl:text> << ", </xsl:text> + <xsl:value-of select="@name"/>="<< + <xsl:value-of select="@name"/> + </xsl:for-each> + ; + } + + inline u_int16_t amqpClassId() const { + return <xsl:value-of select="$f/@class-id"/>; + } + + inline u_int16_t amqpMethodId() const { + return <xsl:value-of select="$f/@method-id"/>; + } + + inline u_int32_t bodySize() const { + <xsl:choose> + <xsl:when test="$f/field"> + return + <xsl:for-each select="$f/field"> + <xsl:if test="position() != 1">+ + </xsl:if> + <xsl:value-of select="amqp:field-length(.)"/> + </xsl:for-each> + ; + </xsl:when> + <xsl:otherwise>return 0;</xsl:otherwise> + </xsl:choose> + } + + <xsl:if test="@server='true'"> + inline void invoke(AMQP_ServerOperations& target, u_int16_t channel) { + <xsl:if test="field"> + <xsl:value-of select="concat('target.get', amqp:upper-first(parent::class/@name), 'Handler()->', @invocation_name, '(channel, ')"/> + <xsl:value-of select="$f/field/@name" separator=", "/>); + </xsl:if> + <xsl:if test="not(field)"> + <xsl:value-of select="concat('target.get', amqp:upper-first(parent::class/@name), 'Handler()->', @invocation_name, '(channel)')"/>; + </xsl:if> + } + </xsl:if> + + inline void encodeContent(Buffer& buffer) const + { + <xsl:if test="$f/field[@type='bit']"> + u_int8_t flags = 0; + <xsl:for-each select="$f/field[@type='bit']"> + <xsl:value-of select="concat('flags |= ', @name,' << (', @boolean-index, ' - 1)')"/>; + </xsl:for-each> + </xsl:if> + <xsl:for-each select="$f/field"> + <xsl:if test="@type != 'bit'"> + <xsl:value-of select="amqp:encoder(.)"/>; + </xsl:if> + <xsl:if test="@type = 'bit' and @boolean-index = 1"> + <xsl:text>buffer.putOctet(flags)</xsl:text>; + </xsl:if> + </xsl:for-each> + } + + inline void decodeContent(Buffer& buffer) + { + <xsl:if test="$f/field[@type='bit']"> + u_int8_t maxbit = <xsl:value-of select="$f/@bit-field-count"/>; + </xsl:if> + <xsl:for-each select="$f/field"> + <xsl:choose> + <xsl:when test="@type = 'bit' and @boolean-index = 1"> + <xsl:text>u_int8_t flags = buffer.getOctet()</xsl:text>; + <xsl:value-of select="amqp:decoder(.)"/>; + </xsl:when> + <xsl:otherwise> + <xsl:value-of select="amqp:decoder(.)"/>; + </xsl:otherwise> + </xsl:choose> + </xsl:for-each> + } + + <xsl:if test="$f/field"> + <!-- only generate overloaded constructor if there are fields in this method --> + inline <xsl:value-of select="$f/@name"/>(<xsl:value-of select="$f/field/concat(@cpp-arg-type, ' _', @name)" separator=", "/>) : <xsl:value-of select="$f/field/concat(@name, '(_', @name, ')')" separator=", "/> + { + } + </xsl:if> + + inline <xsl:value-of select="$f/@name"/>() + { + } +}; + +</xsl:template> + +<xsl:template match="frames" mode="method-list-header"> +<xsl:result-document href="amqp_methods.h" format="textFormat"> +/** + * This file is autogenerated, do not modify. + */ + +#ifndef AMQ_METHODS_H +#define AMQ_METHODS_H + + <xsl:for-each select="class/frame"> +#include "<xsl:value-of select="@name"/>.h" + </xsl:for-each> + +namespace qpid { +namespace framing { + + <xsl:for-each select="class/frame"> +const <xsl:value-of select="concat(@name, ' ', @declaration_name)"/>; + </xsl:for-each> + +AMQMethodBody* createAMQMethodBody(u_int16_t classId, u_int16_t methodId); + +} +} + +#endif +</xsl:result-document> +</xsl:template> + +<xsl:template match="frames" mode="method-list-source"> + <xsl:result-document href="amqp_methods.cpp" format="textFormat"> +#include "amqp_methods.h" +#include "QpidError.h" + +namespace qpid { +namespace framing { +/** + * This method is autogenerated, do not modify. + */ +AMQMethodBody* createAMQMethodBody(u_int16_t classId, u_int16_t methodId){ + switch(classId * 1000 + methodId) + { + <xsl:for-each select="class/frame"> + <xsl:text>case </xsl:text> + <xsl:value-of select="@class-id"/> + <xsl:text> * 1000 + </xsl:text> + <xsl:value-of select="@method-id"/> + <xsl:text>: return new </xsl:text> + <xsl:value-of select="@name"/>(); + </xsl:for-each> + } + THROW_QPID_ERROR(FRAMING_ERROR, "Unknown method"); +} + +} +} +</xsl:result-document> +</xsl:template> + +<xsl:template match="frames" mode="generate-interface"> + <xsl:result-document href="AMQPServer.h" format="textFormat"> +#include "amqp_types.h" +#include "FieldTable.h" + +#ifndef _AMQPServer_ +#define _AMQPServer_ + +namespace qpid { +namespace framing { + +class AMQPServer +{ + public: + + <xsl:for-each select="class"> + class <xsl:value-of select="concat(amqp:upper-first(@name), 'Handler')"/>{ + public: + <xsl:for-each select="frame[@server='true']"> + <xsl:if test="field"> + virtual void <xsl:value-of select="@invocation_name"/>(u_int16_t channel, <xsl:value-of select="field/concat(@cpp-arg-type, ' ', @name)" separator=", "/>) = 0; + </xsl:if> + <xsl:if test="not(field)"> + virtual void <xsl:value-of select="@invocation_name"/>(u_int16_t channel) = 0; + </xsl:if> + </xsl:for-each> + virtual ~<xsl:value-of select="concat(amqp:upper-first(@name), 'Handler')"/>(){} + }; + + virtual <xsl:value-of select="concat(amqp:upper-first(@name), 'Handler* get', amqp:upper-first(@name), 'Handler')"/>() = 0; + + </xsl:for-each> + virtual ~AMQPServer(){} +}; + +} +} + +#endif +</xsl:result-document> + + <xsl:result-document href="AMQPClient.h" format="textFormat"> +#include "amqp_types.h" +#include "FieldTable.h" + +#ifndef _AMQPClient_ +#define _AMQPClient_ + +namespace qpid { +namespace framing { + +class AMQPClient +{ + public: + + <xsl:for-each select="class"> + class <xsl:value-of select="concat(amqp:upper-first(@name), 'Handler')"/>{ + public: + <xsl:for-each select="frame[@client='true']"> + <xsl:if test="field"> + virtual void <xsl:value-of select="@invocation_name"/>(u_int16_t channel, <xsl:value-of select="field/concat(@cpp-arg-type, ' ', @name)" separator=", "/>) = 0; + </xsl:if> + <xsl:if test="not(field)"> + virtual void <xsl:value-of select="@invocation_name"/>(u_int16_t channel) = 0; + </xsl:if> + </xsl:for-each> + virtual ~<xsl:value-of select="concat(amqp:upper-first(@name), 'Handler')"/>(){} + }; + + virtual <xsl:value-of select="concat(amqp:upper-first(@name), 'Handler* get', amqp:upper-first(@name), 'Handler')"/>() = 0; + + </xsl:for-each> + + virtual ~AMQPClient(){} +}; + +} +} + +#endif +</xsl:result-document> + +</xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/framing.xsl b/cpp/common/framing/generated/stylesheets/framing.xsl new file mode 100644 index 0000000000..c63e719a77 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/framing.xsl @@ -0,0 +1,49 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<xsl:import href="prepare1.xsl"/> +<xsl:import href="prepare2.xsl"/> +<xsl:import href="prepare3.xsl"/> +<xsl:import href="cpp.xsl"/> + +<xsl:output indent="yes"/> +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:template match="/"> + <xsl:variable name="prepare1"> + <xsl:apply-templates mode="prepare1" select="."/> + </xsl:variable> + + <xsl:variable name="prepare2"> + <xsl:apply-templates mode="prepare2" select="$prepare1"/> + </xsl:variable> + + <xsl:variable name="model"> + <xsl:apply-templates mode="prepare3" select="$prepare2"/> + </xsl:variable> + + <xsl:apply-templates mode="generate-multi" select="$model"/> + <xsl:apply-templates mode="method-list-header" select="$model"/> + <xsl:apply-templates mode="method-list-source" select="$model"/> + + <!-- these interfaces are now generated by the new scripts from kim --> + <!-- e.g. those of the form amqp-server/client-*.xsl --> + <!-- xsl:apply-templates mode="generate-interface" select="$model"/ --> + + <!-- dump out the intermediary files for debugging --> + <!-- + <xsl:result-document href="prepare1.out"> + <xsl:copy-of select="$prepare1"/> + </xsl:result-document> + + <xsl:result-document href="prepare2.out"> + <xsl:copy-of select="$prepare2"/> + </xsl:result-document> + + <xsl:result-document href="model.out"> + <xsl:copy-of select="$model"/> + </xsl:result-document> + --> +</xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/prepare1.xsl b/cpp/common/framing/generated/stylesheets/prepare1.xsl new file mode 100644 index 0000000000..2aeda89677 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/prepare1.xsl @@ -0,0 +1,104 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> +<xsl:param name="asl_base"/> + +<!-- pre-process, phase 1 --> + +<xsl:template match="/"> + <xsl:apply-templates select="protocol" mode="prepare1"/> +</xsl:template> + +<xsl:template match="amqp" mode="prepare1"> + <frames> + <xsl:attribute name="protocol"> + <xsl:value-of select="@comment"/> + <xsl:text> (</xsl:text> + <xsl:text>major=</xsl:text><xsl:value-of select="@major"/> + <xsl:text>, minor=</xsl:text><xsl:value-of select="@minor"/> + <xsl:text>)</xsl:text> + </xsl:attribute> + <xsl:apply-templates mode="prepare1" select="inherit"/> + <xsl:apply-templates mode="prepare1" select="include"/> + <xsl:apply-templates mode="prepare1" select="domain"/> + <xsl:apply-templates mode="prepare1" select="class"/> + </frames> +</xsl:template> + +<xsl:template match="include" mode="prepare1"> + <xsl:if test="@filename != 'asl_constants.asl'"> + <!-- skip asl_constants.asl, we don't need it and it is not well formed so causes error warnings --> + <xsl:apply-templates select="document(@filename)" mode="prepare1"/> + </xsl:if> +</xsl:template> + +<xsl:template match="inherit" mode="prepare1"> + <xsl:variable name="ibase" select="concat('file:///', $asl_base, '/', @name, '.asl')"/> + <xsl:choose> + <xsl:when test="document($ibase)"> + <xsl:apply-templates select="document($ibase)" mode="prepare1"/> + </xsl:when> + <xsl:otherwise> + <xsl:message> + Could not inherit from <xsl:value-of select="$ibase"/>; file not found. + </xsl:message> + </xsl:otherwise> + </xsl:choose> +</xsl:template> + +<xsl:template match="class[@index]" mode="prepare1"> +<xsl:if test="not(@name = 'test')"> + <class> + <xsl:attribute name="name"><xsl:value-of select="@name"/></xsl:attribute> + <xsl:apply-templates select="method" mode="prepare1"/> + </class> +</xsl:if> +</xsl:template> + +<xsl:template match="method" mode="prepare1"> + <xsl:if test="parent::class[@index]"><!-- there is a template class that has no index, which we want to skip --> + <frame> + <xsl:attribute name="name"><xsl:value-of select="amqp:class-name(parent::class/@name, @name)"/></xsl:attribute> + <xsl:attribute name="class-id"><xsl:value-of select="parent::class/@index"/></xsl:attribute> + <xsl:if test="@index"> + <xsl:attribute name="method-id"><xsl:value-of select="@index"/></xsl:attribute> + </xsl:if> + <xsl:if test="not(@index)"> + <xsl:attribute name="method-id"><xsl:number count="method"/></xsl:attribute> + </xsl:if> + <xsl:attribute name="invocation_name"> + <xsl:value-of select="amqp:keyword-check(amqp:field-name(@name))"/> + </xsl:attribute> + <xsl:attribute name="declaration_name"> + <xsl:value-of select="amqp:method-name(parent::class/@name, @name)"/> + </xsl:attribute> + <xsl:if test="chassis[@name='client']"> + <xsl:attribute name="client">true</xsl:attribute> + </xsl:if> + <xsl:if test="chassis[@name='server']"> + <xsl:attribute name="server">true</xsl:attribute> + </xsl:if> + <xsl:apply-templates select="field" mode="prepare1"/> + </frame> + </xsl:if> +</xsl:template> + +<xsl:template match="domain" mode="prepare1"> + <domain> + <name><xsl:value-of select="@name"/></name> + <type><xsl:value-of select="@type"/></type> + </domain> +</xsl:template> + +<xsl:template match="field" mode="prepare1"> + <field> + <xsl:copy-of select="@name"/> + <xsl:copy-of select="@type"/> + <xsl:copy-of select="@domain"/> + </field> +</xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/prepare2.xsl b/cpp/common/framing/generated/stylesheets/prepare2.xsl new file mode 100644 index 0000000000..331319de57 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/prepare2.xsl @@ -0,0 +1,54 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> + +<!-- pre-process, phase 2 --> + +<xsl:key name="domain-lookup" match="domain" use="name"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="prepare2" select="frames"/> +</xsl:template> + +<xsl:template match="field[@domain]" mode="prepare2"> + <field> + <xsl:variable name="t1" select="key('domain-lookup', @domain)/type"/> + <xsl:attribute name="name"><xsl:value-of select="amqp:field-name(@name)"/></xsl:attribute> + <xsl:attribute name="type"><xsl:value-of select="$t1"/></xsl:attribute> + </field> +</xsl:template> + +<xsl:template match="field[@type]" mode="prepare2"> + <field> + <xsl:attribute name="name"><xsl:value-of select="amqp:field-name(@name)"/></xsl:attribute> + <xsl:attribute name="type"><xsl:value-of select="@type"/></xsl:attribute> + </field> +</xsl:template> + +<xsl:template match="frames" mode="prepare2"> + <frames> + <xsl:copy-of select="@protocol"/> + <xsl:apply-templates mode="prepare2"/> + </frames> +</xsl:template> + +<xsl:template match="class" mode="prepare2"> + <class> + <xsl:copy-of select="@*"/> + <xsl:apply-templates mode="prepare2"/> + </class> +</xsl:template> + +<xsl:template match="frame" mode="prepare2"> + <xsl:element name="{name()}"> + <xsl:copy-of select="@*"/> + <xsl:apply-templates mode="prepare2" select="field"/> + </xsl:element> +</xsl:template> + +<xsl:template match="domain" mode="prepare2"></xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/prepare3.xsl b/cpp/common/framing/generated/stylesheets/prepare3.xsl new file mode 100644 index 0000000000..27a4764e4f --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/prepare3.xsl @@ -0,0 +1,54 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<xsl:import href="utils.xsl"/> + +<xsl:output indent="yes"/> + +<!-- final preparation of the model --> + +<xsl:template match="/"> + <xsl:apply-templates mode="prepare3"/> +</xsl:template> + +<xsl:template match="frames" mode="prepare3"> + <frames> + <xsl:copy-of select="@protocol"/> + <xsl:apply-templates mode="prepare3"/> + </frames> +</xsl:template> + +<xsl:template match="class" mode="prepare3"> + <class> + <xsl:copy-of select="@*"/> + <xsl:apply-templates mode="prepare3"/> + </class> +</xsl:template> + +<xsl:template match="frame" mode="prepare3"> + <xsl:element name="frame"> + <xsl:copy-of select="@*"/> + <xsl:if test="field[@type='bit']"> + <xsl:attribute name="has-bit-field">true</xsl:attribute> + <xsl:attribute name="bit-field-count"><xsl:value-of select="count(field[@type='bit'])"/></xsl:attribute> + </xsl:if> + <xsl:apply-templates mode="prepare3"/> + </xsl:element> +</xsl:template> + + +<xsl:template match="field" mode="prepare3"> + <field> + <xsl:attribute name="type"><xsl:value-of select="@type"/></xsl:attribute> + <!-- ensure the field name is processed to be a valid java name --> + <xsl:attribute name="name"><xsl:value-of select="amqp:field-name(@name)"/></xsl:attribute> + <!-- add some attributes to make code generation easier --> + <xsl:attribute name="cpp-type"><xsl:value-of select="amqp:cpp-type(@type)"/></xsl:attribute> + <xsl:attribute name="cpp-arg-type"><xsl:value-of select="amqp:cpp-arg-type(@type)"/></xsl:attribute> + <xsl:if test="@type='bit'"> + <xsl:attribute name="boolean-index"><xsl:number count="field[@type='bit']"/></xsl:attribute> + </xsl:if> + </field> +</xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/registry.xsl b/cpp/common/framing/generated/stylesheets/registry.xsl new file mode 100644 index 0000000000..a818a0a871 --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/registry.xsl @@ -0,0 +1,12 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<xsl:import href="java.xsl"/> + +<xsl:output method="text" indent="yes" name="textFormat"/> + +<xsl:template match="/"> + <xsl:apply-templates mode="generate-registry" select="registries"/> +</xsl:template> + +</xsl:stylesheet> diff --git a/cpp/common/framing/generated/stylesheets/utils.xsl b/cpp/common/framing/generated/stylesheets/utils.xsl new file mode 100644 index 0000000000..829d38433e --- /dev/null +++ b/cpp/common/framing/generated/stylesheets/utils.xsl @@ -0,0 +1,194 @@ +<?xml version='1.0'?> +<xsl:stylesheet version="2.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:amqp="http://amqp.org"> + +<!-- This file contains functions that are used in the generation of the java classes for framing --> + +<!-- retrieve the java type of a given amq type --> +<xsl:function name="amqp:cpp-type"> + <xsl:param name="t"/> + <xsl:choose> + <xsl:when test="$t='octet'">u_int8_t</xsl:when> + <xsl:when test="$t='short'">u_int16_t</xsl:when> + <xsl:when test="$t='shortstr'">string</xsl:when> + <xsl:when test="$t='longstr'">string</xsl:when> + <xsl:when test="$t='bit'">bool</xsl:when> + <xsl:when test="$t='long'">u_int32_t</xsl:when> + <xsl:when test="$t='longlong'">u_int64_t</xsl:when> + <xsl:when test="$t='table'">FieldTable</xsl:when> + <xsl:otherwise>Object /*WARNING: undefined type*/</xsl:otherwise> + </xsl:choose> +</xsl:function> +<xsl:function name="amqp:cpp-arg-type"> + <xsl:param name="t"/> + <xsl:choose> + <xsl:when test="$t='octet'">u_int8_t</xsl:when> + <xsl:when test="$t='short'">u_int16_t</xsl:when> + <xsl:when test="$t='shortstr'">string&</xsl:when> + <xsl:when test="$t='longstr'">string&</xsl:when> + <xsl:when test="$t='bit'">bool</xsl:when> + <xsl:when test="$t='long'">u_int32_t</xsl:when> + <xsl:when test="$t='longlong'">u_int64_t</xsl:when> + <xsl:when test="$t='table'">FieldTable&</xsl:when> + <xsl:otherwise>Object /*WARNING: undefined type*/</xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to get the field size of a given amq type --> +<xsl:function name="amqp:field-length"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='bit' and $f/@boolean-index=1"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='bit' and $f/@boolean-index > 1"> + <xsl:value-of select="concat('0 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='char'"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat('1 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat('2 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat('4 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat('8 /*', $f/@name, '*/')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat('1 + ', $f/@name, '.length()')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat('4 + ', $f/@name, '.length()')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat($f/@name, '.size()')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE FIELD SIZE */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to encode a field of a given amq type --> +<!-- Note: + This method will not provide an encoder for a bit field. + Bit fields should be encoded together separately. --> + +<xsl:function name="amqp:encoder"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat('buffer.putOctet(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat('buffer.putShort(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat('buffer.putLong(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat('buffer.putLongLong(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat('buffer.putShortString(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat('buffer.putLongString(', $f/@name, ')')"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat('buffer.putFieldTable(', $f/@name, ')')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE ENCODER */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- retrieve the code to decode a field of a given amq type --> +<xsl:function name="amqp:decoder"> + <xsl:param name="f"/> + <xsl:choose> + <xsl:when test="$f/@type='bit'"> + <xsl:value-of select="concat($f/@name, ' = (1 << (', $f/@boolean-index, ' - 1)) & flags;')"/> + </xsl:when> + <xsl:when test="$f/@type='octet'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getOctet()')"/> + </xsl:when> + <xsl:when test="$f/@type='short'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getShort()')"/> + </xsl:when> + <xsl:when test="$f/@type='long'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getLong()')"/> + </xsl:when> + <xsl:when test="$f/@type='longlong'"> + <xsl:value-of select="concat($f/@name, ' = buffer.getLongLong()')"/> + </xsl:when> + <xsl:when test="$f/@type='shortstr'"> + <xsl:value-of select="concat('buffer.getShortString(', $f/@name), ')'"/> + </xsl:when> + <xsl:when test="$f/@type='longstr'"> + <xsl:value-of select="concat('buffer.getLongString(', $f/@name), ')'"/> + </xsl:when> + <xsl:when test="$f/@type='table'"> + <xsl:value-of select="concat('buffer.getFieldTable(', $f/@name, ')')"/> + </xsl:when> + <xsl:otherwise><xsl:text>/* WARNING: COULD NOT DETERMINE DECODER */</xsl:text></xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- create the class name for a frame, based on class and method (passed in) --> +<xsl:function name="amqp:class-name"> + <xsl:param name="class"/> + <xsl:param name="method"/> + <xsl:value-of select="concat(amqp:upper-first($class),amqp:upper-first(amqp:field-name($method)), 'Body')"/> +</xsl:function> + +<!-- create the class name for a frame, based on class and method (passed in) --> +<xsl:function name="amqp:method-name"> + <xsl:param name="class"/> + <xsl:param name="method"/> + <xsl:value-of select="concat(translate($class, '- ', '__'), '_', translate($method, '- ', '__'))"/> +</xsl:function> + +<!-- get a valid field name, processing spaces and '-'s where appropriate --> +<xsl:function name="amqp:field-name"> + <xsl:param name="name"/> + <xsl:choose> + <xsl:when test="contains($name, ' ')"> + <xsl:value-of select="concat(substring-before($name, ' '), amqp:upper-first(substring-after($name, ' ')))"/> + </xsl:when> + <xsl:when test="contains($name, '-')"> + <xsl:value-of select="concat(substring-before($name, '-'), amqp:upper-first(substring-after($name, '-')))"/> + </xsl:when> + <xsl:otherwise> + <xsl:value-of select="$name"/> + </xsl:otherwise> + </xsl:choose> +</xsl:function> + +<!-- convert the first character of the input to upper-case --> +<xsl:function name="amqp:upper-first"> + <xsl:param name="in"/> + <xsl:value-of select="concat(upper-case(substring($in, 1, 1)), substring($in, 2))"/> +</xsl:function> + + +<xsl:function name="amqp:keyword-check"> + <xsl:param name="in"/> + <xsl:choose> + <xsl:when test="contains($in, 'delete')"> + <xsl:value-of select="concat($in, '_')"/> + </xsl:when> + <xsl:when test="contains($in, 'string')"> + <xsl:value-of select="concat($in, '_')"/> + </xsl:when> + <xsl:when test="contains($in, 'return')"> + <xsl:value-of select="concat($in, '_')"/> + </xsl:when> + <xsl:otherwise> + <xsl:value-of select="$in"/> + </xsl:otherwise> + </xsl:choose> +</xsl:function> + +</xsl:stylesheet> diff --git a/cpp/common/framing/inc/AMQBody.h b/cpp/common/framing/inc/AMQBody.h new file mode 100644 index 0000000000..d4b436c949 --- /dev/null +++ b/cpp/common/framing/inc/AMQBody.h @@ -0,0 +1,46 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "memory.h" +#include "amqp_types.h" +#include "Buffer.h" + +#ifndef _AMQBody_ +#define _AMQBody_ + +namespace qpid { + namespace framing { + + class AMQBody + { + public: + typedef std::tr1::shared_ptr<AMQBody> shared_ptr; + + virtual u_int32_t size() const = 0; + virtual u_int8_t type() const = 0; + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, u_int32_t size) = 0; + inline virtual ~AMQBody(){} + }; + + enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8}; + + } +} + + +#endif diff --git a/cpp/common/framing/inc/AMQContentBody.h b/cpp/common/framing/inc/AMQContentBody.h new file mode 100644 index 0000000000..8e97c31edb --- /dev/null +++ b/cpp/common/framing/inc/AMQContentBody.h @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "AMQBody.h" +#include "Buffer.h" + +#ifndef _AMQContentBody_ +#define _AMQContentBody_ + +namespace qpid { +namespace framing { + +class AMQContentBody : virtual public AMQBody +{ + string data; + +public: + typedef std::tr1::shared_ptr<AMQContentBody> shared_ptr; + + AMQContentBody(); + AMQContentBody(string& data); + inline virtual ~AMQContentBody(){} + inline u_int8_t type() const { return CONTENT_BODY; }; + inline string& getData(){ return data; } + u_int32_t size() const; + void encode(Buffer& buffer) const; + void decode(Buffer& buffer, u_int32_t size); +}; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/AMQDataBlock.h b/cpp/common/framing/inc/AMQDataBlock.h new file mode 100644 index 0000000000..6c47c78864 --- /dev/null +++ b/cpp/common/framing/inc/AMQDataBlock.h @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Buffer.h" + +#ifndef _AMQDataBlock_ +#define _AMQDataBlock_ + +namespace qpid { +namespace framing { + +class AMQDataBlock +{ +public: + virtual ~AMQDataBlock() {} + virtual void encode(Buffer& buffer) = 0; + virtual bool decode(Buffer& buffer) = 0; + virtual u_int32_t size() const = 0; +}; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/AMQFrame.h b/cpp/common/framing/inc/AMQFrame.h new file mode 100644 index 0000000000..5656d20377 --- /dev/null +++ b/cpp/common/framing/inc/AMQFrame.h @@ -0,0 +1,61 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_methods.h" +#include "amqp_types.h" +#include "AMQBody.h" +#include "AMQDataBlock.h" +#include "AMQMethodBody.h" +#include "AMQHeaderBody.h" +#include "AMQContentBody.h" +#include "AMQHeartbeatBody.h" +#include "Buffer.h" + +#ifndef _AMQFrame_ +#define _AMQFrame_ + +namespace qpid { + namespace framing { + + class AMQFrame : virtual public AMQDataBlock + { + u_int16_t channel; + u_int8_t type;//used if the body is decoded separately from the 'head' + AMQBody::shared_ptr body; + + public: + AMQFrame(); + AMQFrame(u_int16_t channel, AMQBody* body); + AMQFrame(u_int16_t channel, AMQBody::shared_ptr& body); + virtual ~AMQFrame(); + virtual void encode(Buffer& buffer); + virtual bool decode(Buffer& buffer); + virtual u_int32_t size() const; + u_int16_t getChannel(); + AMQBody::shared_ptr& getBody(); + + u_int32_t decodeHead(Buffer& buffer); + void decodeBody(Buffer& buffer, uint32_t size); + + friend std::ostream& operator<<(std::ostream& out, const AMQFrame& body); + }; + + } +} + + +#endif diff --git a/cpp/common/framing/inc/AMQHeaderBody.h b/cpp/common/framing/inc/AMQHeaderBody.h new file mode 100644 index 0000000000..369db8a9c8 --- /dev/null +++ b/cpp/common/framing/inc/AMQHeaderBody.h @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "AMQBody.h" +#include "Buffer.h" +#include "HeaderProperties.h" + +#ifndef _AMQHeaderBody_ +#define _AMQHeaderBody_ + +namespace qpid { +namespace framing { + +class AMQHeaderBody : virtual public AMQBody +{ + HeaderProperties* properties; + u_int16_t weight; + u_int64_t contentSize; + + void createProperties(int classId); +public: + typedef std::tr1::shared_ptr<AMQHeaderBody> shared_ptr; + + AMQHeaderBody(int classId); + AMQHeaderBody(); + inline u_int8_t type() const { return HEADER_BODY; } + HeaderProperties* getProperties(){ return properties; } + inline u_int64_t getContentSize() const { return contentSize; } + inline void setContentSize(u_int64_t size) { contentSize = size; } + virtual ~AMQHeaderBody(); + virtual u_int32_t size() const; + virtual void encode(Buffer& buffer) const; + virtual void decode(Buffer& buffer, u_int32_t size); +}; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/AMQHeartbeatBody.h b/cpp/common/framing/inc/AMQHeartbeatBody.h new file mode 100644 index 0000000000..ca2def977a --- /dev/null +++ b/cpp/common/framing/inc/AMQHeartbeatBody.h @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "AMQBody.h" +#include "Buffer.h" + +#ifndef _AMQHeartbeatBody_ +#define _AMQHeartbeatBody_ + +namespace qpid { +namespace framing { + +class AMQHeartbeatBody : virtual public AMQBody +{ +public: + typedef std::tr1::shared_ptr<AMQHeartbeatBody> shared_ptr; + + virtual ~AMQHeartbeatBody() {} + inline u_int32_t size() const { return 0; } + inline u_int8_t type() const { return HEARTBEAT_BODY; } + inline void encode(Buffer& buffer) const {} + inline void decode(Buffer& buffer, u_int32_t size) {} +}; + +} +} + +#endif diff --git a/cpp/common/framing/inc/AMQMethodBody.h b/cpp/common/framing/inc/AMQMethodBody.h new file mode 100644 index 0000000000..59d5dd5212 --- /dev/null +++ b/cpp/common/framing/inc/AMQMethodBody.h @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "amqp_types.h" +#include "AMQBody.h" +#include "Buffer.h" +#include "AMQP_ServerOperations.h" + +#ifndef _AMQMethodBody_ +#define _AMQMethodBody_ + +namespace qpid { +namespace framing { + +class AMQMethodBody : virtual public AMQBody +{ +public: + typedef std::tr1::shared_ptr<AMQMethodBody> shared_ptr; + + inline u_int8_t type() const { return METHOD_BODY; } + inline u_int32_t size() const { return 4 + bodySize(); } + inline virtual ~AMQMethodBody(){} + virtual void print(std::ostream& out) const = 0; + virtual u_int16_t amqpMethodId() const = 0; + virtual u_int16_t amqpClassId() const = 0; + virtual void invoke(AMQP_ServerOperations& target, u_int16_t channel); + virtual void encodeContent(Buffer& buffer) const = 0; + virtual void decodeContent(Buffer& buffer) = 0; + virtual u_int32_t bodySize() const = 0; + void encode(Buffer& buffer) const; + void decode(Buffer& buffer, u_int32_t size); + bool match(AMQMethodBody* other) const; +}; + +std::ostream& operator<<(std::ostream& out, const AMQMethodBody& body); + +} +} + + +#endif diff --git a/cpp/common/framing/inc/BasicHeaderProperties.h b/cpp/common/framing/inc/BasicHeaderProperties.h new file mode 100644 index 0000000000..8688a37bf9 --- /dev/null +++ b/cpp/common/framing/inc/BasicHeaderProperties.h @@ -0,0 +1,93 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "amqp_methods.h" +#include "Buffer.h" +#include "HeaderProperties.h" + +#ifndef _BasicHeaderProperties_ +#define _BasicHeaderProperties_ + +namespace qpid { +namespace framing { + + //TODO: This could be easily generated from the spec + class BasicHeaderProperties : public HeaderProperties + { + string contentType; + string contentEncoding; + FieldTable headers; + u_int8_t deliveryMode; + u_int8_t priority; + string correlationId; + string replyTo; + string expiration; + string messageId; + u_int64_t timestamp; + string type; + string userId; + string appId; + string clusterId; + + u_int16_t getFlags() const; + + public: + BasicHeaderProperties(); + virtual ~BasicHeaderProperties(); + virtual u_int32_t size() const; + virtual void encode(Buffer& buffer) const; + virtual void decode(Buffer& buffer, u_int32_t size); + + inline virtual u_int8_t classId(){ return BASIC; } + + inline string& getContentType(){ return contentType; } + inline string& getContentEncoding(){ return contentEncoding; } + inline FieldTable& getHeaders(){ return headers; } + inline u_int8_t getDeliveryMode(){ return deliveryMode; } + inline u_int8_t getPriority(){ return priority; } + inline string& getCorrelationId(){return correlationId; } + inline string& getReplyTo(){ return replyTo; } + inline string& getExpiration(){ return expiration; } + inline string& getMessageId(){return messageId; } + inline u_int64_t getTimestamp(){ return timestamp; } + inline string& getType(){ return type; } + inline string& getUserId(){ return userId; } + inline string& getAppId(){ return appId; } + inline string& getClusterId(){ return clusterId; } + + void inline setContentType(string& type){ contentType = type; } + void inline setContentEncoding(string& encoding){ contentEncoding = encoding; } + void inline setHeaders(FieldTable& headers){ this->headers = headers; } + void inline setDeliveryMode(u_int8_t mode){ deliveryMode = mode; } + void inline setPriority(u_int8_t priority){ this->priority = priority; } + void inline setCorrelationId(string& correlationId){ this->correlationId = correlationId; } + void inline setReplyTo(string& replyTo){ this->replyTo = replyTo;} + void inline setExpiration(string& expiration){ this->expiration = expiration; } + void inline setMessageId(string& messageId){ this->messageId = messageId; } + void inline setTimestamp(u_int64_t timestamp){ this->timestamp = timestamp; } + void inline setType(string& type){ this->type = type; } + void inline setUserId(string& userId){ this->userId = userId; } + void inline setAppId(string& appId){this->appId = appId; } + void inline setClusterId(string& clusterId){ this->clusterId = clusterId; } + }; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/BodyHandler.h b/cpp/common/framing/inc/BodyHandler.h new file mode 100644 index 0000000000..f92ae66804 --- /dev/null +++ b/cpp/common/framing/inc/BodyHandler.h @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _BodyHandler_ +#define _BodyHandler_ + +#include "AMQMethodBody.h" +#include "AMQHeaderBody.h" +#include "AMQContentBody.h" +#include "AMQHeartbeatBody.h" + +namespace qpid { +namespace framing { + + class BodyHandler{ + public: + virtual void handleMethod(AMQMethodBody::shared_ptr body) = 0; + virtual void handleHeader(AMQHeaderBody::shared_ptr body) = 0; + virtual void handleContent(AMQContentBody::shared_ptr body) = 0; + virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body) = 0; + + void handleBody(AMQBody::shared_ptr& body); + }; + + class UnknownBodyType{ + public: + const u_int16_t type; + inline UnknownBodyType(u_int16_t _type) : type(_type){} + }; +} +} + + +#endif diff --git a/cpp/common/framing/inc/Buffer.h b/cpp/common/framing/inc/Buffer.h new file mode 100644 index 0000000000..1ff4611f1f --- /dev/null +++ b/cpp/common/framing/inc/Buffer.h @@ -0,0 +1,77 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "FieldTable.h" + +#ifndef _Buffer_ +#define _Buffer_ + +namespace qpid { +namespace framing { + +class Buffer +{ + const int size; + char* data; + int position; + int limit; + int r_position; + int r_limit; + +public: + + Buffer(int size); + ~Buffer(); + + void flip(); + void clear(); + void compact(); + void record(); + void restore(); + int available(); + char* start(); + void move(int bytes); + + void putOctet(u_int8_t i); + void putShort(u_int16_t i); + void putLong(u_int32_t i); + void putLongLong(u_int64_t i); + + u_int8_t getOctet(); + u_int16_t getShort(); + u_int32_t getLong(); + u_int64_t getLongLong(); + + void putShortString(const string& s); + void putLongString(const string& s); + void getShortString(string& s); + void getLongString(string& s); + + void putFieldTable(const FieldTable& t); + void getFieldTable(FieldTable& t); + + void putRawData(const string& s); + void getRawData(string& s, u_int32_t size); + +}; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/FieldTable.h b/cpp/common/framing/inc/FieldTable.h new file mode 100644 index 0000000000..cf935d3284 --- /dev/null +++ b/cpp/common/framing/inc/FieldTable.h @@ -0,0 +1,68 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <vector> +#include "amqp_types.h" + +#ifndef _FieldTable_ +#define _FieldTable_ + +namespace qpid { +namespace framing { + + class NamedValue; + class Value; + class Buffer; + + class FieldTable + { + std::vector<NamedValue*> values; + NamedValue* find(const std::string& name) const; + + Value* getValue(const std::string& name) const; + void setValue(const std::string& name, Value* value); + + public: + ~FieldTable(); + u_int32_t size() const; + int count() const; + void setString(const std::string& name, const std::string& value); + void setInt(const std::string& name, int value); + void setTimestamp(const std::string& name, u_int64_t value); + void setTable(const std::string& name, const FieldTable& value); + //void setDecimal(string& name, xxx& value); + std::string getString(const std::string& name); + int getInt(const std::string& name); + u_int64_t getTimestamp(const std::string& name); + void getTable(const std::string& name, FieldTable& value); + //void getDecimal(string& name, xxx& value); + + void encode(Buffer& buffer) const; + void decode(Buffer& buffer); + + friend std::ostream& operator<<(std::ostream& out, const FieldTable& body); + }; + + class FieldNotFoundException{}; + class UnknownFieldName : public FieldNotFoundException{}; + class IncorrectFieldType : public FieldNotFoundException{}; +} +} + + +#endif diff --git a/cpp/common/framing/inc/HeaderProperties.h b/cpp/common/framing/inc/HeaderProperties.h new file mode 100644 index 0000000000..f84345c203 --- /dev/null +++ b/cpp/common/framing/inc/HeaderProperties.h @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "Buffer.h" + +#ifndef _HeaderProperties_ +#define _HeaderProperties_ + +namespace qpid { +namespace framing { + + enum header_classes{BASIC = 60}; + + class HeaderProperties + { + + public: + inline virtual ~HeaderProperties(){} + virtual u_int8_t classId() = 0; + virtual u_int32_t size() const = 0; + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, u_int32_t size) = 0; + }; +} +} + + +#endif diff --git a/cpp/common/framing/inc/InitiationHandler.h b/cpp/common/framing/inc/InitiationHandler.h new file mode 100644 index 0000000000..2e8d1e652b --- /dev/null +++ b/cpp/common/framing/inc/InitiationHandler.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _InitiationHandler_ +#define _InitiationHandler_ + +#include "ProtocolInitiation.h" + +namespace qpid { +namespace framing { + + class InitiationHandler{ + public: + virtual void initiated(ProtocolInitiation* header) = 0; + }; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/InputHandler.h b/cpp/common/framing/inc/InputHandler.h new file mode 100644 index 0000000000..2722cae0ed --- /dev/null +++ b/cpp/common/framing/inc/InputHandler.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _InputHandler_ +#define _InputHandler_ + +#include "AMQFrame.h" + +namespace qpid { +namespace framing { + + class InputHandler{ + public: + virtual void received(AMQFrame* frame) = 0; + }; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/NamedValue.h b/cpp/common/framing/inc/NamedValue.h new file mode 100644 index 0000000000..729b5d08a7 --- /dev/null +++ b/cpp/common/framing/inc/NamedValue.h @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <vector> +#include "amqp_types.h" +#include "Value.h" + +#ifndef _NamedValue_ +#define _NamedValue_ + +namespace qpid { +namespace framing { + + class Buffer; + + class NamedValue{ + string name; + Value* value; + public: + NamedValue(); + NamedValue(const string& name, Value* value); + ~NamedValue(); + void encode(Buffer& buffer); + void decode(Buffer& buffer); + u_int32_t size() const; + inline const string& getName() const { return name; } + inline Value* getValue() const { return value; } + inline void setValue(Value* val) { value = val; } + }; +} +} + + +#endif diff --git a/cpp/common/framing/inc/OutputHandler.h b/cpp/common/framing/inc/OutputHandler.h new file mode 100644 index 0000000000..7fe63660c3 --- /dev/null +++ b/cpp/common/framing/inc/OutputHandler.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> + +#ifndef _OutputHandler_ +#define _OutputHandler_ + +#include "AMQFrame.h" + +namespace qpid { +namespace framing { + + class OutputHandler{ + public: + virtual void send(AMQFrame* frame) = 0; + }; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/ProtocolInitiation.h b/cpp/common/framing/inc/ProtocolInitiation.h new file mode 100644 index 0000000000..ab9734e6b3 --- /dev/null +++ b/cpp/common/framing/inc/ProtocolInitiation.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "Buffer.h" +#include "AMQDataBlock.h" + +#ifndef _ProtocolInitiation_ +#define _ProtocolInitiation_ + +namespace qpid { +namespace framing { + +class ProtocolInitiation : virtual public AMQDataBlock +{ + u_int8_t pmajor; + u_int8_t pminor; + +public: + ProtocolInitiation(); + ProtocolInitiation(u_int8_t major, u_int8_t minor); + virtual ~ProtocolInitiation(); + virtual void encode(Buffer& buffer); + virtual bool decode(Buffer& buffer); + inline virtual u_int32_t size() const { return 8; } + inline u_int8_t getMajor(){ return pmajor; } + inline u_int8_t getMinor(){ return pminor; } +}; + +} +} + + +#endif diff --git a/cpp/common/framing/inc/Value.h b/cpp/common/framing/inc/Value.h new file mode 100644 index 0000000000..e3d2a2c1d6 --- /dev/null +++ b/cpp/common/framing/inc/Value.h @@ -0,0 +1,109 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <vector> +#include "amqp_types.h" +#include "FieldTable.h" + +#ifndef _Value_ +#define _Value_ + +namespace qpid { +namespace framing { + + class Buffer; + + class Value{ + public: + inline virtual ~Value(){} + virtual u_int32_t size() const = 0; + virtual char getType() const = 0; + virtual void encode(Buffer& buffer) = 0; + virtual void decode(Buffer& buffer) = 0; + }; + + class StringValue : public virtual Value{ + string value; + + public: + inline StringValue(const string& v) : value(v){} + inline StringValue(){} + inline string getValue(){ return value; } + ~StringValue(){} + inline virtual u_int32_t size() const { return 4 + value.length(); } + inline virtual char getType() const { return 'S'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); + }; + + class IntegerValue : public virtual Value{ + int value; + public: + inline IntegerValue(int v) : value(v){} + inline IntegerValue(){} + inline int getValue(){ return value; } + ~IntegerValue(){} + inline virtual u_int32_t size() const { return 4; } + inline virtual char getType() const { return 'I'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); + }; + + class TimeValue : public virtual Value{ + u_int64_t value; + public: + inline TimeValue(int v) : value(v){} + inline TimeValue(){} + inline u_int64_t getValue(){ return value; } + ~TimeValue(){} + inline virtual u_int32_t size() const { return 8; } + inline virtual char getType() const { return 'T'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); + }; + + class DecimalValue : public virtual Value{ + u_int8_t decimals; + u_int32_t value; + public: + inline DecimalValue(int v) : value(v){} + inline DecimalValue(){} + ~DecimalValue(){} + inline virtual u_int32_t size() const { return 5; } + inline virtual char getType() const { return 'D'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); + }; + + class FieldTableValue : public virtual Value{ + FieldTable value; + public: + inline FieldTableValue(const FieldTable& v) : value(v){} + inline FieldTableValue(){} + inline FieldTable getValue(){ return value; } + ~FieldTableValue(){} + inline virtual u_int32_t size() const { return 4 + value.size(); } + inline virtual char getType() const { return 'F'; } + virtual void encode(Buffer& buffer); + virtual void decode(Buffer& buffer); + }; +} +} + + +#endif diff --git a/cpp/common/framing/inc/amqp_framing.h b/cpp/common/framing/inc/amqp_framing.h new file mode 100644 index 0000000000..adb0045ee5 --- /dev/null +++ b/cpp/common/framing/inc/amqp_framing.h @@ -0,0 +1,31 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_types.h" +#include "AMQFrame.h" +#include "AMQBody.h" +#include "BodyHandler.h" +#include "AMQMethodBody.h" +#include "AMQHeaderBody.h" +#include "AMQContentBody.h" +#include "AMQHeartbeatBody.h" +#include "amqp_methods.h" +#include "InputHandler.h" +#include "OutputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "BasicHeaderProperties.h" diff --git a/cpp/common/framing/inc/amqp_types.h b/cpp/common/framing/inc/amqp_types.h new file mode 100644 index 0000000000..6f8ef0862a --- /dev/null +++ b/cpp/common/framing/inc/amqp_types.h @@ -0,0 +1,36 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <string> +#ifdef _WINDOWS +#include "windows.h" +typedef unsigned char u_int8_t; +typedef unsigned short u_int16_t; +typedef unsigned int u_int32_t; +typedef unsigned __int64 u_int64_t; +#endif +#ifndef _WINDOWS +#include "sys/types.h" +#endif + +#ifndef AMQP_TYPES_H +#define AMQP_TYPES_H + + +typedef std::string string; + +#endif diff --git a/cpp/common/framing/src/AMQContentBody.cpp b/cpp/common/framing/src/AMQContentBody.cpp new file mode 100644 index 0000000000..c8aadc8108 --- /dev/null +++ b/cpp/common/framing/src/AMQContentBody.cpp @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQContentBody.h" + +qpid::framing::AMQContentBody::AMQContentBody(){ +} + +qpid::framing::AMQContentBody::AMQContentBody(string& _data) : data(_data){ +} + +u_int32_t qpid::framing::AMQContentBody::size() const{ + return data.size(); +} +void qpid::framing::AMQContentBody::encode(Buffer& buffer) const{ + buffer.putRawData(data); +} +void qpid::framing::AMQContentBody::decode(Buffer& buffer, u_int32_t size){ + buffer.getRawData(data, size); +} + diff --git a/cpp/common/framing/src/AMQFrame.cpp b/cpp/common/framing/src/AMQFrame.cpp new file mode 100644 index 0000000000..70f71010ff --- /dev/null +++ b/cpp/common/framing/src/AMQFrame.cpp @@ -0,0 +1,147 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQFrame.h" +#include "QpidError.h" + +using namespace qpid::framing; + +AMQFrame::AMQFrame(){} + +AMQFrame::AMQFrame(u_int16_t _channel, AMQBody* _body) : channel(_channel), body(_body){} + +AMQFrame::AMQFrame(u_int16_t _channel, AMQBody::shared_ptr& _body) : channel(_channel), body(_body){} + +AMQFrame::~AMQFrame(){ +} + +u_int16_t AMQFrame::getChannel(){ + return channel; +} + +AMQBody::shared_ptr& AMQFrame::getBody(){ + return body; +} + +void AMQFrame::encode(Buffer& buffer) +{ + buffer.putOctet(body->type()); + buffer.putShort(channel); + buffer.putLong(body->size()); + body->encode(buffer); + buffer.putOctet(0xCE); +} + +AMQBody::shared_ptr createMethodBody(Buffer& buffer){ + u_int16_t classId = buffer.getShort(); + u_int16_t methodId = buffer.getShort(); + AMQBody::shared_ptr body(createAMQMethodBody(classId, methodId)); + return body; +} + +u_int32_t AMQFrame::size() const{ + if(!body.get()) THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to get size of frame with no body set!"); + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() + 1/*0xCE*/; +} + +bool AMQFrame::decode(Buffer& buffer) +{ + if(buffer.available() < 7) return false; + buffer.record(); + u_int8_t type = buffer.getOctet(); + channel = buffer.getShort(); + u_int32_t size = buffer.getLong(); + if(buffer.available() < size + 1){ + buffer.restore(); + return false; + } + switch(type) + { + case METHOD_BODY: + body = createMethodBody(buffer); + break; + case HEADER_BODY: + body = AMQBody::shared_ptr(new AMQHeaderBody()); + break; + case CONTENT_BODY: + body = AMQBody::shared_ptr(new AMQContentBody()); + break; + case HEARTBEAT_BODY: + body = AMQBody::shared_ptr(new AMQHeartbeatBody()); + break; + default: + string msg("Unknown body type: "); + msg += type; + THROW_QPID_ERROR(FRAMING_ERROR, msg); + } + body->decode(buffer, size); + u_int8_t end = buffer.getOctet(); + if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); + return true; +} + +u_int32_t AMQFrame::decodeHead(Buffer& buffer){ + type = buffer.getOctet(); + channel = buffer.getShort(); + return buffer.getLong(); +} + +void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) +{ + switch(type) + { + case METHOD_BODY: + body = createMethodBody(buffer); + break; + case HEADER_BODY: + body = AMQBody::shared_ptr(new AMQHeaderBody()); + break; + case CONTENT_BODY: + body = AMQBody::shared_ptr(new AMQContentBody()); + break; + case HEARTBEAT_BODY: + body = AMQBody::shared_ptr(new AMQHeartbeatBody()); + break; + default: + string msg("Unknown body type: "); + msg += type; + THROW_QPID_ERROR(FRAMING_ERROR, msg); + } + body->decode(buffer, size); +} + +std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t){ + out << "Frame[channel=" << t.channel << "; "; + if(t.body.get() == 0){ + out << "empty"; + }else if(t.body->type() == METHOD_BODY){ + (dynamic_cast<AMQMethodBody*>(t.body.get()))->print(out); + }else if(t.body->type() == HEADER_BODY){ + out << "header, content_size=" << + (dynamic_cast<AMQHeaderBody*>(t.body.get()))->getContentSize() + << " (" << t.body->size() << " bytes)"; + }else if(t.body->type() == CONTENT_BODY){ + out << "content (" << t.body->size() << " bytes)"; + }else if(t.body->type() == HEARTBEAT_BODY){ + out << "heartbeat"; + }else{ + out << "unknown type, " << t.body->type(); + } + out << "]"; + return out; +} + diff --git a/cpp/common/framing/src/AMQHeaderBody.cpp b/cpp/common/framing/src/AMQHeaderBody.cpp new file mode 100644 index 0000000000..4bf1626a8a --- /dev/null +++ b/cpp/common/framing/src/AMQHeaderBody.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQHeaderBody.h" +#include "QpidError.h" +#include "BasicHeaderProperties.h" + +qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){ + createProperties(classId); +} + +qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){ +} + +qpid::framing::AMQHeaderBody::~AMQHeaderBody(){ + delete properties; +} + +u_int32_t qpid::framing::AMQHeaderBody::size() const{ + return 12 + properties->size(); +} + +void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { + buffer.putShort(properties->classId()); + buffer.putShort(weight); + buffer.putLongLong(contentSize); + properties->encode(buffer); +} + +void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, u_int32_t size){ + u_int16_t classId = buffer.getShort(); + weight = buffer.getShort(); + contentSize = buffer.getLongLong(); + createProperties(classId); + properties->decode(buffer, size - 12); +} + +void qpid::framing::AMQHeaderBody::createProperties(int classId){ + switch(classId){ + case BASIC: + properties = new qpid::framing::BasicHeaderProperties(); + break; + default: + THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class"); + } +} diff --git a/cpp/common/framing/src/AMQMethodBody.cpp b/cpp/common/framing/src/AMQMethodBody.cpp new file mode 100644 index 0000000000..73862bb2bf --- /dev/null +++ b/cpp/common/framing/src/AMQMethodBody.cpp @@ -0,0 +1,43 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "AMQMethodBody.h" +#include "QpidError.h" + +void qpid::framing::AMQMethodBody::encode(Buffer& buffer) const{ + buffer.putShort(amqpClassId()); + buffer.putShort(amqpMethodId()); + encodeContent(buffer); +} + +void qpid::framing::AMQMethodBody::decode(Buffer& buffer, u_int32_t size){ + decodeContent(buffer); +} + +bool qpid::framing::AMQMethodBody::match(AMQMethodBody* other) const{ + return other != 0 && other->amqpClassId() == amqpClassId() && other->amqpMethodId() == amqpMethodId(); +} + +void qpid::framing::AMQMethodBody::invoke(AMQP_ServerOperations& target, u_int16_t channel){ + THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); +} + + +std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQMethodBody& m){ + m.print(out); + return out; +} diff --git a/cpp/common/framing/src/BasicHeaderProperties.cpp b/cpp/common/framing/src/BasicHeaderProperties.cpp new file mode 100644 index 0000000000..4219d33a8f --- /dev/null +++ b/cpp/common/framing/src/BasicHeaderProperties.cpp @@ -0,0 +1,102 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "BasicHeaderProperties.h" + +//TODO: This could be easily generated from the spec + +qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(0), priority(0), timestamp(0){} +qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){} + +u_int32_t qpid::framing::BasicHeaderProperties::size() const{ + u_int32_t size = 2;//flags + if(contentType.length() > 0) size += contentType.length() + 1; + if(contentEncoding.length() > 0) size += contentEncoding.length() + 1; + if(headers.count() > 0) size += headers.size(); + if(deliveryMode != 0) size += 1; + if(priority != 0) size += 1; + if(correlationId.length() > 0) size += correlationId.length() + 1; + if(replyTo.length() > 0) size += replyTo.length() + 1; + if(expiration.length() > 0) size += expiration.length() + 1; + if(messageId.length() > 0) size += messageId.length() + 1; + if(timestamp != 0) size += 8; + if(type.length() > 0) size += type.length() + 1; + if(userId.length() > 0) size += userId.length() + 1; + if(appId.length() > 0) size += appId.length() + 1; + if(clusterId.length() > 0) size += clusterId.length() + 1; + + return size; +} + +void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer) const{ + u_int16_t flags = getFlags(); + buffer.putShort(flags); + + if(contentType.length() > 0) buffer.putShortString(contentType); + if(contentEncoding.length() > 0) buffer.putShortString(contentEncoding); + if(headers.count() > 0) buffer.putFieldTable(headers); + if(deliveryMode != 0) buffer.putOctet(deliveryMode); + if(priority != 0) buffer.putOctet(priority); + if(correlationId.length() > 0) buffer.putShortString(correlationId); + if(replyTo.length() > 0) buffer.putShortString(replyTo); + if(expiration.length() > 0) buffer.putShortString(expiration); + if(messageId.length() > 0) buffer.putShortString(messageId); + if(timestamp != 0) buffer.putLongLong(timestamp);; + if(type.length() > 0) buffer.putShortString(type); + if(userId.length() > 0) buffer.putShortString(userId); + if(appId.length() > 0) buffer.putShortString(appId); + if(clusterId.length() > 0) buffer.putShortString(clusterId); +} + +void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, u_int32_t size){ + u_int16_t flags = buffer.getShort(); + int shift = 15; + if(flags & (1 << 15)) buffer.getShortString(contentType); + if(flags & (1 << 14)) buffer.getShortString(contentEncoding); + if(flags & (1 << 13)) buffer.getFieldTable(headers); + if(flags & (1 << 12)) deliveryMode = buffer.getOctet(); + if(flags & (1 << 11)) priority = buffer.getOctet(); + if(flags & (1 << 10)) buffer.getShortString(correlationId); + if(flags & (1 << 9)) buffer.getShortString(replyTo); + if(flags & (1 << 8)) buffer.getShortString(expiration); + if(flags & (1 << 7)) buffer.getShortString(messageId); + if(flags & (1 << 6)) timestamp = buffer.getLongLong(); + if(flags & (1 << 5)) buffer.getShortString(type); + if(flags & (1 << 4)) buffer.getShortString(userId); + if(flags & (1 << 3)) buffer.getShortString(appId); + if(flags & (1 << 2)) buffer.getShortString(clusterId); +} + +u_int16_t qpid::framing::BasicHeaderProperties::getFlags() const{ + u_int16_t flags(0); + int shift = 15; + if(contentType.length() > 0) flags |= (1 << 15); + if(contentEncoding.length() > 0) flags |= (1 << 14); + if(headers.count() > 0) flags |= (1 << 13); + if(deliveryMode != 0) flags |= (1 << 12); + if(priority != 0) flags |= (1 << 11); + if(correlationId.length() > 0) flags |= (1 << 10); + if(replyTo.length() > 0) flags |= (1 << 9); + if(expiration.length() > 0) flags |= (1 << 8); + if(messageId.length() > 0) flags |= (1 << 7); + if(timestamp != 0) flags |= (1 << 6); + if(type.length() > 0) flags |= (1 << 5); + if(userId.length() > 0) flags |= (1 << 4); + if(appId.length() > 0) flags |= (1 << 3); + if(clusterId.length() > 0) flags |= (1 << 2); + return flags; +} diff --git a/cpp/common/framing/src/BodyHandler.cpp b/cpp/common/framing/src/BodyHandler.cpp new file mode 100644 index 0000000000..4e4e2e02f7 --- /dev/null +++ b/cpp/common/framing/src/BodyHandler.cpp @@ -0,0 +1,49 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "memory.h" +#include "BodyHandler.h" + +using namespace qpid::framing; +using namespace std::tr1; + +void BodyHandler::handleBody(AMQBody::shared_ptr& body){ + + switch(body->type()) + { + + case METHOD_BODY: + handleMethod(dynamic_pointer_cast<AMQMethodBody, AMQBody>(body)); + break; + + case HEADER_BODY: + handleHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body)); + break; + + case CONTENT_BODY: + handleContent(dynamic_pointer_cast<AMQContentBody, AMQBody>(body)); + break; + + case HEARTBEAT_BODY: + handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body)); + break; + + default: + throw UnknownBodyType(body->type()); + } + +} diff --git a/cpp/common/framing/src/Buffer.cpp b/cpp/common/framing/src/Buffer.cpp new file mode 100644 index 0000000000..5264491980 --- /dev/null +++ b/cpp/common/framing/src/Buffer.cpp @@ -0,0 +1,167 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Buffer.h" + +qpid::framing::Buffer::Buffer(int _size) : size(_size), position(0), limit(_size){ + data = new char[size]; +} + +qpid::framing::Buffer::~Buffer(){ + delete[] data; +} + +void qpid::framing::Buffer::flip(){ + limit = position; + position = 0; +} + +void qpid::framing::Buffer::clear(){ + limit = size; + position = 0; +} + +void qpid::framing::Buffer::compact(){ + int p = limit - position; + //copy p chars from position to 0 + memmove(data, data + position, p); + limit = size; + position = p; +} + +void qpid::framing::Buffer::record(){ + r_position = position; + r_limit = limit; +} + +void qpid::framing::Buffer::restore(){ + position = r_position; + limit = r_limit; +} + +int qpid::framing::Buffer::available(){ + return limit - position; +} + +char* qpid::framing::Buffer::start(){ + return data + position; +} + +void qpid::framing::Buffer::move(int bytes){ + position += bytes; +} + +void qpid::framing::Buffer::putOctet(u_int8_t i){ + data[position++] = i; +} + +void qpid::framing::Buffer::putShort(u_int16_t i){ + u_int16_t b = i; + data[position++] = (u_int8_t) (0xFF & (b >> 8)); + data[position++] = (u_int8_t) (0xFF & b); +} + +void qpid::framing::Buffer::putLong(u_int32_t i){ + u_int32_t b = i; + data[position++] = (u_int8_t) (0xFF & (b >> 24)); + data[position++] = (u_int8_t) (0xFF & (b >> 16)); + data[position++] = (u_int8_t) (0xFF & (b >> 8)); + data[position++] = (u_int8_t) (0xFF & b); +} + +void qpid::framing::Buffer::putLongLong(u_int64_t i){ + u_int32_t hi = i >> 32; + u_int32_t lo = i; + putLong(hi); + putLong(lo); +} + +u_int8_t qpid::framing::Buffer::getOctet(){ + return (u_int8_t) data[position++]; +} + +u_int16_t qpid::framing::Buffer::getShort(){ + u_int16_t hi = (unsigned char) data[position++]; + hi = hi << 8; + hi |= (unsigned char) data[position++]; + return hi; +} + +u_int32_t qpid::framing::Buffer::getLong(){ + u_int32_t a = (unsigned char) data[position++]; + u_int32_t b = (unsigned char) data[position++]; + u_int32_t c = (unsigned char) data[position++]; + u_int32_t d = (unsigned char) data[position++]; + a = a << 24; + a |= b << 16; + a |= c << 8; + a |= d; + return a; +} + +u_int64_t qpid::framing::Buffer::getLongLong(){ + u_int64_t hi = getLong(); + u_int64_t lo = getLong(); + hi = hi << 32; + return hi | lo; +} + + +void qpid::framing::Buffer::putShortString(const string& s){ + u_int8_t size = s.length(); + putOctet(size); + s.copy(data + position, size); + position += size; +} + +void qpid::framing::Buffer::putLongString(const string& s){ + u_int32_t size = s.length(); + putLong(size); + s.copy(data + position, size); + position += size; +} + +void qpid::framing::Buffer::getShortString(string& s){ + u_int8_t size = getOctet(); + s.assign(data + position, size); + position += size; +} + +void qpid::framing::Buffer::getLongString(string& s){ + u_int32_t size = getLong(); + s.assign(data + position, size); + position += size; +} + +void qpid::framing::Buffer::putFieldTable(const FieldTable& t){ + t.encode(*this); +} + +void qpid::framing::Buffer::getFieldTable(FieldTable& t){ + t.decode(*this); +} + +void qpid::framing::Buffer::putRawData(const string& s){ + u_int32_t size = s.length(); + s.copy(data + position, size); + position += size; +} + +void qpid::framing::Buffer::getRawData(string& s, u_int32_t size){ + s.assign(data + position, size); + position += size; +} diff --git a/cpp/common/framing/src/FieldTable.cpp b/cpp/common/framing/src/FieldTable.cpp new file mode 100644 index 0000000000..048cefa83c --- /dev/null +++ b/cpp/common/framing/src/FieldTable.cpp @@ -0,0 +1,127 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "FieldTable.h" +#include "NamedValue.h" +#include "QpidError.h" +#include "Buffer.h" +#include "Value.h" + +qpid::framing::FieldTable::~FieldTable(){ + int count(values.size()); + for(int i = 0; i < count; i++){ + delete values[i]; + } +} + +u_int32_t qpid::framing::FieldTable::size() const { + u_int32_t size(4); + int count(values.size()); + for(int i = 0; i < count; i++){ + size += values[i]->size(); + } + return size; +} + +int qpid::framing::FieldTable::count() const { + return values.size(); +} + +std::ostream& qpid::framing::operator<<(std::ostream& out, const FieldTable& t){ + out << "field_table{}"; + return out; +} + +void qpid::framing::FieldTable::setString(const std::string& name, const std::string& value){ + setValue(name, new StringValue(value)); +} + +void qpid::framing::FieldTable::setInt(const std::string& name, int value){ + setValue(name, new IntegerValue(value)); +} + +void qpid::framing::FieldTable::setTimestamp(const std::string& name, u_int64_t value){ + setValue(name, new TimeValue(value)); +} + +void qpid::framing::FieldTable::setTable(const std::string& name, const FieldTable& value){ + setValue(name, new FieldTableValue(value)); +} + +std::string qpid::framing::FieldTable::getString(const std::string& name){ + StringValue* val = dynamic_cast<StringValue*>(getValue(name)); + return (val == 0 ? "" : val->getValue()); +} + +int qpid::framing::FieldTable::getInt(const std::string& name){ + IntegerValue* val = dynamic_cast<IntegerValue*>(getValue(name)); + return (val == 0 ? 0 : val->getValue()); +} + +u_int64_t qpid::framing::FieldTable::getTimestamp(const std::string& name){ + TimeValue* val = dynamic_cast<TimeValue*>(getValue(name)); + return (val == 0 ? 0 : val->getValue()); +} + +void qpid::framing::FieldTable::getTable(const std::string& name, FieldTable& value){ + FieldTableValue* val = dynamic_cast<FieldTableValue*>(getValue(name)); + if(val != 0) value = val->getValue(); +} + +qpid::framing::NamedValue* qpid::framing::FieldTable::find(const std::string& name) const{ + int count(values.size()); + for(int i = 0; i < count; i++){ + if(values[i]->getName() == name) return values[i]; + } + return 0; +} + +qpid::framing::Value* qpid::framing::FieldTable::getValue(const std::string& name) const{ + NamedValue* val = find(name); + return val == 0 ? 0 : val->getValue(); +} + +void qpid::framing::FieldTable::setValue(const std::string& name, Value* value){ + NamedValue* val = find(name); + if(val == 0){ + val = new NamedValue(name, value); + values.push_back(val); + }else{ + Value* old = val->getValue(); + if(old != 0) delete old; + val->setValue(value); + } +} + +void qpid::framing::FieldTable::encode(Buffer& buffer) const{ + buffer.putLong(size() - 4); + int count(values.size()); + for(int i = 0; i < count; i++){ + values[i]->encode(buffer); + } +} + +void qpid::framing::FieldTable::decode(Buffer& buffer){ + u_int32_t size = buffer.getLong(); + int leftover = buffer.available() - size; + + while(buffer.available() > leftover){ + NamedValue* value = new NamedValue(); + value->decode(buffer); + values.push_back(value); + } +} diff --git a/cpp/common/framing/src/NamedValue.cpp b/cpp/common/framing/src/NamedValue.cpp new file mode 100644 index 0000000000..e80aea433c --- /dev/null +++ b/cpp/common/framing/src/NamedValue.cpp @@ -0,0 +1,67 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "NamedValue.h" +#include "QpidError.h" +#include "Buffer.h" +#include "FieldTable.h" + +qpid::framing::NamedValue::NamedValue() : value(0){} + +qpid::framing::NamedValue::NamedValue(const string& n, Value* v) : name(n), value(v){} + +qpid::framing::NamedValue::~NamedValue(){ + if(value != 0){ + delete value; + } +} + +u_int32_t qpid::framing::NamedValue::size() const{ + return value ? 1/*size of name*/ + name.length() + 1/*type char*/ + value->size() : 0; +} + +void qpid::framing::NamedValue::encode(Buffer& buffer){ + buffer.putShortString(name); + u_int8_t type = value->getType(); + buffer.putOctet(type); + value->encode(buffer); +} + +void qpid::framing::NamedValue::decode(Buffer& buffer){ + buffer.getShortString(name); + u_int8_t type = buffer.getOctet(); + switch(type){ + case 'S': + value = new StringValue(); + break; + case 'I': + value = new IntegerValue(); + break; + case 'D': + value = new DecimalValue(); + break; + case 'T': + value = new TimeValue(); + break; + case 'F': + value = new FieldTableValue(); + break; + default: + THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type"); + } + value->decode(buffer); +} diff --git a/cpp/common/framing/src/ProtocolInitiation.cpp b/cpp/common/framing/src/ProtocolInitiation.cpp new file mode 100644 index 0000000000..6806d73b55 --- /dev/null +++ b/cpp/common/framing/src/ProtocolInitiation.cpp @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "ProtocolInitiation.h" + +qpid::framing::ProtocolInitiation::ProtocolInitiation(){} + +qpid::framing::ProtocolInitiation::ProtocolInitiation(u_int8_t _major, u_int8_t _minor) : pmajor(_major), pminor(_minor){} + +qpid::framing::ProtocolInitiation::~ProtocolInitiation(){} + +void qpid::framing::ProtocolInitiation::encode(Buffer& buffer){ + buffer.putOctet('A'); + buffer.putOctet('M'); + buffer.putOctet('Q'); + buffer.putOctet('P'); + buffer.putOctet(1);//class + buffer.putOctet(1);//instance + buffer.putOctet(pmajor); + buffer.putOctet(pminor); +} + +bool qpid::framing::ProtocolInitiation::decode(Buffer& buffer){ + if(buffer.available() >= 8){ + buffer.getOctet();//A + buffer.getOctet();//M + buffer.getOctet();//Q + buffer.getOctet();//P + buffer.getOctet();//class + buffer.getOctet();//instance + pmajor = buffer.getOctet(); + pminor = buffer.getOctet(); + return true; + }else{ + return false; + } +} + +//TODO: this should prbably be generated from the spec at some point to keep the version numbers up to date diff --git a/cpp/common/framing/src/Value.cpp b/cpp/common/framing/src/Value.cpp new file mode 100644 index 0000000000..240b086696 --- /dev/null +++ b/cpp/common/framing/src/Value.cpp @@ -0,0 +1,57 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Value.h" +#include "Buffer.h" +#include "FieldTable.h" + +void qpid::framing::StringValue::encode(Buffer& buffer){ + buffer.putLongString(value); +} +void qpid::framing::StringValue::decode(Buffer& buffer){ + buffer.getLongString(value); +} + +void qpid::framing::IntegerValue::encode(Buffer& buffer){ + buffer.putLong((u_int32_t) value); +} +void qpid::framing::IntegerValue::decode(Buffer& buffer){ + value = buffer.getLong(); +} + +void qpid::framing::TimeValue::encode(Buffer& buffer){ + buffer.putLongLong(value); +} +void qpid::framing::TimeValue::decode(Buffer& buffer){ + value = buffer.getLongLong(); +} + +void qpid::framing::DecimalValue::encode(Buffer& buffer){ + buffer.putOctet(decimals); + buffer.putLong(value); +} +void qpid::framing::DecimalValue::decode(Buffer& buffer){ + decimals = buffer.getOctet(); + value = buffer.getLong(); +} + +void qpid::framing::FieldTableValue::encode(Buffer& buffer){ + buffer.putFieldTable(value); +} +void qpid::framing::FieldTableValue::decode(Buffer& buffer){ + buffer.getFieldTable(value); +} diff --git a/cpp/common/framing/test/BodyHandlerTest.cpp b/cpp/common/framing/test/BodyHandlerTest.cpp new file mode 100644 index 0000000000..94038d9a6c --- /dev/null +++ b/cpp/common/framing/test/BodyHandlerTest.cpp @@ -0,0 +1,107 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "amqp_framing.h" +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> + +using namespace qpid::framing; + +class BodyHandlerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(BodyHandlerTest); + CPPUNIT_TEST(testMethod); + CPPUNIT_TEST(testHeader); + CPPUNIT_TEST(testContent); + CPPUNIT_TEST(testHeartbeat); + CPPUNIT_TEST_SUITE_END(); +private: + + class TestBodyHandler : public BodyHandler{ + AMQMethodBody* const method; + AMQHeaderBody* const header; + AMQContentBody* const content; + AMQHeartbeatBody* const heartbeat; + + public: + + TestBodyHandler(AMQMethodBody* _method) : method(_method), header(0), content(0), heartbeat(0){} + TestBodyHandler(AMQHeaderBody* _header) : method(0), header(_header), content(0), heartbeat(0){} + TestBodyHandler(AMQContentBody* _content) : method(0), header(0), content(_content), heartbeat(0){} + TestBodyHandler(AMQHeartbeatBody* _heartbeat) : method(0), header(0), content(0), heartbeat(_heartbeat){} + + virtual void handleMethod(AMQMethodBody::shared_ptr body){ + CPPUNIT_ASSERT(method); + CPPUNIT_ASSERT_EQUAL(method, body.get()); + } + virtual void handleHeader(AMQHeaderBody::shared_ptr body){ + CPPUNIT_ASSERT(header); + CPPUNIT_ASSERT_EQUAL(header, body.get()); + } + virtual void handleContent(AMQContentBody::shared_ptr body){ + CPPUNIT_ASSERT(content); + CPPUNIT_ASSERT_EQUAL(content, body.get()); + } + virtual void handleHeartbeat(AMQHeartbeatBody::shared_ptr body){ + CPPUNIT_ASSERT(heartbeat); + CPPUNIT_ASSERT_EQUAL(heartbeat, body.get()); + } + }; + +public: + + void testMethod() + { + AMQMethodBody* method = new QueueDeclareBody(); + AMQFrame frame(0, method); + TestBodyHandler handler(method); + handler.handleBody(frame.getBody()); + } + + void testHeader() + { + AMQHeaderBody* header = new AMQHeaderBody(); + AMQFrame frame(0, header); + TestBodyHandler handler(header); + handler.handleBody(frame.getBody()); + } + + void testContent() + { + AMQContentBody* content = new AMQContentBody(); + AMQFrame frame(0, content); + TestBodyHandler handler(content); + handler.handleBody(frame.getBody()); + } + + void testHeartbeat() + { + AMQHeartbeatBody* heartbeat = new AMQHeartbeatBody(); + AMQFrame frame(0, heartbeat); + TestBodyHandler handler(heartbeat); + handler.handleBody(frame.getBody()); + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(BodyHandlerTest); + diff --git a/cpp/common/framing/test/Makefile b/cpp/common/framing/test/Makefile new file mode 100644 index 0000000000..487b8d537b --- /dev/null +++ b/cpp/common/framing/test/Makefile @@ -0,0 +1,21 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +QPID_HOME = ../../../.. +LDLIBS=-lapr-1 -lcppunit $(COMMON_LIB) +INCLUDES=$(TEST_INCLUDES) -I ../generated +include ${QPID_HOME}/cpp/test_plugins.mk + diff --git a/cpp/common/framing/test/field_table_test.cpp b/cpp/common/framing/test/field_table_test.cpp new file mode 100644 index 0000000000..48332e05bc --- /dev/null +++ b/cpp/common/framing/test/field_table_test.cpp @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "amqp_framing.h" +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> + +using namespace qpid::framing; + +class FieldTableTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(FieldTableTest); + CPPUNIT_TEST(testMe); + CPPUNIT_TEST_SUITE_END(); + + public: + + void testMe() + { + FieldTable ft; + ft.setString("A", "BCDE"); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft.getString("A")); + + Buffer buffer(100); + buffer.putFieldTable(ft); + buffer.flip(); + FieldTable ft2; + buffer.getFieldTable(ft2); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), ft2.getString("A")); + + } +}; + + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FieldTableTest); + diff --git a/cpp/common/framing/test/framing_test.cpp b/cpp/common/framing/test/framing_test.cpp new file mode 100644 index 0000000000..1978c2cbed --- /dev/null +++ b/cpp/common/framing/test/framing_test.cpp @@ -0,0 +1,147 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "amqp_framing.h" +#include "ConnectionRedirectBody.h" +#include <iostream> +#include <sstream> +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> +#include <typeinfo> + +using namespace qpid::framing; + +// TODO aconway 2006-09-12: Why do we need explicit qpid::framing:: below? + +template <class T> +std::string tostring(const T& x) +{ + std::ostringstream out; + out << x; + return out.str(); +} + +class FramingTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(FramingTest); + CPPUNIT_TEST(testBasicQosBody); + CPPUNIT_TEST(testConnectionSecureBody); + CPPUNIT_TEST(testConnectionRedirectBody); + CPPUNIT_TEST(testAccessRequestBody); + CPPUNIT_TEST(testBasicConsumeBody); + CPPUNIT_TEST(ConnectionRedirectBody); + CPPUNIT_TEST(BasicConsumeOkBody); + CPPUNIT_TEST_SUITE_END(); + + private: + Buffer buffer; + + public: + + FramingTest() : buffer(100) {} + + void testBasicQosBody() + { + BasicQosBody in(0xCAFEBABE, 0xABBA, true); + in.encodeContent(buffer); + buffer.flip(); + BasicQosBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testConnectionSecureBody() + { + std::string s = "security credential"; + ConnectionSecureBody in(s); + in.encodeContent(buffer); + buffer.flip(); + ConnectionSecureBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testConnectionRedirectBody() + { + std::string a = "hostA"; + std::string b = "hostB"; + qpid::framing::ConnectionRedirectBody in(a, b); + in.encodeContent(buffer); + buffer.flip(); + qpid::framing::ConnectionRedirectBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testAccessRequestBody() + { + std::string s = "text"; + AccessRequestBody in(s, true, false, true, false, true); + in.encodeContent(buffer); + buffer.flip(); + AccessRequestBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void testBasicConsumeBody() + { + std::string q = "queue"; + std::string t = "tag"; + BasicConsumeBody in(0, q, t, false, true, false, false); + in.encodeContent(buffer); + buffer.flip(); + BasicConsumeBody out; + out.decodeContent(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + + void ConnectionRedirectBody() + { + std::string a = "hostA"; + std::string b = "hostB"; + AMQFrame in(999, new qpid::framing::ConnectionRedirectBody(a, b)); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + out.decode(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + + void BasicConsumeOkBody() + { + std::string s = "hostA"; + AMQFrame in(999, new qpid::framing::BasicConsumeOkBody(s)); + in.encode(buffer); + buffer.flip(); + AMQFrame out; + for(int i = 0; i < 5; i++){ + out.decode(buffer); + CPPUNIT_ASSERT_EQUAL(tostring(in), tostring(out)); + } + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(FramingTest); + + + diff --git a/cpp/common/framing/test/header_test.cpp b/cpp/common/framing/test/header_test.cpp new file mode 100644 index 0000000000..0ff6d47d57 --- /dev/null +++ b/cpp/common/framing/test/header_test.cpp @@ -0,0 +1,144 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "amqp_framing.h" +#include <cppunit/TestCase.h> +#include <cppunit/TextTestRunner.h> +#include <cppunit/extensions/HelperMacros.h> +#include <cppunit/plugin/TestPlugIn.h> + +using namespace qpid::framing; + +class HeaderTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(HeaderTest); + CPPUNIT_TEST(testGenericProperties); + CPPUNIT_TEST(testAllSpecificProperties); + CPPUNIT_TEST(testSomeSpecificProperties); + CPPUNIT_TEST_SUITE_END(); + +public: + + // TODO aconway 2006-09-12: Need more detailed tests, + // need tests to assert something! + // + void testGenericProperties() + { + AMQHeaderBody body(BASIC); + dynamic_cast<BasicHeaderProperties*>(body.getProperties())->getHeaders().setString("A", "BCDE"); + Buffer buffer(100); + + body.encode(buffer); + buffer.flip(); + AMQHeaderBody body2; + body2.decode(buffer, body.size()); + BasicHeaderProperties* props = + dynamic_cast<BasicHeaderProperties*>(body2.getProperties()); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), + props->getHeaders().getString("A")); + } + + void testAllSpecificProperties(){ + string contentType("text/html"); + string contentEncoding("UTF8"); + u_int8_t deliveryMode(2); + u_int8_t priority(3); + string correlationId("abc"); + string replyTo("no-address"); + string expiration("why is this a string?"); + string messageId("xyz"); + u_int64_t timestamp(0xabcd); + string type("eh?"); + string userId("guest"); + string appId("just testing"); + string clusterId("no clustering required"); + + AMQHeaderBody body(BASIC); + BasicHeaderProperties* properties = + dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + properties->setContentType(contentType); + properties->getHeaders().setString("A", "BCDE"); + properties->setDeliveryMode(deliveryMode); + properties->setPriority(priority); + properties->setCorrelationId(correlationId); + properties->setReplyTo(replyTo); + properties->setExpiration(expiration); + properties->setMessageId(messageId); + properties->setTimestamp(timestamp); + properties->setType(type); + properties->setUserId(userId); + properties->setAppId(appId); + properties->setClusterId(clusterId); + + Buffer buffer(10000); + body.encode(buffer); + buffer.flip(); + AMQHeaderBody temp; + temp.decode(buffer, body.size()); + properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + + CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); + CPPUNIT_ASSERT_EQUAL(std::string("BCDE"), properties->getHeaders().getString("A")); + CPPUNIT_ASSERT_EQUAL(deliveryMode, properties->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL(priority, properties->getPriority()); + CPPUNIT_ASSERT_EQUAL(correlationId, properties->getCorrelationId()); + CPPUNIT_ASSERT_EQUAL(replyTo, properties->getReplyTo()); + CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); + CPPUNIT_ASSERT_EQUAL(messageId, properties->getMessageId()); + CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); + CPPUNIT_ASSERT_EQUAL(type, properties->getType()); + CPPUNIT_ASSERT_EQUAL(userId, properties->getUserId()); + CPPUNIT_ASSERT_EQUAL(appId, properties->getAppId()); + CPPUNIT_ASSERT_EQUAL(clusterId, properties->getClusterId()); + } + + void testSomeSpecificProperties(){ + string contentType("application/octet-stream"); + u_int8_t deliveryMode(5); + u_int8_t priority(6); + string expiration("Z"); + u_int64_t timestamp(0xabe4a34a); + + AMQHeaderBody body(BASIC); + BasicHeaderProperties* properties = + dynamic_cast<BasicHeaderProperties*>(body.getProperties()); + properties->setContentType(contentType); + properties->setDeliveryMode(deliveryMode); + properties->setPriority(priority); + properties->setExpiration(expiration); + properties->setTimestamp(timestamp); + + Buffer buffer(100); + body.encode(buffer); + buffer.flip(); + AMQHeaderBody temp; + temp.decode(buffer, body.size()); + properties = dynamic_cast<BasicHeaderProperties*>(temp.getProperties()); + + CPPUNIT_ASSERT_EQUAL(contentType, properties->getContentType()); + CPPUNIT_ASSERT_EQUAL((int) deliveryMode, (int) properties->getDeliveryMode()); + CPPUNIT_ASSERT_EQUAL((int) priority, (int) properties->getPriority()); + CPPUNIT_ASSERT_EQUAL(expiration, properties->getExpiration()); + CPPUNIT_ASSERT_EQUAL(timestamp, properties->getTimestamp()); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(HeaderTest); + diff --git a/cpp/common/io/Makefile b/cpp/common/io/Makefile new file mode 100644 index 0000000000..e94e802afa --- /dev/null +++ b/cpp/common/io/Makefile @@ -0,0 +1,35 @@ + # + # Copyright (c) 2006 The Apache Software Foundation + # + # Licensed under the Apache License, Version 2.0 (the "License"); + # you may not use this file except in compliance with the License. + # You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + # + +QPID_HOME = ../../.. +include ${QPID_HOME}/cpp/options.mk + +# Compiler flags +CXXFLAGS = ${DEBUG} ${OPT} -MMD -I inc -I ../concurrent/inc -I ../error/inc -I ../framing/inc -I ../framing/generated -I ${APR_HOME}/include/apr-1/ + +SOURCES := $(wildcard src/*.cpp) +OBJECTS := $(subst .cpp,.o,$(SOURCES)) +DEPS := $(subst .cpp,.d,$(SOURCES)) + +.PHONY: all clean + +all: ${OBJECTS} + +-include $(DEPS) + +clean : + -@rm -f ${OBJECTS} src/*.d + diff --git a/cpp/common/io/inc/APRConnector.h b/cpp/common/io/inc/APRConnector.h new file mode 100644 index 0000000000..c292c4d7e0 --- /dev/null +++ b/cpp/common/io/inc/APRConnector.h @@ -0,0 +1,95 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRConnector_ +#define _APRConnector_ + +#include "apr_network_io.h" +#include "apr_time.h" + +#include "InputHandler.h" +#include "OutputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "ShutdownHandler.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "Connector.h" +#include "APRMonitor.h" + +namespace qpid { +namespace io { + + class APRConnector : public virtual qpid::framing::OutputHandler, + public virtual Connector, + private virtual qpid::concurrent::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + apr_time_t lastIn; + apr_time_t lastOut; + apr_interval_time_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::concurrent::APRMonitor* writeLock; + qpid::concurrent::ThreadFactory* threadFactory; + qpid::concurrent::Thread* receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, int available); + void setSocketTimeout(); + + void run(); + + public: + APRConnector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~APRConnector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/APRIOProcessor.h b/cpp/common/io/inc/APRIOProcessor.h new file mode 100644 index 0000000000..de0d67a9c4 --- /dev/null +++ b/cpp/common/io/inc/APRIOProcessor.h @@ -0,0 +1,86 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRIOProcessor_ +#define _APRIOProcessor_ + +#include "apr_poll.h" +#include <queue> +#include <iostream> +#include "APRMonitor.h" +#include "APRThread.h" +#include "IOSession.h" +#include "Runnable.h" + +namespace qpid { +namespace io { + + /** + * Manages non-blocking io through the APR polling + * routines. Interacts with the actual io tasks to be performed + * through the IOSession interface, an implementing instance of + * which must be set as the client_data of the apr_pollfd_t + * structures registered. + */ + class APRIOProcessor : private virtual qpid::concurrent::Runnable + { + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int count; + qpid::concurrent::APRThread thread; + qpid::concurrent::APRMonitor lock; + volatile bool stopped; + + void poll(); + virtual void run(); + + public: + APRIOProcessor(apr_pool_t* pool, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance implementing IOSession, through which the write + * and read operations will be performed when readiness is + * indicated by the poll response. + */ + void add(apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + + ~APRIOProcessor(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/APRSocket.h b/cpp/common/io/inc/APRSocket.h new file mode 100644 index 0000000000..610cf0e175 --- /dev/null +++ b/cpp/common/io/inc/APRSocket.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include "apr_network_io.h" +#include "Buffer.h" + +namespace qpid { +namespace io { + + class APRSocket + { + apr_socket_t* const socket; + volatile bool closed; + public: + APRSocket(apr_socket_t* socket); + void read(qpid::framing::Buffer& b); + void write(qpid::framing::Buffer& b); + void close(); + bool isOpen(); + u_int8_t read(); + ~APRSocket(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/Acceptor.h b/cpp/common/io/inc/Acceptor.h new file mode 100644 index 0000000000..5c690c546f --- /dev/null +++ b/cpp/common/io/inc/Acceptor.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Acceptor_ +#define _Acceptor_ + +#include "SessionHandlerFactory.h" + + +namespace qpid { +namespace io { + + class Acceptor + { + public: + virtual void bind(int port, SessionHandlerFactory* factory) = 0; + virtual ~Acceptor(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/BlockingAPRAcceptor.h b/cpp/common/io/inc/BlockingAPRAcceptor.h new file mode 100644 index 0000000000..b77371b02e --- /dev/null +++ b/cpp/common/io/inc/BlockingAPRAcceptor.h @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _BlockingAPRAcceptor_ +#define _BlockingAPRAcceptor_ + +#include <vector> +#include "apr_network_io.h" +#include "apr_poll.h" +#include "apr_time.h" + +#include "Acceptor.h" +#include "APRMonitor.h" +#include "BlockingAPRSessionContext.h" +#include "Runnable.h" +#include "SessionContext.h" +#include "SessionHandlerFactory.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "ThreadPool.h" + +namespace qpid { +namespace io { + + class BlockingAPRAcceptor : public virtual Acceptor + { + typedef std::vector<BlockingAPRSessionContext*>::iterator iterator; + + const bool debug; + apr_pool_t* apr_pool; + qpid::concurrent::ThreadFactory* threadFactory; + std::vector<BlockingAPRSessionContext*> sessions; + apr_socket_t* socket; + const int connectionBacklog; + volatile bool running; + + public: + BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); + virtual void bind(int port, SessionHandlerFactory* factory); + virtual ~BlockingAPRAcceptor(); + void closed(BlockingAPRSessionContext* session); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/BlockingAPRSessionContext.h b/cpp/common/io/inc/BlockingAPRSessionContext.h new file mode 100644 index 0000000000..038ebd6662 --- /dev/null +++ b/cpp/common/io/inc/BlockingAPRSessionContext.h @@ -0,0 +1,94 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _BlockingAPRSessionContext_ +#define _BlockingAPRSessionContext_ + +#include <queue> +#include <vector> + +#include "apr_network_io.h" +#include "apr_time.h" + +#include "AMQFrame.h" +#include "APRMonitor.h" +#include "Buffer.h" +#include "Runnable.h" +#include "SessionContext.h" +#include "SessionHandler.h" +#include "SessionHandlerFactory.h" +#include "ShutdownHandler.h" +#include "Thread.h" +#include "ThreadFactory.h" + +namespace qpid { +namespace io { + + class BlockingAPRAcceptor; + + class BlockingAPRSessionContext : public virtual SessionContext + { + class Reader : public virtual qpid::concurrent::Runnable{ + BlockingAPRSessionContext* parent; + public: + inline Reader(BlockingAPRSessionContext* p) : parent(p){} + inline virtual void run(){ parent->read(); } + inline virtual ~Reader(){} + }; + + class Writer : public virtual qpid::concurrent::Runnable{ + BlockingAPRSessionContext* parent; + public: + inline Writer(BlockingAPRSessionContext* p) : parent(p){} + inline virtual void run(){ parent->write(); } + inline virtual ~Writer(){} + }; + + apr_socket_t* socket; + const bool debug; + SessionHandler* handler; + BlockingAPRAcceptor* acceptor; + std::queue<qpid::framing::AMQFrame*> outframes; + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + qpid::concurrent::APRMonitor outlock; + Reader* reader; + Writer* writer; + qpid::concurrent::Thread* rThread; + qpid::concurrent::Thread* wThread; + + volatile bool closed; + + void read(); + void write(); + public: + BlockingAPRSessionContext(apr_socket_t* socket, + qpid::concurrent::ThreadFactory* factory, + BlockingAPRAcceptor* acceptor, + bool debug = false); + ~BlockingAPRSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void shutdown(); + void init(SessionHandler* handler); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/Connector.h b/cpp/common/io/inc/Connector.h new file mode 100644 index 0000000000..52684329f1 --- /dev/null +++ b/cpp/common/io/inc/Connector.h @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + +#include "InputHandler.h" +#include "OutputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "ShutdownHandler.h" +#include "TimeoutHandler.h" + +namespace qpid { +namespace io { + + class Connector + { + public: + virtual void connect(const std::string& host, int port) = 0; + virtual void init(qpid::framing::ProtocolInitiation* header) = 0; + virtual void close() = 0; + virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0; + virtual void setTimeoutHandler(TimeoutHandler* handler) = 0; + virtual void setShutdownHandler(ShutdownHandler* handler) = 0; + virtual qpid::framing::OutputHandler* getOutputHandler() = 0; + /** + * Set the timeout for reads, in secs. + */ + virtual void setReadTimeout(u_int16_t timeout) = 0; + /** + * Set the timeout for writes, in secs. + */ + virtual void setWriteTimeout(u_int16_t timeout) = 0; + virtual ~Connector(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/ConnectorImpl.h b/cpp/common/io/inc/ConnectorImpl.h new file mode 100644 index 0000000000..242f3aed49 --- /dev/null +++ b/cpp/common/io/inc/ConnectorImpl.h @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRConnectorImpl_ +#define _APRConnectorImpl_ + +#ifdef _USE_APR_IO_ +#include "APRConnector.h" +#else +#include "LConnector.h" +#endif + +namespace qpid { +namespace io { + +#ifdef _USE_APR_IO_ + class ConnectorImpl : public virtual APRConnector + { + + public: + ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):APRConnector(debug,buffer_size){}; + virtual ~ConnectorImpl(){}; + }; +#else + class ConnectorImpl : public virtual LConnector + { + + public: + ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):LConnector(debug, buffer_size){}; + virtual ~ConnectorImpl(){}; + }; + +#endif + +} +} + + +#endif diff --git a/cpp/common/io/inc/IOSession.h b/cpp/common/io/inc/IOSession.h new file mode 100644 index 0000000000..45e84d29b1 --- /dev/null +++ b/cpp/common/io/inc/IOSession.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _IOSession_ +#define _IOSession_ + +namespace qpid { +namespace io { + + class IOSession + { + public: + virtual void read() = 0; + virtual void write() = 0; + virtual ~IOSession(){} + }; + + + class IOSessionHolder + { + IOSession* session; + public: + IOSessionHolder(IOSession* _session) : session(_session) {} + void read(){ session->read(); } + void write(){ session->write(); } + }; +} +} + + +#endif diff --git a/cpp/common/io/inc/LConnector.h b/cpp/common/io/inc/LConnector.h new file mode 100644 index 0000000000..59d95a6b57 --- /dev/null +++ b/cpp/common/io/inc/LConnector.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LConnector_ +#define _LConnector_ + + +#include "InputHandler.h" +#include "OutputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "Connector.h" + +namespace qpid { +namespace io { + + class LConnector : public virtual qpid::framing::OutputHandler, + public virtual Connector, + private virtual qpid::concurrent::Runnable + { + + public: + LConnector(bool debug = false, u_int32_t buffer_size = 1024){}; + virtual ~LConnector(){}; + + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/LFAcceptor.h b/cpp/common/io/inc/LFAcceptor.h new file mode 100644 index 0000000000..314f811827 --- /dev/null +++ b/cpp/common/io/inc/LFAcceptor.h @@ -0,0 +1,71 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include <vector> +#include "apr_network_io.h" +#include "apr_poll.h" +#include "apr_time.h" + +#include "Acceptor.h" +#include "APRMonitor.h" +#include "APRThreadFactory.h" +#include "APRThreadPool.h" +#include "LFProcessor.h" +#include "LFSessionContext.h" +#include "Runnable.h" +#include "SessionContext.h" +#include "SessionHandlerFactory.h" +#include "Thread.h" + +namespace qpid { +namespace io { + + class LFAcceptor : public virtual Acceptor + { + class APRPool{ + public: + apr_pool_t* pool; + APRPool(); + ~APRPool(); + }; + + APRPool aprPool; + LFProcessor processor; + + const int max_connections_per_processor; + const bool debug; + const int connectionBacklog; + + volatile bool running; + + public: + LFAcceptor(bool debug = false, + int connectionBacklog = 10, + int worker_threads = 5, + int max_connections_per_processor = 500); + virtual void bind(int port, SessionHandlerFactory* factory); + virtual ~LFAcceptor(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/LFProcessor.h b/cpp/common/io/inc/LFProcessor.h new file mode 100644 index 0000000000..7d99d51943 --- /dev/null +++ b/cpp/common/io/inc/LFProcessor.h @@ -0,0 +1,116 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include "apr_poll.h" +#include <iostream> +#include <vector> +#include "APRMonitor.h" +#include "APRThreadFactory.h" +#include "IOSession.h" +#include "Runnable.h" + +namespace qpid { +namespace io { + + class LFSessionContext; + + /** + * This class processes a poll set using the leaders-followers + * pattern for thread synchronization: the leader will poll and on + * the poll returning, it will remove a session, promote a + * follower to leadership, then process the session. + */ + class LFProcessor : private virtual qpid::concurrent::Runnable + { + typedef std::vector<LFSessionContext*>::iterator iterator; + + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int signalledCount; + int current; + const apr_pollfd_t* signalledFDs; + int count; + const int workerCount; + qpid::concurrent::Thread** const workers; + qpid::concurrent::APRMonitor leadLock; + qpid::concurrent::APRMonitor countLock; + qpid::concurrent::APRThreadFactory factory; + std::vector<LFSessionContext*> sessions; + bool hasLeader; + volatile bool stopped; + + const apr_pollfd_t* getNextEvent(); + void waitToLead(); + void relinquishLead(); + void poll(); + virtual void run(); + + public: + LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance of LFSessionContext. + */ + void add(const apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(const apr_pollfd_t* const fd); + /** + * Signal that the fd passed in, already part of the pollset, + * has had its flags altered. + */ + void update(const apr_pollfd_t* const fd); + /** + * Add an fd back to the poll set after deactivation. + */ + void reactivate(const apr_pollfd_t* const fd); + /** + * Temporarily remove the fd from the poll set. Called when processing + * is about to begin. + */ + void deactivate(const apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + /** + * Start processing. + */ + void start(); + + ~LFProcessor(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/LFSessionContext.h b/cpp/common/io/inc/LFSessionContext.h new file mode 100644 index 0000000000..ef6a0d07b0 --- /dev/null +++ b/cpp/common/io/inc/LFSessionContext.h @@ -0,0 +1,89 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include <queue> + +#include "apr_network_io.h" +#include "apr_poll.h" +#include "apr_time.h" + +#include "AMQFrame.h" +#include "APRMonitor.h" +#include "APRSocket.h" +#include "Buffer.h" +#include "IOSession.h" +#include "LFProcessor.h" +#include "SessionContext.h" +#include "SessionHandler.h" + +namespace qpid { +namespace io { + + + class LFSessionContext : public virtual SessionContext, public virtual IOSession + { + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue<qpid::framing::AMQFrame*> framesToWrite; + qpid::concurrent::APRMonitor writeLock; + + bool processing; + bool closing; + + //these are just for debug, as a crude way of detecting concurrent access + volatile unsigned int reading; + volatile unsigned int writing; + + static qpid::concurrent::APRMonitor logLock; + void log(const std::string& desc, qpid::framing::AMQFrame* const frame); + + public: + LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + virtual void read(); + virtual void write(); + void init(SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionContext.h b/cpp/common/io/inc/SessionContext.h new file mode 100644 index 0000000000..f223a70daa --- /dev/null +++ b/cpp/common/io/inc/SessionContext.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionContext_ +#define _SessionContext_ + +#include "OutputHandler.h" + +namespace qpid { +namespace io { + + class SessionContext : public virtual qpid::framing::OutputHandler + { + public: + virtual void close() = 0; + virtual ~SessionContext(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionHandler.h b/cpp/common/io/inc/SessionHandler.h new file mode 100644 index 0000000000..21a992ab65 --- /dev/null +++ b/cpp/common/io/inc/SessionHandler.h @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandler_ +#define _SessionHandler_ + +#include "InputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "TimeoutHandler.h" + +namespace qpid { +namespace io { + + class SessionHandler : public virtual qpid::framing::InitiationHandler, + public virtual qpid::framing::InputHandler, + public virtual TimeoutHandler + { + public: + virtual void closed() = 0; + virtual ~SessionHandler(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionHandlerFactory.h b/cpp/common/io/inc/SessionHandlerFactory.h new file mode 100644 index 0000000000..67d968b72e --- /dev/null +++ b/cpp/common/io/inc/SessionHandlerFactory.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandlerFactory_ +#define _SessionHandlerFactory_ + +#include "SessionContext.h" +#include "SessionHandler.h" + +namespace qpid { +namespace io { + + class SessionHandlerFactory + { + public: + virtual SessionHandler* create(SessionContext* ctxt) = 0; + virtual ~SessionHandlerFactory(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionManager.h b/cpp/common/io/inc/SessionManager.h new file mode 100644 index 0000000000..30c5208532 --- /dev/null +++ b/cpp/common/io/inc/SessionManager.h @@ -0,0 +1,40 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionManager_ +#define _SessionManager_ + +#include "SessionContext.h" +#include "SessionHandler.h" + +namespace qpid { +namespace io { + + class SessionManager + { + public: + virtual SessionHandler* init(SessionContext* ctxt) = 0; + virtual void close(SessionContext* ctxt) = 0; + virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0; + virtual ~SessionManager(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/ShutdownHandler.h b/cpp/common/io/inc/ShutdownHandler.h new file mode 100644 index 0000000000..186d9eeca4 --- /dev/null +++ b/cpp/common/io/inc/ShutdownHandler.h @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ShutdownHandler_ +#define _ShutdownHandler_ + +namespace qpid { +namespace io { + + class ShutdownHandler + { + public: + virtual void shutdown() = 0; + virtual ~ShutdownHandler(){} + }; + +} +} + +#endif diff --git a/cpp/common/io/inc/TimeoutHandler.h b/cpp/common/io/inc/TimeoutHandler.h new file mode 100644 index 0000000000..c92220fd6e --- /dev/null +++ b/cpp/common/io/inc/TimeoutHandler.h @@ -0,0 +1,36 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TimeoutHandler_ +#define _TimeoutHandler_ + +namespace qpid { +namespace io { + + class TimeoutHandler + { + public: + virtual void idleOut() = 0; + virtual void idleIn() = 0; + virtual ~TimeoutHandler(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/src/APRConnector.cpp b/cpp/common/io/src/APRConnector.cpp new file mode 100644 index 0000000000..0e022a8c73 --- /dev/null +++ b/cpp/common/io/src/APRConnector.cpp @@ -0,0 +1,198 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "APRBase.h" +#include "APRConnector.h" +#include "APRThreadFactory.h" +#include "QpidError.h" + +using namespace qpid::io; +using namespace qpid::concurrent; +using namespace qpid::framing; +using qpid::QpidError; + +APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : closed(true), debug(_debug), + idleIn(0), idleOut(0), timeout(0), + timeoutHandler(0), + shutdownHandler(0), + lastIn(0), lastOut(0), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ + + APRBase::increment(); + + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); + + threadFactory = new APRThreadFactory(); + writeLock = new APRMonitor(); +} + +APRConnector::~APRConnector(){ + delete receiver; + delete writeLock; + delete threadFactory; + apr_pool_destroy(pool); + + APRBase::decrement(); +} + +void APRConnector::connect(const std::string& host, int port){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + closed = false; + + receiver = threadFactory->create(this); + receiver->start(); +} + +void APRConnector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void APRConnector::close(){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + receiver->join(); +} + +void APRConnector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void APRConnector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* APRConnector::getOutputHandler(){ + return this; +} + +void APRConnector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void APRConnector::writeBlock(AMQDataBlock* data){ + writeLock->acquire(); + data->encode(outbuf); + + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); + writeLock->release(); +} + +void APRConnector::writeToSocket(char* data, int available){ + apr_size_t bytes(available); + apr_size_t written(0); + while(written < available && !closed){ + apr_status_t status = apr_socket_send(socket, data + written, &bytes); + if(status == APR_TIMEUP){ + std::cout << "Write request timed out." << std::endl; + } + if(bytes == 0){ + std::cout << "Write request wrote 0 bytes." << std::endl; + } + lastOut = apr_time_as_msec(apr_time_now()); + written += bytes; + bytes = available - written; + } +} + +void APRConnector::checkIdle(apr_status_t status){ + if(timeoutHandler){ + apr_time_t now = apr_time_as_msec(apr_time_now()); + if(APR_STATUS_IS_TIMEUP(status)){ + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(APR_STATUS_IS_EOF(status)){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void APRConnector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void APRConnector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void APRConnector::setSocketTimeout(){ + //interval is in microseconds, timeout in milliseconds + //want the interval to be a bit shorter than the timeout, hence multiply + //by 800 rather than 1000. + apr_interval_time_t interval(timeout * 800); + apr_socket_timeout_set(socket, interval); +} + +void APRConnector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void APRConnector::run(){ + try{ + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); + + if(bytes > 0){ + inbuf.move(bytes); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/cpp/common/io/src/APRIOProcessor.cpp b/cpp/common/io/src/APRIOProcessor.cpp new file mode 100644 index 0000000000..d630f2f315 --- /dev/null +++ b/cpp/common/io/src/APRIOProcessor.cpp @@ -0,0 +1,100 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRIOProcessor.h" +#include "APRBase.h" +#include "QpidError.h" + +using namespace qpid::io; +using namespace qpid::concurrent; + +APRIOProcessor::APRIOProcessor(apr_pool_t* pool, int _size, int _timeout) : size(_size), + timeout(_timeout), + count(0), + thread(pool, this), + stopped(false){ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); + thread.start(); +} + +void APRIOProcessor::add(apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + lock.acquire(); + if(!count++) lock.notify(); + lock.release(); +} + +void APRIOProcessor::remove(apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + lock.acquire(); + count--; + lock.release(); +} + +bool APRIOProcessor::full(){ + lock.acquire(); + bool full = count == size; + lock.release(); + return full; +} + +bool APRIOProcessor::empty(){ + lock.acquire(); + bool empty = count == 0; + lock.release(); + return empty; +} + +void APRIOProcessor::poll(){ + try{ + int signalledCount; + const apr_pollfd_t* signalledFDs; + apr_status_t status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + if(status == APR_SUCCESS){ + for(int i = 0; i < signalledCount; i++){ + IOSessionHolder* session = reinterpret_cast<IOSessionHolder*>(signalledFDs[i].client_data); + if(signalledFDs[i].rtnevents & APR_POLLIN) session->read(); + if(signalledFDs[i].rtnevents & APR_POLLOUT) session->write(); + } + } + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + +} + +void APRIOProcessor::run(){ + while(!stopped){ + lock.acquire(); + while(count == 0) lock.wait(); + lock.release(); + poll(); + } +} + +void APRIOProcessor::stop(){ + lock.acquire(); + stopped = true; + lock.notify(); + lock.release(); +} + +APRIOProcessor::~APRIOProcessor(){ + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + diff --git a/cpp/common/io/src/APRSocket.cpp b/cpp/common/io/src/APRSocket.cpp new file mode 100644 index 0000000000..32861ea442 --- /dev/null +++ b/cpp/common/io/src/APRSocket.cpp @@ -0,0 +1,76 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRBase.h" +#include "APRSocket.h" + +#include <iostream> + +using namespace qpid::io; +using namespace qpid::framing; +using namespace qpid::concurrent; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + bytes = buffer.available(); + apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); + buffer.move(bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out + }else if(APR_STATUS_IS_EOF(s)){ + close(); + } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + do{ + bytes = buffer.available(); + apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes); + buffer.move(bytes); + }while(bytes > 0); +} + +void APRSocket::close(){ + if(!closed){ + std::cout << "Closing socket " << socket << "@" << this << std::endl; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + closed = true; + } +} + +bool APRSocket::isOpen(){ + return !closed; +} + +u_int8_t APRSocket::read(){ + char data[1]; + apr_size_t bytes = 1; + apr_status_t s = apr_socket_recv(socket, data, &bytes); + if(APR_STATUS_IS_EOF(s) || bytes == 0){ + return 0; + }else{ + return *data; + } +} + +APRSocket::~APRSocket(){ +} diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp new file mode 100644 index 0000000000..380318bcfa --- /dev/null +++ b/cpp/common/io/src/BlockingAPRAcceptor.cpp @@ -0,0 +1,81 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "BlockingAPRAcceptor.h" +#include "APRBase.h" +#include "APRThreadFactory.h" + +using namespace qpid::concurrent; +using namespace qpid::framing; +using namespace qpid::io; + +BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : connectionBacklog(c), + threadFactory(new APRThreadFactory()), + debug(_debug){ + + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); +} + +void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, apr_pool)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + running = true; + std::cout << "Listening on port " << port << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, apr_pool); + if(status == APR_SUCCESS){ + //configure socket: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + + BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug); + session->init(factory->create(session)); + sessions.push_back(session); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } + + CHECK_APR_SUCCESS(apr_socket_close(socket)); +} + +BlockingAPRAcceptor::~BlockingAPRAcceptor(){ + delete threadFactory; + apr_pool_destroy(apr_pool); + APRBase::decrement(); +} + + +void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ + sessions.erase(find(sessions.begin(), sessions.end(), session)); + delete this; +} + diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp new file mode 100644 index 0000000000..99352c90d5 --- /dev/null +++ b/cpp/common/io/src/BlockingAPRSessionContext.cpp @@ -0,0 +1,177 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "BlockingAPRSessionContext.h" +#include "BlockingAPRAcceptor.h" +#include "APRBase.h" +#include "QpidError.h" + +using namespace qpid::concurrent; +using namespace qpid::framing; +using namespace qpid::io; + + +BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, + ThreadFactory* factory, + BlockingAPRAcceptor* _acceptor, + bool _debug) + : socket(_socket), + debug(_debug), + inbuf(65536), + outbuf(65536), + handler(0), + acceptor(_acceptor), + closed(false){ + + reader = new Reader(this); + writer = new Writer(this); + + rThread = factory->create(reader); + wThread = factory->create(writer); +} + +BlockingAPRSessionContext::~BlockingAPRSessionContext(){ + delete reader; + delete writer; + + delete rThread; + delete wThread; + + delete handler; +} + +void BlockingAPRSessionContext::read(){ + try{ + bool initiated(false); + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out, check closed on loop + }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ + closed = true; + }else{ + inbuf.move(bytes); + inbuf.flip(); + + if(!initiated){ + ProtocolInitiation* init = new ProtocolInitiation(); + if(init->decode(inbuf)){ + handler->initiated(init); + if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; + initiated = true; + } + }else{ + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; + handler->received(&frame); + } + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + + //close socket + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void BlockingAPRSessionContext::write(){ + while(!closed){ + //get next frame + outlock.acquire(); + while(outframes.empty() && !closed){ + outlock.wait(); + } + if(!closed){ + AMQFrame* frame = outframes.front(); + outframes.pop(); + outlock.release(); + + //encode + frame->encode(outbuf); + if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; + delete frame; + outbuf.flip(); + + //write from outbuf to socket + char* data = outbuf.start(); + const int available = outbuf.available(); + int written = 0; + apr_size_t bytes = available; + while(available > written){ + apr_status_t s = apr_socket_send(socket, data + written, &bytes); + written += bytes; + bytes = available - written; + } + outbuf.clear(); + }else{ + outlock.release(); + } + } +} + +void BlockingAPRSessionContext::send(AMQFrame* frame){ + if(!closed){ + outlock.acquire(); + bool was_empty(outframes.empty()); + outframes.push(frame); + if(was_empty){ + outlock.notify(); + } + outlock.release(); + }else{ + std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; + } +} + +void BlockingAPRSessionContext::init(SessionHandler* handler){ + this->handler = handler; + //start the threads + rThread->start(); + wThread->start(); +} + +void BlockingAPRSessionContext::close(){ + closed = true; + wThread->join(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; + handler->closed(); + acceptor->closed(this); + delete this; +} + +void BlockingAPRSessionContext::shutdown(){ + closed = true; + outlock.acquire(); + outlock.notify(); + outlock.release(); + + wThread->join(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + rThread->join(); + handler->closed(); + delete this; +} diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp new file mode 100644 index 0000000000..6653e926db --- /dev/null +++ b/cpp/common/io/src/LFAcceptor.cpp @@ -0,0 +1,80 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFAcceptor.h" +#include "APRBase.h" + +using namespace qpid::concurrent; +using namespace qpid::io; + +LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : processor(aprPool.pool, worker_threads, 1000, 5000000), + connectionBacklog(c), + max_connections_per_processor(m), + debug(_debug){ + +} + + +void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ + apr_socket_t* socket; + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, aprPool.pool)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + running = true; + processor.start(); + + std::cout << "Listening on port " << port << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); +} + + +LFAcceptor::~LFAcceptor(){ +} + +LFAcceptor::APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +LFAcceptor::APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp new file mode 100644 index 0000000000..8ef3543b8f --- /dev/null +++ b/cpp/common/io/src/LFProcessor.cpp @@ -0,0 +1,191 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFProcessor.h" +#include "APRBase.h" +#include "LFSessionContext.h" +#include "QpidError.h" +#include <sstream> + +using namespace qpid::io; +using namespace qpid::concurrent; +using qpid::QpidError; + +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + hasLeader(false), + workerCount(_workers), + workers(new Thread*[_workers]), + stopped(false){ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); + //create & start the required number of threads + for(int i = 0; i < workerCount; i++){ + workers[i] = factory.create(this); + } +} + + +LFProcessor::~LFProcessor(){ + for(int i = 0; i < workerCount; i++){ + delete workers[i]; + } + delete[] workers; + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + +void LFProcessor::start(){ + for(int i = 0; i < workerCount; i++){ + workers[i]->start(); + } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + countLock.acquire(); + sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); + count++; + countLock.release(); +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); + count--; + countLock.release(); +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ + countLock.acquire(); + bool full = count == size; + countLock.release(); + return full; +} + +bool LFProcessor::empty(){ + countLock.acquire(); + bool empty = count == 0; + countLock.release(); + return empty; +} + +void LFProcessor::poll(){ + apr_status_t status; + do{ + current = 0; + if(!stopped){ + status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + } + }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ + try{ + while(!stopped){ + leadLock.acquire(); + waitToLead(); + if(!stopped){ + const apr_pollfd_t* evt = getNextEvent(); + if(evt){ + LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data); + session->startProcessing(); + + relinquishLead(); + leadLock.release(); + + //process event: + if(evt->rtnevents & APR_POLLIN) session->read(); + if(evt->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), session)); + count--; + countLock.release(); + }else{ + session->stopProcessing(); + } + + }else{ + leadLock.release(); + } + }else{ + leadLock.release(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void LFProcessor::waitToLead(){ + while(hasLeader && !stopped) leadLock.wait(); + hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ + hasLeader = false; + leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ + while(true){ + if(stopped){ + return 0; + }else if(current < signalledCount){ + //use result of previous poll if one is available + return signalledFDs + (current++); + }else{ + //else poll to get new events + poll(); + } + } +} + +void LFProcessor::stop(){ + stopped = true; + leadLock.acquire(); + leadLock.notifyAll(); + leadLock.release(); + + for(int i = 0; i < workerCount; i++){ + workers[i]->join(); + } + + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } +} + diff --git a/cpp/common/io/src/LFSessionContext.cpp b/cpp/common/io/src/LFSessionContext.cpp new file mode 100644 index 0000000000..d786cb5e98 --- /dev/null +++ b/cpp/common/io/src/LFSessionContext.cpp @@ -0,0 +1,187 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFSessionContext.h" +#include "APRBase.h" +#include "QpidError.h" +#include <assert.h> + +using namespace qpid::concurrent; +using namespace qpid::io; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : socket(_socket), + processor(_processor), + initiated(false), + processing(false), + closing(false), + in(32768), + out(32768), + reading(0), + writing(0), + debug(_debug){ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + assert(!reading); // No concurrent read. + reading = APRThread::currentThread(); + + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation init; + if(init.decode(in)){ + handler->initiated(&init); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); + + reading = 0; +} + +void LFSessionContext::write(){ + assert(!writing); // No concurrent writes. + writing = APRThread::currentThread(); + + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + writing = 0; + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + writeLock.acquire(); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + writing = 0; + + if(closing){ + socket.close(); + } + } + writeLock.release(); + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + writeLock.acquire(); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } + writeLock.release(); +} + +void LFSessionContext::startProcessing(){ + writeLock.acquire(); + processing = true; + processor->deactivate(&fd); + writeLock.release(); +} + +void LFSessionContext::stopProcessing(){ + writeLock.acquire(); + processor->reactivate(&fd); + processing = false; + writeLock.release(); +} + +void LFSessionContext::close(){ + closing = true; + writeLock.acquire(); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } + writeLock.release(); +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* handler){ + this->handler = handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + logLock.acquire(); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + logLock.release(); +} + +APRMonitor LFSessionContext::logLock; diff --git a/cpp/common/utils/inc/logger.h b/cpp/common/utils/inc/logger.h new file mode 100644 index 0000000000..8a57854476 --- /dev/null +++ b/cpp/common/utils/inc/logger.h @@ -0,0 +1,82 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/********************************************************************* +* +* NOTE: This is a lightweight logging class intended for debugging and +* verification purposes only. +* +* DO NOT USE FOR PRODUCT DEVELOPMENT - Rather, use an agreed upon +* established logging class (such as Apache's log4cxx) for product +* development purposes. +* +*********************************************************************/ + +#ifndef __LOGGER__ +#define __LOGGER__ + +#include <fstream> +#include <iostream> + +namespace qpid { +namespace utils { + +class Logger : public std::ofstream +{ + private: + bool echo_flag; + bool timestamp_flag; + bool eol_flag; + char buff[128]; // Buffer for writing formatted strings + + void write_timestamp(); + + public: + Logger(const char* filename, const bool append); + Logger(std::string& filename, const bool append); + ~Logger(); + + bool getEchoFlag() {return echo_flag;} + bool setEchoFlag(const bool _echo_flag) {echo_flag = _echo_flag;} + bool getTimestampFlag() {return timestamp_flag;} + bool setTimestampFlag(const bool _timestamp_flag) {timestamp_flag = _timestamp_flag;} + + void log(const char* message); + void log(const char* message, const bool echo); + void log(const char* message, const bool echo, const bool timestamp); + + Logger& operator<< (bool b); + Logger& operator<< (const short s); + Logger& operator<< (const unsigned short us); + Logger& operator<< (const int i); + Logger& operator<< (const unsigned int ui); + Logger& operator<< (const long l); + Logger& operator<< (const unsigned long ul); + Logger& operator<< (const long long l); + Logger& operator<< (const unsigned long long ul); + Logger& operator<< (const float f); + Logger& operator<< (const double d); + Logger& operator<< (const long double ld); + Logger& operator<< (const char* cstr); + Logger& operator<< (const std::string& str); +}; + +} +} + + +#endif diff --git a/cpp/common/utils/inc/memory.h b/cpp/common/utils/inc/memory.h new file mode 100644 index 0000000000..2d65877adb --- /dev/null +++ b/cpp/common/utils/inc/memory.h @@ -0,0 +1,17 @@ +#ifndef __UTIL_MEMORY__ +#define __UTIL_MEMORY__ + +#if __GNUC__ < 4 + #include "boost/shared_ptr.hpp" + namespace std { + namespace tr1 { + using boost::shared_ptr; + using boost::dynamic_pointer_cast; + using boost::static_pointer_cast; + } + } +#else + #include <tr1/memory> +#endif +#endif + diff --git a/cpp/common/utils/src/Makefile b/cpp/common/utils/src/Makefile new file mode 100644 index 0000000000..0185ab9975 --- /dev/null +++ b/cpp/common/utils/src/Makefile @@ -0,0 +1,40 @@ + # + # Copyright (c) 2006 The Apache Software Foundation + # + # Licensed under the Apache License, Version 2.0 (the "License"); + # you may not use this file except in compliance with the License. + # You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + # +##### Options ##### +QPID_HOME = ../../../.. + +include ${QPID_HOME}/cpp/options.mk + +##### Compiler flags ##### +CXXFLAGS = -I ../inc -I ${APR_HOME}/include/apr-1/ + +##### Targets ##### +# Add additional source files to SOURCE LIST to include them in the build. +COMMON_SOURCE_LIST = logger.cpp + +COMMON_OBJ_LIST = $(COMMON_SOURCE_LIST:.cpp=.o) +LOGGER_TEST_EXE = logger_test + + +.PHONY: all clean + +all: $(LOGGER_TEST_EXE) + +$(LOGGER_TEST_EXE) : $(COMMON_OBJ_LIST) $(LOGGER_TEST_EXE).o + $(CXX) -o $@ $^ -l apr-1 -L /usr/local/apr/lib/ + +clean: + -@rm -f $(LOGGER_TEST_EXE) $(LOGGER_TEST_EXE).o $(COMMON_OBJ_LIST) test_log.txt *~ ../inc/*~ diff --git a/cpp/common/utils/src/logger.cpp b/cpp/common/utils/src/logger.cpp new file mode 100644 index 0000000000..603fa6574e --- /dev/null +++ b/cpp/common/utils/src/logger.cpp @@ -0,0 +1,209 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/********************************************************************* +* +* NOTE: This is a lightweight logging class intended for debugging and +* verification purposes only. +* +* DO NOT USE FOR PRODUCT DEVELOPMENT - Rather, use an agreed upon +* established logging class (such as Apache's log4cxx) for product +* development purposes. +* +*********************************************************************/ + +#include <iostream> +#include <ostream> +#include <string.h> +#include "apr_time.h" +#include "logger.h" + +namespace qpid { +namespace utils { + +Logger::Logger(const char* filename, const bool append): + std::ofstream(filename, append ? std::ios::app : std::ios::out) +{ + echo_flag = false; + timestamp_flag = true; + eol_flag = true; +} + +Logger::Logger(std::string& filename, const bool append): + std::ofstream(filename.c_str(), append ? std::ios::app : std::ios::out) +{ + echo_flag = false; + timestamp_flag = true; + eol_flag = true; +} + +Logger::~Logger() +{ + close(); +} + +void Logger::write_timestamp() +{ + int len; + apr_time_exp_t now; + apr_time_exp_lt(&now, apr_time_now()); + sprintf(buff, "%4d/%02d/%02d %02d:%02d:%02d.%06d : ", 1900+now.tm_year, now.tm_mon, + now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec, now.tm_usec); + write(buff, strlen(buff)); +} + + +void Logger::log(const char* message) +{ + if (timestamp_flag && eol_flag) + { + eol_flag = false; + write_timestamp(); + } + write(message, strlen(message)); + if (echo_flag) + std::cout << message; + if (strchr(message, '\n')) + eol_flag = true; +} + +void Logger::log(const char* message, const bool echo) +{ + if (timestamp_flag && eol_flag) + { + eol_flag = false; + write_timestamp(); + } + write(message, strlen(message)); + if (echo) + std::cout << message; + if (strchr(message, '\n')) + eol_flag = true; +} + +void Logger::log(const char* message, const bool echo, const bool timestamp) +{ + if (timestamp && eol_flag) + { + eol_flag = false; + write_timestamp(); + } + write(message, strlen(message)); + if (echo) + std::cout << message; + if (strchr(message, '\n')) + eol_flag = true; +} + +Logger& Logger::operator<< (const bool b) +{ + log(b ? "true" : "false"); + return *this; +} + +Logger& Logger::operator<< (const short s) +{ + sprintf(buff, "%d", s); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const unsigned short us) +{ + sprintf(buff, "%u", us); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const int i) +{ + sprintf(buff, "%d", i); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const unsigned int ui) +{ + sprintf(buff, "%u", ui); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const long l) +{ + sprintf(buff, "%ld", l); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const unsigned long ul) +{ + sprintf(buff, "%lu", ul); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const long long l) +{ + sprintf(buff, "%ld", l); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const unsigned long long ul) +{ + sprintf(buff, "%lu", ul); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const float f) +{ + sprintf(buff, "%f", f); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const double d) +{ + sprintf(buff, "%lf", d); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const long double ld) +{ + sprintf(buff, "%Lf", ld); + log(buff); + return *this; +} + +Logger& Logger::operator<< (const char* cstr) +{ + log(cstr); + return *this; +} + +Logger& Logger::operator<< (const std::string& str) +{ + log(str.c_str()); + return *this; +} + +} +} + diff --git a/cpp/common/utils/src/logger_test.cpp b/cpp/common/utils/src/logger_test.cpp new file mode 100644 index 0000000000..1866af9fbb --- /dev/null +++ b/cpp/common/utils/src/logger_test.cpp @@ -0,0 +1,78 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include <string> +#include "logger.h" + +using namespace qpid::utils; + +void run_sequence(Logger& log) +{ + bool b = true; + short s = -5; + unsigned short us = 12; + int i = -2345; + unsigned int ui = 34567; + long l = -12345678; + unsigned long ul = 23456789; + long long ll = -1234567890; + unsigned long long ull = 1234567890; + float f = -123.45678; + double d = 123.45678901; + long double ld = 23456.789012345678; + char* cstr = "This is a test C string."; + char* cr = "\n"; + std::string str("This is a test std::string"); + log << "bool = " << b << cr; + log << "short = " << s << cr; + log << "unsigned sort = " << us << cr; + log << "int = " << i << cr; + log << "unsigned int = " << ui << cr; + log << "long = " << l << cr; + log << "unsigned long = " << ul << cr; + log << "long long = " << ll << cr; + log << "unsigned long long = " << ull << cr; + log << "float = " << f << cr; + log << "double = " << d << cr; + log << "long double = " << ld << cr; + log << "char* = " << cstr << cr; + log << "std::string = " << str << cr; + log << "String 1\n"; + log << "String 2\n" << "String 3 " << "String 4\n"; + log << "Literal bool = " << false << cr; + log << "Literal unsigned int = " << 15 << cr; + log << "Literal double = " << (double)15 << cr; +} + +int main(int argc, char** argv) +{ + Logger log("test_log.txt", false); + std::cout << "****** Initial state (echo off, timestamp on)" << std::endl; + run_sequence(log); + std::cout << std::endl << "****** (echo off, timestamp off)" << std::endl; + log.setTimestampFlag(false); + run_sequence(log); + std::cout << std::endl << "****** (echo on, timestamp on)" << std::endl; + log.setEchoFlag(true); + log.setTimestampFlag(true); + run_sequence(log); + std::cout << std::endl << "****** (echo on, timestamp off)" << std::endl; + log.setTimestampFlag(false); + run_sequence(log); + return 0; +} |