diff options
author | David Pursehouse <david.pursehouse@sonymobile.com> | 2013-09-13 19:03:24 +0900 |
---|---|---|
committer | David Pursehouse <david.pursehouse@sonymobile.com> | 2013-09-13 19:03:37 +0900 |
commit | 87b68fc2db307fe4e294abd913bcee80ecb9cb7f (patch) | |
tree | 70f1f534633cb128c780cc1b29ef685fe255798e /pygerrit | |
parent | 8571778ed4ebd1ebacba2334d8b6b19fbcd5a2b9 (diff) | |
parent | ae41ccac316c6bd45ec4af0146219212bbb08c87 (diff) | |
download | pygerrit-87b68fc2db307fe4e294abd913bcee80ecb9cb7f.tar.gz |
Merge branch 'master' into internal
* master:
Bump version to 0.1.1
Add changelog
Fix #10: Allow to manually specify ssh username and port
Completely refactor the stream event handling
Add missing __repr__ methods on ErrorEvent and UnhandledEvent
Fix initialisation of error event
Fix #11: correct handling of `identityfile` in the ssh config
Allow example script to continue if errors are received
Fix #9: Add a bit more detail in the documentation
Fix #8: Support the "topic-changed" stream event
Fix #7: Support the "reviewer-added" stream event
Fix #6: Support the "merge-failed" stream event
Fix #5: Move json parsing and error handling into the event factory
Improved logging in the example script
Fix #3: Don't treat unhandled event types as errors
Fix #4: Keep event's raw json data in the event object
Add __repr__ methods on event and model classes
Remove redundant `exec_command` method
Fix #2: Establish SSH connection in a thread-safe way
Fix #1: Use select.select() instead of select.poll()
Change-Id: Ib91384b16acca30bfc5aadcdc1131c8bdecef583
Diffstat (limited to 'pygerrit')
-rw-r--r-- | pygerrit/__init__.py | 2 | ||||
-rw-r--r-- | pygerrit/client.py | 10 | ||||
-rw-r--r-- | pygerrit/events.py | 168 | ||||
-rw-r--r-- | pygerrit/models.py | 17 | ||||
-rw-r--r-- | pygerrit/ssh.py | 79 | ||||
-rw-r--r-- | pygerrit/stream.py | 55 |
6 files changed, 241 insertions, 90 deletions
diff --git a/pygerrit/__init__.py b/pygerrit/__init__.py index 98d0f92..76c8321 100644 --- a/pygerrit/__init__.py +++ b/pygerrit/__init__.py @@ -22,7 +22,7 @@ """ Module to interface with Gerrit. """ -__numversion__ = (0, 1, 0) +__numversion__ = (0, 1, 1) __version__ = '.'.join([str(num) for num in __numversion__]) diff --git a/pygerrit/client.py b/pygerrit/client.py index b6502e1..456ce2e 100644 --- a/pygerrit/client.py +++ b/pygerrit/client.py @@ -37,11 +37,11 @@ class GerritClient(object): """ Gerrit client interface. """ - def __init__(self, host): + def __init__(self, host, username=None, port=None): self._factory = GerritEventFactory() self._events = Queue() self._stream = None - self._ssh_client = GerritSSHClient(host) + self._ssh_client = GerritSSHClient(host, username=username, port=port) def gerrit_version(self): """ Return the version of Gerrit that is connected to. """ @@ -112,15 +112,15 @@ class GerritClient(object): except Empty: return None - def put_event(self, json_data): - """ Create event from `json_data` and add it to the queue. + def put_event(self, data): + """ Create event from `data` and add it to the queue. Raise GerritError if the queue is full, or the factory could not create the event. """ try: - event = self._factory.create(json_data) + event = self._factory.create(data) self._events.put(event) except Full: raise GerritError("Unable to add event: queue is full") diff --git a/pygerrit/events.py b/pygerrit/events.py index 8a5c21f..ebbb327 100644 --- a/pygerrit/events.py +++ b/pygerrit/events.py @@ -23,6 +23,9 @@ """ Gerrit event classes. """ +import json +import logging + from .error import GerritError from .models import Account, Approval, Change, Patchset, RefUpdate @@ -53,19 +56,27 @@ class GerritEventFactory(object): return decorate @classmethod - def create(cls, json_data): + def create(cls, data): """ Create a new event instance. - Return an instance of the `GerritEvent` subclass from `json_data` - Raise GerritError if `json_data` does not contain a `type` key, or - no corresponding event is registered. + Return an instance of the `GerritEvent` subclass after converting + `data` to json. + + Raise GerritError if json parsed from `data` does not contain a `type` + key. """ + try: + json_data = json.loads(data) + except ValueError as err: + logging.debug("Failed to load json data: %s: [%s]", str(err), data) + json_data = json.loads(ErrorEvent.error_json(err)) + if not "type" in json_data: raise GerritError("`type` not in json_data") name = json_data["type"] if not name in cls._events: - raise GerritError("Unknown event: %s" % name) + name = 'unhandled-event' event = cls._events[name] module_name = event[0] class_name = event[1] @@ -78,11 +89,39 @@ class GerritEvent(object): """ Gerrit event base class. """ - def __init__(self): - pass + def __init__(self, json_data): + self.json = json_data + + +@GerritEventFactory.register("unhandled-event") +class UnhandledEvent(GerritEvent): + + """ Unknown event type received in json data from Gerrit's event stream. """ + + def __init__(self, json_data): + super(UnhandledEvent, self).__init__(json_data) + + def __repr__(self): + return u"<UnhandledEvent>" + + +@GerritEventFactory.register("error-event") +class ErrorEvent(GerritEvent): + + """ Error occurred when processing json data from Gerrit's event stream. """ - def __str__(self): - return u"%s" % self.name # pylint: disable=no-member + def __init__(self, json_data): + super(ErrorEvent, self).__init__(json_data) + self.error = json_data["error"] + + @classmethod + def error_json(cls, error): + """ Return a json string for the `error`. """ + return '{"type":"error-event",' \ + '"error":"%s"}' % str(error) + + def __repr__(self): + return u"<ErrorEvent: %s>" % self.error @GerritEventFactory.register("patchset-created") @@ -91,7 +130,7 @@ class PatchsetCreatedEvent(GerritEvent): """ Gerrit "patchset-created" event. """ def __init__(self, json_data): - super(PatchsetCreatedEvent, self).__init__() + super(PatchsetCreatedEvent, self).__init__(json_data) try: self.change = Change(json_data["change"]) self.patchset = Patchset(json_data["patchSet"]) @@ -99,6 +138,11 @@ class PatchsetCreatedEvent(GerritEvent): except KeyError as e: raise GerritError("PatchsetCreatedEvent: %s" % e) + def __repr__(self): + return u"<PatchsetCreatedEvent>: %s %s %s" % (self.change, + self.patchset, + self.uploader) + @GerritEventFactory.register("draft-published") class DraftPublishedEvent(GerritEvent): @@ -106,7 +150,7 @@ class DraftPublishedEvent(GerritEvent): """ Gerrit "draft-published" event. """ def __init__(self, json_data): - super(DraftPublishedEvent, self).__init__() + super(DraftPublishedEvent, self).__init__(json_data) try: self.change = Change(json_data["change"]) self.patchset = Patchset(json_data["patchSet"]) @@ -114,6 +158,11 @@ class DraftPublishedEvent(GerritEvent): except KeyError as e: raise GerritError("DraftPublishedEvent: %s" % e) + def __repr__(self): + return u"<DraftPublishedEvent>: %s %s %s" % (self.change, + self.patchset, + self.uploader) + @GerritEventFactory.register("comment-added") class CommentAddedEvent(GerritEvent): @@ -121,7 +170,7 @@ class CommentAddedEvent(GerritEvent): """ Gerrit "comment-added" event. """ def __init__(self, json_data): - super(CommentAddedEvent, self).__init__() + super(CommentAddedEvent, self).__init__(json_data) try: self.change = Change(json_data["change"]) self.patchset = Patchset(json_data["patchSet"]) @@ -134,6 +183,11 @@ class CommentAddedEvent(GerritEvent): except (KeyError, ValueError) as e: raise GerritError("CommentAddedEvent: %s" % e) + def __repr__(self): + return u"<CommentAddedEvent>: %s %s %s" % (self.change, + self.patchset, + self.author) + @GerritEventFactory.register("change-merged") class ChangeMergedEvent(GerritEvent): @@ -141,7 +195,7 @@ class ChangeMergedEvent(GerritEvent): """ Gerrit "change-merged" event. """ def __init__(self, json_data): - super(ChangeMergedEvent, self).__init__() + super(ChangeMergedEvent, self).__init__(json_data) try: self.change = Change(json_data["change"]) self.patchset = Patchset(json_data["patchSet"]) @@ -149,6 +203,32 @@ class ChangeMergedEvent(GerritEvent): except KeyError as e: raise GerritError("ChangeMergedEvent: %s" % e) + def __repr__(self): + return u"<ChangeMergedEvent>: %s %s %s" % (self.change, + self.patchset, + self.submitter) + + +@GerritEventFactory.register("merge-failed") +class MergeFailedEvent(GerritEvent): + + """ Gerrit "merge-failed" event. """ + + def __init__(self, json_data): + super(MergeFailedEvent, self).__init__(json_data) + try: + self.change = Change(json_data["change"]) + self.patchset = Patchset(json_data["patchSet"]) + self.submitter = Account(json_data["submitter"]) + self.reason = json_data["reason"] + except KeyError as e: + raise GerritError("MergeFailedEvent: %s" % e) + + def __repr__(self): + return u"<MergeFailedEvent>: %s %s %s" % (self.change, + self.patchset, + self.submitter) + @GerritEventFactory.register("change-abandoned") class ChangeAbandonedEvent(GerritEvent): @@ -156,7 +236,7 @@ class ChangeAbandonedEvent(GerritEvent): """ Gerrit "change-abandoned" event. """ def __init__(self, json_data): - super(ChangeAbandonedEvent, self).__init__() + super(ChangeAbandonedEvent, self).__init__(json_data) try: self.change = Change(json_data["change"]) self.patchset = Patchset.from_json(json_data) @@ -165,6 +245,11 @@ class ChangeAbandonedEvent(GerritEvent): except KeyError as e: raise GerritError("ChangeAbandonedEvent: %s" % e) + def __repr__(self): + return u"<ChangeAbandonedEvent>: %s %s %s" % (self.change, + self.patchset, + self.abandoner) + @GerritEventFactory.register("change-restored") class ChangeRestoredEvent(GerritEvent): @@ -172,7 +257,7 @@ class ChangeRestoredEvent(GerritEvent): """ Gerrit "change-restored" event. """ def __init__(self, json_data): - super(ChangeRestoredEvent, self).__init__() + super(ChangeRestoredEvent, self).__init__(json_data) try: self.change = Change(json_data["change"]) self.patchset = Patchset.from_json(json_data) @@ -181,6 +266,11 @@ class ChangeRestoredEvent(GerritEvent): except KeyError as e: raise GerritError("ChangeRestoredEvent: %s" % e) + def __repr__(self): + return u"<ChangeRestoredEvent>: %s %s %s" % (self.change, + self.patchset, + self.restorer) + @GerritEventFactory.register("ref-updated") class RefUpdatedEvent(GerritEvent): @@ -188,9 +278,55 @@ class RefUpdatedEvent(GerritEvent): """ Gerrit "ref-updated" event. """ def __init__(self, json_data): - super(RefUpdatedEvent, self).__init__() + super(RefUpdatedEvent, self).__init__(json_data) try: self.ref_update = RefUpdate(json_data["refUpdate"]) self.submitter = Account.from_json(json_data, "submitter") except KeyError as e: raise GerritError("RefUpdatedEvent: %s" % e) + + def __repr__(self): + return u"<RefUpdatedEvent>: %s %s" % (self.ref_update, self.submitter) + + +@GerritEventFactory.register("reviewer-added") +class ReviewerAddedEvent(GerritEvent): + + """ Gerrit "reviewer-added" event. """ + + def __init__(self, json_data): + super(ReviewerAddedEvent, self).__init__(json_data) + try: + self.change = Change(json_data["change"]) + self.patchset = Patchset.from_json(json_data) + self.reviewer = Account(json_data["reviewer"]) + except KeyError as e: + raise GerritError("ReviewerAddedEvent: %s" % e) + + def __repr__(self): + return u"<ReviewerAddedEvent>: %s %s %s" % (self.change, + self.patchset, + self.reviewer) + + +@GerritEventFactory.register("topic-changed") +class TopicChangedEvent(GerritEvent): + + """ Gerrit "topic-changed" event. """ + + def __init__(self, json_data): + super(TopicChangedEvent, self).__init__(json_data) + try: + self.change = Change(json_data["change"]) + self.changer = Account(json_data["changer"]) + if "oldTopic" in json_data: + self.oldtopic = json_data["oldTopic"] + else: + self.oldtopic = "" + except KeyError as e: + raise GerritError("TopicChangedEvent: %s" % e) + + def __repr__(self): + return u"<TopicChangedEvent>: %s %s [%s]" % (self.change, + self.changer, + self.oldtopic) diff --git a/pygerrit/models.py b/pygerrit/models.py index 2e79949..ac9575a 100644 --- a/pygerrit/models.py +++ b/pygerrit/models.py @@ -34,6 +34,10 @@ class Account(object): self.name = from_json(json_data, "name") self.email = from_json(json_data, "email") + def __repr__(self): + return u"<Account %s%s>" % (self.name, + " (%s)" % self.email if self.email else "") + @staticmethod def from_json(json_data, key): """ Create an Account instance. @@ -61,6 +65,9 @@ class Change(object): self.url = from_json(json_data, "url") self.owner = Account.from_json(json_data, "owner") + def __repr__(self): + return u"<Change %s, %s, %s>" % (self.number, self.project, self.branch) + class Patchset(object): @@ -72,6 +79,9 @@ class Patchset(object): self.ref = from_json(json_data, "ref") self.uploader = Account.from_json(json_data, "uploader") + def __repr__(self): + return u"<Patchset %s, %s>" % (self.number, self.revision) + @staticmethod def from_json(json_data): r""" Create a Patchset instance. @@ -94,6 +104,9 @@ class Approval(object): self.value = from_json(json_data, "value") self.description = from_json(json_data, "description") + def __repr__(self): + return u"<Approval %s %s>" % (self.description, self.value) + class RefUpdate(object): @@ -104,3 +117,7 @@ class RefUpdate(object): self.newrev = from_json(json_data, "newRev") self.refname = from_json(json_data, "refName") self.project = from_json(json_data, "project") + + def __repr__(self): + return "<RefUpdate %s %s %s %s>" % \ + (self.project, self.refname, self.oldrev, self.newrev) diff --git a/pygerrit/ssh.py b/pygerrit/ssh.py index 3e2add1..e779715 100644 --- a/pygerrit/ssh.py +++ b/pygerrit/ssh.py @@ -25,6 +25,7 @@ from os.path import abspath, expanduser, isfile import re import socket +from threading import Event, Lock from .error import GerritError @@ -64,18 +65,19 @@ class GerritSSHClient(SSHClient): """ Gerrit SSH Client, wrapping the paramiko SSH Client. """ - def __init__(self, hostname): + def __init__(self, hostname, username=None, port=None): """ Initialise and connect to SSH. """ super(GerritSSHClient, self).__init__() self.remote_version = None self.hostname = hostname - self.connected = False - - def _connect(self): - """ Connect to the remote if not already connected. """ - if self.connected: - return - self.load_system_host_keys() + self.username = username + self.key_filename = None + self.port = port + self.connected = Event() + self.lock = Lock() + + def _configure(self): + """ Configure the ssh parameters from the config file. """ configfile = expanduser("~/.ssh/config") if not isfile(configfile): raise GerritError("ssh config file '%s' does not exist" % @@ -88,22 +90,29 @@ class GerritSSHClient(SSHClient): raise GerritError("No ssh config for host %s" % self.hostname) if not 'hostname' in data or not 'port' in data or not 'user' in data: raise GerritError("Missing configuration data in %s" % configfile) - key_filename = None + self.hostname = data['hostname'] + self.username = data['user'] if 'identityfile' in data: - key_filename = abspath(expanduser(data['identityfile'])) + key_filename = abspath(expanduser(data['identityfile'][0])) if not isfile(key_filename): raise GerritError("Identity file '%s' does not exist" % key_filename) + self.key_filename = key_filename try: - port = int(data['port']) + self.port = int(data['port']) except ValueError: raise GerritError("Invalid port: %s" % data['port']) + + def _do_connect(self): + """ Connect to the remote. """ + self.load_system_host_keys() + if self.username is None or self.port is None: + self._configure() try: - self.connect(hostname=data['hostname'], - port=port, - username=data['user'], - key_filename=key_filename) - self.connected = True + self.connect(hostname=self.hostname, + port=self.port, + username=self.username, + key_filename=self.key_filename) except socket.error as e: raise GerritError("Failed to connect to server: %s" % e) @@ -114,17 +123,20 @@ class GerritSSHClient(SSHClient): except AttributeError: self.remote_version = None - def exec_command(self, command, bufsize=1, timeout=None, get_pty=False): - """ Execute the command. - - Make sure we're connected and then execute the command. - - Return a tuple of stdin, stdout, stderr. - - """ - self._connect() - return super(GerritSSHClient, self).\ - exec_command(command, bufsize, timeout, get_pty) + def _connect(self): + """ Connect to the remote if not already connected. """ + if not self.connected.is_set(): + try: + self.lock.acquire() + # Another thread may have connected while we were + # waiting to acquire the lock + if not self.connected.is_set(): + self._do_connect() + self.connected.set() + except GerritError: + raise + finally: + self.lock.release() def get_remote_version(self): """ Return the version of the remote Gerrit server. """ @@ -138,17 +150,24 @@ class GerritSSHClient(SSHClient): def run_gerrit_command(self, command): """ Run the given command. - Run `command` and return a `GerritSSHCommandResult`. + Make sure we're connected to the remote server, and run `command`. + + Return the results as a `GerritSSHCommandResult`. - Raise `ValueError` if `command` is not a string. + Raise `ValueError` if `command` is not a string, or `GerritError` if + command execution fails. """ if not isinstance(command, basestring): raise ValueError("command must be a string") gerrit_command = "gerrit " + command + self._connect() try: - stdin, stdout, stderr = self.exec_command(gerrit_command) + stdin, stdout, stderr = self.exec_command(gerrit_command, + bufsize=1, + timeout=None, + get_pty=False) except SSHException as err: raise GerritError("Command execution error: %s" % err) return GerritSSHCommandResult(command, stdin, stdout, stderr) diff --git a/pygerrit/stream.py b/pygerrit/stream.py index d7633e8..37b09f1 100644 --- a/pygerrit/stream.py +++ b/pygerrit/stream.py @@ -26,23 +26,9 @@ Class to listen to the Gerrit event stream and dispatch events. """ -import json -import logging -from select import poll, POLLIN from threading import Thread, Event -from .error import GerritError -from .events import GerritEvent, GerritEventFactory - - -@GerritEventFactory.register("gerrit-stream-error") -class GerritStreamErrorEvent(GerritEvent): - - """ Represents an error when handling the gerrit event stream. """ - - def __init__(self, json_data): - super(GerritStreamErrorEvent, self).__init__() - self.error = json_data["error"] +from .events import ErrorEvent class GerritStream(Thread): @@ -62,29 +48,22 @@ class GerritStream(Thread): def _error_event(self, error): """ Dispatch `error` to the Gerrit client. """ - json_data = json.loads('{"type":"gerrit-stream-error",' - '"error":"%s"}' % str(error)) - self._gerrit.put_event(json_data) + self._gerrit.put_event(ErrorEvent.error_json(error)) def run(self): """ Listen to the stream and send events to the client. """ - try: - result = self._ssh_client.run_gerrit_command("stream-events") - except GerritError as e: - self._error_event(e) - else: - poller = poll() - stdout = result.stdout - poller.register(stdout.channel) - while not self._stop.is_set(): - data = poller.poll() - for (handle, event) in data: - if handle == stdout.channel.fileno() and event == POLLIN: - try: - line = stdout.readline() - json_data = json.loads(line) - self._gerrit.put_event(json_data) - except (ValueError, IOError) as err: - self._error_event(err) - except GerritError as err: - logging.error("Failed to put event: %s", err) + channel = self._ssh_client.get_transport().open_session() + channel.exec_command("gerrit stream-events") + stdout = channel.makefile() + stderr = channel.makefile_stderr() + while not self._stop.is_set(): + if channel.exit_status_ready(): + if channel.recv_stderr_ready(): + error = stderr.readline().strip() + else: + error = "Remote server connection closed" + self._error_event(error) + self._stop.set() + elif channel.recv_ready(): + data = stdout.readline() + self._gerrit.put_event(data) |