diff options
Diffstat (limited to 'Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket')
16 files changed, 2746 insertions, 616 deletions
diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/__init__.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/__init__.py index c154da4a1..454ae0c45 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/__init__.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/__init__.py @@ -34,7 +34,8 @@ mod_pywebsocket is a WebSocket extension for Apache HTTP Server intended for testing or experimental purposes. mod_python is required. -Installation: +Installation +============ 0. Prepare an Apache HTTP Server for which mod_python is enabled. @@ -60,11 +61,6 @@ Installation: <scan_dir> is useful in saving scan time when <websock_handlers> contains many non-WebSocket handler files. - If you want to support old handshake based on - draft-hixie-thewebsocketprotocol-75: - - PythonOption mod_pywebsocket.allow_draft75 On - If you want to allow handlers whose canonical path is not under the root directory (i.e. symbolic link is in root directory but its target is not), configure as follows: @@ -89,7 +85,8 @@ Installation: 3. Verify installation. You can use example/console.html to poke the server. -Writing WebSocket handlers: +Writing WebSocket handlers +========================== When a WebSocket request comes in, the resource name specified in the handshake is considered as if it is a file path under @@ -118,28 +115,36 @@ extra handshake (web_socket_do_extra_handshake): - ws_resource - ws_origin - ws_version -- ws_location (Hixie 75 and HyBi 00 only) -- ws_extensions (Hybi 06 and later) +- ws_location (HyBi 00 only) +- ws_extensions (HyBi 06 and later) - ws_deflate (HyBi 06 and later) - ws_protocol - ws_requested_protocols (HyBi 06 and later) -The last two are a bit tricky. +The last two are a bit tricky. See the next subsection. + + +Subprotocol Negotiation +----------------------- For HyBi 06 and later, ws_protocol is always set to None when web_socket_do_extra_handshake is called. If ws_requested_protocols is not None, you must choose one subprotocol from this list and set it to ws_protocol. -For Hixie 75 and HyBi 00, when web_socket_do_extra_handshake is called, +For HyBi 00, when web_socket_do_extra_handshake is called, ws_protocol is set to the value given by the client in -Sec-WebSocket-Protocol (WebSocket-Protocol for Hixie 75) header or None if +Sec-WebSocket-Protocol header or None if such header was not found in the opening handshake request. Finish extra handshake with ws_protocol untouched to accept the request subprotocol. -Then, Sec-WebSocket-Protocol (or WebSocket-Protocol) header will be sent to +Then, Sec-WebSocket-Protocol header will be sent to the client in response with the same value as requested. Raise an exception in web_socket_do_extra_handshake to reject the requested subprotocol. + +Data Transfer +------------- + web_socket_transfer_data is called after the handshake completed successfully. A handler can receive/send messages from/to the client using request. mod_pywebsocket.msgutil module provides utilities @@ -159,12 +164,16 @@ You can send a message by the following statement. request.ws_stream.send_message(message) + +Closing Connection +------------------ + Executing the following statement or just return-ing from web_socket_transfer_data cause connection close. request.ws_stream.close_connection() -When you're using IETF HyBi 00 or later protocol, close_connection will wait +close_connection will wait for closing handshake acknowledgement coming from the client. When it couldn't receive a valid acknowledgement, raises an exception. @@ -176,6 +185,10 @@ use in web_socket_passive_closing_handshake. - ws_close_code - ws_close_reason + +Threading +--------- + A WebSocket handler must be thread-safe if the server (Apache or standalone.py) is configured to use threads. """ diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hixie75.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hixie75.py index c84ca6e07..94cf5b31b 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hixie75.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hixie75.py @@ -32,7 +32,8 @@ protocol version HyBi 00 and Hixie 75. Specification: -http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-00 +- HyBi 00 http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-00 +- Hixie 75 http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75 """ diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hybi.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hybi.py index 34fa7a60e..bd158fa6b 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hybi.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/_stream_hybi.py @@ -37,6 +37,7 @@ http://tools.ietf.org/html/rfc6455 from collections import deque +import logging import os import struct import time @@ -162,14 +163,145 @@ def create_text_frame( frame_filters) +def parse_frame(receive_bytes, logger=None, + ws_version=common.VERSION_HYBI_LATEST, + unmask_receive=True): + """Parses a frame. Returns a tuple containing each header field and + payload. + + Args: + receive_bytes: a function that reads frame data from a stream or + something similar. The function takes length of the bytes to be + read. The function must raise ConnectionTerminatedException if + there is not enough data to be read. + logger: a logging object. + ws_version: the version of WebSocket protocol. + unmask_receive: unmask received frames. When received unmasked + frame, raises InvalidFrameException. + + Raises: + ConnectionTerminatedException: when receive_bytes raises it. + InvalidFrameException: when the frame contains invalid data. + """ + + if not logger: + logger = logging.getLogger() + + logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame') + + received = receive_bytes(2) + + first_byte = ord(received[0]) + fin = (first_byte >> 7) & 1 + rsv1 = (first_byte >> 6) & 1 + rsv2 = (first_byte >> 5) & 1 + rsv3 = (first_byte >> 4) & 1 + opcode = first_byte & 0xf + + second_byte = ord(received[1]) + mask = (second_byte >> 7) & 1 + payload_length = second_byte & 0x7f + + logger.log(common.LOGLEVEL_FINE, + 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, ' + 'Mask=%s, Payload_length=%s', + fin, rsv1, rsv2, rsv3, opcode, mask, payload_length) + + if (mask == 1) != unmask_receive: + raise InvalidFrameException( + 'Mask bit on the received frame did\'nt match masking ' + 'configuration for received frames') + + # The HyBi and later specs disallow putting a value in 0x0-0xFFFF + # into the 8-octet extended payload length field (or 0x0-0xFD in + # 2-octet field). + valid_length_encoding = True + length_encoding_bytes = 1 + if payload_length == 127: + logger.log(common.LOGLEVEL_FINE, + 'Receive 8-octet extended payload length') + + extended_payload_length = receive_bytes(8) + payload_length = struct.unpack( + '!Q', extended_payload_length)[0] + if payload_length > 0x7FFFFFFFFFFFFFFF: + raise InvalidFrameException( + 'Extended payload length >= 2^63') + if ws_version >= 13 and payload_length < 0x10000: + valid_length_encoding = False + length_encoding_bytes = 8 + + logger.log(common.LOGLEVEL_FINE, + 'Decoded_payload_length=%s', payload_length) + elif payload_length == 126: + logger.log(common.LOGLEVEL_FINE, + 'Receive 2-octet extended payload length') + + extended_payload_length = receive_bytes(2) + payload_length = struct.unpack( + '!H', extended_payload_length)[0] + if ws_version >= 13 and payload_length < 126: + valid_length_encoding = False + length_encoding_bytes = 2 + + logger.log(common.LOGLEVEL_FINE, + 'Decoded_payload_length=%s', payload_length) + + if not valid_length_encoding: + logger.warning( + 'Payload length is not encoded using the minimal number of ' + 'bytes (%d is encoded using %d bytes)', + payload_length, + length_encoding_bytes) + + if mask == 1: + logger.log(common.LOGLEVEL_FINE, 'Receive mask') + + masking_nonce = receive_bytes(4) + masker = util.RepeatedXorMasker(masking_nonce) + + logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce) + else: + masker = _NOOP_MASKER + + logger.log(common.LOGLEVEL_FINE, 'Receive payload data') + if logger.isEnabledFor(common.LOGLEVEL_FINE): + receive_start = time.time() + + raw_payload_bytes = receive_bytes(payload_length) + + if logger.isEnabledFor(common.LOGLEVEL_FINE): + logger.log( + common.LOGLEVEL_FINE, + 'Done receiving payload data at %s MB/s', + payload_length / (time.time() - receive_start) / 1000 / 1000) + logger.log(common.LOGLEVEL_FINE, 'Unmask payload data') + + if logger.isEnabledFor(common.LOGLEVEL_FINE): + unmask_start = time.time() + + bytes = masker.mask(raw_payload_bytes) + + if logger.isEnabledFor(common.LOGLEVEL_FINE): + logger.log( + common.LOGLEVEL_FINE, + 'Done unmasking payload data at %s MB/s', + payload_length / (time.time() - unmask_start) / 1000 / 1000) + + return opcode, bytes, fin, rsv1, rsv2, rsv3 + + class FragmentedFrameBuilder(object): """A stateful class to send a message as fragments.""" - def __init__(self, mask, frame_filters=[]): + def __init__(self, mask, frame_filters=[], encode_utf8=True): """Constructs an instance.""" self._mask = mask self._frame_filters = frame_filters + # This is for skipping UTF-8 encoding when building text type frames + # from compressed data. + self._encode_utf8 = encode_utf8 self._started = False @@ -177,7 +309,7 @@ class FragmentedFrameBuilder(object): # frames in the message are all the same. self._opcode = common.OPCODE_TEXT - def build(self, message, end, binary): + def build(self, payload_data, end, binary): if binary: frame_type = common.OPCODE_BINARY else: @@ -198,12 +330,12 @@ class FragmentedFrameBuilder(object): self._started = True fin = 0 - if binary: + if binary or not self._encode_utf8: return create_binary_frame( - message, opcode, fin, self._mask, self._frame_filters) + payload_data, opcode, fin, self._mask, self._frame_filters) else: return create_text_frame( - message, opcode, fin, self._mask, self._frame_filters) + payload_data, opcode, fin, self._mask, self._frame_filters) def _create_control_frame(opcode, body, mask, frame_filters): @@ -235,6 +367,22 @@ def create_close_frame(body, mask=False, frame_filters=[]): common.OPCODE_CLOSE, body, mask, frame_filters) +def create_closing_handshake_body(code, reason): + body = '' + if code is not None: + if (code > common.STATUS_USER_PRIVATE_MAX or + code < common.STATUS_NORMAL_CLOSURE): + raise BadOperationException('Status code is out of range') + if (code == common.STATUS_NO_STATUS_RECEIVED or + code == common.STATUS_ABNORMAL_CLOSURE or + code == common.STATUS_TLS_HANDSHAKE): + raise BadOperationException('Status code is reserved pseudo ' + 'code') + encoded_reason = reason.encode('utf-8') + body = struct.pack('!H', code) + encoded_reason + return body + + class StreamOptions(object): """Holds option values to configure Stream objects.""" @@ -248,8 +396,16 @@ class StreamOptions(object): self.outgoing_frame_filters = [] self.incoming_frame_filters = [] + # Filters applied to messages. Control frames are not affected by them. + self.outgoing_message_filters = [] + self.incoming_message_filters = [] + + self.encode_text_message_to_utf8 = True self.mask_send = False self.unmask_receive = True + # RFC6455 disallows fragmented control frames, but mux extension + # relaxes the restriction. + self.allow_fragmented_control_frame = False class Stream(StreamBase): @@ -283,7 +439,8 @@ class Stream(StreamBase): self._original_opcode = None self._writer = FragmentedFrameBuilder( - self._options.mask_send, self._options.outgoing_frame_filters) + self._options.mask_send, self._options.outgoing_frame_filters, + self._options.encode_text_message_to_utf8) self._ping_queue = deque() @@ -297,109 +454,13 @@ class Stream(StreamBase): InvalidFrameException: when the frame contains invalid data. """ - self._logger.log(common.LOGLEVEL_FINE, - 'Receive the first 2 octets of a frame') - - received = self.receive_bytes(2) - - first_byte = ord(received[0]) - fin = (first_byte >> 7) & 1 - rsv1 = (first_byte >> 6) & 1 - rsv2 = (first_byte >> 5) & 1 - rsv3 = (first_byte >> 4) & 1 - opcode = first_byte & 0xf - - second_byte = ord(received[1]) - mask = (second_byte >> 7) & 1 - payload_length = second_byte & 0x7f - - self._logger.log(common.LOGLEVEL_FINE, - 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, ' - 'Mask=%s, Payload_length=%s', - fin, rsv1, rsv2, rsv3, opcode, mask, payload_length) - - if (mask == 1) != self._options.unmask_receive: - raise InvalidFrameException( - 'Mask bit on the received frame did\'nt match masking ' - 'configuration for received frames') - - # The Hybi-13 and later specs disallow putting a value in 0x0-0xFFFF - # into the 8-octet extended payload length field (or 0x0-0xFD in - # 2-octet field). - valid_length_encoding = True - length_encoding_bytes = 1 - if payload_length == 127: - self._logger.log(common.LOGLEVEL_FINE, - 'Receive 8-octet extended payload length') - - extended_payload_length = self.receive_bytes(8) - payload_length = struct.unpack( - '!Q', extended_payload_length)[0] - if payload_length > 0x7FFFFFFFFFFFFFFF: - raise InvalidFrameException( - 'Extended payload length >= 2^63') - if self._request.ws_version >= 13 and payload_length < 0x10000: - valid_length_encoding = False - length_encoding_bytes = 8 - - self._logger.log(common.LOGLEVEL_FINE, - 'Decoded_payload_length=%s', payload_length) - elif payload_length == 126: - self._logger.log(common.LOGLEVEL_FINE, - 'Receive 2-octet extended payload length') - - extended_payload_length = self.receive_bytes(2) - payload_length = struct.unpack( - '!H', extended_payload_length)[0] - if self._request.ws_version >= 13 and payload_length < 126: - valid_length_encoding = False - length_encoding_bytes = 2 - - self._logger.log(common.LOGLEVEL_FINE, - 'Decoded_payload_length=%s', payload_length) - - if not valid_length_encoding: - self._logger.warning( - 'Payload length is not encoded using the minimal number of ' - 'bytes (%d is encoded using %d bytes)', - payload_length, - length_encoding_bytes) - - if mask == 1: - self._logger.log(common.LOGLEVEL_FINE, 'Receive mask') - - masking_nonce = self.receive_bytes(4) - masker = util.RepeatedXorMasker(masking_nonce) - - self._logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce) - else: - masker = _NOOP_MASKER - - self._logger.log(common.LOGLEVEL_FINE, 'Receive payload data') - if self._logger.isEnabledFor(common.LOGLEVEL_FINE): - receive_start = time.time() - - raw_payload_bytes = self.receive_bytes(payload_length) - - if self._logger.isEnabledFor(common.LOGLEVEL_FINE): - self._logger.log( - common.LOGLEVEL_FINE, - 'Done receiving payload data at %s MB/s', - payload_length / (time.time() - receive_start) / 1000 / 1000) - self._logger.log(common.LOGLEVEL_FINE, 'Unmask payload data') - - if self._logger.isEnabledFor(common.LOGLEVEL_FINE): - unmask_start = time.time() - - bytes = masker.mask(raw_payload_bytes) + def _receive_bytes(length): + return self.receive_bytes(length) - if self._logger.isEnabledFor(common.LOGLEVEL_FINE): - self._logger.log( - common.LOGLEVEL_FINE, - 'Done unmasking payload data at %s MB/s', - payload_length / (time.time() - unmask_start) / 1000 / 1000) - - return opcode, bytes, fin, rsv1, rsv2, rsv3 + return parse_frame(receive_bytes=_receive_bytes, + logger=self._logger, + ws_version=self._request.ws_version, + unmask_receive=self._options.unmask_receive) def _receive_frame_as_frame_object(self): opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame() @@ -407,6 +468,32 @@ class Stream(StreamBase): return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3, opcode=opcode, payload=bytes) + def receive_filtered_frame(self): + """Receives a frame and applies frame filters and message filters. + The frame to be received must satisfy following conditions: + - The frame is not fragmented. + - The opcode of the frame is TEXT or BINARY. + + DO NOT USE this method except for testing purpose. + """ + + frame = self._receive_frame_as_frame_object() + if not frame.fin: + raise InvalidFrameException( + 'Segmented frames must not be received via ' + 'receive_filtered_frame()') + if (frame.opcode != common.OPCODE_TEXT and + frame.opcode != common.OPCODE_BINARY): + raise InvalidFrameException( + 'Control frames must not be received via ' + 'receive_filtered_frame()') + + for frame_filter in self._options.incoming_frame_filters: + frame_filter.filter(frame) + for message_filter in self._options.incoming_message_filters: + frame.payload = message_filter.filter(frame.payload) + return frame + def send_message(self, message, end=True, binary=False): """Send message. @@ -428,11 +515,219 @@ class Stream(StreamBase): raise BadOperationException( 'Message for binary frame must be instance of str') + for message_filter in self._options.outgoing_message_filters: + message = message_filter.filter(message, end, binary) + try: - self._write(self._writer.build(message, end, binary)) + # Set this to any positive integer to limit maximum size of data in + # payload data of each frame. + MAX_PAYLOAD_DATA_SIZE = -1 + + if MAX_PAYLOAD_DATA_SIZE <= 0: + self._write(self._writer.build(message, end, binary)) + return + + bytes_written = 0 + while True: + end_for_this_frame = end + bytes_to_write = len(message) - bytes_written + if (MAX_PAYLOAD_DATA_SIZE > 0 and + bytes_to_write > MAX_PAYLOAD_DATA_SIZE): + end_for_this_frame = False + bytes_to_write = MAX_PAYLOAD_DATA_SIZE + + frame = self._writer.build( + message[bytes_written:bytes_written + bytes_to_write], + end_for_this_frame, + binary) + self._write(frame) + + bytes_written += bytes_to_write + + # This if must be placed here (the end of while block) so that + # at least one frame is sent. + if len(message) <= bytes_written: + break except ValueError, e: raise BadOperationException(e) + def _get_message_from_frame(self, frame): + """Gets a message from frame. If the message is composed of fragmented + frames and the frame is not the last fragmented frame, this method + returns None. The whole message will be returned when the last + fragmented frame is passed to this method. + + Raises: + InvalidFrameException: when the frame doesn't match defragmentation + context, or the frame contains invalid data. + """ + + if frame.opcode == common.OPCODE_CONTINUATION: + if not self._received_fragments: + if frame.fin: + raise InvalidFrameException( + 'Received a termination frame but fragmentation ' + 'not started') + else: + raise InvalidFrameException( + 'Received an intermediate frame but ' + 'fragmentation not started') + + if frame.fin: + # End of fragmentation frame + self._received_fragments.append(frame.payload) + message = ''.join(self._received_fragments) + self._received_fragments = [] + return message + else: + # Intermediate frame + self._received_fragments.append(frame.payload) + return None + else: + if self._received_fragments: + if frame.fin: + raise InvalidFrameException( + 'Received an unfragmented frame without ' + 'terminating existing fragmentation') + else: + raise InvalidFrameException( + 'New fragmentation started without terminating ' + 'existing fragmentation') + + if frame.fin: + # Unfragmented frame + + self._original_opcode = frame.opcode + return frame.payload + else: + # Start of fragmentation frame + + if (not self._options.allow_fragmented_control_frame and + common.is_control_opcode(frame.opcode)): + raise InvalidFrameException( + 'Control frames must not be fragmented') + + self._original_opcode = frame.opcode + self._received_fragments.append(frame.payload) + return None + + def _process_close_message(self, message): + """Processes close message. + + Args: + message: close message. + + Raises: + InvalidFrameException: when the message is invalid. + """ + + self._request.client_terminated = True + + # Status code is optional. We can have status reason only if we + # have status code. Status reason can be empty string. So, + # allowed cases are + # - no application data: no code no reason + # - 2 octet of application data: has code but no reason + # - 3 or more octet of application data: both code and reason + if len(message) == 0: + self._logger.debug('Received close frame (empty body)') + self._request.ws_close_code = ( + common.STATUS_NO_STATUS_RECEIVED) + elif len(message) == 1: + raise InvalidFrameException( + 'If a close frame has status code, the length of ' + 'status code must be 2 octet') + elif len(message) >= 2: + self._request.ws_close_code = struct.unpack( + '!H', message[0:2])[0] + self._request.ws_close_reason = message[2:].decode( + 'utf-8', 'replace') + self._logger.debug( + 'Received close frame (code=%d, reason=%r)', + self._request.ws_close_code, + self._request.ws_close_reason) + + # Drain junk data after the close frame if necessary. + self._drain_received_data() + + if self._request.server_terminated: + self._logger.debug( + 'Received ack for server-initiated closing handshake') + return + + self._logger.debug( + 'Received client-initiated closing handshake') + + code = common.STATUS_NORMAL_CLOSURE + reason = '' + if hasattr(self._request, '_dispatcher'): + dispatcher = self._request._dispatcher + code, reason = dispatcher.passive_closing_handshake( + self._request) + if code is None and reason is not None and len(reason) > 0: + self._logger.warning( + 'Handler specified reason despite code being None') + reason = '' + if reason is None: + reason = '' + self._send_closing_handshake(code, reason) + self._logger.debug( + 'Sent ack for client-initiated closing handshake ' + '(code=%r, reason=%r)', code, reason) + + def _process_ping_message(self, message): + """Processes ping message. + + Args: + message: ping message. + """ + + try: + handler = self._request.on_ping_handler + if handler: + handler(self._request, message) + return + except AttributeError, e: + pass + self._send_pong(message) + + def _process_pong_message(self, message): + """Processes pong message. + + Args: + message: pong message. + """ + + # TODO(tyoshino): Add ping timeout handling. + + inflight_pings = deque() + + while True: + try: + expected_body = self._ping_queue.popleft() + if expected_body == message: + # inflight_pings contains pings ignored by the + # other peer. Just forget them. + self._logger.debug( + 'Ping %r is acked (%d pings were ignored)', + expected_body, len(inflight_pings)) + break + else: + inflight_pings.append(expected_body) + except IndexError, e: + # The received pong was unsolicited pong. Keep the + # ping queue as is. + self._ping_queue = inflight_pings + self._logger.debug('Received a unsolicited pong') + break + + try: + handler = self._request.on_pong_handler + if handler: + handler(self._request, message) + except AttributeError, e: + pass + def receive_message(self): """Receive a WebSocket frame and return its payload as a text in unicode or a binary in str. @@ -482,52 +777,12 @@ class Stream(StreamBase): 'Unsupported flag is set (rsv = %d%d%d)' % (frame.rsv1, frame.rsv2, frame.rsv3)) - if frame.opcode == common.OPCODE_CONTINUATION: - if not self._received_fragments: - if frame.fin: - raise InvalidFrameException( - 'Received a termination frame but fragmentation ' - 'not started') - else: - raise InvalidFrameException( - 'Received an intermediate frame but ' - 'fragmentation not started') - - if frame.fin: - # End of fragmentation frame - self._received_fragments.append(frame.payload) - message = ''.join(self._received_fragments) - self._received_fragments = [] - else: - # Intermediate frame - self._received_fragments.append(frame.payload) - continue - else: - if self._received_fragments: - if frame.fin: - raise InvalidFrameException( - 'Received an unfragmented frame without ' - 'terminating existing fragmentation') - else: - raise InvalidFrameException( - 'New fragmentation started without terminating ' - 'existing fragmentation') - - if frame.fin: - # Unfragmented frame - - self._original_opcode = frame.opcode - message = frame.payload - else: - # Start of fragmentation frame - - if common.is_control_opcode(frame.opcode): - raise InvalidFrameException( - 'Control frames must not be fragmented') + message = self._get_message_from_frame(frame) + if message is None: + continue - self._original_opcode = frame.opcode - self._received_fragments.append(frame.payload) - continue + for message_filter in self._options.incoming_message_filters: + message = message_filter.filter(message) if self._original_opcode == common.OPCODE_TEXT: # The WebSocket protocol section 4.4 specifies that invalid @@ -540,124 +795,21 @@ class Stream(StreamBase): elif self._original_opcode == common.OPCODE_BINARY: return message elif self._original_opcode == common.OPCODE_CLOSE: - self._request.client_terminated = True - - # Status code is optional. We can have status reason only if we - # have status code. Status reason can be empty string. So, - # allowed cases are - # - no application data: no code no reason - # - 2 octet of application data: has code but no reason - # - 3 or more octet of application data: both code and reason - if len(message) == 0: - self._logger.debug('Received close frame (empty body)') - self._request.ws_close_code = ( - common.STATUS_NO_STATUS_RECEIVED) - elif len(message) == 1: - raise InvalidFrameException( - 'If a close frame has status code, the length of ' - 'status code must be 2 octet') - elif len(message) >= 2: - self._request.ws_close_code = struct.unpack( - '!H', message[0:2])[0] - self._request.ws_close_reason = message[2:].decode( - 'utf-8', 'replace') - self._logger.debug( - 'Received close frame (code=%d, reason=%r)', - self._request.ws_close_code, - self._request.ws_close_reason) - - # Drain junk data after the close frame if necessary. - self._drain_received_data() - - if self._request.server_terminated: - self._logger.debug( - 'Received ack for server-initiated closing handshake') - return None - - self._logger.debug( - 'Received client-initiated closing handshake') - - code = common.STATUS_NORMAL_CLOSURE - reason = '' - if hasattr(self._request, '_dispatcher'): - dispatcher = self._request._dispatcher - code, reason = dispatcher.passive_closing_handshake( - self._request) - if code is None and reason is not None and len(reason) > 0: - self._logger.warning( - 'Handler specified reason despite code being None') - reason = '' - if reason is None: - reason = '' - self._send_closing_handshake(code, reason) - self._logger.debug( - 'Sent ack for client-initiated closing handshake ' - '(code=%r, reason=%r)', code, reason) + self._process_close_message(message) return None elif self._original_opcode == common.OPCODE_PING: - try: - handler = self._request.on_ping_handler - if handler: - handler(self._request, message) - continue - except AttributeError, e: - pass - self._send_pong(message) + self._process_ping_message(message) elif self._original_opcode == common.OPCODE_PONG: - # TODO(tyoshino): Add ping timeout handling. - - inflight_pings = deque() - - while True: - try: - expected_body = self._ping_queue.popleft() - if expected_body == message: - # inflight_pings contains pings ignored by the - # other peer. Just forget them. - self._logger.debug( - 'Ping %r is acked (%d pings were ignored)', - expected_body, len(inflight_pings)) - break - else: - inflight_pings.append(expected_body) - except IndexError, e: - # The received pong was unsolicited pong. Keep the - # ping queue as is. - self._ping_queue = inflight_pings - self._logger.debug('Received a unsolicited pong') - break - - try: - handler = self._request.on_pong_handler - if handler: - handler(self._request, message) - continue - except AttributeError, e: - pass - - continue + self._process_pong_message(message) else: raise UnsupportedFrameException( 'Opcode %d is not supported' % self._original_opcode) def _send_closing_handshake(self, code, reason): - body = '' - if code is not None: - if (code > common.STATUS_USER_PRIVATE_MAX or - code < common.STATUS_NORMAL_CLOSURE): - raise BadOperationException('Status code is out of range') - if (code == common.STATUS_NO_STATUS_RECEIVED or - code == common.STATUS_ABNORMAL_CLOSURE or - code == common.STATUS_TLS_HANDSHAKE): - raise BadOperationException('Status code is reserved pseudo ' - 'code') - encoded_reason = reason.encode('utf-8') - body = struct.pack('!H', code) + encoded_reason - + body = create_closing_handshake_body(code, reason) frame = create_close_frame( - body, - self._options.mask_send, - self._options.outgoing_frame_filters) + body, mask=self._options.mask_send, + frame_filters=self._options.outgoing_frame_filters) self._request.server_terminated = True @@ -731,6 +883,14 @@ class Stream(StreamBase): self._options.outgoing_frame_filters) self._write(frame) + def get_last_received_opcode(self): + """Returns the opcode of the WebSocket message which the last received + frame belongs to. The return value is valid iff immediately after + receive_message call. + """ + + return self._original_opcode + def _drain_received_data(self): """Drains unread data in the receive buffer to avoid sending out TCP RST packet. This is because when deflate-stream is enabled, some diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/common.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/common.py index 710967c80..2388379c0 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/common.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/common.py @@ -104,7 +104,10 @@ SEC_WEBSOCKET_LOCATION_HEADER = 'Sec-WebSocket-Location' DEFLATE_STREAM_EXTENSION = 'deflate-stream' DEFLATE_FRAME_EXTENSION = 'deflate-frame' PERFRAME_COMPRESSION_EXTENSION = 'perframe-compress' +PERMESSAGE_COMPRESSION_EXTENSION = 'permessage-compress' X_WEBKIT_DEFLATE_FRAME_EXTENSION = 'x-webkit-deflate-frame' +X_WEBKIT_PERMESSAGE_COMPRESSION_EXTENSION = 'x-webkit-permessage-compress' +MUX_EXTENSION = 'mux_DO_NOT_USE' # Status codes # Code STATUS_NO_STATUS_RECEIVED, STATUS_ABNORMAL_CLOSURE, and @@ -125,7 +128,7 @@ STATUS_INVALID_FRAME_PAYLOAD_DATA = 1007 STATUS_POLICY_VIOLATION = 1008 STATUS_MESSAGE_TOO_BIG = 1009 STATUS_MANDATORY_EXTENSION = 1010 -STATUS_INTERNAL_SERVER_ERROR = 1011 +STATUS_INTERNAL_ENDPOINT_ERROR = 1011 STATUS_TLS_HANDSHAKE = 1015 STATUS_USER_REGISTERED_BASE = 3000 STATUS_USER_REGISTERED_MAX = 3999 diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/dispatch.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/dispatch.py index ab1eb4fb3..25905f180 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/dispatch.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/dispatch.py @@ -39,6 +39,7 @@ import re from mod_pywebsocket import common from mod_pywebsocket import handshake from mod_pywebsocket import msgutil +from mod_pywebsocket import mux from mod_pywebsocket import stream from mod_pywebsocket import util @@ -277,13 +278,18 @@ class Dispatcher(object): AbortedByUserException: when user handler abort connection """ - handler_suite = self.get_handler_suite(request.ws_resource) - if handler_suite is None: - raise DispatchException('No handler for: %r' % request.ws_resource) - transfer_data_ = handler_suite.transfer_data # TODO(tyoshino): Terminate underlying TCP connection if possible. try: - transfer_data_(request) + if mux.use_mux(request): + mux.start(request, self) + else: + handler_suite = self.get_handler_suite(request.ws_resource) + if handler_suite is None: + raise DispatchException('No handler for: %r' % + request.ws_resource) + transfer_data_ = handler_suite.transfer_data + transfer_data_(request) + if not request.server_terminated: request.ws_stream.close_connection() # Catch non-critical exceptions the handler didn't handle. diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/extensions.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/extensions.py index 52b7a4a19..03dbf9ee1 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/extensions.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/extensions.py @@ -38,6 +38,9 @@ _available_processors = {} class ExtensionProcessorInterface(object): + def name(self): + return None + def get_extension_response(self): return None @@ -46,13 +49,21 @@ class ExtensionProcessorInterface(object): class DeflateStreamExtensionProcessor(ExtensionProcessorInterface): - """WebSocket DEFLATE stream extension processor.""" + """WebSocket DEFLATE stream extension processor. + + Specification: + Section 9.2.1 in + http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10 + """ def __init__(self, request): self._logger = util.get_class_logger(self) self._request = request + def name(self): + return common.DEFLATE_STREAM_EXTENSION + def get_extension_response(self): if len(self._request.get_parameter_names()) != 0: return None @@ -70,8 +81,40 @@ _available_processors[common.DEFLATE_STREAM_EXTENSION] = ( DeflateStreamExtensionProcessor) +def _log_compression_ratio(logger, original_bytes, total_original_bytes, + filtered_bytes, total_filtered_bytes): + # Print inf when ratio is not available. + ratio = float('inf') + average_ratio = float('inf') + if original_bytes != 0: + ratio = float(filtered_bytes) / original_bytes + if total_original_bytes != 0: + average_ratio = ( + float(total_filtered_bytes) / total_original_bytes) + logger.debug('Outgoing compress ratio: %f (average: %f)' % + (ratio, average_ratio)) + + +def _log_decompression_ratio(logger, received_bytes, total_received_bytes, + filtered_bytes, total_filtered_bytes): + # Print inf when ratio is not available. + ratio = float('inf') + average_ratio = float('inf') + if received_bytes != 0: + ratio = float(received_bytes) / filtered_bytes + if total_filtered_bytes != 0: + average_ratio = ( + float(total_received_bytes) / total_filtered_bytes) + logger.debug('Incoming compress ratio: %f (average: %f)' % + (ratio, average_ratio)) + + class DeflateFrameExtensionProcessor(ExtensionProcessorInterface): - """WebSocket Per-frame DEFLATE extension processor.""" + """WebSocket Per-frame DEFLATE extension processor. + + Specification: + http://tools.ietf.org/html/draft-tyoshino-hybi-websocket-perframe-deflate + """ _WINDOW_BITS_PARAM = 'max_window_bits' _NO_CONTEXT_TAKEOVER_PARAM = 'no_context_takeover' @@ -83,6 +126,7 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface): self._response_window_bits = None self._response_no_context_takeover = False + self._bfinal = False # Counters for statistics. @@ -96,6 +140,9 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface): # Total number of incoming bytes obtained after applying this filter. self._total_filtered_incoming_payload_bytes = 0 + def name(self): + return common.DEFLATE_FRAME_EXTENSION + def get_extension_response(self): # Any unknown parameter will be just ignored. @@ -173,6 +220,9 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface): def set_response_no_context_takeover(self, value): self._response_no_context_takeover = value + def set_bfinal(self, value): + self._bfinal = value + def enable_outgoing_compression(self): self._compress_outgoing = True @@ -193,24 +243,17 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface): original_payload_size) return - frame.payload = self._deflater.filter(frame.payload) + frame.payload = self._deflater.filter( + frame.payload, bfinal=self._bfinal) frame.rsv1 = 1 filtered_payload_size = len(frame.payload) self._total_filtered_outgoing_payload_bytes += filtered_payload_size - # Print inf when ratio is not available. - ratio = float('inf') - average_ratio = float('inf') - if original_payload_size != 0: - ratio = float(filtered_payload_size) / original_payload_size - if self._total_outgoing_payload_bytes != 0: - average_ratio = ( - float(self._total_filtered_outgoing_payload_bytes) / - self._total_outgoing_payload_bytes) - self._logger.debug( - 'Outgoing compress ratio: %f (average: %f)' % - (ratio, average_ratio)) + _log_compression_ratio(self._logger, original_payload_size, + self._total_outgoing_payload_bytes, + filtered_payload_size, + self._total_filtered_outgoing_payload_bytes) def _incoming_filter(self, frame): """Transform incoming frames. This method is called only by @@ -231,18 +274,10 @@ class DeflateFrameExtensionProcessor(ExtensionProcessorInterface): filtered_payload_size = len(frame.payload) self._total_filtered_incoming_payload_bytes += filtered_payload_size - # Print inf when ratio is not available. - ratio = float('inf') - average_ratio = float('inf') - if received_payload_size != 0: - ratio = float(received_payload_size) / filtered_payload_size - if self._total_filtered_incoming_payload_bytes != 0: - average_ratio = ( - float(self._total_incoming_payload_bytes) / - self._total_filtered_incoming_payload_bytes) - self._logger.debug( - 'Incoming compress ratio: %f (average: %f)' % - (ratio, average_ratio)) + _log_decompression_ratio(self._logger, received_payload_size, + self._total_incoming_payload_bytes, + filtered_payload_size, + self._total_filtered_incoming_payload_bytes) _available_processors[common.DEFLATE_FRAME_EXTENSION] = ( @@ -250,7 +285,7 @@ _available_processors[common.DEFLATE_FRAME_EXTENSION] = ( # Adding vendor-prefixed deflate-frame extension. -# TODO(bashi): Remove this after WebKit stops using vender prefix. +# TODO(bashi): Remove this after WebKit stops using vendor prefix. _available_processors[common.X_WEBKIT_DEFLATE_FRAME_EXTENSION] = ( DeflateFrameExtensionProcessor) @@ -270,21 +305,22 @@ def _create_accepted_method_desc(method_name, method_params): return common.format_extension(extension) -class PerFrameCompressionExtensionProcessor(ExtensionProcessorInterface): - """WebSocket Per-frame compression extension processor.""" +class CompressionExtensionProcessorBase(ExtensionProcessorInterface): + """Base class for Per-frame and Per-message compression extension.""" _METHOD_PARAM = 'method' - _DEFLATE_METHOD = 'deflate' def __init__(self, request): self._logger = util.get_class_logger(self) self._request = request self._compression_method_name = None self._compression_processor = None + self._compression_processor_hook = None + + def name(self): + return '' def _lookup_compression_processor(self, method_desc): - if method_desc.name() == self._DEFLATE_METHOD: - return DeflateFrameExtensionProcessor(method_desc) return None def _get_compression_processor_response(self): @@ -311,6 +347,10 @@ class PerFrameCompressionExtensionProcessor(ExtensionProcessorInterface): break if compression_processor is None: return None + + if self._compression_processor_hook: + self._compression_processor_hook(compression_processor) + processor_response = compression_processor.get_extension_response() if processor_response is None: return None @@ -337,14 +377,345 @@ class PerFrameCompressionExtensionProcessor(ExtensionProcessorInterface): return self._compression_processor.setup_stream_options(stream_options) + def set_compression_processor_hook(self, hook): + self._compression_processor_hook = hook + def get_compression_processor(self): return self._compression_processor +class PerFrameCompressionExtensionProcessor(CompressionExtensionProcessorBase): + """WebSocket Per-frame compression extension processor. + + Specification: + http://tools.ietf.org/html/draft-ietf-hybi-websocket-perframe-compression + """ + + _DEFLATE_METHOD = 'deflate' + + def __init__(self, request): + CompressionExtensionProcessorBase.__init__(self, request) + + def name(self): + return common.PERFRAME_COMPRESSION_EXTENSION + + def _lookup_compression_processor(self, method_desc): + if method_desc.name() == self._DEFLATE_METHOD: + return DeflateFrameExtensionProcessor(method_desc) + return None + + _available_processors[common.PERFRAME_COMPRESSION_EXTENSION] = ( PerFrameCompressionExtensionProcessor) +class DeflateMessageProcessor(ExtensionProcessorInterface): + """Per-message deflate processor.""" + + _S2C_MAX_WINDOW_BITS_PARAM = 's2c_max_window_bits' + _S2C_NO_CONTEXT_TAKEOVER_PARAM = 's2c_no_context_takeover' + _C2S_MAX_WINDOW_BITS_PARAM = 'c2s_max_window_bits' + _C2S_NO_CONTEXT_TAKEOVER_PARAM = 'c2s_no_context_takeover' + + def __init__(self, request): + self._request = request + self._logger = util.get_class_logger(self) + + self._c2s_max_window_bits = None + self._c2s_no_context_takeover = False + self._bfinal = False + + self._compress_outgoing_enabled = False + + # True if a message is fragmented and compression is ongoing. + self._compress_ongoing = False + + # Counters for statistics. + + # Total number of outgoing bytes supplied to this filter. + self._total_outgoing_payload_bytes = 0 + # Total number of bytes sent to the network after applying this filter. + self._total_filtered_outgoing_payload_bytes = 0 + + # Total number of bytes received from the network. + self._total_incoming_payload_bytes = 0 + # Total number of incoming bytes obtained after applying this filter. + self._total_filtered_incoming_payload_bytes = 0 + + def name(self): + return 'deflate' + + def get_extension_response(self): + # Any unknown parameter will be just ignored. + + s2c_max_window_bits = self._request.get_parameter_value( + self._S2C_MAX_WINDOW_BITS_PARAM) + if s2c_max_window_bits is not None: + try: + s2c_max_window_bits = int(s2c_max_window_bits) + except ValueError, e: + return None + if s2c_max_window_bits < 8 or s2c_max_window_bits > 15: + return None + + s2c_no_context_takeover = self._request.has_parameter( + self._S2C_NO_CONTEXT_TAKEOVER_PARAM) + if (s2c_no_context_takeover and + self._request.get_parameter_value( + self._S2C_NO_CONTEXT_TAKEOVER_PARAM) is not None): + return None + + self._deflater = util._RFC1979Deflater( + s2c_max_window_bits, s2c_no_context_takeover) + + self._inflater = util._RFC1979Inflater() + + self._compress_outgoing_enabled = True + + response = common.ExtensionParameter(self._request.name()) + + if s2c_max_window_bits is not None: + response.add_parameter( + self._S2C_MAX_WINDOW_BITS_PARAM, str(s2c_max_window_bits)) + + if s2c_no_context_takeover: + response.add_parameter( + self._S2C_NO_CONTEXT_TAKEOVER_PARAM, None) + + if self._c2s_max_window_bits is not None: + response.add_parameter( + self._C2S_MAX_WINDOW_BITS_PARAM, + str(self._c2s_max_window_bits)) + if self._c2s_no_context_takeover: + response.add_parameter( + self._C2S_NO_CONTEXT_TAKEOVER_PARAM, None) + + self._logger.debug( + 'Enable %s extension (' + 'request: s2c_max_window_bits=%s; s2c_no_context_takeover=%r, ' + 'response: c2s_max_window_bits=%s; c2s_no_context_takeover=%r)' % + (self._request.name(), + s2c_max_window_bits, + s2c_no_context_takeover, + self._c2s_max_window_bits, + self._c2s_no_context_takeover)) + + return response + + def setup_stream_options(self, stream_options): + class _OutgoingMessageFilter(object): + + def __init__(self, parent): + self._parent = parent + + def filter(self, message, end=True, binary=False): + return self._parent._process_outgoing_message( + message, end, binary) + + class _IncomingMessageFilter(object): + + def __init__(self, parent): + self._parent = parent + self._decompress_next_message = False + + def decompress_next_message(self): + self._decompress_next_message = True + + def filter(self, message): + message = self._parent._process_incoming_message( + message, self._decompress_next_message) + self._decompress_next_message = False + return message + + self._outgoing_message_filter = _OutgoingMessageFilter(self) + self._incoming_message_filter = _IncomingMessageFilter(self) + stream_options.outgoing_message_filters.append( + self._outgoing_message_filter) + stream_options.incoming_message_filters.append( + self._incoming_message_filter) + + class _OutgoingFrameFilter(object): + + def __init__(self, parent): + self._parent = parent + self._set_compression_bit = False + + def set_compression_bit(self): + self._set_compression_bit = True + + def filter(self, frame): + self._parent._process_outgoing_frame( + frame, self._set_compression_bit) + self._set_compression_bit = False + + class _IncomingFrameFilter(object): + + def __init__(self, parent): + self._parent = parent + + def filter(self, frame): + self._parent._process_incoming_frame(frame) + + self._outgoing_frame_filter = _OutgoingFrameFilter(self) + self._incoming_frame_filter = _IncomingFrameFilter(self) + stream_options.outgoing_frame_filters.append( + self._outgoing_frame_filter) + stream_options.incoming_frame_filters.append( + self._incoming_frame_filter) + + stream_options.encode_text_message_to_utf8 = False + + def set_c2s_max_window_bits(self, value): + self._c2s_max_window_bits = value + + def set_c2s_no_context_takeover(self, value): + self._c2s_no_context_takeover = value + + def set_bfinal(self, value): + self._bfinal = value + + def enable_outgoing_compression(self): + self._compress_outgoing_enabled = True + + def disable_outgoing_compression(self): + self._compress_outgoing_enabled = False + + def _process_incoming_message(self, message, decompress): + if not decompress: + return message + + received_payload_size = len(message) + self._total_incoming_payload_bytes += received_payload_size + + message = self._inflater.filter(message) + + filtered_payload_size = len(message) + self._total_filtered_incoming_payload_bytes += filtered_payload_size + + _log_decompression_ratio(self._logger, received_payload_size, + self._total_incoming_payload_bytes, + filtered_payload_size, + self._total_filtered_incoming_payload_bytes) + + return message + + def _process_outgoing_message(self, message, end, binary): + if not binary: + message = message.encode('utf-8') + + if not self._compress_outgoing_enabled: + return message + + original_payload_size = len(message) + self._total_outgoing_payload_bytes += original_payload_size + + message = self._deflater.filter( + message, flush=end, bfinal=self._bfinal) + + filtered_payload_size = len(message) + self._total_filtered_outgoing_payload_bytes += filtered_payload_size + + _log_compression_ratio(self._logger, original_payload_size, + self._total_outgoing_payload_bytes, + filtered_payload_size, + self._total_filtered_outgoing_payload_bytes) + + if not self._compress_ongoing: + self._outgoing_frame_filter.set_compression_bit() + self._compress_ongoing = not end + return message + + def _process_incoming_frame(self, frame): + if frame.rsv1 == 1 and not common.is_control_opcode(frame.opcode): + self._incoming_message_filter.decompress_next_message() + frame.rsv1 = 0 + + def _process_outgoing_frame(self, frame, compression_bit): + if (not compression_bit or + common.is_control_opcode(frame.opcode)): + return + + frame.rsv1 = 1 + + +class PerMessageCompressionExtensionProcessor( + CompressionExtensionProcessorBase): + """WebSocket Per-message compression extension processor. + + Specification: + http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression + """ + + _DEFLATE_METHOD = 'deflate' + + def __init__(self, request): + CompressionExtensionProcessorBase.__init__(self, request) + + def name(self): + return common.PERMESSAGE_COMPRESSION_EXTENSION + + def _lookup_compression_processor(self, method_desc): + if method_desc.name() == self._DEFLATE_METHOD: + return DeflateMessageProcessor(method_desc) + return None + + +_available_processors[common.PERMESSAGE_COMPRESSION_EXTENSION] = ( + PerMessageCompressionExtensionProcessor) + + +# Adding vendor-prefixed permessage-compress extension. +# TODO(bashi): Remove this after WebKit stops using vendor prefix. +_available_processors[common.X_WEBKIT_PERMESSAGE_COMPRESSION_EXTENSION] = ( + PerMessageCompressionExtensionProcessor) + + +class MuxExtensionProcessor(ExtensionProcessorInterface): + """WebSocket multiplexing extension processor.""" + + _QUOTA_PARAM = 'quota' + + def __init__(self, request): + self._request = request + + def name(self): + return common.MUX_EXTENSION + + def get_extension_response(self, ws_request, + logical_channel_extensions): + # Mux extension cannot be used after extensions that depend on + # frame boundary, extension data field, or any reserved bits + # which are attributed to each frame. + for extension in logical_channel_extensions: + name = extension.name() + if (name == common.PERFRAME_COMPRESSION_EXTENSION or + name == common.DEFLATE_FRAME_EXTENSION or + name == common.X_WEBKIT_DEFLATE_FRAME_EXTENSION): + return None + + quota = self._request.get_parameter_value(self._QUOTA_PARAM) + if quota is None: + ws_request.mux_quota = 0 + else: + try: + quota = int(quota) + except ValueError, e: + return None + if quota < 0 or quota >= 2 ** 32: + return None + ws_request.mux_quota = quota + + ws_request.mux = True + ws_request.mux_extensions = logical_channel_extensions + return common.ExtensionParameter(common.MUX_EXTENSION) + + def setup_stream_options(self, stream_options): + pass + + +_available_processors[common.MUX_EXTENSION] = MuxExtensionProcessor + + def get_extension_processor(extension_request): global _available_processors processor_class = _available_processors.get(extension_request.name()) diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/__init__.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/__init__.py index 10a178314..194f6b395 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/__init__.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/__init__.py @@ -37,7 +37,6 @@ successfully established. import logging from mod_pywebsocket import common -from mod_pywebsocket.handshake import draft75 from mod_pywebsocket.handshake import hybi00 from mod_pywebsocket.handshake import hybi # Export AbortedByUserException, HandshakeException, and VersionException @@ -56,10 +55,8 @@ def do_handshake(request, dispatcher, allowDraft75=False, strict=False): Args: request: mod_python request. dispatcher: Dispatcher (dispatch.Dispatcher). - allowDraft75: allow draft 75 handshake protocol. - strict: Strictly check handshake request in draft 75. - Default: False. If True, request.connection must provide - get_memorized_lines method. + allowDraft75: obsolete argument. ignored. + strict: obsolete argument. ignored. Handshaker will add attributes such as ws_resource in performing handshake. @@ -86,9 +83,6 @@ def do_handshake(request, dispatcher, allowDraft75=False, strict=False): ('RFC 6455', hybi.Handshaker(request, dispatcher))) handshakers.append( ('HyBi 00', hybi00.Handshaker(request, dispatcher))) - if allowDraft75: - handshakers.append( - ('Hixie 75', draft75.Handshaker(request, dispatcher, strict))) for name, handshaker in handshakers: _LOGGER.debug('Trying protocol version %s', name) diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/_base.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/_base.py index bc095b129..e5c94ca90 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/_base.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/_base.py @@ -85,13 +85,16 @@ def get_default_port(is_secure): def validate_subprotocol(subprotocol, hixie): - """Validate a value in subprotocol fields such as WebSocket-Protocol, - Sec-WebSocket-Protocol. + """Validate a value in the Sec-WebSocket-Protocol field. See - RFC 6455: Section 4.1., 4.2.2., and 4.3. - HyBi 00: Section 4.1. Opening handshake - - Hixie 75: Section 4.1. Handshake + + Args: + hixie: if True, checks if characters in subprotocol are in range + between U+0020 and U+007E. It's required by HyBi 00 but not by + RFC 6455. """ if not subprotocol: @@ -170,7 +173,11 @@ def check_request_line(request): # 5.1 1. The three character UTF-8 string "GET". # 5.1 2. A UTF-8-encoded U+0020 SPACE character (0x20 byte). if request.method != 'GET': - raise HandshakeException('Method is not GET') + raise HandshakeException('Method is not GET: %r' % request.method) + + if request.protocol != 'HTTP/1.1': + raise HandshakeException('Version is not HTTP/1.1: %r' % + request.protocol) def check_header_lines(request, mandatory_headers): diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/draft75.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/draft75.py deleted file mode 100644 index 802a31c9a..000000000 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/draft75.py +++ /dev/null @@ -1,190 +0,0 @@ -# Copyright 2011, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -"""WebSocket handshaking defined in draft-hixie-thewebsocketprotocol-75.""" - - -# Note: request.connection.write is used in this module, even though mod_python -# document says that it should be used only in connection handlers. -# Unfortunately, we have no other options. For example, request.write is not -# suitable because it doesn't allow direct raw bytes writing. - - -import logging -import re - -from mod_pywebsocket import common -from mod_pywebsocket.stream import StreamHixie75 -from mod_pywebsocket import util -from mod_pywebsocket.handshake._base import HandshakeException -from mod_pywebsocket.handshake._base import build_location -from mod_pywebsocket.handshake._base import validate_subprotocol - - -_MANDATORY_HEADERS = [ - # key, expected value or None - ['Upgrade', 'WebSocket'], - ['Connection', 'Upgrade'], - ['Host', None], - ['Origin', None], -] - -_FIRST_FIVE_LINES = map(re.compile, [ - r'^GET /[\S]* HTTP/1.1\r\n$', - r'^Upgrade: WebSocket\r\n$', - r'^Connection: Upgrade\r\n$', - r'^Host: [\S]+\r\n$', - r'^Origin: [\S]+\r\n$', -]) - -_SIXTH_AND_LATER = re.compile( - r'^' - r'(WebSocket-Protocol: [\x20-\x7e]+\r\n)?' - r'(Cookie: [^\r]*\r\n)*' - r'(Cookie2: [^\r]*\r\n)?' - r'(Cookie: [^\r]*\r\n)*' - r'\r\n') - - -class Handshaker(object): - """This class performs WebSocket handshake.""" - - def __init__(self, request, dispatcher, strict=False): - """Construct an instance. - - Args: - request: mod_python request. - dispatcher: Dispatcher (dispatch.Dispatcher). - strict: Strictly check handshake request. Default: False. - If True, request.connection must provide get_memorized_lines - method. - - Handshaker will add attributes such as ws_resource in performing - handshake. - """ - - self._logger = util.get_class_logger(self) - - self._request = request - self._dispatcher = dispatcher - self._strict = strict - - def do_handshake(self): - """Perform WebSocket Handshake. - - On _request, we set - ws_resource, ws_origin, ws_location, ws_protocol - ws_challenge_md5: WebSocket handshake information. - ws_stream: Frame generation/parsing class. - ws_version: Protocol version. - """ - - self._check_header_lines() - self._set_resource() - self._set_origin() - self._set_location() - self._set_subprotocol() - self._set_protocol_version() - - self._dispatcher.do_extra_handshake(self._request) - - self._send_handshake() - - self._logger.debug('Sent opening handshake response') - - def _set_resource(self): - self._request.ws_resource = self._request.uri - - def _set_origin(self): - self._request.ws_origin = self._request.headers_in['Origin'] - - def _set_location(self): - self._request.ws_location = build_location(self._request) - - def _set_subprotocol(self): - subprotocol = self._request.headers_in.get('WebSocket-Protocol') - if subprotocol is not None: - validate_subprotocol(subprotocol, hixie=True) - self._request.ws_protocol = subprotocol - - def _set_protocol_version(self): - self._logger.debug('IETF Hixie 75 protocol') - self._request.ws_version = common.VERSION_HIXIE75 - self._request.ws_stream = StreamHixie75(self._request) - - def _sendall(self, data): - self._request.connection.write(data) - - def _send_handshake(self): - self._sendall('HTTP/1.1 101 Web Socket Protocol Handshake\r\n') - self._sendall('Upgrade: WebSocket\r\n') - self._sendall('Connection: Upgrade\r\n') - self._sendall('WebSocket-Origin: %s\r\n' % self._request.ws_origin) - self._sendall('WebSocket-Location: %s\r\n' % self._request.ws_location) - if self._request.ws_protocol: - self._sendall( - 'WebSocket-Protocol: %s\r\n' % self._request.ws_protocol) - self._sendall('\r\n') - - def _check_header_lines(self): - for key, expected_value in _MANDATORY_HEADERS: - actual_value = self._request.headers_in.get(key) - if not actual_value: - raise HandshakeException('Header %s is not defined' % key) - if expected_value: - if actual_value != expected_value: - raise HandshakeException( - 'Expected %r for header %s but found %r' % - (expected_value, key, actual_value)) - if self._strict: - try: - lines = self._request.connection.get_memorized_lines() - except AttributeError, e: - raise AttributeError( - 'Strict handshake is specified but the connection ' - 'doesn\'t provide get_memorized_lines()') - self._check_first_lines(lines) - - def _check_first_lines(self, lines): - if len(lines) < len(_FIRST_FIVE_LINES): - raise HandshakeException('Too few header lines: %d' % len(lines)) - for line, regexp in zip(lines, _FIRST_FIVE_LINES): - if not regexp.search(line): - raise HandshakeException( - 'Unexpected header: %r doesn\'t match %r' - % (line, regexp.pattern)) - sixth_and_later = ''.join(lines[5:]) - if not _SIXTH_AND_LATER.search(sixth_and_later): - raise HandshakeException( - 'Unexpected header: %r doesn\'t match %r' - % (sixth_and_later, _SIXTH_AND_LATER.pattern)) - - -# vi:sts=4 sw=4 et diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/hybi.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/hybi.py index 2883acbf8..fc0e2a096 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/hybi.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/handshake/hybi.py @@ -182,34 +182,60 @@ class Handshaker(object): # Extra handshake handler may modify/remove processors. self._dispatcher.do_extra_handshake(self._request) + processors = filter(lambda processor: processor is not None, + self._request.ws_extension_processors) + + accepted_extensions = [] + + # We need to take care of mux extension here. Extensions that + # are placed before mux should be applied to logical channels. + mux_index = -1 + for i, processor in enumerate(processors): + if processor.name() == common.MUX_EXTENSION: + mux_index = i + break + if mux_index >= 0: + mux_processor = processors[mux_index] + logical_channel_processors = processors[:mux_index] + processors = processors[mux_index+1:] + + for processor in logical_channel_processors: + extension_response = processor.get_extension_response() + if extension_response is None: + # Rejected. + continue + accepted_extensions.append(extension_response) + # Pass a shallow copy of accepted_extensions as extensions for + # logical channels. + mux_response = mux_processor.get_extension_response( + self._request, accepted_extensions[:]) + if mux_response is not None: + accepted_extensions.append(mux_response) stream_options = StreamOptions() - self._request.ws_extensions = None - for processor in self._request.ws_extension_processors: - if processor is None: - # Some processors may be removed by extra handshake - # handler. - continue + # When there is mux extension, here, |processors| contain only + # prosessors for extensions placed after mux. + for processor in processors: extension_response = processor.get_extension_response() if extension_response is None: # Rejected. continue - if self._request.ws_extensions is None: - self._request.ws_extensions = [] - self._request.ws_extensions.append(extension_response) + accepted_extensions.append(extension_response) processor.setup_stream_options(stream_options) - if self._request.ws_extensions is not None: + if len(accepted_extensions) > 0: + self._request.ws_extensions = accepted_extensions self._logger.debug( 'Extensions accepted: %r', - map(common.ExtensionParameter.name, - self._request.ws_extensions)) + map(common.ExtensionParameter.name, accepted_extensions)) + else: + self._request.ws_extensions = None - self._request.ws_stream = Stream(self._request, stream_options) + self._request.ws_stream = self._create_stream(stream_options) if self._request.ws_requested_protocols is not None: if self._request.ws_protocol is None: @@ -268,7 +294,7 @@ class Handshaker(object): protocol_header = self._request.headers_in.get( common.SEC_WEBSOCKET_PROTOCOL_HEADER) - if not protocol_header: + if protocol_header is None: self._request.ws_requested_protocols = None return @@ -341,7 +367,10 @@ class Handshaker(object): return key - def _send_handshake(self, accept): + def _create_stream(self, stream_options): + return Stream(self._request, stream_options) + + def _create_handshake_response(self, accept): response = [] response.append('HTTP/1.1 101 Switching Protocols\r\n') @@ -363,7 +392,10 @@ class Handshaker(object): common.format_extensions(self._request.ws_extensions))) response.append('\r\n') - raw_response = ''.join(response) + return ''.join(response) + + def _send_handshake(self, accept): + raw_response = self._create_handshake_response(accept) self._request.connection.write(raw_response) self._logger.debug('Sent server\'s opening handshake: %r', raw_response) diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/headerparserhandler.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/headerparserhandler.py index b68c240e1..2cc62de04 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/headerparserhandler.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/headerparserhandler.py @@ -63,8 +63,9 @@ _PYOPT_ALLOW_HANDLERS_OUTSIDE_ROOT = ( _PYOPT_ALLOW_HANDLERS_OUTSIDE_ROOT_DEFINITION = { 'off': False, 'no': False, 'on': True, 'yes': True} -# PythonOption to specify to allow draft75 handshake. -# The default is None (Off) +# (Obsolete option. Ignored.) +# PythonOption to specify to allow handshake defined in Hixie 75 version +# protocol. The default is None (Off) _PYOPT_ALLOW_DRAFT75 = 'mod_pywebsocket.allow_draft75' # Map from values to their meanings. _PYOPT_ALLOW_DRAFT75_DEFINITION = {'off': False, 'on': True} diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/msgutil.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/msgutil.py index 21ffdacf6..4c1a0114b 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/msgutil.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/msgutil.py @@ -59,20 +59,20 @@ def close_connection(request): request.ws_stream.close_connection() -def send_message(request, message, end=True, binary=False): - """Send message. +def send_message(request, payload_data, end=True, binary=False): + """Send a message (or part of a message). Args: request: mod_python request. - message: unicode text or str binary to send. - end: False to send message as a fragment. All messages until the - first call with end=True (inclusive) will be delivered to the - client in separate frames but as one WebSocket message. - binary: send message as binary frame. + payload_data: unicode text or str binary to send. + end: True to terminate a message. + False to send payload_data as part of a message that is to be + terminated by next or later send_message call with end=True. + binary: send payload_data as binary frame(s). Raises: BadOperationException: when server already terminated. """ - request.ws_stream.send_message(message, end, binary) + request.ws_stream.send_message(payload_data, end, binary) def receive_message(request): diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/mux.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/mux.py new file mode 100644 index 000000000..f0bdd2461 --- /dev/null +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/mux.py @@ -0,0 +1,1636 @@ +# Copyright 2012, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +"""This file provides classes and helper functions for multiplexing extension. + +Specification: +http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-06 +""" + + +import collections +import copy +import email +import email.parser +import logging +import math +import struct +import threading +import traceback + +from mod_pywebsocket import common +from mod_pywebsocket import handshake +from mod_pywebsocket import util +from mod_pywebsocket._stream_base import BadOperationException +from mod_pywebsocket._stream_base import ConnectionTerminatedException +from mod_pywebsocket._stream_hybi import Frame +from mod_pywebsocket._stream_hybi import Stream +from mod_pywebsocket._stream_hybi import StreamOptions +from mod_pywebsocket._stream_hybi import create_binary_frame +from mod_pywebsocket._stream_hybi import create_closing_handshake_body +from mod_pywebsocket._stream_hybi import create_header +from mod_pywebsocket._stream_hybi import create_length_header +from mod_pywebsocket._stream_hybi import parse_frame +from mod_pywebsocket.handshake import hybi + + +_CONTROL_CHANNEL_ID = 0 +_DEFAULT_CHANNEL_ID = 1 + +_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0 +_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1 +_MUX_OPCODE_FLOW_CONTROL = 2 +_MUX_OPCODE_DROP_CHANNEL = 3 +_MUX_OPCODE_NEW_CHANNEL_SLOT = 4 + +_MAX_CHANNEL_ID = 2 ** 29 - 1 + +_INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64 +_INITIAL_QUOTA_FOR_CLIENT = 8 * 1024 + +_HANDSHAKE_ENCODING_IDENTITY = 0 +_HANDSHAKE_ENCODING_DELTA = 1 + +# We need only these status code for now. +_HTTP_BAD_RESPONSE_MESSAGES = { + common.HTTP_STATUS_BAD_REQUEST: 'Bad Request', +} + +# DropChannel reason code +# TODO(bashi): Define all reason code defined in -05 draft. +_DROP_CODE_NORMAL_CLOSURE = 1000 + +_DROP_CODE_INVALID_ENCAPSULATING_MESSAGE = 2001 +_DROP_CODE_CHANNEL_ID_TRUNCATED = 2002 +_DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED = 2003 +_DROP_CODE_UNKNOWN_MUX_OPCODE = 2004 +_DROP_CODE_INVALID_MUX_CONTROL_BLOCK = 2005 +_DROP_CODE_CHANNEL_ALREADY_EXISTS = 2006 +_DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION = 2007 + +_DROP_CODE_UNKNOWN_REQUEST_ENCODING = 3002 +_DROP_CODE_SEND_QUOTA_VIOLATION = 3005 +_DROP_CODE_ACKNOWLEDGED = 3008 + + +class MuxUnexpectedException(Exception): + """Exception in handling multiplexing extension.""" + pass + + +# Temporary +class MuxNotImplementedException(Exception): + """Raised when a flow enters unimplemented code path.""" + pass + + +class LogicalConnectionClosedException(Exception): + """Raised when logical connection is gracefully closed.""" + pass + + +class PhysicalConnectionError(Exception): + """Raised when there is a physical connection error.""" + def __init__(self, drop_code, message=''): + super(PhysicalConnectionError, self).__init__( + 'code=%d, message=%r' % (drop_code, message)) + self.drop_code = drop_code + self.message = message + + +class LogicalChannelError(Exception): + """Raised when there is a logical channel error.""" + def __init__(self, channel_id, drop_code, message=''): + super(LogicalChannelError, self).__init__( + 'channel_id=%d, code=%d, message=%r' % ( + channel_id, drop_code, message)) + self.channel_id = channel_id + self.drop_code = drop_code + self.message = message + + +def _encode_channel_id(channel_id): + if channel_id < 0: + raise ValueError('Channel id %d must not be negative' % channel_id) + + if channel_id < 2 ** 7: + return chr(channel_id) + if channel_id < 2 ** 14: + return struct.pack('!H', 0x8000 + channel_id) + if channel_id < 2 ** 21: + first = chr(0xc0 + (channel_id >> 16)) + return first + struct.pack('!H', channel_id & 0xffff) + if channel_id < 2 ** 29: + return struct.pack('!L', 0xe0000000 + channel_id) + + raise ValueError('Channel id %d is too large' % channel_id) + + +def _encode_number(number): + return create_length_header(number, False) + + +def _create_add_channel_response(channel_id, encoded_handshake, + encoding=0, rejected=False, + outer_frame_mask=False): + if encoding != 0 and encoding != 1: + raise ValueError('Invalid encoding %d' % encoding) + + first_byte = ((_MUX_OPCODE_ADD_CHANNEL_RESPONSE << 5) | + (rejected << 4) | encoding) + block = (chr(first_byte) + + _encode_channel_id(channel_id) + + _encode_number(len(encoded_handshake)) + + encoded_handshake) + payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block + return create_binary_frame(payload, mask=outer_frame_mask) + + +def _create_drop_channel(channel_id, code=None, message='', + outer_frame_mask=False): + if len(message) > 0 and code is None: + raise ValueError('Code must be specified if message is specified') + + first_byte = _MUX_OPCODE_DROP_CHANNEL << 5 + block = chr(first_byte) + _encode_channel_id(channel_id) + if code is None: + block += _encode_number(0) # Reason size + else: + reason = struct.pack('!H', code) + message + reason_size = _encode_number(len(reason)) + block += reason_size + reason + + payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block + return create_binary_frame(payload, mask=outer_frame_mask) + + +def _create_flow_control(channel_id, replenished_quota, + outer_frame_mask=False): + first_byte = _MUX_OPCODE_FLOW_CONTROL << 5 + block = (chr(first_byte) + + _encode_channel_id(channel_id) + + _encode_number(replenished_quota)) + payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block + return create_binary_frame(payload, mask=outer_frame_mask) + + +def _create_new_channel_slot(slots, send_quota, outer_frame_mask=False): + if slots < 0 or send_quota < 0: + raise ValueError('slots and send_quota must be non-negative.') + first_byte = _MUX_OPCODE_NEW_CHANNEL_SLOT << 5 + block = (chr(first_byte) + + _encode_number(slots) + + _encode_number(send_quota)) + payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block + return create_binary_frame(payload, mask=outer_frame_mask) + + +def _create_fallback_new_channel_slot(outer_frame_mask=False): + first_byte = (_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) | 1 # Set the F flag + block = (chr(first_byte) + _encode_number(0) + _encode_number(0)) + payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block + return create_binary_frame(payload, mask=outer_frame_mask) + + +def _parse_request_text(request_text): + request_line, header_lines = request_text.split('\r\n', 1) + + words = request_line.split(' ') + if len(words) != 3: + raise ValueError('Bad Request-Line syntax %r' % request_line) + [command, path, version] = words + if version != 'HTTP/1.1': + raise ValueError('Bad request version %r' % version) + + # email.parser.Parser() parses RFC 2822 (RFC 822) style headers. + # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers + # RFC 822. + headers = email.parser.Parser().parsestr(header_lines) + return command, path, version, headers + + +class _ControlBlock(object): + """A structure that holds parsing result of multiplexing control block. + Control block specific attributes will be added by _MuxFramePayloadParser. + (e.g. encoded_handshake will be added for AddChannelRequest and + AddChannelResponse) + """ + + def __init__(self, opcode): + self.opcode = opcode + + +class _MuxFramePayloadParser(object): + """A class that parses multiplexed frame payload.""" + + def __init__(self, payload): + self._data = payload + self._read_position = 0 + self._logger = util.get_class_logger(self) + + def read_channel_id(self): + """Reads channel id. + + Raises: + ValueError: when the payload doesn't contain + valid channel id. + """ + + remaining_length = len(self._data) - self._read_position + pos = self._read_position + if remaining_length == 0: + raise ValueError('Invalid channel id format') + + channel_id = ord(self._data[pos]) + channel_id_length = 1 + if channel_id & 0xe0 == 0xe0: + if remaining_length < 4: + raise ValueError('Invalid channel id format') + channel_id = struct.unpack('!L', + self._data[pos:pos+4])[0] & 0x1fffffff + channel_id_length = 4 + elif channel_id & 0xc0 == 0xc0: + if remaining_length < 3: + raise ValueError('Invalid channel id format') + channel_id = (((channel_id & 0x1f) << 16) + + struct.unpack('!H', self._data[pos+1:pos+3])[0]) + channel_id_length = 3 + elif channel_id & 0x80 == 0x80: + if remaining_length < 2: + raise ValueError('Invalid channel id format') + channel_id = struct.unpack('!H', + self._data[pos:pos+2])[0] & 0x3fff + channel_id_length = 2 + self._read_position += channel_id_length + + return channel_id + + def read_inner_frame(self): + """Reads an inner frame. + + Raises: + PhysicalConnectionError: when the inner frame is invalid. + """ + + if len(self._data) == self._read_position: + raise PhysicalConnectionError( + _DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED) + + bits = ord(self._data[self._read_position]) + self._read_position += 1 + fin = (bits & 0x80) == 0x80 + rsv1 = (bits & 0x40) == 0x40 + rsv2 = (bits & 0x20) == 0x20 + rsv3 = (bits & 0x10) == 0x10 + opcode = bits & 0xf + payload = self.remaining_data() + # Consume rest of the message which is payload data of the original + # frame. + self._read_position = len(self._data) + return fin, rsv1, rsv2, rsv3, opcode, payload + + def _read_number(self): + if self._read_position + 1 > len(self._data): + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Cannot read the first byte of number field') + + number = ord(self._data[self._read_position]) + if number & 0x80 == 0x80: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'The most significant bit of the first byte of number should ' + 'be unset') + self._read_position += 1 + pos = self._read_position + if number == 127: + if pos + 8 > len(self._data): + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Invalid number field') + self._read_position += 8 + number = struct.unpack('!Q', self._data[pos:pos+8])[0] + if number > 0x7FFFFFFFFFFFFFFF: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Encoded number >= 2^63') + if number <= 0xFFFF: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + '%d should not be encoded by 9 bytes encoding' % number) + return number + if number == 126: + if pos + 2 > len(self._data): + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Invalid number field') + self._read_position += 2 + number = struct.unpack('!H', self._data[pos:pos+2])[0] + if number <= 125: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + '%d should not be encoded by 3 bytes encoding' % number) + return number + + def _read_size_and_contents(self): + """Reads data that consists of followings: + - the size of the contents encoded the same way as payload length + of the WebSocket Protocol with 1 bit padding at the head. + - the contents. + """ + + size = self._read_number() + pos = self._read_position + if pos + size > len(self._data): + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Cannot read %d bytes data' % size) + + self._read_position += size + return self._data[pos:pos+size] + + def _read_add_channel_request(self, first_byte, control_block): + reserved = (first_byte >> 2) & 0x7 + if reserved != 0: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Reserved bits must be unset') + + # Invalid encoding will be handled by MuxHandler. + encoding = first_byte & 0x3 + try: + control_block.channel_id = self.read_channel_id() + except ValueError, e: + raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK) + control_block.encoding = encoding + encoded_handshake = self._read_size_and_contents() + control_block.encoded_handshake = encoded_handshake + return control_block + + def _read_add_channel_response(self, first_byte, control_block): + reserved = (first_byte >> 2) & 0x3 + if reserved != 0: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Reserved bits must be unset') + + control_block.accepted = (first_byte >> 4) & 1 + control_block.encoding = first_byte & 0x3 + try: + control_block.channel_id = self.read_channel_id() + except ValueError, e: + raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK) + control_block.encoded_handshake = self._read_size_and_contents() + return control_block + + def _read_flow_control(self, first_byte, control_block): + reserved = first_byte & 0x1f + if reserved != 0: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Reserved bits must be unset') + + try: + control_block.channel_id = self.read_channel_id() + except ValueError, e: + raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK) + control_block.send_quota = self._read_number() + return control_block + + def _read_drop_channel(self, first_byte, control_block): + reserved = first_byte & 0x1f + if reserved != 0: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Reserved bits must be unset') + + try: + control_block.channel_id = self.read_channel_id() + except ValueError, e: + raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK) + reason = self._read_size_and_contents() + if len(reason) == 0: + control_block.drop_code = None + control_block.drop_message = '' + elif len(reason) >= 2: + control_block.drop_code = struct.unpack('!H', reason[:2])[0] + control_block.drop_message = reason[2:] + else: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Received DropChannel that conains only 1-byte reason') + return control_block + + def _read_new_channel_slot(self, first_byte, control_block): + reserved = first_byte & 0x1e + if reserved != 0: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Reserved bits must be unset') + control_block.fallback = first_byte & 1 + control_block.slots = self._read_number() + control_block.send_quota = self._read_number() + return control_block + + def read_control_blocks(self): + """Reads control block(s). + + Raises: + PhysicalConnectionError: when the payload contains invalid control + block(s). + StopIteration: when no control blocks left. + """ + + while self._read_position < len(self._data): + first_byte = ord(self._data[self._read_position]) + self._read_position += 1 + opcode = (first_byte >> 5) & 0x7 + control_block = _ControlBlock(opcode=opcode) + if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: + yield self._read_add_channel_request(first_byte, control_block) + elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: + yield self._read_add_channel_response( + first_byte, control_block) + elif opcode == _MUX_OPCODE_FLOW_CONTROL: + yield self._read_flow_control(first_byte, control_block) + elif opcode == _MUX_OPCODE_DROP_CHANNEL: + yield self._read_drop_channel(first_byte, control_block) + elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: + yield self._read_new_channel_slot(first_byte, control_block) + else: + raise PhysicalConnectionError( + _DROP_CODE_UNKNOWN_MUX_OPCODE, + 'Invalid opcode %d' % opcode) + + assert self._read_position == len(self._data) + raise StopIteration + + def remaining_data(self): + """Returns remaining data.""" + + return self._data[self._read_position:] + + +class _LogicalRequest(object): + """Mimics mod_python request.""" + + def __init__(self, channel_id, command, path, protocol, headers, + connection): + """Constructs an instance. + + Args: + channel_id: the channel id of the logical channel. + command: HTTP request command. + path: HTTP request path. + headers: HTTP headers. + connection: _LogicalConnection instance. + """ + + self.channel_id = channel_id + self.method = command + self.uri = path + self.protocol = protocol + self.headers_in = headers + self.connection = connection + self.server_terminated = False + self.client_terminated = False + + def is_https(self): + """Mimics request.is_https(). Returns False because this method is + used only by old protocols (hixie and hybi00). + """ + + return False + + +class _LogicalConnection(object): + """Mimics mod_python mp_conn.""" + + # For details, see the comment of set_read_state(). + STATE_ACTIVE = 1 + STATE_GRACEFULLY_CLOSED = 2 + STATE_TERMINATED = 3 + + def __init__(self, mux_handler, channel_id): + """Constructs an instance. + + Args: + mux_handler: _MuxHandler instance. + channel_id: channel id of this connection. + """ + + self._mux_handler = mux_handler + self._channel_id = channel_id + self._incoming_data = '' + self._write_condition = threading.Condition() + self._waiting_write_completion = False + self._read_condition = threading.Condition() + self._read_state = self.STATE_ACTIVE + + def get_local_addr(self): + """Getter to mimic mp_conn.local_addr.""" + + return self._mux_handler.physical_connection.get_local_addr() + local_addr = property(get_local_addr) + + def get_remote_addr(self): + """Getter to mimic mp_conn.remote_addr.""" + + return self._mux_handler.physical_connection.get_remote_addr() + remote_addr = property(get_remote_addr) + + def get_memorized_lines(self): + """Gets memorized lines. Not supported.""" + + raise MuxUnexpectedException('_LogicalConnection does not support ' + 'get_memorized_lines') + + def write(self, data): + """Writes data. mux_handler sends data asynchronously. The caller will + be suspended until write done. + + Args: + data: data to be written. + + Raises: + MuxUnexpectedException: when called before finishing the previous + write. + """ + + try: + self._write_condition.acquire() + if self._waiting_write_completion: + raise MuxUnexpectedException( + 'Logical connection %d is already waiting the completion ' + 'of write' % self._channel_id) + + self._waiting_write_completion = True + self._mux_handler.send_data(self._channel_id, data) + self._write_condition.wait() + finally: + self._write_condition.release() + + def write_control_data(self, data): + """Writes data via the control channel. Don't wait finishing write + because this method can be called by mux dispatcher. + + Args: + data: data to be written. + """ + + self._mux_handler.send_control_data(data) + + def notify_write_done(self): + """Called when sending data is completed.""" + + try: + self._write_condition.acquire() + if not self._waiting_write_completion: + raise MuxUnexpectedException( + 'Invalid call of notify_write_done for logical connection' + ' %d' % self._channel_id) + self._waiting_write_completion = False + self._write_condition.notify() + finally: + self._write_condition.release() + + def append_frame_data(self, frame_data): + """Appends incoming frame data. Called when mux_handler dispatches + frame data to the corresponding application. + + Args: + frame_data: incoming frame data. + """ + + self._read_condition.acquire() + self._incoming_data += frame_data + self._read_condition.notify() + self._read_condition.release() + + def read(self, length): + """Reads data. Blocks until enough data has arrived via physical + connection. + + Args: + length: length of data to be read. + Raises: + LogicalConnectionClosedException: when closing handshake for this + logical channel has been received. + ConnectionTerminatedException: when the physical connection has + closed, or an error is caused on the reader thread. + """ + + self._read_condition.acquire() + while (self._read_state == self.STATE_ACTIVE and + len(self._incoming_data) < length): + self._read_condition.wait() + + try: + if self._read_state == self.STATE_GRACEFULLY_CLOSED: + raise LogicalConnectionClosedException( + 'Logical channel %d has closed.' % self._channel_id) + elif self._read_state == self.STATE_TERMINATED: + raise ConnectionTerminatedException( + 'Receiving %d byte failed. Logical channel (%d) closed' % + (length, self._channel_id)) + + value = self._incoming_data[:length] + self._incoming_data = self._incoming_data[length:] + finally: + self._read_condition.release() + + return value + + def set_read_state(self, new_state): + """Sets the state of this connection. Called when an event for this + connection has occurred. + + Args: + new_state: state to be set. new_state must be one of followings: + - STATE_GRACEFULLY_CLOSED: when closing handshake for this + connection has been received. + - STATE_TERMINATED: when the physical connection has closed or + DropChannel of this connection has received. + """ + + self._read_condition.acquire() + self._read_state = new_state + self._read_condition.notify() + self._read_condition.release() + + +class _LogicalStream(Stream): + """Mimics the Stream class. This class interprets multiplexed WebSocket + frames. + """ + + def __init__(self, request, send_quota, receive_quota): + """Constructs an instance. + + Args: + request: _LogicalRequest instance. + send_quota: Initial send quota. + receive_quota: Initial receive quota. + """ + + # TODO(bashi): Support frame filters. + stream_options = StreamOptions() + # Physical stream is responsible for masking. + stream_options.unmask_receive = False + # Control frames can be fragmented on logical channel. + stream_options.allow_fragmented_control_frame = True + Stream.__init__(self, request, stream_options) + self._send_quota = send_quota + self._send_quota_condition = threading.Condition() + self._receive_quota = receive_quota + self._write_inner_frame_semaphore = threading.Semaphore() + + def _create_inner_frame(self, opcode, payload, end=True): + # TODO(bashi): Support extensions that use reserved bits. + first_byte = (end << 7) | opcode + return (_encode_channel_id(self._request.channel_id) + + chr(first_byte) + payload) + + def _write_inner_frame(self, opcode, payload, end=True): + payload_length = len(payload) + write_position = 0 + + try: + # An inner frame will be fragmented if there is no enough send + # quota. This semaphore ensures that fragmented inner frames are + # sent in order on the logical channel. + # Note that frames that come from other logical channels or + # multiplexing control blocks can be inserted between fragmented + # inner frames on the physical channel. + self._write_inner_frame_semaphore.acquire() + while write_position < payload_length: + try: + self._send_quota_condition.acquire() + while self._send_quota == 0: + self._logger.debug( + 'No quota. Waiting FlowControl message for %d.' % + self._request.channel_id) + self._send_quota_condition.wait() + + remaining = payload_length - write_position + write_length = min(self._send_quota, remaining) + inner_frame_end = ( + end and + (write_position + write_length == payload_length)) + + inner_frame = self._create_inner_frame( + opcode, + payload[write_position:write_position+write_length], + inner_frame_end) + frame_data = self._writer.build( + inner_frame, end=True, binary=True) + self._send_quota -= write_length + self._logger.debug('Consumed quota=%d, remaining=%d' % + (write_length, self._send_quota)) + finally: + self._send_quota_condition.release() + + # Writing data will block the worker so we need to release + # _send_quota_condition before writing. + self._logger.debug('Sending inner frame: %r' % frame_data) + self._request.connection.write(frame_data) + write_position += write_length + + opcode = common.OPCODE_CONTINUATION + + except ValueError, e: + raise BadOperationException(e) + finally: + self._write_inner_frame_semaphore.release() + + def replenish_send_quota(self, send_quota): + """Replenish send quota.""" + + self._send_quota_condition.acquire() + self._send_quota += send_quota + self._logger.debug('Replenished send quota for channel id %d: %d' % + (self._request.channel_id, self._send_quota)) + self._send_quota_condition.notify() + self._send_quota_condition.release() + + def consume_receive_quota(self, amount): + """Consumes receive quota. Returns False on failure.""" + + if self._receive_quota < amount: + self._logger.debug('Violate quota on channel id %d: %d < %d' % + (self._request.channel_id, + self._receive_quota, amount)) + return False + self._receive_quota -= amount + return True + + def send_message(self, message, end=True, binary=False): + """Override Stream.send_message.""" + + if self._request.server_terminated: + raise BadOperationException( + 'Requested send_message after sending out a closing handshake') + + if binary and isinstance(message, unicode): + raise BadOperationException( + 'Message for binary frame must be instance of str') + + if binary: + opcode = common.OPCODE_BINARY + else: + opcode = common.OPCODE_TEXT + message = message.encode('utf-8') + + self._write_inner_frame(opcode, message, end) + + def _receive_frame(self): + """Overrides Stream._receive_frame. + + In addition to call Stream._receive_frame, this method adds the amount + of payload to receiving quota and sends FlowControl to the client. + We need to do it here because Stream.receive_message() handles + control frames internally. + """ + + opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self) + amount = len(payload) + self._receive_quota += amount + frame_data = _create_flow_control(self._request.channel_id, + amount) + self._logger.debug('Sending flow control for %d, replenished=%d' % + (self._request.channel_id, amount)) + self._request.connection.write_control_data(frame_data) + return opcode, payload, fin, rsv1, rsv2, rsv3 + + def receive_message(self): + """Overrides Stream.receive_message.""" + + # Just call Stream.receive_message(), but catch + # LogicalConnectionClosedException, which is raised when the logical + # connection has closed gracefully. + try: + return Stream.receive_message(self) + except LogicalConnectionClosedException, e: + self._logger.debug('%s', e) + return None + + def _send_closing_handshake(self, code, reason): + """Overrides Stream._send_closing_handshake.""" + + body = create_closing_handshake_body(code, reason) + self._logger.debug('Sending closing handshake for %d: (%r, %r)' % + (self._request.channel_id, code, reason)) + self._write_inner_frame(common.OPCODE_CLOSE, body, end=True) + + self._request.server_terminated = True + + def send_ping(self, body=''): + """Overrides Stream.send_ping""" + + self._logger.debug('Sending ping on logical channel %d: %r' % + (self._request.channel_id, body)) + self._write_inner_frame(common.OPCODE_PING, body, end=True) + + self._ping_queue.append(body) + + def _send_pong(self, body): + """Overrides Stream._send_pong""" + + self._logger.debug('Sending pong on logical channel %d: %r' % + (self._request.channel_id, body)) + self._write_inner_frame(common.OPCODE_PONG, body, end=True) + + def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''): + """Overrides Stream.close_connection.""" + + # TODO(bashi): Implement + self._logger.debug('Closing logical connection %d' % + self._request.channel_id) + self._request.server_terminated = True + + def _drain_received_data(self): + """Overrides Stream._drain_received_data. Nothing need to be done for + logical channel. + """ + + pass + + +class _OutgoingData(object): + """A structure that holds data to be sent via physical connection and + origin of the data. + """ + + def __init__(self, channel_id, data): + self.channel_id = channel_id + self.data = data + + +class _PhysicalConnectionWriter(threading.Thread): + """A thread that is responsible for writing data to physical connection. + + TODO(bashi): Make sure there is no thread-safety problem when the reader + thread reads data from the same socket at a time. + """ + + def __init__(self, mux_handler): + """Constructs an instance. + + Args: + mux_handler: _MuxHandler instance. + """ + + threading.Thread.__init__(self) + self._logger = util.get_class_logger(self) + self._mux_handler = mux_handler + self.setDaemon(True) + self._stop_requested = False + self._deque = collections.deque() + self._deque_condition = threading.Condition() + + def put_outgoing_data(self, data): + """Puts outgoing data. + + Args: + data: _OutgoingData instance. + + Raises: + BadOperationException: when the thread has been requested to + terminate. + """ + + try: + self._deque_condition.acquire() + if self._stop_requested: + raise BadOperationException('Cannot write data anymore') + + self._deque.append(data) + self._deque_condition.notify() + finally: + self._deque_condition.release() + + def _write_data(self, outgoing_data): + try: + self._mux_handler.physical_connection.write(outgoing_data.data) + except Exception, e: + util.prepend_message_to_exception( + 'Failed to send message to %r: ' % + (self._mux_handler.physical_connection.remote_addr,), e) + raise + + # TODO(bashi): It would be better to block the thread that sends + # control data as well. + if outgoing_data.channel_id != _CONTROL_CHANNEL_ID: + self._mux_handler.notify_write_done(outgoing_data.channel_id) + + def run(self): + self._deque_condition.acquire() + while not self._stop_requested: + if len(self._deque) == 0: + self._deque_condition.wait() + continue + + outgoing_data = self._deque.popleft() + self._deque_condition.release() + self._write_data(outgoing_data) + self._deque_condition.acquire() + + # Flush deque + try: + while len(self._deque) > 0: + outgoing_data = self._deque.popleft() + self._write_data(outgoing_data) + finally: + self._deque_condition.release() + + def stop(self): + """Stops the writer thread.""" + + self._deque_condition.acquire() + self._stop_requested = True + self._deque_condition.notify() + self._deque_condition.release() + + +class _PhysicalConnectionReader(threading.Thread): + """A thread that is responsible for reading data from physical connection. + """ + + def __init__(self, mux_handler): + """Constructs an instance. + + Args: + mux_handler: _MuxHandler instance. + """ + + threading.Thread.__init__(self) + self._logger = util.get_class_logger(self) + self._mux_handler = mux_handler + self.setDaemon(True) + + def run(self): + while True: + try: + physical_stream = self._mux_handler.physical_stream + message = physical_stream.receive_message() + if message is None: + break + # Below happens only when a data message is received. + opcode = physical_stream.get_last_received_opcode() + if opcode != common.OPCODE_BINARY: + self._mux_handler.fail_physical_connection( + _DROP_CODE_INVALID_ENCAPSULATING_MESSAGE, + 'Received a text message on physical connection') + break + + except ConnectionTerminatedException, e: + self._logger.debug('%s', e) + break + + try: + self._mux_handler.dispatch_message(message) + except PhysicalConnectionError, e: + self._mux_handler.fail_physical_connection( + e.drop_code, e.message) + break + except LogicalChannelError, e: + self._mux_handler.fail_logical_channel( + e.channel_id, e.drop_code, e.message) + except Exception, e: + self._logger.debug(traceback.format_exc()) + break + + self._mux_handler.notify_reader_done() + + +class _Worker(threading.Thread): + """A thread that is responsible for running the corresponding application + handler. + """ + + def __init__(self, mux_handler, request): + """Constructs an instance. + + Args: + mux_handler: _MuxHandler instance. + request: _LogicalRequest instance. + """ + + threading.Thread.__init__(self) + self._logger = util.get_class_logger(self) + self._mux_handler = mux_handler + self._request = request + self.setDaemon(True) + + def run(self): + self._logger.debug('Logical channel worker started. (id=%d)' % + self._request.channel_id) + try: + # Non-critical exceptions will be handled by dispatcher. + self._mux_handler.dispatcher.transfer_data(self._request) + finally: + self._mux_handler.notify_worker_done(self._request.channel_id) + + +class _MuxHandshaker(hybi.Handshaker): + """Opening handshake processor for multiplexing.""" + + _DUMMY_WEBSOCKET_KEY = 'dGhlIHNhbXBsZSBub25jZQ==' + + def __init__(self, request, dispatcher, send_quota, receive_quota): + """Constructs an instance. + Args: + request: _LogicalRequest instance. + dispatcher: Dispatcher instance (dispatch.Dispatcher). + send_quota: Initial send quota. + receive_quota: Initial receive quota. + """ + + hybi.Handshaker.__init__(self, request, dispatcher) + self._send_quota = send_quota + self._receive_quota = receive_quota + + # Append headers which should not be included in handshake field of + # AddChannelRequest. + # TODO(bashi): Make sure whether we should raise exception when + # these headers are included already. + request.headers_in[common.UPGRADE_HEADER] = ( + common.WEBSOCKET_UPGRADE_TYPE) + request.headers_in[common.CONNECTION_HEADER] = ( + common.UPGRADE_CONNECTION_TYPE) + request.headers_in[common.SEC_WEBSOCKET_VERSION_HEADER] = ( + str(common.VERSION_HYBI_LATEST)) + request.headers_in[common.SEC_WEBSOCKET_KEY_HEADER] = ( + self._DUMMY_WEBSOCKET_KEY) + + def _create_stream(self, stream_options): + """Override hybi.Handshaker._create_stream.""" + + self._logger.debug('Creating logical stream for %d' % + self._request.channel_id) + return _LogicalStream(self._request, self._send_quota, + self._receive_quota) + + def _create_handshake_response(self, accept): + """Override hybi._create_handshake_response.""" + + response = [] + + response.append('HTTP/1.1 101 Switching Protocols\r\n') + + # Upgrade, Connection and Sec-WebSocket-Accept should be excluded. + if self._request.ws_protocol is not None: + response.append('%s: %s\r\n' % ( + common.SEC_WEBSOCKET_PROTOCOL_HEADER, + self._request.ws_protocol)) + if (self._request.ws_extensions is not None and + len(self._request.ws_extensions) != 0): + response.append('%s: %s\r\n' % ( + common.SEC_WEBSOCKET_EXTENSIONS_HEADER, + common.format_extensions(self._request.ws_extensions))) + response.append('\r\n') + + return ''.join(response) + + def _send_handshake(self, accept): + """Override hybi.Handshaker._send_handshake.""" + + # Don't send handshake response for the default channel + if self._request.channel_id == _DEFAULT_CHANNEL_ID: + return + + handshake_response = self._create_handshake_response(accept) + frame_data = _create_add_channel_response( + self._request.channel_id, + handshake_response) + self._logger.debug('Sending handshake response for %d: %r' % + (self._request.channel_id, frame_data)) + self._request.connection.write_control_data(frame_data) + + +class _LogicalChannelData(object): + """A structure that holds information about logical channel. + """ + + def __init__(self, request, worker): + self.request = request + self.worker = worker + self.drop_code = _DROP_CODE_NORMAL_CLOSURE + self.drop_message = '' + + +class _HandshakeDeltaBase(object): + """A class that holds information for delta-encoded handshake.""" + + def __init__(self, headers): + self._headers = headers + + def create_headers(self, delta=None): + """Creates request headers for an AddChannelRequest that has + delta-encoded handshake. + + Args: + delta: headers should be overridden. + """ + + headers = copy.copy(self._headers) + if delta: + for key, value in delta.items(): + # The spec requires that a header with an empty value is + # removed from the delta base. + if len(value) == 0 and headers.has_key(key): + del headers[key] + else: + headers[key] = value + # TODO(bashi): Support extensions + headers['Sec-WebSocket-Extensions'] = '' + return headers + + +class _MuxHandler(object): + """Multiplexing handler. When a handler starts, it launches three + threads; the reader thread, the writer thread, and a worker thread. + + The reader thread reads data from the physical stream, i.e., the + ws_stream object of the underlying websocket connection. The reader + thread interprets multiplexed frames and dispatches them to logical + channels. Methods of this class are mostly called by the reader thread. + + The writer thread sends multiplexed frames which are created by + logical channels via the physical connection. + + The worker thread launched at the starting point handles the + "Implicitly Opened Connection". If multiplexing handler receives + an AddChannelRequest and accepts it, the handler will launch a new worker + thread and dispatch the request to it. + """ + + def __init__(self, request, dispatcher): + """Constructs an instance. + + Args: + request: mod_python request of the physical connection. + dispatcher: Dispatcher instance (dispatch.Dispatcher). + """ + + self.original_request = request + self.dispatcher = dispatcher + self.physical_connection = request.connection + self.physical_stream = request.ws_stream + self._logger = util.get_class_logger(self) + self._logical_channels = {} + self._logical_channels_condition = threading.Condition() + # Holds client's initial quota + self._channel_slots = collections.deque() + self._handshake_base = None + self._worker_done_notify_received = False + self._reader = None + self._writer = None + + def start(self): + """Starts the handler. + + Raises: + MuxUnexpectedException: when the handler already started, or when + opening handshake of the default channel fails. + """ + + if self._reader or self._writer: + raise MuxUnexpectedException('MuxHandler already started') + + self._reader = _PhysicalConnectionReader(self) + self._writer = _PhysicalConnectionWriter(self) + self._reader.start() + self._writer.start() + + # Create "Implicitly Opened Connection". + logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID) + self._handshake_base = _HandshakeDeltaBase( + self.original_request.headers_in) + logical_request = _LogicalRequest( + _DEFAULT_CHANNEL_ID, + self.original_request.method, + self.original_request.uri, + self.original_request.protocol, + self._handshake_base.create_headers(), + logical_connection) + # Client's send quota for the implicitly opened connection is zero, + # but we will send FlowControl later so set the initial quota to + # _INITIAL_QUOTA_FOR_CLIENT. + self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT) + if not self._do_handshake_for_logical_request( + logical_request, send_quota=self.original_request.mux_quota): + raise MuxUnexpectedException( + 'Failed handshake on the default channel id') + self._add_logical_channel(logical_request) + + # Send FlowControl for the implicitly opened connection. + frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID, + _INITIAL_QUOTA_FOR_CLIENT) + logical_request.connection.write_control_data(frame_data) + + def add_channel_slots(self, slots, send_quota): + """Adds channel slots. + + Args: + slots: number of slots to be added. + send_quota: initial send quota for slots. + """ + + self._channel_slots.extend([send_quota] * slots) + # Send NewChannelSlot to client. + frame_data = _create_new_channel_slot(slots, send_quota) + self.send_control_data(frame_data) + + def wait_until_done(self, timeout=None): + """Waits until all workers are done. Returns False when timeout has + occurred. Returns True on success. + + Args: + timeout: timeout in sec. + """ + + self._logical_channels_condition.acquire() + try: + while len(self._logical_channels) > 0: + self._logger.debug('Waiting workers(%d)...' % + len(self._logical_channels)) + self._worker_done_notify_received = False + self._logical_channels_condition.wait(timeout) + if not self._worker_done_notify_received: + self._logger.debug('Waiting worker(s) timed out') + return False + + finally: + self._logical_channels_condition.release() + + # Flush pending outgoing data + self._writer.stop() + self._writer.join() + + return True + + def notify_write_done(self, channel_id): + """Called by the writer thread when a write operation has done. + + Args: + channel_id: objective channel id. + """ + + try: + self._logical_channels_condition.acquire() + if channel_id in self._logical_channels: + channel_data = self._logical_channels[channel_id] + channel_data.request.connection.notify_write_done() + else: + self._logger.debug('Seems that logical channel for %d has gone' + % channel_id) + finally: + self._logical_channels_condition.release() + + def send_control_data(self, data): + """Sends data via the control channel. + + Args: + data: data to be sent. + """ + + self._writer.put_outgoing_data(_OutgoingData( + channel_id=_CONTROL_CHANNEL_ID, data=data)) + + def send_data(self, channel_id, data): + """Sends data via given logical channel. This method is called by + worker threads. + + Args: + channel_id: objective channel id. + data: data to be sent. + """ + + self._writer.put_outgoing_data(_OutgoingData( + channel_id=channel_id, data=data)) + + def _send_drop_channel(self, channel_id, code=None, message=''): + frame_data = _create_drop_channel(channel_id, code, message) + self._logger.debug( + 'Sending drop channel for channel id %d' % channel_id) + self.send_control_data(frame_data) + + def _send_error_add_channel_response(self, channel_id, status=None): + if status is None: + status = common.HTTP_STATUS_BAD_REQUEST + + if status in _HTTP_BAD_RESPONSE_MESSAGES: + message = _HTTP_BAD_RESPONSE_MESSAGES[status] + else: + self._logger.debug('Response message for %d is not found' % status) + message = '???' + + response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message) + frame_data = _create_add_channel_response(channel_id, + encoded_handshake=response, + encoding=0, rejected=True) + self.send_control_data(frame_data) + + def _create_logical_request(self, block): + if block.channel_id == _CONTROL_CHANNEL_ID: + # TODO(bashi): Raise PhysicalConnectionError with code 2006 + # instead of MuxUnexpectedException. + raise MuxUnexpectedException( + 'Received the control channel id (0) as objective channel ' + 'id for AddChannel') + + if block.encoding > _HANDSHAKE_ENCODING_DELTA: + raise PhysicalConnectionError( + _DROP_CODE_UNKNOWN_REQUEST_ENCODING) + + method, path, version, headers = _parse_request_text( + block.encoded_handshake) + if block.encoding == _HANDSHAKE_ENCODING_DELTA: + headers = self._handshake_base.create_headers(headers) + + connection = _LogicalConnection(self, block.channel_id) + request = _LogicalRequest(block.channel_id, method, path, version, + headers, connection) + return request + + def _do_handshake_for_logical_request(self, request, send_quota=0): + try: + receive_quota = self._channel_slots.popleft() + except IndexError: + raise LogicalChannelError( + request.channel_id, _DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION) + + handshaker = _MuxHandshaker(request, self.dispatcher, + send_quota, receive_quota) + try: + handshaker.do_handshake() + except handshake.VersionException, e: + self._logger.info('%s', e) + self._send_error_add_channel_response( + request.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) + return False + except handshake.HandshakeException, e: + # TODO(bashi): Should we _Fail the Logical Channel_ with 3001 + # instead? + self._logger.info('%s', e) + self._send_error_add_channel_response(request.channel_id, + status=e.status) + return False + except handshake.AbortedByUserException, e: + self._logger.info('%s', e) + self._send_error_add_channel_response(request.channel_id) + return False + + return True + + def _add_logical_channel(self, logical_request): + try: + self._logical_channels_condition.acquire() + if logical_request.channel_id in self._logical_channels: + self._logger.debug('Channel id %d already exists' % + logical_request.channel_id) + raise PhysicalConnectionError( + _DROP_CODE_CHANNEL_ALREADY_EXISTS, + 'Channel id %d already exists' % + logical_request.channel_id) + worker = _Worker(self, logical_request) + channel_data = _LogicalChannelData(logical_request, worker) + self._logical_channels[logical_request.channel_id] = channel_data + worker.start() + finally: + self._logical_channels_condition.release() + + def _process_add_channel_request(self, block): + try: + logical_request = self._create_logical_request(block) + except ValueError, e: + self._logger.debug('Failed to create logical request: %r' % e) + self._send_error_add_channel_response( + block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) + return + if self._do_handshake_for_logical_request(logical_request): + if block.encoding == _HANDSHAKE_ENCODING_IDENTITY: + # Update handshake base. + # TODO(bashi): Make sure this is the right place to update + # handshake base. + self._handshake_base = _HandshakeDeltaBase( + logical_request.headers_in) + self._add_logical_channel(logical_request) + else: + self._send_error_add_channel_response( + block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) + + def _process_flow_control(self, block): + try: + self._logical_channels_condition.acquire() + if not block.channel_id in self._logical_channels: + return + channel_data = self._logical_channels[block.channel_id] + channel_data.request.ws_stream.replenish_send_quota( + block.send_quota) + finally: + self._logical_channels_condition.release() + + def _process_drop_channel(self, block): + self._logger.debug( + 'DropChannel received for %d: code=%r, reason=%r' % + (block.channel_id, block.drop_code, block.drop_message)) + try: + self._logical_channels_condition.acquire() + if not block.channel_id in self._logical_channels: + return + channel_data = self._logical_channels[block.channel_id] + channel_data.drop_code = _DROP_CODE_ACKNOWLEDGED + # Close the logical channel + channel_data.request.connection.set_read_state( + _LogicalConnection.STATE_TERMINATED) + finally: + self._logical_channels_condition.release() + + def _process_control_blocks(self, parser): + for control_block in parser.read_control_blocks(): + opcode = control_block.opcode + self._logger.debug('control block received, opcode: %d' % opcode) + if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: + self._process_add_channel_request(control_block) + elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Received AddChannelResponse') + elif opcode == _MUX_OPCODE_FLOW_CONTROL: + self._process_flow_control(control_block) + elif opcode == _MUX_OPCODE_DROP_CHANNEL: + self._process_drop_channel(control_block) + elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: + raise PhysicalConnectionError( + _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, + 'Received NewChannelSlot') + else: + raise MuxUnexpectedException( + 'Unexpected opcode %r' % opcode) + + def _process_logical_frame(self, channel_id, parser): + self._logger.debug('Received a frame. channel id=%d' % channel_id) + try: + self._logical_channels_condition.acquire() + if not channel_id in self._logical_channels: + # We must ignore the message for an inactive channel. + return + channel_data = self._logical_channels[channel_id] + fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame() + if not channel_data.request.ws_stream.consume_receive_quota( + len(payload)): + # The client violates quota. Close logical channel. + raise LogicalChannelError( + channel_id, _DROP_CODE_SEND_QUOTA_VIOLATION) + header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3, + mask=False) + frame_data = header + payload + channel_data.request.connection.append_frame_data(frame_data) + finally: + self._logical_channels_condition.release() + + def dispatch_message(self, message): + """Dispatches message. The reader thread calls this method. + + Args: + message: a message that contains encapsulated frame. + Raises: + PhysicalConnectionError: if the message contains physical + connection level errors. + LogicalChannelError: if the message contains logical channel + level errors. + """ + + parser = _MuxFramePayloadParser(message) + try: + channel_id = parser.read_channel_id() + except ValueError, e: + raise PhysicalConnectionError(_DROP_CODE_CHANNEL_ID_TRUNCATED) + if channel_id == _CONTROL_CHANNEL_ID: + self._process_control_blocks(parser) + else: + self._process_logical_frame(channel_id, parser) + + def notify_worker_done(self, channel_id): + """Called when a worker has finished. + + Args: + channel_id: channel id corresponded with the worker. + """ + + self._logger.debug('Worker for channel id %d terminated' % channel_id) + try: + self._logical_channels_condition.acquire() + if not channel_id in self._logical_channels: + raise MuxUnexpectedException( + 'Channel id %d not found' % channel_id) + channel_data = self._logical_channels.pop(channel_id) + finally: + self._worker_done_notify_received = True + self._logical_channels_condition.notify() + self._logical_channels_condition.release() + + if not channel_data.request.server_terminated: + self._send_drop_channel( + channel_id, code=channel_data.drop_code, + message=channel_data.drop_message) + + def notify_reader_done(self): + """This method is called by the reader thread when the reader has + finished. + """ + + # Terminate all logical connections + self._logger.debug('termiating all logical connections...') + self._logical_channels_condition.acquire() + for channel_data in self._logical_channels.values(): + try: + channel_data.request.connection.set_read_state( + _LogicalConnection.STATE_TERMINATED) + except Exception: + pass + self._logical_channels_condition.release() + + def fail_physical_connection(self, code, message): + """Fail the physical connection. + + Args: + code: drop reason code. + message: drop message. + """ + + self._logger.debug('Failing the physical connection...') + self._send_drop_channel(_CONTROL_CHANNEL_ID, code, message) + self.physical_stream.close_connection( + common.STATUS_INTERNAL_ENDPOINT_ERROR) + + def fail_logical_channel(self, channel_id, code, message): + """Fail a logical channel. + + Args: + channel_id: channel id. + code: drop reason code. + message: drop message. + """ + + self._logger.debug('Failing logical channel %d...' % channel_id) + try: + self._logical_channels_condition.acquire() + if channel_id in self._logical_channels: + channel_data = self._logical_channels[channel_id] + # Close the logical channel. notify_worker_done() will be + # called later and it will send DropChannel. + channel_data.drop_code = code + channel_data.drop_message = message + channel_data.request.connection.set_read_state( + _LogicalConnection.STATE_TERMINATED) + else: + self._send_drop_channel(channel_id, code, message) + finally: + self._logical_channels_condition.release() + + +def use_mux(request): + return hasattr(request, 'mux') and request.mux + + +def start(request, dispatcher): + mux_handler = _MuxHandler(request, dispatcher) + mux_handler.start() + + mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS, + _INITIAL_QUOTA_FOR_CLIENT) + + mux_handler.wait_until_done() + + +# vi:sts=4 sw=4 et diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/standalone.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/standalone.py index 850aa5cd4..07a33d9c9 100755 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/standalone.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/standalone.py @@ -32,27 +32,44 @@ """Standalone WebSocket server. +Use this file to launch pywebsocket without Apache HTTP Server. + + BASIC USAGE -Use this server to run mod_pywebsocket without Apache HTTP Server. +Go to the src directory and run -Usage: - python standalone.py [-p <ws_port>] [-w <websock_handlers>] - [-s <scan_dir>] - [-d <document_root>] - [-m <websock_handlers_map_file>] - ... for other options, see _main below ... + $ python mod_pywebsocket/standalone.py [-p <ws_port>] + [-w <websock_handlers>] + [-d <document_root>] <ws_port> is the port number to use for ws:// connection. <document_root> is the path to the root directory of HTML files. <websock_handlers> is the path to the root directory of WebSocket handlers. -See __init__.py for details of <websock_handlers> and how to write WebSocket -handlers. If this path is relative, <document_root> is used as the base. +If not specified, <document_root> will be used. See __init__.py (or +run $ pydoc mod_pywebsocket) for how to write WebSocket handlers. + +For more detail and other options, run + + $ python mod_pywebsocket/standalone.py --help + +or see _build_option_parser method below. + +For trouble shooting, adding "--log_level debug" might help you. + -<scan_dir> is a path under the root directory. If specified, only the -handlers under scan_dir are scanned. This is useful in saving scan time. +TRY DEMO + +Go to the src directory and run + + $ python standalone.py -d example + +to launch pywebsocket with the sample handler and html on port 80. Open +http://localhost/console.html, click the connect button, type something into +the text box next to the send button and click the send button. If everything +is working, you'll see the message you typed echoed by the server. SUPPORTING TLS @@ -63,10 +80,10 @@ To support TLS, run standalone.py with -t, -k, and -c options. SUPPORTING CLIENT AUTHENTICATION To support client authentication with TLS, run standalone.py with -t, -k, -c, -and --ca-certificate options. +and --tls-client-auth, and --tls-client-ca options. E.g., $./standalone.py -d ../example -p 10443 -t -c ../test/cert/cert.pem -k -../test/cert/key.pem --ca-certificate=../test/cert/cacert.pem +../test/cert/key.pem --tls-client-auth --tls-client-ca=../test/cert/cacert.pem CONFIGURATION FILE @@ -110,6 +127,7 @@ import CGIHTTPServer import SimpleHTTPServer import SocketServer import ConfigParser +import base64 import httplib import logging import logging.handlers @@ -224,6 +242,12 @@ class _StandaloneRequest(object): return self._request_handler.command method = property(get_method) + def get_protocol(self): + """Getter to mimic request.protocol.""" + + return self._request_handler.request_version + protocol = property(get_protocol) + def is_https(self): """Mimic request.is_https().""" @@ -264,6 +288,32 @@ class _StandaloneSSLConnection(object): return socket._fileobject(self._connection, mode, bufsize) +def _alias_handlers(dispatcher, websock_handlers_map_file): + """Set aliases specified in websock_handler_map_file in dispatcher. + + Args: + dispatcher: dispatch.Dispatcher instance + websock_handler_map_file: alias map file + """ + + fp = open(websock_handlers_map_file) + try: + for line in fp: + if line[0] == '#' or line.isspace(): + continue + m = re.match('(\S+)\s+(\S+)', line) + if not m: + logging.warning('Wrong format in map file:' + line) + continue + try: + dispatcher.add_resource_path_alias( + m.group(1), m.group(2)) + except dispatch.DispatchException, e: + logging.error(str(e)) + finally: + fp.close() + + class WebSocketServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): """HTTPServer specialized for WebSocket.""" @@ -278,6 +328,20 @@ class WebSocketServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): if necessary. """ + # Share a Dispatcher among request handlers to save time for + # instantiation. Dispatcher can be shared because it is thread-safe. + options.dispatcher = dispatch.Dispatcher( + options.websock_handlers, + options.scan_dir, + options.allow_handlers_outside_root_dir) + if options.websock_handlers_map_file: + _alias_handlers(options.dispatcher, + options.websock_handlers_map_file) + warnings = options.dispatcher.source_warnings() + if warnings: + for warning in warnings: + logging.warning('mod_pywebsocket: %s' % warning) + self._logger = util.get_class_logger(self) self.request_queue_size = options.request_queue_size @@ -325,7 +389,7 @@ class WebSocketServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): continue if self.websocket_server_options.use_tls: if _HAS_SSL: - if self.websocket_server_options.ca_certificate: + if self.websocket_server_options.tls_client_auth: client_cert_ = ssl.CERT_REQUIRED else: client_cert_ = ssl.CERT_NONE @@ -333,7 +397,7 @@ class WebSocketServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): keyfile=self.websocket_server_options.private_key, certfile=self.websocket_server_options.certificate, ssl_version=ssl.PROTOCOL_SSLv23, - ca_certs=self.websocket_server_options.ca_certificate, + ca_certs=self.websocket_server_options.tls_client_ca, cert_reqs=client_cert_) if _HAS_OPEN_SSL: ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD) @@ -362,6 +426,15 @@ class WebSocketServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): self._logger.info('Skip by failure: %r', e) socket_.close() failed_sockets.append(socketinfo) + if self.server_address[1] == 0: + # The operating system assigns the actual port number for port + # number 0. This case, the second and later sockets should use + # the same port number. Also self.server_port is rewritten + # because it is exported, and will be used by external code. + self.server_address = ( + self.server_name, socket_.getsockname()[1]) + self.server_port = self.server_address[1] + self._logger.info('Port %r is assigned', self.server_port) for socketinfo in failed_sockets: self._sockets.remove(socketinfo) @@ -386,6 +459,10 @@ class WebSocketServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): for socketinfo in failed_sockets: self._sockets.remove(socketinfo) + if len(self._sockets) == 0: + self._logger.critical( + 'No sockets activated. Use info log level to see the reason.') + def server_close(self): """Override SocketServer.TCPServer.server_close to enable multiple sockets close. @@ -513,6 +590,17 @@ class WebSocketRequestHandler(CGIHTTPServer.CGIHTTPRequestHandler): # attributes). if not CGIHTTPServer.CGIHTTPRequestHandler.parse_request(self): return False + + if self._options.use_basic_auth: + auth = self.headers.getheader('Authorization') + if auth != self._options.basic_auth_credential: + self.send_response(401) + self.send_header('WWW-Authenticate', + 'Basic realm="Pywebsocket"') + self.end_headers() + self._logger.info('Request basic authentication') + return True + host, port, resource = http_header_util.parse_uri(self.path) if resource is None: self._logger.info('Invalid URI: %r', self.path) @@ -648,32 +736,6 @@ def _configure_logging(options): deflate_log_level_name) -def _alias_handlers(dispatcher, websock_handlers_map_file): - """Set aliases specified in websock_handler_map_file in dispatcher. - - Args: - dispatcher: dispatch.Dispatcher instance - websock_handler_map_file: alias map file - """ - - fp = open(websock_handlers_map_file) - try: - for line in fp: - if line[0] == '#' or line.isspace(): - continue - m = re.match('(\S+)\s+(\S+)', line) - if not m: - logging.warning('Wrong format in map file:' + line) - continue - try: - dispatcher.add_resource_path_alias( - m.group(1), m.group(2)) - except dispatch.DispatchException, e: - logging.error(str(e)) - finally: - fp.close() - - def _build_option_parser(): parser = optparse.OptionParser() @@ -700,7 +762,9 @@ def _build_option_parser(): parser.add_option('-w', '--websock-handlers', '--websock_handlers', dest='websock_handlers', default='.', - help='WebSocket handlers root directory.') + help=('The root directory of WebSocket handler files. ' + 'If the path is relative, --document-root is used ' + 'as the base.')) parser.add_option('-m', '--websock-handlers-map-file', '--websock_handlers_map_file', dest='websock_handlers_map_file', @@ -710,15 +774,20 @@ def _build_option_parser(): 'existing_resource_path, separated by spaces.')) parser.add_option('-s', '--scan-dir', '--scan_dir', dest='scan_dir', default=None, - help=('WebSocket handlers scan directory. ' - 'Must be a directory under websock_handlers.')) + help=('Must be a directory under --websock-handlers. ' + 'Only handlers under this directory are scanned ' + 'and registered to the server. ' + 'Useful for saving scan time when the handler ' + 'root directory contains lots of files that are ' + 'not handler file or are handler files but you ' + 'don\'t want them to be registered. ')) parser.add_option('--allow-handlers-outside-root-dir', '--allow_handlers_outside_root_dir', dest='allow_handlers_outside_root_dir', action='store_true', default=False, help=('Scans WebSocket handlers even if their canonical ' - 'path is not under websock_handlers.')) + 'path is not under --websock-handlers.')) parser.add_option('-d', '--document-root', '--document_root', dest='document_root', default='.', help='Document root directory.') @@ -735,9 +804,20 @@ def _build_option_parser(): default='', help='TLS private key file.') parser.add_option('-c', '--certificate', dest='certificate', default='', help='TLS certificate file.') - parser.add_option('--ca-certificate', dest='ca_certificate', default='', - help=('TLS CA certificate file for client ' - 'authentication.')) + parser.add_option('--tls-client-auth', dest='tls_client_auth', + action='store_true', default=False, + help='Requires TLS client auth on every connection.') + parser.add_option('--tls-client-ca', dest='tls_client_ca', default='', + help=('Specifies a pem file which contains a set of ' + 'concatenated CA certificates which are used to ' + 'validate certificates passed from clients')) + parser.add_option('--basic-auth', dest='use_basic_auth', + action='store_true', default=False, + help='Requires Basic authentication.') + parser.add_option('--basic-auth-credential', + dest='basic_auth_credential', default='test:test', + help='Specifies the credential of basic authentication ' + 'by username:password pair (e.g. test:test).') parser.add_option('-l', '--log-file', '--log_file', dest='log_file', default='', help='Log file.') # Custom log level: @@ -771,9 +851,9 @@ def _build_option_parser(): help='Log backup count') parser.add_option('--allow-draft75', dest='allow_draft75', action='store_true', default=False, - help='Allow draft 75 handshake') + help='Obsolete option. Ignored.') parser.add_option('--strict', dest='strict', action='store_true', - default=False, help='Strictly check handshake request') + default=False, help='Obsolete option. Ignored.') parser.add_option('-q', '--queue', dest='request_queue_size', type='int', default=_DEFAULT_REQUEST_QUEUE_SIZE, help='request queue size') @@ -841,6 +921,12 @@ def _parse_args_and_config(args): def _main(args=None): + """You can call this function from your own program, but please note that + this function has some side-effects that might affect your program. For + example, util.wrap_popen3_for_win use in this method replaces implementation + of os.popen3. + """ + options, args = _parse_args_and_config(args=args) os.chdir(options.document_root) @@ -877,7 +963,7 @@ def _main(args=None): 'To use TLS, specify private_key and certificate.') sys.exit(1) - if options.ca_certificate: + if options.tls_client_auth: if not options.use_tls: logging.critical('TLS must be enabled for client authentication.') sys.exit(1) @@ -887,26 +973,16 @@ def _main(args=None): if not options.scan_dir: options.scan_dir = options.websock_handlers + if options.use_basic_auth: + options.basic_auth_credential = 'Basic ' + base64.b64encode( + options.basic_auth_credential) + try: if options.thread_monitor_interval_in_sec > 0: # Run a thread monitor to show the status of server threads for # debugging. ThreadMonitor(options.thread_monitor_interval_in_sec).start() - # Share a Dispatcher among request handlers to save time for - # instantiation. Dispatcher can be shared because it is thread-safe. - options.dispatcher = dispatch.Dispatcher( - options.websock_handlers, - options.scan_dir, - options.allow_handlers_outside_root_dir) - if options.websock_handlers_map_file: - _alias_handlers(options.dispatcher, - options.websock_handlers_map_file) - warnings = options.dispatcher.source_warnings() - if warnings: - for warning in warnings: - logging.warning('mod_pywebsocket: %s' % warning) - server = WebSocketServer(options) server.serve_forever() except Exception, e: diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/stream.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/stream.py index d051eee20..edc533279 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/stream.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/stream.py @@ -51,6 +51,7 @@ from mod_pywebsocket._stream_hybi import create_ping_frame from mod_pywebsocket._stream_hybi import create_pong_frame from mod_pywebsocket._stream_hybi import create_binary_frame from mod_pywebsocket._stream_hybi import create_text_frame +from mod_pywebsocket._stream_hybi import create_closing_handshake_body # vi:sts=4 sw=4 et diff --git a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/util.py b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/util.py index 6146e052f..7bb0b5d9e 100644 --- a/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/util.py +++ b/Tools/Scripts/webkitpy/thirdparty/mod_pywebsocket/util.py @@ -232,6 +232,12 @@ class _Deflater(object): self._compress = zlib.compressobj( zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -window_bits) + def compress(self, bytes): + compressed_bytes = self._compress.compress(bytes) + self._logger.debug('Compress input %r', bytes) + self._logger.debug('Compress result %r', compressed_bytes) + return compressed_bytes + def compress_and_flush(self, bytes): compressed_bytes = self._compress.compress(bytes) compressed_bytes += self._compress.flush(zlib.Z_SYNC_FLUSH) @@ -239,6 +245,12 @@ class _Deflater(object): self._logger.debug('Compress result %r', compressed_bytes) return compressed_bytes + def compress_and_finish(self, bytes): + compressed_bytes = self._compress.compress(bytes) + compressed_bytes += self._compress.flush(zlib.Z_FINISH) + self._logger.debug('Compress input %r', bytes) + self._logger.debug('Compress result %r', compressed_bytes) + return compressed_bytes class _Inflater(object): @@ -318,14 +330,21 @@ class _RFC1979Deflater(object): self._window_bits = window_bits self._no_context_takeover = no_context_takeover - def filter(self, bytes): - if self._deflater is None or self._no_context_takeover: + def filter(self, bytes, flush=True, bfinal=False): + if self._deflater is None or (self._no_context_takeover and flush): self._deflater = _Deflater(self._window_bits) - # Strip last 4 octets which is LEN and NLEN field of a non-compressed - # block added for Z_SYNC_FLUSH. - return self._deflater.compress_and_flush(bytes)[:-4] - + if bfinal: + result = self._deflater.compress_and_finish(bytes) + # Add a padding block with BFINAL = 0 and BTYPE = 0. + result = result + chr(0) + self._deflater = None + return result + if flush: + # Strip last 4 octets which is LEN and NLEN field of a + # non-compressed block added for Z_SYNC_FLUSH. + return self._deflater.compress_and_flush(bytes)[:-4] + return self._deflater.compress(bytes) class _RFC1979Inflater(object): """A decompressor class for byte sequence compressed and flushed following |