summaryrefslogtreecommitdiff
path: root/pygerrit
diff options
context:
space:
mode:
authorDavid Pursehouse <david.pursehouse@sonymobile.com>2013-09-13 19:03:24 +0900
committerDavid Pursehouse <david.pursehouse@sonymobile.com>2013-09-13 19:03:37 +0900
commit87b68fc2db307fe4e294abd913bcee80ecb9cb7f (patch)
tree70f1f534633cb128c780cc1b29ef685fe255798e /pygerrit
parent8571778ed4ebd1ebacba2334d8b6b19fbcd5a2b9 (diff)
parentae41ccac316c6bd45ec4af0146219212bbb08c87 (diff)
downloadpygerrit-87b68fc2db307fe4e294abd913bcee80ecb9cb7f.tar.gz
Merge branch 'master' into internal
* master: Bump version to 0.1.1 Add changelog Fix #10: Allow to manually specify ssh username and port Completely refactor the stream event handling Add missing __repr__ methods on ErrorEvent and UnhandledEvent Fix initialisation of error event Fix #11: correct handling of `identityfile` in the ssh config Allow example script to continue if errors are received Fix #9: Add a bit more detail in the documentation Fix #8: Support the "topic-changed" stream event Fix #7: Support the "reviewer-added" stream event Fix #6: Support the "merge-failed" stream event Fix #5: Move json parsing and error handling into the event factory Improved logging in the example script Fix #3: Don't treat unhandled event types as errors Fix #4: Keep event's raw json data in the event object Add __repr__ methods on event and model classes Remove redundant `exec_command` method Fix #2: Establish SSH connection in a thread-safe way Fix #1: Use select.select() instead of select.poll() Change-Id: Ib91384b16acca30bfc5aadcdc1131c8bdecef583
Diffstat (limited to 'pygerrit')
-rw-r--r--pygerrit/__init__.py2
-rw-r--r--pygerrit/client.py10
-rw-r--r--pygerrit/events.py168
-rw-r--r--pygerrit/models.py17
-rw-r--r--pygerrit/ssh.py79
-rw-r--r--pygerrit/stream.py55
6 files changed, 241 insertions, 90 deletions
diff --git a/pygerrit/__init__.py b/pygerrit/__init__.py
index 98d0f92..76c8321 100644
--- a/pygerrit/__init__.py
+++ b/pygerrit/__init__.py
@@ -22,7 +22,7 @@
""" Module to interface with Gerrit. """
-__numversion__ = (0, 1, 0)
+__numversion__ = (0, 1, 1)
__version__ = '.'.join([str(num) for num in __numversion__])
diff --git a/pygerrit/client.py b/pygerrit/client.py
index b6502e1..456ce2e 100644
--- a/pygerrit/client.py
+++ b/pygerrit/client.py
@@ -37,11 +37,11 @@ class GerritClient(object):
""" Gerrit client interface. """
- def __init__(self, host):
+ def __init__(self, host, username=None, port=None):
self._factory = GerritEventFactory()
self._events = Queue()
self._stream = None
- self._ssh_client = GerritSSHClient(host)
+ self._ssh_client = GerritSSHClient(host, username=username, port=port)
def gerrit_version(self):
""" Return the version of Gerrit that is connected to. """
@@ -112,15 +112,15 @@ class GerritClient(object):
except Empty:
return None
- def put_event(self, json_data):
- """ Create event from `json_data` and add it to the queue.
+ def put_event(self, data):
+ """ Create event from `data` and add it to the queue.
Raise GerritError if the queue is full, or the factory could not
create the event.
"""
try:
- event = self._factory.create(json_data)
+ event = self._factory.create(data)
self._events.put(event)
except Full:
raise GerritError("Unable to add event: queue is full")
diff --git a/pygerrit/events.py b/pygerrit/events.py
index 8a5c21f..ebbb327 100644
--- a/pygerrit/events.py
+++ b/pygerrit/events.py
@@ -23,6 +23,9 @@
""" Gerrit event classes. """
+import json
+import logging
+
from .error import GerritError
from .models import Account, Approval, Change, Patchset, RefUpdate
@@ -53,19 +56,27 @@ class GerritEventFactory(object):
return decorate
@classmethod
- def create(cls, json_data):
+ def create(cls, data):
""" Create a new event instance.
- Return an instance of the `GerritEvent` subclass from `json_data`
- Raise GerritError if `json_data` does not contain a `type` key, or
- no corresponding event is registered.
+ Return an instance of the `GerritEvent` subclass after converting
+ `data` to json.
+
+ Raise GerritError if json parsed from `data` does not contain a `type`
+ key.
"""
+ try:
+ json_data = json.loads(data)
+ except ValueError as err:
+ logging.debug("Failed to load json data: %s: [%s]", str(err), data)
+ json_data = json.loads(ErrorEvent.error_json(err))
+
if not "type" in json_data:
raise GerritError("`type` not in json_data")
name = json_data["type"]
if not name in cls._events:
- raise GerritError("Unknown event: %s" % name)
+ name = 'unhandled-event'
event = cls._events[name]
module_name = event[0]
class_name = event[1]
@@ -78,11 +89,39 @@ class GerritEvent(object):
""" Gerrit event base class. """
- def __init__(self):
- pass
+ def __init__(self, json_data):
+ self.json = json_data
+
+
+@GerritEventFactory.register("unhandled-event")
+class UnhandledEvent(GerritEvent):
+
+ """ Unknown event type received in json data from Gerrit's event stream. """
+
+ def __init__(self, json_data):
+ super(UnhandledEvent, self).__init__(json_data)
+
+ def __repr__(self):
+ return u"<UnhandledEvent>"
+
+
+@GerritEventFactory.register("error-event")
+class ErrorEvent(GerritEvent):
+
+ """ Error occurred when processing json data from Gerrit's event stream. """
- def __str__(self):
- return u"%s" % self.name # pylint: disable=no-member
+ def __init__(self, json_data):
+ super(ErrorEvent, self).__init__(json_data)
+ self.error = json_data["error"]
+
+ @classmethod
+ def error_json(cls, error):
+ """ Return a json string for the `error`. """
+ return '{"type":"error-event",' \
+ '"error":"%s"}' % str(error)
+
+ def __repr__(self):
+ return u"<ErrorEvent: %s>" % self.error
@GerritEventFactory.register("patchset-created")
@@ -91,7 +130,7 @@ class PatchsetCreatedEvent(GerritEvent):
""" Gerrit "patchset-created" event. """
def __init__(self, json_data):
- super(PatchsetCreatedEvent, self).__init__()
+ super(PatchsetCreatedEvent, self).__init__(json_data)
try:
self.change = Change(json_data["change"])
self.patchset = Patchset(json_data["patchSet"])
@@ -99,6 +138,11 @@ class PatchsetCreatedEvent(GerritEvent):
except KeyError as e:
raise GerritError("PatchsetCreatedEvent: %s" % e)
+ def __repr__(self):
+ return u"<PatchsetCreatedEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.uploader)
+
@GerritEventFactory.register("draft-published")
class DraftPublishedEvent(GerritEvent):
@@ -106,7 +150,7 @@ class DraftPublishedEvent(GerritEvent):
""" Gerrit "draft-published" event. """
def __init__(self, json_data):
- super(DraftPublishedEvent, self).__init__()
+ super(DraftPublishedEvent, self).__init__(json_data)
try:
self.change = Change(json_data["change"])
self.patchset = Patchset(json_data["patchSet"])
@@ -114,6 +158,11 @@ class DraftPublishedEvent(GerritEvent):
except KeyError as e:
raise GerritError("DraftPublishedEvent: %s" % e)
+ def __repr__(self):
+ return u"<DraftPublishedEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.uploader)
+
@GerritEventFactory.register("comment-added")
class CommentAddedEvent(GerritEvent):
@@ -121,7 +170,7 @@ class CommentAddedEvent(GerritEvent):
""" Gerrit "comment-added" event. """
def __init__(self, json_data):
- super(CommentAddedEvent, self).__init__()
+ super(CommentAddedEvent, self).__init__(json_data)
try:
self.change = Change(json_data["change"])
self.patchset = Patchset(json_data["patchSet"])
@@ -134,6 +183,11 @@ class CommentAddedEvent(GerritEvent):
except (KeyError, ValueError) as e:
raise GerritError("CommentAddedEvent: %s" % e)
+ def __repr__(self):
+ return u"<CommentAddedEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.author)
+
@GerritEventFactory.register("change-merged")
class ChangeMergedEvent(GerritEvent):
@@ -141,7 +195,7 @@ class ChangeMergedEvent(GerritEvent):
""" Gerrit "change-merged" event. """
def __init__(self, json_data):
- super(ChangeMergedEvent, self).__init__()
+ super(ChangeMergedEvent, self).__init__(json_data)
try:
self.change = Change(json_data["change"])
self.patchset = Patchset(json_data["patchSet"])
@@ -149,6 +203,32 @@ class ChangeMergedEvent(GerritEvent):
except KeyError as e:
raise GerritError("ChangeMergedEvent: %s" % e)
+ def __repr__(self):
+ return u"<ChangeMergedEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.submitter)
+
+
+@GerritEventFactory.register("merge-failed")
+class MergeFailedEvent(GerritEvent):
+
+ """ Gerrit "merge-failed" event. """
+
+ def __init__(self, json_data):
+ super(MergeFailedEvent, self).__init__(json_data)
+ try:
+ self.change = Change(json_data["change"])
+ self.patchset = Patchset(json_data["patchSet"])
+ self.submitter = Account(json_data["submitter"])
+ self.reason = json_data["reason"]
+ except KeyError as e:
+ raise GerritError("MergeFailedEvent: %s" % e)
+
+ def __repr__(self):
+ return u"<MergeFailedEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.submitter)
+
@GerritEventFactory.register("change-abandoned")
class ChangeAbandonedEvent(GerritEvent):
@@ -156,7 +236,7 @@ class ChangeAbandonedEvent(GerritEvent):
""" Gerrit "change-abandoned" event. """
def __init__(self, json_data):
- super(ChangeAbandonedEvent, self).__init__()
+ super(ChangeAbandonedEvent, self).__init__(json_data)
try:
self.change = Change(json_data["change"])
self.patchset = Patchset.from_json(json_data)
@@ -165,6 +245,11 @@ class ChangeAbandonedEvent(GerritEvent):
except KeyError as e:
raise GerritError("ChangeAbandonedEvent: %s" % e)
+ def __repr__(self):
+ return u"<ChangeAbandonedEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.abandoner)
+
@GerritEventFactory.register("change-restored")
class ChangeRestoredEvent(GerritEvent):
@@ -172,7 +257,7 @@ class ChangeRestoredEvent(GerritEvent):
""" Gerrit "change-restored" event. """
def __init__(self, json_data):
- super(ChangeRestoredEvent, self).__init__()
+ super(ChangeRestoredEvent, self).__init__(json_data)
try:
self.change = Change(json_data["change"])
self.patchset = Patchset.from_json(json_data)
@@ -181,6 +266,11 @@ class ChangeRestoredEvent(GerritEvent):
except KeyError as e:
raise GerritError("ChangeRestoredEvent: %s" % e)
+ def __repr__(self):
+ return u"<ChangeRestoredEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.restorer)
+
@GerritEventFactory.register("ref-updated")
class RefUpdatedEvent(GerritEvent):
@@ -188,9 +278,55 @@ class RefUpdatedEvent(GerritEvent):
""" Gerrit "ref-updated" event. """
def __init__(self, json_data):
- super(RefUpdatedEvent, self).__init__()
+ super(RefUpdatedEvent, self).__init__(json_data)
try:
self.ref_update = RefUpdate(json_data["refUpdate"])
self.submitter = Account.from_json(json_data, "submitter")
except KeyError as e:
raise GerritError("RefUpdatedEvent: %s" % e)
+
+ def __repr__(self):
+ return u"<RefUpdatedEvent>: %s %s" % (self.ref_update, self.submitter)
+
+
+@GerritEventFactory.register("reviewer-added")
+class ReviewerAddedEvent(GerritEvent):
+
+ """ Gerrit "reviewer-added" event. """
+
+ def __init__(self, json_data):
+ super(ReviewerAddedEvent, self).__init__(json_data)
+ try:
+ self.change = Change(json_data["change"])
+ self.patchset = Patchset.from_json(json_data)
+ self.reviewer = Account(json_data["reviewer"])
+ except KeyError as e:
+ raise GerritError("ReviewerAddedEvent: %s" % e)
+
+ def __repr__(self):
+ return u"<ReviewerAddedEvent>: %s %s %s" % (self.change,
+ self.patchset,
+ self.reviewer)
+
+
+@GerritEventFactory.register("topic-changed")
+class TopicChangedEvent(GerritEvent):
+
+ """ Gerrit "topic-changed" event. """
+
+ def __init__(self, json_data):
+ super(TopicChangedEvent, self).__init__(json_data)
+ try:
+ self.change = Change(json_data["change"])
+ self.changer = Account(json_data["changer"])
+ if "oldTopic" in json_data:
+ self.oldtopic = json_data["oldTopic"]
+ else:
+ self.oldtopic = ""
+ except KeyError as e:
+ raise GerritError("TopicChangedEvent: %s" % e)
+
+ def __repr__(self):
+ return u"<TopicChangedEvent>: %s %s [%s]" % (self.change,
+ self.changer,
+ self.oldtopic)
diff --git a/pygerrit/models.py b/pygerrit/models.py
index 2e79949..ac9575a 100644
--- a/pygerrit/models.py
+++ b/pygerrit/models.py
@@ -34,6 +34,10 @@ class Account(object):
self.name = from_json(json_data, "name")
self.email = from_json(json_data, "email")
+ def __repr__(self):
+ return u"<Account %s%s>" % (self.name,
+ " (%s)" % self.email if self.email else "")
+
@staticmethod
def from_json(json_data, key):
""" Create an Account instance.
@@ -61,6 +65,9 @@ class Change(object):
self.url = from_json(json_data, "url")
self.owner = Account.from_json(json_data, "owner")
+ def __repr__(self):
+ return u"<Change %s, %s, %s>" % (self.number, self.project, self.branch)
+
class Patchset(object):
@@ -72,6 +79,9 @@ class Patchset(object):
self.ref = from_json(json_data, "ref")
self.uploader = Account.from_json(json_data, "uploader")
+ def __repr__(self):
+ return u"<Patchset %s, %s>" % (self.number, self.revision)
+
@staticmethod
def from_json(json_data):
r""" Create a Patchset instance.
@@ -94,6 +104,9 @@ class Approval(object):
self.value = from_json(json_data, "value")
self.description = from_json(json_data, "description")
+ def __repr__(self):
+ return u"<Approval %s %s>" % (self.description, self.value)
+
class RefUpdate(object):
@@ -104,3 +117,7 @@ class RefUpdate(object):
self.newrev = from_json(json_data, "newRev")
self.refname = from_json(json_data, "refName")
self.project = from_json(json_data, "project")
+
+ def __repr__(self):
+ return "<RefUpdate %s %s %s %s>" % \
+ (self.project, self.refname, self.oldrev, self.newrev)
diff --git a/pygerrit/ssh.py b/pygerrit/ssh.py
index 3e2add1..e779715 100644
--- a/pygerrit/ssh.py
+++ b/pygerrit/ssh.py
@@ -25,6 +25,7 @@
from os.path import abspath, expanduser, isfile
import re
import socket
+from threading import Event, Lock
from .error import GerritError
@@ -64,18 +65,19 @@ class GerritSSHClient(SSHClient):
""" Gerrit SSH Client, wrapping the paramiko SSH Client. """
- def __init__(self, hostname):
+ def __init__(self, hostname, username=None, port=None):
""" Initialise and connect to SSH. """
super(GerritSSHClient, self).__init__()
self.remote_version = None
self.hostname = hostname
- self.connected = False
-
- def _connect(self):
- """ Connect to the remote if not already connected. """
- if self.connected:
- return
- self.load_system_host_keys()
+ self.username = username
+ self.key_filename = None
+ self.port = port
+ self.connected = Event()
+ self.lock = Lock()
+
+ def _configure(self):
+ """ Configure the ssh parameters from the config file. """
configfile = expanduser("~/.ssh/config")
if not isfile(configfile):
raise GerritError("ssh config file '%s' does not exist" %
@@ -88,22 +90,29 @@ class GerritSSHClient(SSHClient):
raise GerritError("No ssh config for host %s" % self.hostname)
if not 'hostname' in data or not 'port' in data or not 'user' in data:
raise GerritError("Missing configuration data in %s" % configfile)
- key_filename = None
+ self.hostname = data['hostname']
+ self.username = data['user']
if 'identityfile' in data:
- key_filename = abspath(expanduser(data['identityfile']))
+ key_filename = abspath(expanduser(data['identityfile'][0]))
if not isfile(key_filename):
raise GerritError("Identity file '%s' does not exist" %
key_filename)
+ self.key_filename = key_filename
try:
- port = int(data['port'])
+ self.port = int(data['port'])
except ValueError:
raise GerritError("Invalid port: %s" % data['port'])
+
+ def _do_connect(self):
+ """ Connect to the remote. """
+ self.load_system_host_keys()
+ if self.username is None or self.port is None:
+ self._configure()
try:
- self.connect(hostname=data['hostname'],
- port=port,
- username=data['user'],
- key_filename=key_filename)
- self.connected = True
+ self.connect(hostname=self.hostname,
+ port=self.port,
+ username=self.username,
+ key_filename=self.key_filename)
except socket.error as e:
raise GerritError("Failed to connect to server: %s" % e)
@@ -114,17 +123,20 @@ class GerritSSHClient(SSHClient):
except AttributeError:
self.remote_version = None
- def exec_command(self, command, bufsize=1, timeout=None, get_pty=False):
- """ Execute the command.
-
- Make sure we're connected and then execute the command.
-
- Return a tuple of stdin, stdout, stderr.
-
- """
- self._connect()
- return super(GerritSSHClient, self).\
- exec_command(command, bufsize, timeout, get_pty)
+ def _connect(self):
+ """ Connect to the remote if not already connected. """
+ if not self.connected.is_set():
+ try:
+ self.lock.acquire()
+ # Another thread may have connected while we were
+ # waiting to acquire the lock
+ if not self.connected.is_set():
+ self._do_connect()
+ self.connected.set()
+ except GerritError:
+ raise
+ finally:
+ self.lock.release()
def get_remote_version(self):
""" Return the version of the remote Gerrit server. """
@@ -138,17 +150,24 @@ class GerritSSHClient(SSHClient):
def run_gerrit_command(self, command):
""" Run the given command.
- Run `command` and return a `GerritSSHCommandResult`.
+ Make sure we're connected to the remote server, and run `command`.
+
+ Return the results as a `GerritSSHCommandResult`.
- Raise `ValueError` if `command` is not a string.
+ Raise `ValueError` if `command` is not a string, or `GerritError` if
+ command execution fails.
"""
if not isinstance(command, basestring):
raise ValueError("command must be a string")
gerrit_command = "gerrit " + command
+ self._connect()
try:
- stdin, stdout, stderr = self.exec_command(gerrit_command)
+ stdin, stdout, stderr = self.exec_command(gerrit_command,
+ bufsize=1,
+ timeout=None,
+ get_pty=False)
except SSHException as err:
raise GerritError("Command execution error: %s" % err)
return GerritSSHCommandResult(command, stdin, stdout, stderr)
diff --git a/pygerrit/stream.py b/pygerrit/stream.py
index d7633e8..37b09f1 100644
--- a/pygerrit/stream.py
+++ b/pygerrit/stream.py
@@ -26,23 +26,9 @@ Class to listen to the Gerrit event stream and dispatch events.
"""
-import json
-import logging
-from select import poll, POLLIN
from threading import Thread, Event
-from .error import GerritError
-from .events import GerritEvent, GerritEventFactory
-
-
-@GerritEventFactory.register("gerrit-stream-error")
-class GerritStreamErrorEvent(GerritEvent):
-
- """ Represents an error when handling the gerrit event stream. """
-
- def __init__(self, json_data):
- super(GerritStreamErrorEvent, self).__init__()
- self.error = json_data["error"]
+from .events import ErrorEvent
class GerritStream(Thread):
@@ -62,29 +48,22 @@ class GerritStream(Thread):
def _error_event(self, error):
""" Dispatch `error` to the Gerrit client. """
- json_data = json.loads('{"type":"gerrit-stream-error",'
- '"error":"%s"}' % str(error))
- self._gerrit.put_event(json_data)
+ self._gerrit.put_event(ErrorEvent.error_json(error))
def run(self):
""" Listen to the stream and send events to the client. """
- try:
- result = self._ssh_client.run_gerrit_command("stream-events")
- except GerritError as e:
- self._error_event(e)
- else:
- poller = poll()
- stdout = result.stdout
- poller.register(stdout.channel)
- while not self._stop.is_set():
- data = poller.poll()
- for (handle, event) in data:
- if handle == stdout.channel.fileno() and event == POLLIN:
- try:
- line = stdout.readline()
- json_data = json.loads(line)
- self._gerrit.put_event(json_data)
- except (ValueError, IOError) as err:
- self._error_event(err)
- except GerritError as err:
- logging.error("Failed to put event: %s", err)
+ channel = self._ssh_client.get_transport().open_session()
+ channel.exec_command("gerrit stream-events")
+ stdout = channel.makefile()
+ stderr = channel.makefile_stderr()
+ while not self._stop.is_set():
+ if channel.exit_status_ready():
+ if channel.recv_stderr_ready():
+ error = stderr.readline().strip()
+ else:
+ error = "Remote server connection closed"
+ self._error_event(error)
+ self._stop.set()
+ elif channel.recv_ready():
+ data = stdout.readline()
+ self._gerrit.put_event(data)