diff options
authorRuss Cox <>2009-02-03 14:16:22 -0800
committerRuss Cox <>2009-02-03 14:16:22 -0800
commit70a901b40029a7ae462b8accfaeeb74b345f3a24 (patch)
parent31056e6905bcc3f7e5a4f2bb7321babc82e5d8b1 (diff)
* avoid large copies * NewBufRead, NewBufWrite never fail * add BufReadWrite io: * add io.Close http, google/net/rpc: * add, use http.Conn.Hijack R=r DELTA=416 (202 added, 123 deleted, 91 changed) OCL=24153 CL=24238
8 files changed, 251 insertions, 172 deletions
diff --git a/src/lib/bufio.go b/src/lib/bufio.go
index 6fed9d06e..9f3688588 100644
--- a/src/lib/bufio.go
+++ b/src/lib/bufio.go
@@ -16,6 +16,8 @@ import (
// - BufRead: ReadRune, UnreadRune ?
// could make ReadRune generic if we dropped UnreadRune
// - buffered output
+// - would like to rename to Read, Write, but breaks
+// embedding of these: would lose the Read, Write methods.
const (
defaultBufSize = 4096
@@ -44,6 +46,7 @@ type BufRead struct {
rd io.Read;
r, w int;
err *os.Error;
+ lastbyte int;
func NewBufReadSize(rd io.Read, size int) (b *BufRead, err *os.Error) {
@@ -53,11 +56,17 @@ func NewBufReadSize(rd io.Read, size int) (b *BufRead, err *os.Error) {
b = new(BufRead);
b.buf = make([]byte, size);
b.rd = rd;
+ b.lastbyte = -1;
return b, nil
-func NewBufRead(rd io.Read) (b *BufRead, err *os.Error) {
- return NewBufReadSize(rd, defaultBufSize);
+func NewBufRead(rd io.Read) *BufRead {
+ b, err := NewBufReadSize(rd, defaultBufSize);
+ if err != nil {
+ // cannot happen - defaultBufSize is a valid size
+ panic("bufio: NewBufRead: ", err.String());
+ }
+ return b;
// Read a new chunk into the buffer.
@@ -94,6 +103,23 @@ func (b *BufRead) Read(p []byte) (nn int, err *os.Error) {
for len(p) > 0 {
n := len(p);
if b.w == b.r {
+ if len(p) >= len(b.buf) {
+ // Large read, empty buffer.
+ // Read directly into p to avoid copy.
+ n, b.err = b.rd.Read(p);
+ if n > 0 {
+ b.lastbyte = int(p[n-1]);
+ }
+ p = p[n:len(p)];
+ nn += n;
+ if b.err != nil {
+ return nn, b.err
+ }
+ if n == 0 {
+ return nn, EndOfFile
+ }
+ continue;
+ }
if b.err != nil {
return nn, b.err
@@ -108,6 +134,7 @@ func (b *BufRead) Read(p []byte) (nn int, err *os.Error) {
copySlice(p[0:n], b.buf[b.r:b.r+n]);
p = p[n:len(p)];
b.r += n;
+ b.lastbyte = int(b.buf[b.r-1]);
nn += n
return nn, nil
@@ -127,6 +154,7 @@ func (b *BufRead) ReadByte() (c byte, err *os.Error) {
c = b.buf[b.r];
+ b.lastbyte = int(c);
return c, nil
@@ -135,10 +163,18 @@ func (b *BufRead) UnreadByte() *os.Error {
if b.err != nil {
return b.err
+ if b.r == b.w && b.lastbyte >= 0 {
+ b.w = 1;
+ b.r = 0;
+ b.buf[0] = byte(b.lastbyte);
+ b.lastbyte = -1;
+ return nil;
+ }
if b.r <= 0 {
return PhaseError
+ b.lastbyte = -1;
return nil
@@ -163,6 +199,7 @@ func (b *BufRead) ReadRune() (rune int, size int, err *os.Error) {
rune, size = utf8.DecodeRune(b.buf[b.r:b.w]);
b.r += size;
+ b.lastbyte = int(b.buf[b.r-1]);
return rune, size, nil
@@ -343,8 +380,13 @@ func NewBufWriteSize(wr io.Write, size int) (b *BufWrite, err *os.Error) {
return b, nil
-func NewBufWrite(wr io.Write) (b *BufWrite, err *os.Error) {
- return NewBufWriteSize(wr, defaultBufSize);
+func NewBufWrite(wr io.Write) *BufWrite {
+ b, err := NewBufWriteSize(wr, defaultBufSize);
+ if err != nil {
+ // cannot happen - defaultBufSize is valid size
+ panic("bufio: NewBufWrite: ", err.String());
+ }
+ return b;
// Flush the output buffer.
@@ -393,6 +435,17 @@ func (b *BufWrite) Write(p []byte) (nn int, err *os.Error) {
n = b.Available()
+ if b.Available() == 0 && len(p) >= len(b.buf) {
+ // Large write, empty buffer.
+ // Write directly from p to avoid copy.
+ n, b.err = b.wr.Write(p);
+ nn += n;
+ p = p[n:len(p)];
+ if b.err != nil {
+ break;
+ }
+ continue;
+ }
if n > len(p) {
n = len(p)
@@ -416,3 +469,14 @@ func (b *BufWrite) WriteByte(c byte) *os.Error {
return nil
+// buffered input and output
+type BufReadWrite struct {
+ *BufRead;
+ *BufWrite;
+func NewBufReadWrite(r *BufRead, w *BufWrite) *BufReadWrite {
+ return &BufReadWrite{r, w}
diff --git a/src/lib/bufio_test.go b/src/lib/bufio_test.go
index 17fb379cb..9ffd6cbfd 100644
--- a/src/lib/bufio_test.go
+++ b/src/lib/bufio_test.go
@@ -174,12 +174,12 @@ var bufsizes = []int {
func TestBufReadSimple(t *testing.T) {
- b, e := NewBufRead(newByteReader(io.StringBytes("hello world")));
+ b := NewBufRead(newByteReader(io.StringBytes("hello world")));
if s := readBytes(b); s != "hello world" {
t.Errorf("simple hello world test failed: got %q", s);
- b, e = NewBufRead(newRot13Reader(newByteReader(io.StringBytes("hello world"))));
+ b = NewBufRead(newRot13Reader(newByteReader(io.StringBytes("hello world"))));
if s := readBytes(b); s != "uryyb jbeyq" {
t.Error("rot13 hello world test failed: got %q", s);
diff --git a/src/lib/http/server.go b/src/lib/http/server.go
index 970cd7e38..6747473c4 100644
--- a/src/lib/http/server.go
+++ b/src/lib/http/server.go
@@ -16,12 +16,14 @@ import (
+ "log";
var ErrWriteAfterFlush = os.NewError("Conn.Write called after Flush")
+var ErrHijacked = os.NewError("Conn has been hijacked")
type Conn struct
@@ -32,132 +34,30 @@ type Handler interface {
// Active HTTP connection (server side).
type Conn struct {
- Fd io.ReadWriteClose;
- RemoteAddr string;
- Req *Request;
- Br *bufio.BufRead;
- br *bufio.BufRead;
- bw *bufio.BufWrite;
- close bool;
- chunking bool;
- flushed bool;
- header map[string] string;
- wroteHeader bool;
- handler Handler;
-// HTTP response codes.
-// TODO(rsc): Maybe move these to their own file, so that
-// clients can use them too.
-const (
- StatusContinue = 100;
- StatusSwitchingProtocols = 101;
- StatusOK = 200;
- StatusCreated = 201;
- StatusAccepted = 202;
- StatusNonAuthoritativeInfo = 203;
- StatusNoContent = 204;
- StatusResetContent = 205;
- StatusPartialContent = 206;
- StatusMultipleChoices = 300;
- StatusMovedPermanently = 301;
- StatusFound = 302;
- StatusSeeOther = 303;
- StatusNotModified = 304;
- StatusUseProxy = 305;
- StatusTemporaryRedirect = 307;
- StatusBadRequest = 400;
- StatusUnauthorized = 401;
- StatusPaymentRequired = 402;
- StatusForbidden = 403;
- StatusNotFound = 404;
- StatusMethodNotAllowed = 405;
- StatusNotAcceptable = 406;
- StatusProxyAuthRequired = 407;
- StatusRequestTimeout = 408;
- StatusConflict = 409;
- StatusGone = 410;
- StatusLengthRequired = 411;
- StatusPreconditionFailed = 412;
- StatusRequestEntityTooLarge = 413;
- StatusRequestURITooLong = 414;
- StatusUnsupportedMediaType = 415;
- StatusRequestedRangeNotSatisfiable = 416;
- StatusExpectationFailed = 417;
- StatusInternalServerError = 500;
- StatusNotImplemented = 501;
- StatusBadGateway = 502;
- StatusServiceUnavailable = 503;
- StatusGatewayTimeout = 504;
- StatusHTTPVersionNotSupported = 505;
-var statusText = map[int]string {
- StatusContinue: "Continue",
- StatusSwitchingProtocols: "Switching Protocols",
- StatusOK: "OK",
- StatusCreated: "Created",
- StatusAccepted: "Accepted",
- StatusNonAuthoritativeInfo: "Non-Authoritative Information",
- StatusNoContent: "No Content",
- StatusResetContent: "Reset Content",
- StatusPartialContent: "Partial Content",
- StatusMultipleChoices: "Multiple Choices",
- StatusMovedPermanently: "Moved Permanently",
- StatusFound: "Found",
- StatusSeeOther: "See Other",
- StatusNotModified: "Not Modified",
- StatusUseProxy: "Use Proxy",
- StatusTemporaryRedirect: "Temporary Redirect",
- StatusBadRequest: "Bad Request",
- StatusUnauthorized: "Unauthorized",
- StatusPaymentRequired: "Payment Required",
- StatusForbidden: "Forbidden",
- StatusNotFound: "Not Found",
- StatusMethodNotAllowed: "Method Not Allowed",
- StatusNotAcceptable: "Not Acceptable",
- StatusProxyAuthRequired: "Proxy Authentication Required",
- StatusRequestTimeout: "Request Timeout",
- StatusConflict: "Conflict",
- StatusGone: "Gone",
- StatusLengthRequired: "Length Required",
- StatusPreconditionFailed: "Precondition Failed",
- StatusRequestEntityTooLarge: "Request Entity Too Large",
- StatusRequestURITooLong: "Request URI Too Long",
- StatusUnsupportedMediaType: "Unsupported Media Type",
- StatusRequestedRangeNotSatisfiable: "Requested Range Not Satisfiable",
- StatusExpectationFailed: "Expectation Failed",
- StatusInternalServerError: "Internal Server Error",
- StatusNotImplemented: "Not Implemented",
- StatusBadGateway: "Bad Gateway",
- StatusServiceUnavailable: "Service Unavailable",
- StatusGatewayTimeout: "Gateway Timeout",
- StatusHTTPVersionNotSupported: "HTTP Version Not Supported",
+ RemoteAddr string; // network address of remote side
+ Req *Request; // current HTTP request
+ fd io.ReadWriteClose; // i/o connection
+ buf *bufio.BufReadWrite; // buffered fd
+ handler Handler; // request handler
+ hijacked bool; // connection has been hijacked by handler
+ // state for the current reply
+ closeAfterReply bool; // close connection after this reply
+ chunking bool; // using chunked transfer encoding for reply body
+ wroteHeader bool; // reply header has been written
+ header map[string] string; // reply header parameters
// Create new connection from rwc.
func newConn(rwc io.ReadWriteClose, raddr string, handler Handler) (c *Conn, err *os.Error) {
c = new(Conn);
- c.Fd = rwc;
c.RemoteAddr = raddr;
c.handler = handler;
- if, err = bufio.NewBufRead(rwc.(io.Read)); err != nil {
- return nil, err
- }
-c.Br =;
- if, err = bufio.NewBufWrite(rwc); err != nil {
- return nil, err
- }
+ c.fd = rwc;
+ br := bufio.NewBufRead(rwc);
+ bw := bufio.NewBufWrite(rwc);
+ c.buf = bufio.NewBufReadWrite(br, bw);
return c, nil
@@ -165,14 +65,16 @@ func (c *Conn) SetHeader(hdr, val string)
// Read next request from connection.
func (c *Conn) readRequest() (req *Request, err *os.Error) {
- if req, err = ReadRequest(; err != nil {
+ if c.hijacked {
+ return nil, ErrHijacked
+ }
+ if req, err = ReadRequest(c.buf.BufRead); err != nil {
return nil, err
// Reset per-request connection state.
c.header = make(map[string] string);
c.wroteHeader = false;
- c.flushed = false;
c.Req = req;
// Default output is HTML encoded in UTF-8.
@@ -190,7 +92,7 @@ func (c *Conn) readRequest() (req *Request, err *os.Error) {
// a Content-Length: header in the response,
// but everyone who expects persistent connections
// does HTTP/1.1 now.
- c.close = true;
+ c.closeAfterReply = true;
c.chunking = false;
@@ -203,8 +105,12 @@ func (c *Conn) SetHeader(hdr, val string) {
// Write header.
func (c *Conn) WriteHeader(code int) {
+ if c.hijacked {
+ log.Stderr("http: Conn.WriteHeader on hijacked connection");
+ return
+ }
if c.wroteHeader {
- // TODO(rsc): log
+ log.Stderr("http: multiple Conn.WriteHeader calls");
c.wroteHeader = true;
@@ -220,19 +126,20 @@ func (c *Conn) WriteHeader(code int) {
if !ok {
text = "status code " + codestring;
- io.WriteString(, proto + " " + codestring + " " + text + "\r\n");
+ io.WriteString(c.buf, proto + " " + codestring + " " + text + "\r\n");
for k,v := range c.header {
- io.WriteString(, k + ": " + v + "\r\n");
+ io.WriteString(c.buf, k + ": " + v + "\r\n");
- io.WriteString(, "\r\n");
+ io.WriteString(c.buf, "\r\n");
// TODO(rsc): BUG in 6g: must return "nn int" not "n int"
// so that the implicit struct assignment in
-// return works. oops
+// return c.buf.Write(data) works. oops
func (c *Conn) Write(data []byte) (nn int, err *os.Error) {
- if c.flushed {
- return 0, ErrWriteAfterFlush
+ if c.hijacked {
+ log.Stderr("http: Conn.Write on hijacked connection");
+ return 0, ErrHijacked
if !c.wroteHeader {
@@ -242,38 +149,35 @@ func (c *Conn) Write(data []byte) (nn int, err *os.Error) {
// TODO(rsc): if chunking happened after the buffering,
- // then there would be fewer chunk headers
+ // then there would be fewer chunk headers.
+ // On the other hand, it would make hijacking more difficult.
if c.chunking {
- fmt.Fprintf(, "%x\r\n", len(data)); // TODO(rsc): use strconv not fmt
+ fmt.Fprintf(c.buf, "%x\r\n", len(data)); // TODO(rsc): use strconv not fmt
- return;
+ return c.buf.Write(data);
-func (c *Conn) Flush() {
- if c.flushed {
- return
- }
+func (c *Conn) flush() {
if !c.wroteHeader {
if c.chunking {
- io.WriteString(, "0\r\n");
+ io.WriteString(c.buf, "0\r\n");
// trailer key/value pairs, followed by blank line
- io.WriteString(, "\r\n");
+ io.WriteString(c.buf, "\r\n");
- c.flushed = true;
+ c.buf.Flush();
// Close the connection.
-func (c *Conn) Close() {
- if != nil {
- = nil;
+func (c *Conn) close() {
+ if c.buf != nil {
+ c.buf.Flush();
+ c.buf = nil;
- if c.Fd != nil {
- c.Fd.Close();
- c.Fd = nil;
+ if c.fd != nil {
+ c.fd.Close();
+ c.fd = nil;
@@ -288,18 +192,32 @@ func (c *Conn) serve() {
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this thread.
c.handler.ServeHTTP(c, req);
- if c.Fd == nil {
- // Handler took over the connection.
+ if c.hijacked {
- if !c.flushed {
- c.Flush();
- }
- if c.close {
+ c.flush();
+ if c.closeAfterReply {
- c.Close();
+ c.close();
+// Allow client to take over the connection.
+// After a handler calls c.Hijack(), the HTTP server library
+// will never touch the connection again.
+// It is the caller's responsibility to manage and close
+// the connection.
+func (c *Conn) Hijack() (fd io.ReadWriteClose, buf *bufio.BufReadWrite, err *os.Error) {
+ if c.hijacked {
+ return nil, nil, ErrHijacked;
+ }
+ c.hijacked = true;
+ fd = c.fd;
+ buf = c.buf;
+ c.fd = nil;
+ c.buf = nil;
+ return;
// Adapter: can use RequestFunction(f) as Handler
diff --git a/src/lib/http/status.go b/src/lib/http/status.go
new file mode 100644
index 000000000..82a8b214c
--- /dev/null
+++ b/src/lib/http/status.go
@@ -0,0 +1,102 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// HTTP status codes. See RFC 2616.
+package http
+const (
+ StatusContinue = 100;
+ StatusSwitchingProtocols = 101;
+ StatusOK = 200;
+ StatusCreated = 201;
+ StatusAccepted = 202;
+ StatusNonAuthoritativeInfo = 203;
+ StatusNoContent = 204;
+ StatusResetContent = 205;
+ StatusPartialContent = 206;
+ StatusMultipleChoices = 300;
+ StatusMovedPermanently = 301;
+ StatusFound = 302;
+ StatusSeeOther = 303;
+ StatusNotModified = 304;
+ StatusUseProxy = 305;
+ StatusTemporaryRedirect = 307;
+ StatusBadRequest = 400;
+ StatusUnauthorized = 401;
+ StatusPaymentRequired = 402;
+ StatusForbidden = 403;
+ StatusNotFound = 404;
+ StatusMethodNotAllowed = 405;
+ StatusNotAcceptable = 406;
+ StatusProxyAuthRequired = 407;
+ StatusRequestTimeout = 408;
+ StatusConflict = 409;
+ StatusGone = 410;
+ StatusLengthRequired = 411;
+ StatusPreconditionFailed = 412;
+ StatusRequestEntityTooLarge = 413;
+ StatusRequestURITooLong = 414;
+ StatusUnsupportedMediaType = 415;
+ StatusRequestedRangeNotSatisfiable = 416;
+ StatusExpectationFailed = 417;
+ StatusInternalServerError = 500;
+ StatusNotImplemented = 501;
+ StatusBadGateway = 502;
+ StatusServiceUnavailable = 503;
+ StatusGatewayTimeout = 504;
+ StatusHTTPVersionNotSupported = 505;
+var statusText = map[int]string {
+ StatusContinue: "Continue",
+ StatusSwitchingProtocols: "Switching Protocols",
+ StatusOK: "OK",
+ StatusCreated: "Created",
+ StatusAccepted: "Accepted",
+ StatusNonAuthoritativeInfo: "Non-Authoritative Information",
+ StatusNoContent: "No Content",
+ StatusResetContent: "Reset Content",
+ StatusPartialContent: "Partial Content",
+ StatusMultipleChoices: "Multiple Choices",
+ StatusMovedPermanently: "Moved Permanently",
+ StatusFound: "Found",
+ StatusSeeOther: "See Other",
+ StatusNotModified: "Not Modified",
+ StatusUseProxy: "Use Proxy",
+ StatusTemporaryRedirect: "Temporary Redirect",
+ StatusBadRequest: "Bad Request",
+ StatusUnauthorized: "Unauthorized",
+ StatusPaymentRequired: "Payment Required",
+ StatusForbidden: "Forbidden",
+ StatusNotFound: "Not Found",
+ StatusMethodNotAllowed: "Method Not Allowed",
+ StatusNotAcceptable: "Not Acceptable",
+ StatusProxyAuthRequired: "Proxy Authentication Required",
+ StatusRequestTimeout: "Request Timeout",
+ StatusConflict: "Conflict",
+ StatusGone: "Gone",
+ StatusLengthRequired: "Length Required",
+ StatusPreconditionFailed: "Precondition Failed",
+ StatusRequestEntityTooLarge: "Request Entity Too Large",
+ StatusRequestURITooLong: "Request URI Too Long",
+ StatusUnsupportedMediaType: "Unsupported Media Type",
+ StatusRequestedRangeNotSatisfiable: "Requested Range Not Satisfiable",
+ StatusExpectationFailed: "Expectation Failed",
+ StatusInternalServerError: "Internal Server Error",
+ StatusNotImplemented: "Not Implemented",
+ StatusBadGateway: "Bad Gateway",
+ StatusServiceUnavailable: "Service Unavailable",
+ StatusGatewayTimeout: "Gateway Timeout",
+ StatusHTTPVersionNotSupported: "HTTP Version Not Supported",
diff --git a/src/lib/io/io.go b/src/lib/io/io.go
index cbacbe095..54f18fb95 100644
--- a/src/lib/io/io.go
+++ b/src/lib/io/io.go
@@ -30,6 +30,10 @@ type ReadWriteClose interface {
Close() *os.Error;
+type Close interface {
+ Close() *os.Error;
func WriteString(w Write, s string) (n int, err *os.Error) {
b := make([]byte, len(s)+1);
if !syscall.StringToBytes(b, s) {
diff --git a/src/lib/log_test.go b/src/lib/log_test.go
index d813941bb..922cbb4fc 100644
--- a/src/lib/log_test.go
+++ b/src/lib/log_test.go
@@ -51,10 +51,7 @@ func testLog(t *testing.T, flag int, prefix string, pattern string, useLogf bool
if err1 != nil {
t.Fatal("pipe", err1);
- buf, err2 := bufio.NewBufRead(fd0);
- if err2 != nil {
- t.Fatal("bufio.NewBufRead", err2);
- }
+ buf := bufio.NewBufRead(fd0);
l := NewLogger(fd1, nil, prefix, flag);
if useLogf {
l.Logf("hello %d world", 23);
diff --git a/src/lib/net/parse_test.go b/src/lib/net/parse_test.go
index 633a45718..57a68ee80 100644
--- a/src/lib/net/parse_test.go
+++ b/src/lib/net/parse_test.go
@@ -18,10 +18,7 @@ func TestReadLine(t *testing.T) {
if err != nil {
t.Fatalf("open %s: %v", filename, err);
- br, err1 := bufio.NewBufRead(fd);
- if err1 != nil {
- t.Fatalf("bufio.NewBufRead: %v", err1);
- }
+ br := bufio.NewBufRead(fd);
file := _Open(filename);
if file == nil {
diff --git a/src/lib/strconv/fp_test.go b/src/lib/strconv/fp_test.go
index 6738ed75e..c6f67155c 100644
--- a/src/lib/strconv/fp_test.go
+++ b/src/lib/strconv/fp_test.go
@@ -98,10 +98,7 @@ func TestFp(t *testing.T) {
panicln("testfp: open testfp.txt:", err.String());
- b, err1 := bufio.NewBufRead(fd);
- if err1 != nil {
- panicln("testfp NewBufRead:", err1.String());
- }
+ b := bufio.NewBufRead(fd);
lineno := 0;
for {