summaryrefslogtreecommitdiff
path: root/gprofng/src/ipcio.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gprofng/src/ipcio.cc')
-rw-r--r--gprofng/src/ipcio.cc1025
1 files changed, 1025 insertions, 0 deletions
diff --git a/gprofng/src/ipcio.cc b/gprofng/src/ipcio.cc
new file mode 100644
index 00000000000..57f2617c293
--- /dev/null
+++ b/gprofng/src/ipcio.cc
@@ -0,0 +1,1025 @@
+/* Copyright (C) 2021 Free Software Foundation, Inc.
+ Contributed by Oracle.
+
+ This file is part of GNU Binutils.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, 51 Franklin Street - Fifth Floor, Boston,
+ MA 02110-1301, USA. */
+
+#include "config.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <signal.h>
+#include <unistd.h>
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+#include <queue>
+#include "vec.h"
+#include "util.h"
+#include "ipcio.h"
+#include "DbeThread.h"
+#include "Experiment.h"
+
+#define ipc_trace if (ipc_flags) ipc_default_log
+#define ipc_request_trace if (ipc_flags) ipc_request_log
+#define ipc_response_trace if (ipc_flags) ipc_response_log
+
+using namespace std;
+
+// IPC implementation
+static const int L_PROGRESS = 0;
+static const int L_INTEGER = 1;
+static const int L_BOOLEAN = 2;
+static const int L_LONG = 3;
+static const int L_STRING = 4;
+static const int L_DOUBLE = 5;
+static const int L_ARRAY = 6;
+static const int L_OBJECT = 7;
+static const int L_CHAR = 8;
+
+int currentRequestID;
+int currentChannelID;
+static long maxSize;
+
+extern int cancellableChannelID;
+extern int error_flag;
+extern int ipc_delay_microsec;
+extern FILE *responseLogFileP;
+
+IPCresponse *IPCresponseGlobal;
+
+BufferPool *responseBufferPool;
+
+IPCrequest::IPCrequest (int sz, int reqID, int chID)
+{
+ size = sz;
+ requestID = reqID;
+ channelID = chID;
+ status = INITIALIZED;
+ idx = 0;
+ buf = (char *) malloc (size);
+ cancelImmediate = false;
+}
+
+IPCrequest::~IPCrequest ()
+{
+ free (buf);
+}
+
+void
+IPCrequest::read (void)
+{
+ for (int i = 0; i < size; i++)
+ {
+ int c = getc (stdin);
+ ipc_request_trace (TRACE_LVL_4, " IPCrequest:getc(stdin): %02x\n", c);
+ buf[i] = c;
+ }
+}
+
+IPCrequestStatus
+IPCrequest::getStatus (void)
+{
+ return status;
+}
+
+void
+IPCrequest::setStatus (IPCrequestStatus newStatus)
+{
+ status = newStatus;
+}
+
+static int
+readByte (IPCrequest* req)
+{
+ int c;
+ int val = 0;
+ for (int i = 0; i < 2; i++)
+ {
+ if (req == NULL)
+ {
+ c = getc (stdin);
+ ipc_request_trace (TRACE_LVL_4, " readByte:getc(stdin): %02x\n", c);
+ }
+ else
+ c = req->rgetc ();
+ switch (c)
+ {
+ case '0': case '1': case '2': case '3':
+ case '4': case '5': case '6': case '7':
+ case '8': case '9':
+ val = val * 16 + c - '0';
+ break;
+ case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
+ val = val * 16 + c - 'a' + 10;
+ break;
+ case EOF:
+ val = EOF;
+ break;
+ default:
+ fprintf (stderr, "readByte: Unknown byte: %d\n", c);
+ break;
+ }
+ }
+ return val;
+}
+
+static int
+readIVal (IPCrequest *req)
+{
+ int val = readByte (req);
+ for (int i = 0; i < 3; i++)
+ val = val * 256 + readByte (req);
+ ipc_trace (" readIVal: %d\n", val);
+ return val;
+}
+
+static String
+readSVal (IPCrequest *req)
+{
+ int len = readIVal (req);
+ if (len == -1)
+ {
+ ipc_trace (" readSVal: <NULL>\n");
+ return NULL;
+ }
+ char *str = (char *) malloc (len + 1);
+ char *s = str;
+ *s = (char) 0;
+ while (len--)
+ *s++ = req->rgetc ();
+ *s = (char) 0;
+ ipc_trace (" readSVal: '%s'\n", str);
+ return str;
+}
+
+static long long
+readLVal (IPCrequest *req)
+{
+ long long val = readByte (req);
+ for (int i = 0; i < 7; i++)
+ val = val * 256 + readByte (req);
+ ipc_trace (" readLVal: %lld\n", val);
+ return val;
+}
+
+static bool
+readBVal (IPCrequest *req)
+{
+ int val = readByte (req);
+ ipc_trace (" readBVal: %s\n", val == 0 ? "true" : "false");
+ return val != 0;
+}
+
+static char
+readCVal (IPCrequest *req)
+{
+ int val = readByte (req);
+ ipc_trace (" readCVal: %d\n", val);
+ return (char) val;
+}
+
+static double
+readDVal (IPCrequest *req)
+{
+ String s = readSVal (req);
+ double d = atof (s);
+ free (s);
+ return d;
+}
+
+static Object
+readAVal (IPCrequest *req)
+{
+ bool twoD = false;
+ int type = readByte (req);
+ if (type == L_ARRAY)
+ {
+ twoD = true;
+ type = readByte (req);
+ }
+ ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type);
+
+ int len = readIVal (req);
+ if (len == -1)
+ return NULL;
+ switch (type)
+ {
+ case L_INTEGER:
+ if (twoD)
+ {
+ Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, (Vector<int>*)readAVal (req));
+ return array;
+ }
+ else
+ {
+ Vector<int> *array = new Vector<int>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, readIVal (req));
+ return array;
+ }
+ //break;
+ case L_LONG:
+ if (twoD)
+ {
+ Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, (Vector<long long>*)readAVal (req));
+ return array;
+ }
+ else
+ {
+ Vector<long long> *array = new Vector<long long>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, readLVal (req));
+ return array;
+ }
+ //break;
+ case L_DOUBLE:
+ if (twoD)
+ {
+ Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, (Vector<double>*)readAVal (req));
+ return array;
+ }
+ else
+ {
+ Vector<double> *array = new Vector<double>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, readDVal (req));
+ return array;
+ }
+ //break;
+ case L_BOOLEAN:
+ if (twoD)
+ {
+ Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, (Vector<bool>*)readAVal (req));
+ return array;
+ }
+ else
+ {
+ Vector<bool> *array = new Vector<bool>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, readBVal (req));
+ return array;
+ }
+ //break;
+ case L_CHAR:
+ if (twoD)
+ {
+ Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, (Vector<char>*)readAVal (req));
+ return array;
+ }
+ else
+ {
+ Vector<char> *array = new Vector<char>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, readCVal (req));
+ return array;
+ }
+ //break;
+ case L_STRING:
+ if (twoD)
+ {
+ Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, (Vector<String>*)readAVal (req));
+ return array;
+ }
+ else
+ {
+ Vector<String> *array = new Vector<String>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, readSVal (req));
+ return array;
+ }
+ //break;
+ case L_OBJECT:
+ if (twoD)
+ {
+ Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, (Vector<Object>*)readAVal (req));
+ return array;
+ }
+ else
+ {
+ Vector<Object> *array = new Vector<Object>(len);
+ for (int i = 0; i < len; i++)
+ array->store (i, readAVal (req));
+ return array;
+ }
+ //break;
+ default:
+ fprintf (stderr, "readAVal: Unknown code: %d\n", type);
+ break;
+ }
+ return NULL;
+}
+
+static int iVal;
+static bool bVal;
+static long long lVal;
+static String sVal;
+static double dVal;
+static Object aVal;
+
+static void
+readResult (int type, IPCrequest *req)
+{
+ int tVal = readByte (req);
+ switch (tVal)
+ {
+ case L_INTEGER:
+ iVal = readIVal (req);
+ break;
+ case L_LONG:
+ lVal = readLVal (req);
+ break;
+ case L_BOOLEAN:
+ bVal = readBVal (req);
+ break;
+ case L_DOUBLE:
+ dVal = readDVal (req);
+ break;
+ case L_STRING:
+ sVal = readSVal (req);
+ break;
+ case L_ARRAY:
+ aVal = readAVal (req);
+ break;
+ case EOF:
+ fprintf (stderr, "EOF read in readResult\n");
+ sVal = NULL;
+ return;
+ default:
+ fprintf (stderr, "Unknown code: %d\n", tVal);
+ abort ();
+ }
+ if (type != tVal)
+ {
+ fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type);
+ abort ();
+ }
+}
+
+int
+readInt (IPCrequest *req)
+{
+ readResult (L_INTEGER, req);
+ return iVal;
+}
+
+String
+readString (IPCrequest *req)
+{
+ readResult (L_STRING, req);
+ return sVal;
+}
+
+long long
+readLong (IPCrequest *req)
+{
+ readResult (L_LONG, req);
+ return lVal;
+}
+
+double
+readDouble (IPCrequest *req)
+{
+ readResult (L_DOUBLE, req);
+ return dVal;
+}
+
+bool
+readBoolean (IPCrequest *req)
+{
+ readResult (L_BOOLEAN, req);
+ return bVal;
+}
+
+DbeObj
+readObject (IPCrequest *req)
+{
+ readResult (L_LONG, req);
+ return (DbeObj) lVal;
+}
+
+Object
+readArray (IPCrequest *req)
+{
+ readResult (L_ARRAY, req);
+ return aVal;
+}
+
+// Write
+IPCresponse::IPCresponse (int sz)
+{
+ requestID = -1;
+ channelID = -1;
+ responseType = -1;
+ responseStatus = RESPONSE_STATUS_SUCCESS;
+ sb = new StringBuilder (sz);
+ next = NULL;
+}
+
+IPCresponse::~IPCresponse ()
+{
+ delete sb;
+}
+
+void
+IPCresponse::reset ()
+{
+ requestID = -1;
+ channelID = -1;
+ responseType = -1;
+ responseStatus = RESPONSE_STATUS_SUCCESS;
+ sb->setLength (0);
+}
+
+void
+IPCresponse::sendByte (int b)
+{
+ ipc_trace ("sendByte: %02x %d\n", b, b);
+ sb->appendf ("%02x", b);
+}
+
+void
+IPCresponse::sendIVal (int i)
+{
+ ipc_trace ("sendIVal: %08x %d\n", i, i);
+ sb->appendf ("%08x", i);
+}
+
+void
+IPCresponse::sendLVal (long long l)
+{
+ ipc_trace ("sendLVal: %016llx %lld\n", l, l);
+ sb->appendf ("%016llx", l);
+}
+
+void
+IPCresponse::sendSVal (const char *s)
+{
+ if (s == NULL)
+ {
+ sendIVal (-1);
+ return;
+ }
+ sendIVal ((int) strlen (s));
+ ipc_trace ("sendSVal: %s\n", s);
+ sb->appendf ("%s", s);
+}
+
+void
+IPCresponse::sendBVal (bool b)
+{
+ sendByte (b ? 1 : 0);
+}
+
+void
+IPCresponse::sendCVal (char c)
+{
+ sendByte (c);
+}
+
+void
+IPCresponse::sendDVal (double d)
+{
+ char str[32];
+ snprintf (str, sizeof (str), "%.12f", d);
+ sendSVal (str);
+}
+
+void
+IPCresponse::sendAVal (void *ptr)
+{
+ if (ptr == NULL)
+ {
+ sendByte (L_INTEGER);
+ sendIVal (-1);
+ return;
+ }
+
+ VecType type = ((Vector<void*>*)ptr)->type ();
+ switch (type)
+ {
+ case VEC_INTEGER:
+ {
+ sendByte (L_INTEGER);
+ Vector<int> *array = (Vector<int>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendIVal (array->fetch (i));
+ break;
+ }
+ case VEC_BOOL:
+ {
+ sendByte (L_BOOLEAN);
+ Vector<bool> *array = (Vector<bool>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendBVal (array->fetch (i));
+ break;
+ }
+ case VEC_CHAR:
+ {
+ sendByte (L_CHAR);
+ Vector<char> *array = (Vector<char>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendCVal (array->fetch (i));
+ break;
+ }
+ case VEC_LLONG:
+ {
+ sendByte (L_LONG);
+ Vector<long long> *array = (Vector<long long>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendLVal (array->fetch (i));
+ break;
+ }
+ case VEC_DOUBLE:
+ {
+ sendByte (L_DOUBLE);
+ Vector<double> *array = (Vector<double>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendDVal (array->fetch (i));
+ break;
+ }
+ case VEC_STRING:
+ {
+ sendByte (L_STRING);
+ Vector<String> *array = (Vector<String>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendSVal (array->fetch (i));
+ break;
+ }
+ case VEC_STRINGARR:
+ {
+ sendByte (L_ARRAY);
+ sendByte (L_STRING);
+ Vector<void*> *array = (Vector<void*>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendAVal (array->fetch (i));
+ break;
+ }
+ case VEC_INTARR:
+ {
+ sendByte (L_ARRAY);
+ sendByte (L_INTEGER);
+ Vector<void*> *array = (Vector<void*>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendAVal (array->fetch (i));
+ break;
+ }
+ case VEC_LLONGARR:
+ {
+ sendByte (L_ARRAY);
+ sendByte (L_LONG);
+ Vector<void*> *array = (Vector<void*>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendAVal (array->fetch (i));
+ break;
+ }
+ case VEC_VOIDARR:
+ {
+ sendByte (L_OBJECT);
+ Vector<void*> *array = (Vector<void*>*)ptr;
+ sendIVal (array->size ());
+ for (int i = 0; i < array->size (); i++)
+ sendAVal (array->fetch (i));
+ break;
+ }
+ default:
+ fprintf (stderr, "sendAVal: Unknown type: %d\n", type);
+ abort ();
+ }
+}
+
+static void
+writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes)
+{
+ if (responseType == RESPONSE_TYPE_HANDSHAKE)
+ nBytes = IPC_VERSION_NUMBER;
+ int use_write = 2;
+ ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes);
+ if (use_write)
+ {
+ char buf[23];
+ if (use_write == 1)
+ {
+ int i = 0;
+ snprintf (buf + i, 3, "%2x", HEADER_MARKER);
+ i += 2;
+ snprintf (buf + i, 9, "%8x", requestID);
+ i += 8;
+ snprintf (buf + i, 3, "%2x", responseType);
+ i += 2;
+ snprintf (buf + i, 3, "%2x", responseStatus);
+ i += 2;
+ snprintf (buf + i, 9, "%8x", nBytes);
+ }
+ else
+ snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID,
+ responseType, responseStatus, nBytes);
+ buf[22] = 0;
+ write (1, buf, 22);
+ }
+ else
+ {
+ cout << setfill ('0') << setw (2) << hex << HEADER_MARKER;
+ cout << setfill ('0') << setw (8) << hex << requestID;
+ cout << setfill ('0') << setw (2) << hex << responseType;
+ cout << setfill ('0') << setw (2) << hex << responseStatus;
+ cout << setfill ('0') << setw (8) << hex << nBytes;
+ cout.flush ();
+ }
+ ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n");
+ if (nBytes > maxSize)
+ {
+ maxSize = nBytes;
+ ipc_trace ("New maxsize %ld\n", maxSize);
+ }
+}
+
+bool
+cancelNeeded (int chID)
+{
+ if (chID == cancellableChannelID && chID == cancelRequestedChannelID)
+ return true;
+ else
+ return false;
+}
+
+static void
+writeResponseWithHeader (int requestID, int channelID, int responseType,
+ int responseStatus, IPCresponse* os)
+{
+ if (cancelNeeded (channelID))
+ {
+ responseStatus = RESPONSE_STATUS_CANCELLED;
+ ipc_trace ("CANCELLING %d %d\n", requestID, channelID);
+ // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
+ }
+ os->setRequestID (requestID);
+ os->setChannelID (channelID);
+ os->setResponseType (responseType);
+ os->setResponseStatus (responseStatus);
+ os->print ();
+ os->reset ();
+ responseBufferPool->recycle (os);
+}
+
+void
+writeAckFast (int requestID)
+{
+ writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0);
+}
+
+void
+writeAck (int requestID, int channelID)
+{
+#if DEBUG
+ char *s = getenv (NTXT ("SP_NO_IPC_ACK"));
+#else /* ^DEBUG */
+ char *s = NULL;
+#endif /* ^DEBUG */
+ if (s)
+ {
+ int i = requestID;
+ int j = channelID;
+ ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j);
+ }
+ else
+ {
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
+ writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK,
+ RESPONSE_STATUS_SUCCESS, OUTS);
+ }
+}
+
+void
+writeHandshake (int requestID, int channelID)
+{
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
+ writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
+ // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER);
+}
+
+void
+writeResponseGeneric (int responseStatus, int requestID, int channelID)
+{
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
+ writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS);
+}
+
+BufferPool::BufferPool ()
+{
+ pthread_mutex_init (&p_mutex, NULL);
+ smallBuf = NULL;
+ largeBuf = NULL;
+}
+
+BufferPool::~BufferPool ()
+{
+ for (IPCresponse *p = smallBuf; p;)
+ {
+ IPCresponse *tmp = p;
+ p = tmp->next;
+ delete tmp;
+ }
+ for (IPCresponse *p = largeBuf; p;)
+ {
+ IPCresponse *tmp = p;
+ p = tmp->next;
+ delete tmp;
+ }
+}
+
+IPCresponse*
+BufferPool::getNewResponse (int size)
+{
+ pthread_mutex_lock (&p_mutex);
+ if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE)
+ size = BUFFER_SIZE_LARGE;
+ IPCresponse *newResponse = NULL;
+ if (size >= BUFFER_SIZE_LARGE)
+ {
+ if (largeBuf)
+ {
+ newResponse = largeBuf;
+ largeBuf = largeBuf->next;
+ }
+ }
+ else if (smallBuf)
+ {
+ newResponse = smallBuf;
+ smallBuf = smallBuf->next;
+ }
+ if (newResponse)
+ newResponse->reset ();
+ else
+ {
+ newResponse = new IPCresponse (size);
+ ipc_trace ("GETNEWBUFFER %d\n", size);
+ }
+ pthread_mutex_unlock (&p_mutex);
+ return newResponse;
+}
+
+void
+BufferPool::recycle (IPCresponse *respB)
+{
+ pthread_mutex_lock (&p_mutex);
+ if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE)
+ {
+ respB->next = largeBuf;
+ largeBuf = respB;
+ }
+ else
+ {
+ respB->next = smallBuf;
+ smallBuf = respB;
+ }
+ pthread_mutex_unlock (&p_mutex);
+}
+
+void
+writeArray (void *ptr, IPCrequest* req)
+{
+ if (req->getStatus () == CANCELLED_IMMEDIATE)
+ return;
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
+ OUTS->sendByte (L_ARRAY);
+ OUTS->sendAVal (ptr);
+ writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
+ RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
+}
+
+void
+writeString (const char *s, IPCrequest* req)
+{
+ if (req->getStatus () == CANCELLED_IMMEDIATE)
+ return;
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
+ OUTS->sendByte (L_STRING);
+ OUTS->sendSVal (s);
+ writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
+ RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
+}
+
+void
+writeObject (DbeObj obj, IPCrequest* req)
+{
+ writeLong ((long long) obj, req);
+}
+
+void
+writeBoolean (bool b, IPCrequest* req)
+{
+ if (req->getStatus () == CANCELLED_IMMEDIATE)
+ return;
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
+ OUTS->sendByte (L_BOOLEAN);
+ OUTS->sendBVal (b);
+ writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
+ RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
+}
+
+void
+writeInt (int i, IPCrequest* req)
+{
+ if (req->getStatus () == CANCELLED_IMMEDIATE)
+ return;
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
+ OUTS->sendByte (L_INTEGER);
+ OUTS->sendIVal (i);
+ writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
+}
+
+void
+writeChar (char c, IPCrequest* req)
+{
+ if (req->getStatus () == CANCELLED_IMMEDIATE)
+ return;
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
+ OUTS->sendByte (L_CHAR);
+ OUTS->sendCVal (c);
+ writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
+}
+
+void
+writeLong (long long l, IPCrequest* req)
+{
+ if (req->getStatus () == CANCELLED_IMMEDIATE)
+ return;
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
+ OUTS->sendByte (L_LONG);
+ OUTS->sendLVal (l);
+ writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
+}
+
+void
+writeDouble (double d, IPCrequest* req)
+{
+ if (req->getStatus () == CANCELLED_IMMEDIATE) return;
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
+ OUTS->sendByte (L_DOUBLE);
+ OUTS->sendDVal (d);
+ writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
+}
+
+int
+setProgress (int percentage, const char *proc_str)
+{
+ if (cancelNeeded (currentChannelID))
+ {
+ // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
+ // throw (e1);
+ return 1;
+ }
+ if (NULL == proc_str)
+ return 1;
+ int size = strlen (proc_str) + 100; // 100 bytes for additional data
+ int bs = BUFFER_SIZE_MEDIUM;
+ if (size > BUFFER_SIZE_MEDIUM)
+ {
+ if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen
+ bs = BUFFER_SIZE_LARGE;
+ }
+ IPCresponse *OUTS = responseBufferPool->getNewResponse (bs);
+ OUTS->sendByte (L_PROGRESS);
+ OUTS->sendIVal (percentage);
+ OUTS->sendSVal (proc_str);
+ writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS);
+ return 0;
+}
+
+void
+IPCresponse::print (void)
+{
+ if (ipc_delay_microsec)
+ usleep (ipc_delay_microsec);
+ int stringSize = sb->length ();
+ writeResponseHeader (requestID, responseType, responseStatus, stringSize);
+ if (stringSize > 0)
+ {
+ char *s = sb->toString ();
+ hrtime_t start_time = gethrtime ();
+ int use_write = 1;
+ if (use_write)
+ write (1, s, stringSize); // write(1, sb->toString(), stringSize);
+ else
+ {
+ cout << s;
+ cout.flush ();
+ }
+ hrtime_t end_time = gethrtime ();
+ unsigned long long time_stamp = end_time - start_time;
+ ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu nanosec \n", requestID, time_stamp);
+ free (s);
+ }
+}
+
+void
+setCancelRequestedCh (int chID)
+{
+ cancelRequestedChannelID = chID;
+}
+
+void
+readRequestHeader ()
+{
+ int marker = readByte (NULL);
+ if (marker != HEADER_MARKER)
+ {
+ fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker);
+ error_flag = 1;
+ return;
+ }
+ else
+ ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n");
+ int requestID = readIVal (NULL);
+ int requestType = readByte (NULL);
+ int channelID = readIVal (NULL);
+ int nBytes = readIVal (NULL);
+ if (requestType == REQUEST_TYPE_HANDSHAKE)
+ {
+ // write the ack directly to the wire, not through the response queue
+ // writeAckFast(requestID);
+ writeAck (requestID, channelID);
+ maxSize = 0;
+ writeHandshake (requestID, channelID);
+ ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
+ }
+ else if (requestType == REQUEST_TYPE_CANCEL)
+ {
+ writeAck (requestID, channelID);
+ ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
+ if (channelID == cancellableChannelID)
+ {
+ // we have worked on at least one request belonging to this channel
+ writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID);
+ setCancelRequestedCh (channelID);
+ ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID);
+ if (channelID == currentChannelID)
+ // request for this channel is currently in progress
+ ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION");
+ // ssp_post_cond(waitingToFinish);
+ }
+ else
+ {
+ // FIXME:
+ // it is possible that a request for this channel is on the requestQ
+ // or has been submitted to the work group queue but is waiting for a thread to pick it up
+ writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID);
+ setCancelRequestedCh (channelID);
+ ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID);
+ }
+ }
+ else
+ {
+ writeAck (requestID, channelID);
+ ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
+ IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID);
+ nreq->read ();
+ ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID);
+ if (cancelNeeded (channelID))
+ {
+ ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID);
+ writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID);
+ delete nreq;
+ return;
+ }
+ DbeQueue *q = new DbeQueue (ipc_doWork, nreq);
+ ipcThreadPool->put_queue (q);
+ }
+}