summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp337
1 files changed, 337 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
new file mode 100644
index 0000000000..f2cb5740d3
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
@@ -0,0 +1,337 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include "MessageBodyStream.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+// Thefolowing def must match "Frames" private typedef.
+// TODO: make "Frames" publicly visible.
+typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
+
+using namespace std;
+
+static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count)
+{
+ if (buffer == nullptr)
+ throw gcnew ArgumentNullException("buffer");
+
+ if (offset < 0)
+ throw gcnew ArgumentOutOfRangeException("offset");
+
+ if (count < 0)
+ throw gcnew ArgumentOutOfRangeException("count");
+
+ if ((offset + count) > buffer->Length)
+ throw gcnew ArgumentException("offset + count");
+}
+
+
+// Input stream constructor
+
+MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp)
+{
+ isInputStream = true;
+ frameSetpp = fspp;
+ fragmentCount = 0;
+ length = 0;
+ position = 0;
+ currentFramep = NULL;
+
+ const std::string *datap; // pointer to the fragment's string variable that holds the content
+
+ for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
+ if (i->getBody()->type() == CONTENT_BODY) {
+ fragmentCount++;
+ datap = &(i->castBody<AMQContentBody>()->getData());
+ length += datap->size();
+ }
+ }
+
+ // fragmentCount can be zero for an empty message
+
+ fragmentIndex = 0;
+ fragmentPosition = 0;
+
+ if (fragmentCount == 0) {
+ currentFragment = NULL;
+ fragmentLength = 0;
+ }
+ else if (fragmentCount == 1) {
+ currentFragment = datap->data();
+ fragmentLength = (int) length;
+ }
+ else {
+ fragments = gcnew array<IntPtr>(fragmentCount);
+ fragmentIndex = 0;
+ for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
+ if (i->getBody()->type() == CONTENT_BODY) {
+ datap = &(i->castBody<AMQContentBody>()->getData());
+ fragments[fragmentIndex++] = (IntPtr) (void *) datap;
+ }
+ }
+ fragmentIndex = 0;
+ datap = (const std::string *) fragments[0].ToPointer();
+ currentFragment = datap->data();
+ fragmentLength = datap->size();
+ }
+}
+
+
+int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count)
+{
+ if (!isInputStream)
+ throw gcnew NotSupportedException();
+ if (disposed)
+ throw gcnew ObjectDisposedException("Stream");
+ if (count == 0)
+ return 0;
+ ThrowIfBadArgs(buffer, offset, count);
+
+ int nRead = 0;
+ int remaining = count;
+
+ while (nRead < count) {
+ int fragAvail = fragmentLength - fragmentPosition;
+ int copyCount = min (fragAvail, remaining);
+ if (copyCount == 0) {
+ // no more to read
+ return nRead;
+ }
+
+ // copy from native space
+ IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition);
+ Marshal::Copy (nativep, buffer, offset, copyCount);
+ nRead += copyCount;
+ remaining -= copyCount;
+ fragmentPosition += copyCount;
+ offset += copyCount;
+
+ // advance to next fragment?
+ if (fragmentPosition == fragmentLength) {
+ if (++fragmentIndex < fragmentCount) {
+ const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer();
+ currentFragment = datap->data();
+ fragmentLength = datap->size();
+ fragmentPosition = 0;
+ }
+ }
+ }
+
+ return nRead;
+}
+
+
+void MessageBodyStream::pushCurrentFrame(bool lastFrame)
+{
+ // set flags as in SessionImpl::sendContent.
+ if (currentFramep->getBody()->type() == CONTENT_BODY) {
+
+ if ((fragmentCount == 1) && lastFrame) {
+ // only one content frame
+ currentFramep->setFirstSegment(false);
+ }
+ else {
+ currentFramep->setFirstSegment(false);
+ currentFramep->setLastSegment(true);
+ if (fragmentCount != 1) {
+ currentFramep->setFirstFrame(false);
+ }
+ if (!lastFrame) {
+ currentFramep->setLastFrame(false);
+ }
+ }
+ }
+ else {
+ // the header frame
+ currentFramep->setFirstSegment(false);
+ if (!lastFrame) {
+ // there will be at least one content frame
+ currentFramep->setLastSegment(false);
+ }
+ }
+
+ // add to frame set. This makes a copy and ref counts the body
+ (*frameSetpp)->append(*currentFramep);
+
+ delete currentFramep;
+
+ currentFramep = NULL;
+}
+
+
+IntPtr MessageBodyStream::GetFrameSet()
+{
+ if (currentFramep != NULL) {
+ // No more content. Tidy up the pending (possibly single header) frame.
+ pushCurrentFrame(true);
+ }
+
+ if (frameSetpp == NULL) {
+ return (IntPtr) NULL;
+ }
+
+ // shared_ptr.get()
+ return (IntPtr) (void *) (*frameSetpp).get();
+}
+
+IntPtr MessageBodyStream::GetHeader()
+{
+ return (IntPtr) headerBodyp;
+}
+
+
+// Ouput stream constructor
+
+MessageBodyStream::MessageBodyStream(int maxFrameSize)
+{
+ isInputStream = false;
+
+ maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ SequenceNumber unused; // only meaningful on incoming frames
+ frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused));
+ fragmentCount = 0;
+ length = 0;
+ position = 0;
+
+ // header goes first in the outgoing frameset
+
+ boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody);
+ currentFramep = new AMQFrame(headerBody);
+ headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get());
+
+ // mark this header frame as "full" to force the first write to create a new content frame
+ fragmentPosition = maxFrameContentSize;
+}
+
+void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count)
+{
+ if (isInputStream)
+ throw gcnew NotSupportedException();
+ if (disposed)
+ throw gcnew ObjectDisposedException("Stream");
+ if (count == 0)
+ return;
+ ThrowIfBadArgs(buffer, offset, count);
+
+ if (currentFramep == NULL) {
+ // GetFrameSet() has been called and we no longer exclusively own the underlying frames.
+ throw gcnew InvalidOperationException ("Mesage Body output already completed");
+ }
+
+ if (count <= 0)
+ return;
+
+ // keep GC memory movement at bay while copying to native space
+ pin_ptr<unsigned char> pinnedBuf = &buffer[0];
+
+ string *datap;
+
+ int remaining = count;
+ while (remaining > 0) {
+ if (fragmentPosition == maxFrameContentSize) {
+ // move to a new frame, but not until ready to add new content.
+ // zero content is valid, or the final write may exactly fill to maxFrameContentSize
+
+ pushCurrentFrame(false);
+
+ currentFramep = new AMQFrame(AMQContentBody());
+ fragmentPosition = 0;
+ fragmentCount++;
+ }
+
+ int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition));
+ datap = &(currentFramep->castBody<AMQContentBody>()->getData());
+
+ char *outp = (char *) pinnedBuf + offset;
+ if (fragmentPosition == 0) {
+ datap->assign(outp, copyCount);
+ }
+ else {
+ datap->append(outp, copyCount);
+ }
+
+ position += copyCount;
+ fragmentPosition += copyCount;
+ remaining -= copyCount;
+ offset += copyCount;
+ }
+}
+
+
+void MessageBodyStream::Cleanup()
+{
+ {
+ lock l(this);
+ if (disposed)
+ return;
+
+ disposed = true;
+ }
+
+ try {}
+ finally
+ {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ frameSetpp = NULL;
+ }
+ if (currentFramep != NULL) {
+ delete currentFramep;
+ currentFramep = NULL;
+ }
+ }
+}
+
+MessageBodyStream::~MessageBodyStream()
+{
+ Cleanup();
+}
+
+MessageBodyStream::!MessageBodyStream()
+{
+ Cleanup();
+}
+
+void MessageBodyStream::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+
+}}} // namespace Apache::Qpid::Interop