From 64949d8dc427da5b1eb760a0d3424341945f2aa1 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 1 Jun 2012 17:41:19 +0000 Subject: NO-JIRA: Updated to compile against latest proton API git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/gs-amqp-1-0-sandbox@1345286 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/qpid/messaging/amqp/ConnectionContext.cpp | 3 +- qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp | 133 ++------------------- .../src/qpid/messaging/amqp/ReceiverContext.cpp | 4 +- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h | 4 +- qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 4 +- qpid/cpp/src/qpid/messaging/amqp/SenderContext.h | 4 +- 6 files changed, 19 insertions(+), 133 deletions(-) diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index a16333b0bb..ce5a3d996f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -141,12 +141,13 @@ void ConnectionContext::open() void ConnectionContext::run() { while (active.get()) { - pn_driver_wait(driver); + pn_driver_wait(driver, 0); for (pn_connector_t* c = pn_driver_connector(driver); c; c = pn_driver_connector(driver)) { ConnectionContext* context = reinterpret_cast(pn_connector_context(c)); qpid::sys::ScopedLock l(context->lock); pn_connector_process(c); context->lock.notifyAll(); + pn_connector_process(c); } } } diff --git a/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp index c61ae21e0e..b154ddc506 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp @@ -20,142 +20,27 @@ */ #include "Decoder.h" #include "qpid/log/Statement.h" +#include "qpid/messaging/exceptions.h" #include "qpid/messaging/MessageImpl.h" #include namespace qpid { namespace messaging { namespace amqp { -namespace { -void on_null(void *ctx) -{ - reinterpret_cast(ctx)->onNull(); -} -void on_bool(void *ctx, bool v) -{ - reinterpret_cast(ctx)->onBool(v); -} -void on_ubyte(void *ctx, uint8_t v) -{ - reinterpret_cast(ctx)->onUbyte(v); -} -void on_byte(void *ctx, int8_t v) -{ - reinterpret_cast(ctx)->onByte(v); -} -void on_ushort(void *ctx, uint16_t v) -{ - reinterpret_cast(ctx)->onUshort(v); -} -void on_short(void *ctx, int16_t v) -{ - reinterpret_cast(ctx)->onShort(v); -} -void on_uint(void *ctx, uint32_t v) -{ - reinterpret_cast(ctx)->onUint(v); -} -void on_int(void *ctx, int32_t v) -{ - reinterpret_cast(ctx)->onInt(v); -} -void on_float(void *ctx, float v) -{ - reinterpret_cast(ctx)->onFloat(v); -} -void on_ulong(void *ctx, uint64_t v) -{ - reinterpret_cast(ctx)->onUlong(v); -} -void on_long(void *ctx, int64_t v) -{ - reinterpret_cast(ctx)->onLong(v); -} -void on_double(void *ctx, double v) -{ - reinterpret_cast(ctx)->onDouble(v); -} -void on_binary(void *ctx, size_t size, char *bytes) -{ - reinterpret_cast(ctx)->onBinary(size, bytes); -} -void on_utf8(void *ctx, size_t size, char *utf8) -{ - reinterpret_cast(ctx)->onUtf8(size, utf8); -} -void on_symbol(void *ctx, size_t size, char *str) -{ - reinterpret_cast(ctx)->onSymbol(size, str); -} -void start_descriptor(void *ctx) -{ - reinterpret_cast(ctx)->startDescriptor(); -} -void stop_descriptor(void *ctx) -{ - reinterpret_cast(ctx)->stopDescriptor(); -} -void start_array(void *ctx, size_t count, uint8_t code) -{ - reinterpret_cast(ctx)->startArray(count, code); -} -void stop_array(void *ctx, size_t count, uint8_t code) -{ - reinterpret_cast(ctx)->stopArray(count, code); -} -void start_list(void *ctx, size_t count) -{ - reinterpret_cast(ctx)->startList(count); -} -void stop_list(void *ctx, size_t count) -{ - reinterpret_cast(ctx)->stopList(count); -} -void start_map(void *ctx, size_t count) -{ - reinterpret_cast(ctx)->startMap(count); -} -void stop_map(void *ctx, size_t count) -{ - reinterpret_cast(ctx)->stopMap(count); -} -} Decoder::~Decoder() {} ssize_t Decoder::decode(const char* data, size_t size) { - pn_data_callbacks_t callbacks; - callbacks.on_null = &on_null; - callbacks.on_bool = &on_bool; - callbacks.on_ubyte = &on_ubyte; - callbacks.on_byte = &on_byte; - callbacks.on_ushort = &on_ushort; - callbacks.on_short = &on_short; - callbacks.on_uint = &on_uint; - callbacks.on_int = &on_int; - callbacks.on_float = &on_float; - callbacks.on_ulong = &on_ulong; - callbacks.on_long = &on_long; - callbacks.on_double = &on_double; - callbacks.on_binary = &on_binary; - callbacks.on_utf8 = &on_utf8; - callbacks.on_symbol = &on_symbol; - callbacks.start_descriptor = &start_descriptor; - callbacks.stop_descriptor = &stop_descriptor; - callbacks.start_array = &start_array; - callbacks.stop_array = &stop_array; - callbacks.start_list = &start_list; - callbacks.stop_list = &stop_list; - callbacks.start_map = &start_map; - callbacks.stop_map = &stop_map; - size_t total = 0; - while (total < size) { - ssize_t result = pn_read_datum(data + total, size - total, &callbacks, this); - if (result < 0) return result; - else total += result; + pn_bytes_t input = {size, const_cast(data)}; + const size_t maxDatums(50);/*what is a reasonable number here*/ + pn_atom_t datums[maxDatums]; + pn_atoms_t output = {maxDatums, datums}; + while (input.size > 0) { + if (pn_decode_one(&input, &output)) throw qpid::messaging::MessagingException("Decode failure!"); + input.start = const_cast(data + (size - input.size)); } - return total; + return size - input.size; } void DecoderBase::onNull() { QPID_LOG(debug, this << " onNull()"); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index b9972dc112..b6d539ffca 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -31,7 +31,7 @@ namespace amqp { //TODO: proper conversion to wide string for address ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s) : name(n), - source(s.begin(), s.end()), + source(s), receiver(pn_receiver(session, source.c_str())), capacity(0) {} ReceiverContext::~ReceiverContext() @@ -82,7 +82,7 @@ const std::string& ReceiverContext::getName() const return name; } -const std::wstring& ReceiverContext::getSource() const +const std::string& ReceiverContext::getSource() const { return source; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index f83de8e151..0a6f363228 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -49,12 +49,12 @@ class ReceiverContext uint32_t getUnsettled(); void close(); const std::string& getName() const; - const std::wstring& getSource() const; + const std::string& getSource() const; bool isClosed() const; private: friend class ConnectionContext; const std::string name; - const std::wstring source; + const std::string source; pn_link_t* receiver; uint32_t capacity; }; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 521538952d..4fbd878699 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -33,7 +33,7 @@ namespace amqp { //TODO: proper conversion to wide string for address SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t) : name(n), - target(t.begin(), t.end()), + target(t), sender(pn_sender(session, target.c_str())), capacity(1000) {} SenderContext::~SenderContext() @@ -67,7 +67,7 @@ const std::string& SenderContext::getName() const return name; } -const std::wstring& SenderContext::getTarget() const +const std::string& SenderContext::getTarget() const { return target; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 7abdee311d..e91fd6f1ca 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -62,14 +62,14 @@ class SenderContext uint32_t getCapacity(); uint32_t getUnsettled(); const std::string& getName() const; - const std::wstring& getTarget() const; + const std::string& getTarget() const; Delivery* send(const qpid::messaging::Message& message); private: friend class ConnectionContext; typedef std::deque Deliveries; const std::string name; - const std::wstring target; + const std::string target; pn_link_t* sender; int32_t nextId; Deliveries deliveries; -- cgit v1.2.1