diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/conn.py | 297 | ||||
-rw-r--r-- | kafka/consumer/group.py | 15 | ||||
-rw-r--r-- | kafka/errors.py | 6 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 21 | ||||
-rw-r--r-- | kafka/protocol/parser.py | 177 |
5 files changed, 306 insertions, 210 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index dbe212a..e10d4f1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -6,19 +6,19 @@ import errno import logging from random import shuffle, uniform import socket -import time +import struct import sys +import time from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest -from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest -from kafka.protocol.frame import KafkaBytes +from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -73,9 +73,6 @@ class ConnectionStates(object): CONNECTED = '<connected>' AUTHENTICATING = '<authenticating>' -InFlightRequest = collections.namedtuple('InFlightRequest', - ['request', 'response_type', 'correlation_id', 'future', 'timestamp']) - class BrokerConnection(object): """Initialize a Kafka broker connection @@ -230,6 +227,9 @@ class BrokerConnection(object): assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' + self._protocol = KafkaProtocol( + client_id=self.config['client_id'], + api_version=self.config['api_version']) self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() self._sock = None @@ -237,12 +237,7 @@ class BrokerConnection(object): if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] self._sasl_auth_future = None - self._header = KafkaBytes(4) - self._rbuffer = None - self._receiving = False self.last_attempt = 0 - self._processing = False - self._correlation_id = 0 self._gai = None self._gai_index = 0 self._sensors = None @@ -304,12 +299,15 @@ class BrokerConnection(object): self._sock.setsockopt(*option) self._sock.setblocking(False) + self.last_attempt = time.time() + self.state = ConnectionStates.CONNECTING if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() - log.info('%s: connecting to %s:%d', self, self.host, self.port) - self.state = ConnectionStates.CONNECTING - self.last_attempt = time.time() - self.config['state_change_callback'](self) + # _wrap_ssl can alter the connection state -- disconnects on failure + # so we need to double check that we are still connecting before + if self.connecting(): + self.config['state_change_callback'](self) + log.info('%s: connecting to %s:%d', self, self.host, self.port) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -372,10 +370,12 @@ class BrokerConnection(object): if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') if self._try_authenticate(): - log.debug('%s: Connection complete.', self) - self.state = ConnectionStates.CONNECTED - self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + # _try_authenticate has side-effects: possibly disconnected on socket errors + if self.state is ConnectionStates.AUTHENTICATING: + log.debug('%s: Connection complete.', self) + self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() + self.config['state_change_callback'](self) return self.state @@ -402,10 +402,7 @@ class BrokerConnection(object): password=self.config['ssl_password']) if self.config['ssl_crlfile']: if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): - error = 'No CRL support with this version of Python.' - log.error('%s: %s Disconnecting.', self, error) - self.close(Errors.ConnectionError(error)) - return + raise RuntimeError('This version of Python does not support ssl_crlfile!') log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile']) self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) # pylint: disable=no-member @@ -448,7 +445,9 @@ class BrokerConnection(object): self._sasl_auth_future = future self._recv() if self._sasl_auth_future.failed(): - raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type + ex = self._sasl_auth_future.exception + if not isinstance(ex, Errors.ConnectionError): + raise ex # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() def _handle_sasl_handshake_response(self, future, response): @@ -468,6 +467,19 @@ class BrokerConnection(object): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _recv_bytes_blocking(self, n): + self._sock.setblocking(True) + try: + data = b'' + while len(data) < n: + fragment = self._sock.recv(n - len(data)) + if not fragment: + raise ConnectionError('Connection reset during recv') + data += fragment + return data + finally: + self._sock.setblocking(False) + def _try_authenticate_plain(self, future): if self.config['security_protocol'] == 'SASL_PLAINTEXT': log.warning('%s: Sending username and password in the clear', self) @@ -481,30 +493,23 @@ class BrokerConnection(object): self.config['sasl_plain_password']]).encode('utf-8')) size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) # The server will send a zero sized message (that is Int32(0)) on success. # The connection is closed on failure - while len(data) < 4: - fragment = self._sock.recv(4 - len(data)) - if not fragment: - log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) - error = Errors.AuthenticationFailedError( - 'Authentication failed for user {0}'.format( - self.config['sasl_plain_username'])) - future.failure(error) - raise error - data += fragment - self._sock.setblocking(False) - except (AssertionError, ConnectionError) as e: + self._recv_bytes_blocking(4) + + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) self.close(error=error) + return future.failure(error) if data != b'\x00\x00\x00\x00': - return future.failure(Errors.AuthenticationFailedError()) + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + return future.failure(error) - log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) + log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) def _try_authenticate_gssapi(self, future): @@ -514,52 +519,41 @@ class BrokerConnection(object): ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') - # Exchange tokens until authentication either succeeds or fails: + log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name) + + # Exchange tokens until authentication either succeeds or fails received_token = None try: while not ctx_Context.complete: - # calculate the output token - try: - output_token = ctx_Context.step(received_token) - except GSSError as e: - log.exception("%s: Error invalid token received from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + # calculate an output token from kafka token (or None if first iteration) + output_token = ctx_Context.step(received_token) - if not output_token: - if ctx_Context.complete: - log.debug("%s: Security Context complete ", self) - log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name) - break + # pass output token to kafka try: self._sock.setblocking(True) - # Send output token msg = output_token size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. # The gssapi will be able to identify the needed next step. # The connection is closed on failure. - response = self._sock.recv(2000) - self._sock.setblocking(False) + header = self._recv_bytes_blocking(4) + token_size = struct.unpack('>i', header) + received_token = self._recv_bytes_blocking(token_size) - except (AssertionError, ConnectionError) as e: + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) self.close(error=error) - - # pass the received token back to gssapi, strip the first 4 bytes - received_token = response[4:] + return future.failure(error) except Exception as e: - log.exception("%s: GSSAPI handshake error", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) - self.close(error=error) + return future.failure(e) - log.info('%s: Authenticated as %s', self, gssname) + log.info('%s: Authenticated as %s via GSSAPI', self, gssname) return future.success(True) def blacked_out(self): @@ -635,19 +629,16 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self.last_attempt = time.time() self._sasl_auth_future = None - self._reset_buffer() + self._protocol = KafkaProtocol( + client_id=self.config['client_id'], + api_version=self.config['api_version']) if error is None: error = Errors.Cancelled(str(self)) while self.in_flight_requests: - ifr = self.in_flight_requests.popleft() - ifr.future.failure(error) + (_, future, _) = self.in_flight_requests.popleft() + future.failure(error) self.config['state_change_callback'](self) - def _reset_buffer(self): - self._receiving = False - self._header.seek(0) - self._rbuffer = None - def send(self, request): """send request, return Future() @@ -665,13 +656,8 @@ class BrokerConnection(object): def _send(self, request): assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() - correlation_id = self._next_correlation_id() - header = RequestHeader(request, - correlation_id=correlation_id, - client_id=self.config['client_id']) - message = b''.join([header.encode(), request.encode()]) - size = Int32.encode(len(message)) - data = size + message + correlation_id = self._protocol.send_request(request) + data = self._protocol.send_bytes() try: # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block @@ -693,11 +679,7 @@ class BrokerConnection(object): log.debug('%s Request %d: %s', self, correlation_id, request) if request.expect_response(): - ifr = InFlightRequest(request=request, - correlation_id=correlation_id, - response_type=request.RESPONSE_TYPE, - future=future, - timestamp=time.time()) + ifr = (correlation_id, future, time.time()) self.in_flight_requests.append(ifr) else: future.success(None) @@ -714,7 +696,6 @@ class BrokerConnection(object): Return response if available """ - assert not self._processing, 'Recursion not supported' if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) # If requests are pending, we should close the socket and @@ -727,15 +708,28 @@ class BrokerConnection(object): log.warning('%s: No in-flight-requests to recv', self) return () - response = self._recv() - if not response and self.requests_timed_out(): + responses = self._recv() + if not responses and self.requests_timed_out(): log.warning('%s timed out after %s ms. Closing connection.', self, self.config['request_timeout_ms']) self.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % self.config['request_timeout_ms'])) return () - return response + + for response in responses: + (correlation_id, future, timestamp) = self.in_flight_requests.popleft() + if isinstance(response, Errors.KafkaError): + self.close(response) + break + + if self._sensors: + self._sensors.request_time.record((time.time() - timestamp) * 1000) + + log.debug('%s Response %d: %s', self, correlation_id, response) + future.success(response) + + return responses def _recv(self): responses = [] @@ -751,10 +745,7 @@ class BrokerConnection(object): log.error('%s: socket disconnected', self) self.close(error=Errors.ConnectionError('socket disconnected')) break - else: - responses.extend(self.receive_bytes(data)) - if len(data) < SOCK_CHUNK_BYTES: - break + except SSLWantReadError: break except ConnectionError as e: @@ -768,118 +759,26 @@ class BrokerConnection(object): if six.PY3: break raise - return responses - def receive_bytes(self, data): - i = 0 - n = len(data) - responses = [] - if self._sensors: - self._sensors.bytes_received.record(n) - while i < n: - - # Not receiving is the state of reading the payload header - if not self._receiving: - bytes_to_read = min(4 - self._header.tell(), n - i) - self._header.write(data[i:i+bytes_to_read]) - i += bytes_to_read - - if self._header.tell() == 4: - self._header.seek(0) - nbytes = Int32.decode(self._header) - # reset buffer and switch state to receiving payload bytes - self._rbuffer = KafkaBytes(nbytes) - self._receiving = True - elif self._header.tell() > 4: - raise Errors.KafkaError('this should not happen - are you threading?') - - - if self._receiving: - total_bytes = len(self._rbuffer) - staged_bytes = self._rbuffer.tell() - bytes_to_read = min(total_bytes - staged_bytes, n - i) - self._rbuffer.write(data[i:i+bytes_to_read]) - i += bytes_to_read - - staged_bytes = self._rbuffer.tell() - if staged_bytes > total_bytes: - self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?')) - - if staged_bytes != total_bytes: - break + if self._sensors: + self._sensors.bytes_received.record(len(data)) - self._receiving = False - self._rbuffer.seek(0) - resp = self._process_response(self._rbuffer) - if resp is not None: - responses.append(resp) - self._reset_buffer() - return responses + try: + more_responses = self._protocol.receive_bytes(data) + except Errors.KafkaProtocolError as e: + self.close(e) + break + else: + responses.extend([resp for (_, resp) in more_responses]) - def _process_response(self, read_buffer): - assert not self._processing, 'Recursion not supported' - self._processing = True - recv_correlation_id = Int32.decode(read_buffer) - - if not self.in_flight_requests: - error = Errors.CorrelationIdError( - '%s: No in-flight-request found for server response' - ' with correlation ID %d' - % (self, recv_correlation_id)) - self.close(error) - self._processing = False - return None - else: - ifr = self.in_flight_requests.popleft() - - if self._sensors: - self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000) - - # verify send/recv correlation ids match - - # 0.8.2 quirk - if (self.config['api_version'] == (0, 8, 2) and - ifr.response_type is GroupCoordinatorResponse[0] and - ifr.correlation_id != 0 and - recv_correlation_id == 0): - log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' - ' Correlation ID does not match request. This' - ' should go away once at least one topic has been' - ' initialized on the broker.') - - elif ifr.correlation_id != recv_correlation_id: - error = Errors.CorrelationIdError( - '%s: Correlation IDs do not match: sent %d, recv %d' - % (self, ifr.correlation_id, recv_correlation_id)) - ifr.future.failure(error) - self.close(error) - self._processing = False - return None - - # decode response - try: - response = ifr.response_type.decode(read_buffer) - except ValueError: - read_buffer.seek(0) - buf = read_buffer.read() - log.error('%s Response %d [ResponseType: %s Request: %s]:' - ' Unable to decode %d-byte buffer: %r', self, - ifr.correlation_id, ifr.response_type, - ifr.request, len(buf), buf) - error = Errors.UnknownError('Unable to decode response') - ifr.future.failure(error) - self.close(error) - self._processing = False - return None - - log.debug('%s Response %d: %s', self, ifr.correlation_id, response) - ifr.future.success(response) - self._processing = False - return response + if len(data) < SOCK_CHUNK_BYTES: + break + + return responses def requests_timed_out(self): if self.in_flight_requests: - oldest_at = self.in_flight_requests[0].timestamp + (_, _, oldest_at) = self.in_flight_requests[0] timeout = self.config['request_timeout_ms'] / 1000.0 if time.time() >= oldest_at + timeout: return True diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b7fbd83..a83d5da 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -846,13 +846,20 @@ class KafkaConsumer(six.Iterator): log.debug("Unsubscribed all topics or patterns and assigned partitions") def metrics(self, raw=False): - """Warning: this is an unstable interface. - It may change in future releases without warning""" + """Get metrics on consumer performance. + + This is ported from the Java Consumer, for details see: + https://kafka.apache.org/documentation/#new_consumer_monitoring + + Warning: + This is an unstable interface. It may change in future + releases without warning. + """ if raw: return self._metrics.metrics metrics = {} - for k, v in self._metrics.metrics.items(): + for k, v in six.iteritems(self._metrics.metrics): if k.group not in metrics: metrics[k.group] = {} if k.name not in metrics[k.group]: @@ -897,7 +904,7 @@ class KafkaConsumer(six.Iterator): raise UnsupportedVersionError( "offsets_for_times API not supported for cluster version {}" .format(self.config['api_version'])) - for tp, ts in timestamps.items(): + for tp, ts in six.iteritems(timestamps): timestamps[tp] = int(ts) if ts < 0: raise ValueError( diff --git a/kafka/errors.py b/kafka/errors.py index 35f9d94..c72455a 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -33,7 +33,11 @@ class NodeNotReadyError(KafkaError): retriable = True -class CorrelationIdError(KafkaError): +class KafkaProtocolError(KafkaError): + retriable = True + + +class CorrelationIdError(KafkaProtocolError): retriable = True diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 09ca744..de9dcd2 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -8,6 +8,8 @@ import threading import time import weakref +from ..vendor import six + from .. import errors as Errors from ..client_async import KafkaClient, selectors from ..metrics import MetricConfig, Metrics @@ -566,10 +568,10 @@ class KafkaProducer(object): Arguments: timeout (float, optional): timeout in seconds to wait for completion. - + Raises: - KafkaTimeoutError: failure to flush buffered records within the - provided timeout + KafkaTimeoutError: failure to flush buffered records within the + provided timeout """ log.debug("Flushing accumulated records in producer.") # trace self._accumulator.begin_flush() @@ -655,13 +657,20 @@ class KafkaProducer(object): available) def metrics(self, raw=False): - """Warning: this is an unstable interface. - It may change in future releases without warning""" + """Get metrics on producer performance. + + This is ported from the Java Producer, for details see: + https://kafka.apache.org/documentation/#producer_monitoring + + Warning: + This is an unstable interface. It may change in future + releases without warning. + """ if raw: return self._metrics.metrics metrics = {} - for k, v in self._metrics.metrics.items(): + for k, v in six.iteritems(self._metrics.metrics): if k.group not in metrics: metrics[k.group] = {} if k.name not in metrics[k.group]: diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py new file mode 100644 index 0000000..4d77bb3 --- /dev/null +++ b/kafka/protocol/parser.py @@ -0,0 +1,177 @@ +from __future__ import absolute_import + +import collections +import logging + +import kafka.errors as Errors +from kafka.protocol.api import RequestHeader +from kafka.protocol.commit import GroupCoordinatorResponse +from kafka.protocol.frame import KafkaBytes +from kafka.protocol.types import Int32 +from kafka.version import __version__ + +log = logging.getLogger(__name__) + + +class KafkaProtocol(object): + """Manage the kafka network protocol + + Use an instance of KafkaProtocol to manage bytes send/recv'd + from a network socket to a broker. + """ + def __init__(self, client_id=None, api_version=None): + if client_id is None: + client_id = self._gen_client_id() + self._client_id = client_id + self._api_version = api_version + self._correlation_id = 0 + self._header = KafkaBytes(4) + self._rbuffer = None + self._receiving = False + self.in_flight_requests = collections.deque() + self.bytes_to_send = [] + + def _next_correlation_id(self): + self._correlation_id = (self._correlation_id + 1) % 2**31 + return self._correlation_id + + def _gen_client_id(self): + return 'kafka-python' + __version__ + + def send_request(self, request, correlation_id=None): + """Encode and queue a kafka api request for sending. + + Arguments: + request (object): An un-encoded kafka request. + correlation_id (int, optional): Optionally specify an ID to + correlate requests with responses. If not provided, an ID will + be generated automatically. + + Returns: + correlation_id + """ + log.debug('Sending request %s', request) + if correlation_id is None: + correlation_id = self._next_correlation_id() + header = RequestHeader(request, + correlation_id=correlation_id, + client_id=self._client_id) + message = b''.join([header.encode(), request.encode()]) + size = Int32.encode(len(message)) + data = size + message + self.bytes_to_send.append(data) + if request.expect_response(): + ifr = (correlation_id, request) + self.in_flight_requests.append(ifr) + return correlation_id + + def send_bytes(self): + """Retrieve all pending bytes to send on the network""" + data = b''.join(self.bytes_to_send) + self.bytes_to_send = [] + return data + + def receive_bytes(self, data): + """Process bytes received from the network. + + Arguments: + data (bytes): any length bytes received from a network connection + to a kafka broker. + + Returns: + responses (list of (correlation_id, response)): any/all completed + responses, decoded from bytes to python objects. + + Raises: + KafkaProtocolError: if the bytes received could not be decoded. + CorrelationIdError: if the response does not match the request + correlation id. + """ + i = 0 + n = len(data) + responses = [] + while i < n: + + # Not receiving is the state of reading the payload header + if not self._receiving: + bytes_to_read = min(4 - self._header.tell(), n - i) + self._header.write(data[i:i+bytes_to_read]) + i += bytes_to_read + + if self._header.tell() == 4: + self._header.seek(0) + nbytes = Int32.decode(self._header) + # reset buffer and switch state to receiving payload bytes + self._rbuffer = KafkaBytes(nbytes) + self._receiving = True + elif self._header.tell() > 4: + raise Errors.KafkaError('this should not happen - are you threading?') + + if self._receiving: + total_bytes = len(self._rbuffer) + staged_bytes = self._rbuffer.tell() + bytes_to_read = min(total_bytes - staged_bytes, n - i) + self._rbuffer.write(data[i:i+bytes_to_read]) + i += bytes_to_read + + staged_bytes = self._rbuffer.tell() + if staged_bytes > total_bytes: + raise Errors.KafkaError('Receive buffer has more bytes than expected?') + + if staged_bytes != total_bytes: + break + + self._receiving = False + self._rbuffer.seek(0) + resp = self._process_response(self._rbuffer) + responses.append(resp) + self._reset_buffer() + return responses + + def _process_response(self, read_buffer): + recv_correlation_id = Int32.decode(read_buffer) + log.debug('Received correlation id: %d', recv_correlation_id) + + if not self.in_flight_requests: + raise Errors.CorrelationIdError( + 'No in-flight-request found for server response' + ' with correlation ID %d' + % recv_correlation_id) + + (correlation_id, request) = self.in_flight_requests.popleft() + + # 0.8.2 quirk + if (self._api_version == (0, 8, 2) and + request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and + correlation_id != 0 and + recv_correlation_id == 0): + log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' + ' Correlation ID does not match request. This' + ' should go away once at least one topic has been' + ' initialized on the broker.') + + elif correlation_id != recv_correlation_id: + # return or raise? + raise Errors.CorrelationIdError( + 'Correlation IDs do not match: sent %d, recv %d' + % (correlation_id, recv_correlation_id)) + + # decode response + log.debug('Processing response %s', request.RESPONSE_TYPE.__name__) + try: + response = request.RESPONSE_TYPE.decode(read_buffer) + except ValueError: + read_buffer.seek(0) + buf = read_buffer.read() + log.error('Response %d [ResponseType: %s Request: %s]:' + ' Unable to decode %d-byte buffer: %r', + correlation_id, request.RESPONSE_TYPE, + request, len(buf), buf) + raise Errors.KafkaProtocolError('Unable to decode response') + + return (correlation_id, response) + + def _reset_buffer(self): + self._receiving = False + self._header.seek(0) + self._rbuffer = None |