diff options
Diffstat (limited to 'cpp/lib/common')
-rw-r--r-- | cpp/lib/common/framing/BasicHeaderProperties.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/common/framing/BasicHeaderProperties.h | 136 | ||||
-rw-r--r-- | cpp/lib/common/framing/MethodContext.h | 5 | ||||
-rw-r--r-- | cpp/lib/common/shared_ptr.h | 7 | ||||
-rw-r--r-- | cpp/lib/common/sys/ProducerConsumer.cpp | 20 | ||||
-rw-r--r-- | cpp/lib/common/sys/ProducerConsumer.h | 20 | ||||
-rw-r--r-- | cpp/lib/common/sys/ThreadSafeQueue.h | 8 |
7 files changed, 109 insertions, 91 deletions
diff --git a/cpp/lib/common/framing/BasicHeaderProperties.cpp b/cpp/lib/common/framing/BasicHeaderProperties.cpp index 930ec9f4dd..d815d1e62f 100644 --- a/cpp/lib/common/framing/BasicHeaderProperties.cpp +++ b/cpp/lib/common/framing/BasicHeaderProperties.cpp @@ -22,7 +22,7 @@ //TODO: This could be easily generated from the spec -qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(0), priority(0), timestamp(0){} +qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){} qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){} uint32_t qpid::framing::BasicHeaderProperties::size() const{ @@ -70,7 +70,7 @@ void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, if(flags & (1 << 15)) buffer.getShortString(contentType); if(flags & (1 << 14)) buffer.getShortString(contentEncoding); if(flags & (1 << 13)) buffer.getFieldTable(headers); - if(flags & (1 << 12)) deliveryMode = buffer.getOctet(); + if(flags & (1 << 12)) deliveryMode = DeliveryMode(buffer.getOctet()); if(flags & (1 << 11)) priority = buffer.getOctet(); if(flags & (1 << 10)) buffer.getShortString(correlationId); if(flags & (1 << 9)) buffer.getShortString(replyTo); diff --git a/cpp/lib/common/framing/BasicHeaderProperties.h b/cpp/lib/common/framing/BasicHeaderProperties.h index 316e67b82c..248014aefb 100644 --- a/cpp/lib/common/framing/BasicHeaderProperties.h +++ b/cpp/lib/common/framing/BasicHeaderProperties.h @@ -28,70 +28,88 @@ namespace qpid { namespace framing { - enum delivery_mode {TRANSIENT = 1, PERSISTENT = 2}; - //TODO: This could be easily generated from the spec - class BasicHeaderProperties : public HeaderProperties - { - string contentType; - string contentEncoding; - FieldTable headers; - uint8_t deliveryMode; - uint8_t priority; - string correlationId; - string replyTo; - string expiration; - string messageId; - uint64_t timestamp; - string type; - string userId; - string appId; - string clusterId; - - uint16_t getFlags() const; - - public: - BasicHeaderProperties(); - virtual ~BasicHeaderProperties(); - virtual uint32_t size() const; - virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); +enum DeliveryMode { TRANSIENT = 1, PERSISTENT = 2}; - virtual uint8_t classId() { return BASIC; } +class BasicHeaderProperties : public HeaderProperties +{ + string contentType; + string contentEncoding; + FieldTable headers; + DeliveryMode deliveryMode; + uint8_t priority; + string correlationId; + string replyTo; + string expiration; + string messageId; + uint64_t timestamp; + string type; + string userId; + string appId; + string clusterId; + + uint16_t getFlags() const; - string getContentType() const { return contentType; } - string getContentEncoding() const { return contentEncoding; } - FieldTable& getHeaders() { return headers; } - uint8_t getDeliveryMode() const { return deliveryMode; } - uint8_t getPriority() const { return priority; } - string getCorrelationId() const {return correlationId; } - string getReplyTo() const { return replyTo; } - string getExpiration() const { return expiration; } - string getMessageId() const {return messageId; } - uint64_t getTimestamp() const { return timestamp; } - string getType() const { return type; } - string getUserId() const { return userId; } - string getAppId() const { return appId; } - string getClusterId() const { return clusterId; } + public: + BasicHeaderProperties(); + virtual ~BasicHeaderProperties(); + virtual uint32_t size() const; + virtual void encode(Buffer& buffer) const; + virtual void decode(Buffer& buffer, uint32_t size); - void setContentType(const string& _type){ contentType = _type; } - void setContentEncoding(const string& encoding){ contentEncoding = encoding; } - void setHeaders(const FieldTable& _headers){ headers = _headers; } - void setDeliveryMode(uint8_t mode){ deliveryMode = mode; } - void setPriority(uint8_t _priority){ priority = _priority; } - void setCorrelationId(const string& _correlationId){ correlationId = _correlationId; } - void setReplyTo(const string& _replyTo){ replyTo = _replyTo;} - void setExpiration(const string& _expiration){ expiration = _expiration; } - void setMessageId(const string& _messageId){ messageId = _messageId; } - void setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; } - void setType(const string& _type){ type = _type; } - void setUserId(const string& _userId){ userId = _userId; } - void setAppId(const string& _appId){appId = _appId; } - void setClusterId(const string& _clusterId){ clusterId = _clusterId; } - }; + virtual uint8_t classId() { return BASIC; } -} -} + string getContentType() const { return contentType; } + string getContentEncoding() const { return contentEncoding; } + FieldTable& getHeaders() { return headers; } + const FieldTable& getHeaders() const { return headers; } + DeliveryMode getDeliveryMode() const { return deliveryMode; } + uint8_t getPriority() const { return priority; } + string getCorrelationId() const {return correlationId; } + string getReplyTo() const { return replyTo; } + string getExpiration() const { return expiration; } + string getMessageId() const {return messageId; } + uint64_t getTimestamp() const { return timestamp; } + string getType() const { return type; } + string getUserId() const { return userId; } + string getAppId() const { return appId; } + string getClusterId() const { return clusterId; } + void setContentType(const string& _type){ contentType = _type; } + void setContentEncoding(const string& encoding){ contentEncoding = encoding; } + void setHeaders(const FieldTable& _headers){ headers = _headers; } + void setDeliveryMode(DeliveryMode mode){ deliveryMode = mode; } + void setPriority(uint8_t _priority){ priority = _priority; } + void setCorrelationId(const string& _correlationId){ correlationId = _correlationId; } + void setReplyTo(const string& _replyTo){ replyTo = _replyTo;} + void setExpiration(const string& _expiration){ expiration = _expiration; } + void setMessageId(const string& _messageId){ messageId = _messageId; } + void setTimestamp(uint64_t _timestamp){ timestamp = _timestamp; } + void setType(const string& _type){ type = _type; } + void setUserId(const string& _userId){ userId = _userId; } + void setAppId(const string& _appId){appId = _appId; } + void setClusterId(const string& _clusterId){ clusterId = _clusterId; } + /** \internal + * Template to copy between types like BasicHeaderProperties. + */ + template <class T, class U> + static void copy(T& to, const U& from) { + to.setContentType(from.getContentType()); + to.setContentEncoding(from.getContentEncoding()); + to.setHeaders(from.getHeaders()); + to.setDeliveryMode(from.getDeliveryMode()); + to.setPriority(from.getPriority()); + to.setCorrelationId(from.getCorrelationId()); + to.setReplyTo(from.getReplyTo()); + to.setExpiration(from.getExpiration()); + to.setMessageId(from.getMessageId()); + to.setTimestamp(from.getTimestamp()); + to.setType(from.getType()); + to.setUserId(from.getUserId()); + to.setAppId(from.getAppId()); + to.setClusterId(from.getClusterId()); + } +}; +}} #endif diff --git a/cpp/lib/common/framing/MethodContext.h b/cpp/lib/common/framing/MethodContext.h index 3493924bf6..80e4c55d7e 100644 --- a/cpp/lib/common/framing/MethodContext.h +++ b/cpp/lib/common/framing/MethodContext.h @@ -67,11 +67,6 @@ struct MethodContext RequestId getRequestId() const; }; -// FIXME aconway 2007-02-01: Method context only required on Handler -// functions, not on Proxy functions. If we add set/getChannel(ChannelAdapter*) -// on AMQBody and set it during decodeing then we could get rid of the context. - - }} // namespace qpid::framing diff --git a/cpp/lib/common/shared_ptr.h b/cpp/lib/common/shared_ptr.h index 6725f7acb3..c4d547e5bb 100644 --- a/cpp/lib/common/shared_ptr.h +++ b/cpp/lib/common/shared_ptr.h @@ -20,10 +20,15 @@ */ #include <boost/shared_ptr.hpp> +#include <boost/cast.hpp> namespace qpid { -/// Import shared_ptr into qpid namespace. +/// Import shared_ptr definitions into qpid namespace. using boost::shared_ptr; +using boost::dynamic_pointer_cast; +using boost::static_pointer_cast; +using boost::const_pointer_cast; +using boost::shared_polymorphic_downcast; } // namespace qpid diff --git a/cpp/lib/common/sys/ProducerConsumer.cpp b/cpp/lib/common/sys/ProducerConsumer.cpp index 3f6156f230..7a0249f666 100644 --- a/cpp/lib/common/sys/ProducerConsumer.cpp +++ b/cpp/lib/common/sys/ProducerConsumer.cpp @@ -27,12 +27,12 @@ namespace sys { // // ================ ProducerConsumer ProducerConsumer::ProducerConsumer(size_t init_items) - : items(init_items), waiters(0), stopped(false) + : items(init_items), waiters(0), shutdownFlag(false) {} -void ProducerConsumer::stop() { +void ProducerConsumer::shutdown() { Mutex::ScopedLock l(monitor); - stopped = true; + shutdownFlag = true; monitor.notifyAll(); // Wait for waiting consumers to wake up. while (waiters > 0) @@ -55,16 +55,16 @@ ProducerConsumer::Lock::Lock(ProducerConsumer& p) : pc(p), lock(p.monitor), status(INCOMPLETE) {} bool ProducerConsumer::Lock::isOk() const { - return !pc.isStopped() && status==INCOMPLETE; + return !pc.isShutdown() && status==INCOMPLETE; } void ProducerConsumer::Lock::checkOk() const { - assert(!pc.isStopped()); + assert(!pc.isShutdown()); assert(status == INCOMPLETE); } ProducerConsumer::Lock::~Lock() { - assert(status != INCOMPLETE || pc.isStopped()); + assert(status != INCOMPLETE || pc.isShutdown()); } void ProducerConsumer::Lock::confirm() { @@ -96,7 +96,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock(ProducerConsumer& p) : Lock(p) { if (isOk()) { ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.stopped) { + while (pc.items == 0 && !pc.shutdownFlag) { pc.monitor.wait(); } } @@ -115,7 +115,7 @@ ProducerConsumer::ConsumerLock::ConsumerLock( else { Time deadline = now() + timeout; ScopedIncrement<size_t> inc(pc.waiters); - while (pc.items == 0 && !pc.stopped) { + while (pc.items == 0 && !pc.shutdownFlag) { if (!pc.monitor.wait(deadline)) { status = TIMEOUT; return; @@ -126,9 +126,9 @@ ProducerConsumer::ConsumerLock::ConsumerLock( } ProducerConsumer::ConsumerLock::~ConsumerLock() { - if (pc.isStopped()) { + if (pc.isShutdown()) { if (pc.waiters == 0) - pc.monitor.notifyAll(); // All waiters woken, notify stop thread(s) + pc.monitor.notifyAll(); // Notify shutdown thread(s) } else if (status==CONFIRMED) { pc.items--; diff --git a/cpp/lib/common/sys/ProducerConsumer.h b/cpp/lib/common/sys/ProducerConsumer.h index 742639323b..c7f42f266d 100644 --- a/cpp/lib/common/sys/ProducerConsumer.h +++ b/cpp/lib/common/sys/ProducerConsumer.h @@ -30,7 +30,7 @@ namespace sys { * * Producers increase the number of available items, consumers reduce it. * Consumers wait till an item is available. Waiting threads can be - * woken for shutdown using stop(). + * woken for shutdown using shutdown(). * * Note: Currently implements unbounded producer-consumer, i.e. no limit * to available items, producers never block. Can be extended to support @@ -43,16 +43,16 @@ class ProducerConsumer public: ProducerConsumer(size_t init_items=0); - ~ProducerConsumer() { stop(); } + ~ProducerConsumer() { shutdown(); } /** * Wake any threads waiting for ProducerLock or ConsumerLock. *@post No threads are waiting in Producer or Consumer locks. */ - void stop(); + void shutdown(); - /** True if queue is stopped */ - bool isStopped() { return stopped; } + /** True if queue is shutdown */ + bool isShutdown() { return shutdownFlag; } /** Number of items available for consumers */ size_t available() const; @@ -76,7 +76,7 @@ class ProducerConsumer *confirm() or cancel() before the lock goes out of scope. * * false means the lock failed - timed out or the - * ProducerConsumer is stopped. You should not do anything in + * ProducerConsumer is shutdown. You should not do anything in * the scope of the lock. */ bool isOk() const; @@ -98,8 +98,8 @@ class ProducerConsumer /** True if this lock experienced a timeout */ bool isTimedOut() const { return status == TIMEOUT; } - /** True if we have been stopped */ - bool isStopped() const { return pc.isStopped(); } + /** True if we have been shutdown */ + bool isShutdown() const { return pc.isShutdown(); } ProducerConsumer& pc; @@ -141,7 +141,7 @@ class ProducerConsumer * Wait up to timeout to acquire lock. *@post If isOk() caller has a producer lock. * If isTimedOut() there was a timeout. - * If neither then we were stopped. + * If neither then we were shutdown. */ ConsumerLock(ProducerConsumer& p, const Time& timeout); @@ -153,7 +153,7 @@ class ProducerConsumer mutable Monitor monitor; size_t items; size_t waiters; - bool stopped; + bool shutdownFlag; friend class Lock; friend class ProducerLock; diff --git a/cpp/lib/common/sys/ThreadSafeQueue.h b/cpp/lib/common/sys/ThreadSafeQueue.h index ff949a3e16..80ea92da0e 100644 --- a/cpp/lib/common/sys/ThreadSafeQueue.h +++ b/cpp/lib/common/sys/ThreadSafeQueue.h @@ -46,7 +46,7 @@ class ThreadSafeQueue } /** Pop a value from the front of the queue. Waits till value is available. - *@throw ShutdownException if queue is stopped while waiting. + *@throw ShutdownException if queue is shutdown while waiting. */ T pop() { ProducerConsumer::ConsumerLock consumer(pc); @@ -75,10 +75,10 @@ class ThreadSafeQueue } /** Interrupt threads waiting in pop() */ - void stop() { pc.stop(); } + void shutdown() { pc.shutdown(); } - /** True if queue is stopped */ - bool isStopped() { return pc.isStopped(); } + /** True if queue is shutdown */ + bool isShutdown() { return pc.isShutdown(); } /** Size of the queue */ size_t size() { ProducerConsumer::Lock l(pc); return container.size(); } |