summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-06-01 17:41:19 +0000
committerGordon Sim <gsim@apache.org>2012-06-01 17:41:19 +0000
commit64949d8dc427da5b1eb760a0d3424341945f2aa1 (patch)
tree181cf89621eace10671ac830e606708f6d080dcb
parentdd007bcf3cba0074c42a6c13c151241fb6e2aad0 (diff)
downloadqpid-python-gs-amqp-1-0-sandbox.tar.gz
NO-JIRA: Updated to compile against latest proton APIgs-amqp-1-0-sandbox
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/gs-amqp-1-0-sandbox@1345286 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp133
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h4
6 files changed, 19 insertions, 133 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index a16333b0bb..ce5a3d996f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -141,12 +141,13 @@ void ConnectionContext::open()
void ConnectionContext::run()
{
while (active.get()) {
- pn_driver_wait(driver);
+ pn_driver_wait(driver, 0);
for (pn_connector_t* c = pn_driver_connector(driver); c; c = pn_driver_connector(driver)) {
ConnectionContext* context = reinterpret_cast<ConnectionContext*>(pn_connector_context(c));
qpid::sys::ScopedLock<qpid::sys::Monitor> l(context->lock);
pn_connector_process(c);
context->lock.notifyAll();
+ pn_connector_process(c);
}
}
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp
index c61ae21e0e..b154ddc506 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/Decoder.cpp
@@ -20,142 +20,27 @@
*/
#include "Decoder.h"
#include "qpid/log/Statement.h"
+#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/MessageImpl.h"
#include <string.h>
namespace qpid {
namespace messaging {
namespace amqp {
-namespace {
-void on_null(void *ctx)
-{
- reinterpret_cast<Decoder*>(ctx)->onNull();
-}
-void on_bool(void *ctx, bool v)
-{
- reinterpret_cast<Decoder*>(ctx)->onBool(v);
-}
-void on_ubyte(void *ctx, uint8_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onUbyte(v);
-}
-void on_byte(void *ctx, int8_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onByte(v);
-}
-void on_ushort(void *ctx, uint16_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onUshort(v);
-}
-void on_short(void *ctx, int16_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onShort(v);
-}
-void on_uint(void *ctx, uint32_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onUint(v);
-}
-void on_int(void *ctx, int32_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onInt(v);
-}
-void on_float(void *ctx, float v)
-{
- reinterpret_cast<Decoder*>(ctx)->onFloat(v);
-}
-void on_ulong(void *ctx, uint64_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onUlong(v);
-}
-void on_long(void *ctx, int64_t v)
-{
- reinterpret_cast<Decoder*>(ctx)->onLong(v);
-}
-void on_double(void *ctx, double v)
-{
- reinterpret_cast<Decoder*>(ctx)->onDouble(v);
-}
-void on_binary(void *ctx, size_t size, char *bytes)
-{
- reinterpret_cast<Decoder*>(ctx)->onBinary(size, bytes);
-}
-void on_utf8(void *ctx, size_t size, char *utf8)
-{
- reinterpret_cast<Decoder*>(ctx)->onUtf8(size, utf8);
-}
-void on_symbol(void *ctx, size_t size, char *str)
-{
- reinterpret_cast<Decoder*>(ctx)->onSymbol(size, str);
-}
-void start_descriptor(void *ctx)
-{
- reinterpret_cast<Decoder*>(ctx)->startDescriptor();
-}
-void stop_descriptor(void *ctx)
-{
- reinterpret_cast<Decoder*>(ctx)->stopDescriptor();
-}
-void start_array(void *ctx, size_t count, uint8_t code)
-{
- reinterpret_cast<Decoder*>(ctx)->startArray(count, code);
-}
-void stop_array(void *ctx, size_t count, uint8_t code)
-{
- reinterpret_cast<Decoder*>(ctx)->stopArray(count, code);
-}
-void start_list(void *ctx, size_t count)
-{
- reinterpret_cast<Decoder*>(ctx)->startList(count);
-}
-void stop_list(void *ctx, size_t count)
-{
- reinterpret_cast<Decoder*>(ctx)->stopList(count);
-}
-void start_map(void *ctx, size_t count)
-{
- reinterpret_cast<Decoder*>(ctx)->startMap(count);
-}
-void stop_map(void *ctx, size_t count)
-{
- reinterpret_cast<Decoder*>(ctx)->stopMap(count);
-}
-}
Decoder::~Decoder() {}
ssize_t Decoder::decode(const char* data, size_t size)
{
- pn_data_callbacks_t callbacks;
- callbacks.on_null = &on_null;
- callbacks.on_bool = &on_bool;
- callbacks.on_ubyte = &on_ubyte;
- callbacks.on_byte = &on_byte;
- callbacks.on_ushort = &on_ushort;
- callbacks.on_short = &on_short;
- callbacks.on_uint = &on_uint;
- callbacks.on_int = &on_int;
- callbacks.on_float = &on_float;
- callbacks.on_ulong = &on_ulong;
- callbacks.on_long = &on_long;
- callbacks.on_double = &on_double;
- callbacks.on_binary = &on_binary;
- callbacks.on_utf8 = &on_utf8;
- callbacks.on_symbol = &on_symbol;
- callbacks.start_descriptor = &start_descriptor;
- callbacks.stop_descriptor = &stop_descriptor;
- callbacks.start_array = &start_array;
- callbacks.stop_array = &stop_array;
- callbacks.start_list = &start_list;
- callbacks.stop_list = &stop_list;
- callbacks.start_map = &start_map;
- callbacks.stop_map = &stop_map;
- size_t total = 0;
- while (total < size) {
- ssize_t result = pn_read_datum(data + total, size - total, &callbacks, this);
- if (result < 0) return result;
- else total += result;
+ pn_bytes_t input = {size, const_cast<char*>(data)};
+ const size_t maxDatums(50);/*what is a reasonable number here*/
+ pn_atom_t datums[maxDatums];
+ pn_atoms_t output = {maxDatums, datums};
+ while (input.size > 0) {
+ if (pn_decode_one(&input, &output)) throw qpid::messaging::MessagingException("Decode failure!");
+ input.start = const_cast<char*>(data + (size - input.size));
}
- return total;
+ return size - input.size;
}
void DecoderBase::onNull() { QPID_LOG(debug, this << " onNull()"); }
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index b9972dc112..b6d539ffca 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -31,7 +31,7 @@ namespace amqp {
//TODO: proper conversion to wide string for address
ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s)
: name(n),
- source(s.begin(), s.end()),
+ source(s),
receiver(pn_receiver(session, source.c_str())),
capacity(0) {}
ReceiverContext::~ReceiverContext()
@@ -82,7 +82,7 @@ const std::string& ReceiverContext::getName() const
return name;
}
-const std::wstring& ReceiverContext::getSource() const
+const std::string& ReceiverContext::getSource() const
{
return source;
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index f83de8e151..0a6f363228 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -49,12 +49,12 @@ class ReceiverContext
uint32_t getUnsettled();
void close();
const std::string& getName() const;
- const std::wstring& getSource() const;
+ const std::string& getSource() const;
bool isClosed() const;
private:
friend class ConnectionContext;
const std::string name;
- const std::wstring source;
+ const std::string source;
pn_link_t* receiver;
uint32_t capacity;
};
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 521538952d..4fbd878699 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -33,7 +33,7 @@ namespace amqp {
//TODO: proper conversion to wide string for address
SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t)
: name(n),
- target(t.begin(), t.end()),
+ target(t),
sender(pn_sender(session, target.c_str())), capacity(1000) {}
SenderContext::~SenderContext()
@@ -67,7 +67,7 @@ const std::string& SenderContext::getName() const
return name;
}
-const std::wstring& SenderContext::getTarget() const
+const std::string& SenderContext::getTarget() const
{
return target;
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 7abdee311d..e91fd6f1ca 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -62,14 +62,14 @@ class SenderContext
uint32_t getCapacity();
uint32_t getUnsettled();
const std::string& getName() const;
- const std::wstring& getTarget() const;
+ const std::string& getTarget() const;
Delivery* send(const qpid::messaging::Message& message);
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
const std::string name;
- const std::wstring target;
+ const std::string target;
pn_link_t* sender;
int32_t nextId;
Deliveries deliveries;