summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Pfaff <blp@nicira.com>2010-01-06 14:26:48 -0800
committerBen Pfaff <blp@nicira.com>2010-01-06 14:26:48 -0800
commit539e96f62300e4afab00e5906a28e3b89301d62e (patch)
treed91d559866c7a32d48613d2cae3a8b52c56bdb36
parente0668bd1d448d6f17c20b9c7ba91344180809061 (diff)
downloadopenvswitch-539e96f62300e4afab00e5906a28e3b89301d62e.tar.gz
stream: Add stream_run(), stream_run_wait() functions.
SSL, which will be added in an upcoming commit, requires some background processing, which is best done in a "run" function in our architecture. This commit adds stream_run() and stream_run_wait() and calls to them from the places where they will be required.
-rw-r--r--lib/jsonrpc.c16
-rw-r--r--lib/stream-fd.c2
-rw-r--r--lib/stream-provider.h12
-rw-r--r--lib/stream-tcp.c2
-rw-r--r--lib/stream-unix.c2
-rw-r--r--lib/stream.c29
-rw-r--r--lib/stream.h3
7 files changed, 60 insertions, 6 deletions
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 0f535155e..ec9108f85 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2009 Nicira Networks.
+ * Copyright (c) 2009, 2010 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -88,6 +88,7 @@ jsonrpc_run(struct jsonrpc *rpc)
return;
}
+ stream_run(rpc->stream);
while (!queue_is_empty(&rpc->output)) {
struct ofpbuf *buf = rpc->output.head;
int retval;
@@ -113,8 +114,11 @@ jsonrpc_run(struct jsonrpc *rpc)
void
jsonrpc_wait(struct jsonrpc *rpc)
{
- if (!rpc->status && !queue_is_empty(&rpc->output)) {
- stream_send_wait(rpc->stream);
+ if (!rpc->status) {
+ stream_run_wait(rpc->stream);
+ if (!queue_is_empty(&rpc->output)) {
+ stream_send_wait(rpc->stream);
+ }
}
}
@@ -721,7 +725,10 @@ jsonrpc_session_run(struct jsonrpc_session *s)
jsonrpc_session_disconnect(s);
}
} else if (s->stream) {
- int error = stream_connect(s->stream);
+ int error;
+
+ stream_run(s->stream);
+ error = stream_connect(s->stream);
if (!error) {
reconnect_connected(s->reconnect, time_msec());
s->rpc = jsonrpc_open(s->stream);
@@ -763,6 +770,7 @@ jsonrpc_session_wait(struct jsonrpc_session *s)
if (s->rpc) {
jsonrpc_wait(s->rpc);
} else if (s->stream) {
+ stream_run_wait(s->stream);
stream_connect_wait(s->stream);
}
reconnect_wait(s->reconnect, time_msec());
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 46aa8e738..94c843409 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -139,6 +139,8 @@ static struct stream_class stream_fd_class = {
fd_connect, /* connect */
fd_recv, /* recv */
fd_send, /* send */
+ NULL, /* run */
+ NULL, /* run_wait */
fd_wait, /* wait */
};
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 6beaab75e..872da3c7d 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -109,6 +109,18 @@ struct stream_class {
* accepted for transmission, it should return -EAGAIN immediately. */
ssize_t (*send)(struct stream *stream, const void *buffer, size_t n);
+ /* Allows 'stream' to perform maintenance activities, such as flushing
+ * output buffers.
+ *
+ * May be null if 'stream' doesn't have anything to do here. */
+ void (*run)(struct stream *stream);
+
+ /* Arranges for the poll loop to wake up when 'stream' needs to perform
+ * maintenance activities.
+ *
+ * May be null if 'stream' doesn't have anything to do here. */
+ void (*run_wait)(struct stream *stream);
+
/* Arranges for the poll loop to wake up when 'stream' is ready to take an
* action of the given 'type'. */
void (*wait)(struct stream *stream, enum stream_wait_type type);
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
index 947be9f19..bfcf35c74 100644
--- a/lib/stream-tcp.c
+++ b/lib/stream-tcp.c
@@ -90,6 +90,8 @@ struct stream_class tcp_stream_class = {
NULL, /* connect */
NULL, /* recv */
NULL, /* send */
+ NULL, /* run */
+ NULL, /* run_wait */
NULL, /* wait */
};
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index 9046da154..6ce7790b5 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -69,6 +69,8 @@ struct stream_class unix_stream_class = {
NULL, /* connect */
NULL, /* recv */
NULL, /* send */
+ NULL, /* run */
+ NULL, /* run_wait */
NULL, /* wait */
};
diff --git a/lib/stream.c b/lib/stream.c
index 337fb5c75..d4f5de29e 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2008, 2009 Nicira Networks.
+ * Copyright (c) 2008, 2009, 2010 Nicira Networks.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -66,7 +66,8 @@ check_stream_classes(void)
struct stream_class *class = stream_classes[i];
assert(class->name != NULL);
assert(class->open != NULL);
- if (class->close || class->recv || class->send || class->wait) {
+ if (class->close || class->recv || class->send || class->run
+ || class->run_wait || class->wait) {
assert(class->close != NULL);
assert(class->recv != NULL);
assert(class->send != NULL);
@@ -166,6 +167,8 @@ stream_open_block(const char *name, struct stream **streamp)
error = stream_open(name, &stream);
while (error == EAGAIN) {
+ stream_run(stream);
+ stream_run_wait(stream);
stream_connect_wait(stream);
poll_block();
error = stream_connect(stream);
@@ -312,6 +315,28 @@ stream_send(struct stream *stream, const void *buffer, size_t n)
: (stream->class->send)(stream, buffer, n));
}
+/* Allows 'stream' to perform maintenance activities, such as flushing
+ * output buffers. */
+void
+stream_run(struct stream *stream)
+{
+ if (stream->class->run) {
+ (stream->class->run)(stream);
+ }
+}
+
+/* Arranges for the poll loop to wake up when 'stream' needs to perform
+ * maintenance activities. */
+void
+stream_run_wait(struct stream *stream)
+{
+ if (stream->class->run_wait) {
+ (stream->class->run_wait)(stream);
+ }
+}
+
+/* Arranges for the poll loop to wake up when 'stream' is ready to take an
+ * action of the given 'type'. */
void
stream_wait(struct stream *stream, enum stream_wait_type wait)
{
diff --git a/lib/stream.h b/lib/stream.h
index 7a62a5a37..ae30b1031 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -42,6 +42,9 @@ int stream_connect(struct stream *);
int stream_recv(struct stream *, void *buffer, size_t n);
int stream_send(struct stream *, const void *buffer, size_t n);
+void stream_run(struct stream *);
+void stream_run_wait(struct stream *);
+
enum stream_wait_type {
STREAM_CONNECT,
STREAM_RECV,