summaryrefslogtreecommitdiff
path: root/zuul/driver/gerrit/gerritconnection.py
diff options
context:
space:
mode:
Diffstat (limited to 'zuul/driver/gerrit/gerritconnection.py')
-rw-r--r--zuul/driver/gerrit/gerritconnection.py589
1 files changed, 427 insertions, 162 deletions
diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py
index 1c4173005..8a64bad7c 100644
--- a/zuul/driver/gerrit/gerritconnection.py
+++ b/zuul/driver/gerrit/gerritconnection.py
@@ -13,36 +13,100 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+import datetime
+import itertools
import json
+import logging
+import paramiko
+import pprint
+import queue
import re
import re2
+import requests
import select
+import shlex
import threading
import time
import urllib
-import paramiko
-import logging
-import pprint
-import shlex
-import queue
import urllib.parse
-import requests
from typing import Dict, List
from uuid import uuid4
+from zuul import version as zuul_version
from zuul.connection import BaseConnection
+from zuul.driver.gerrit.auth import FormAuth
+from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Tag, Branch, Project
-from zuul import exceptions
-from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
-from zuul.driver.gerrit.auth import FormAuth
-from zuul import version as zuul_version
# HTTP timeout in seconds
TIMEOUT = 30
+class GerritChangeData(object):
+ """Compatability layer for SSH/HTTP
+
+ This class holds the raw data returned from a change query over
+ SSH or HTTP. Most of the work of parsing the data and storing it
+ on the change is in the gerritmodel.GerritChange class, however
+ this does perform a small amount of parsing of dependencies since
+ they are handled outside of that class. This provides an API to
+ that data independent of the source.
+
+ """
+
+ SSH = 1
+ HTTP = 2
+
+ def __init__(self, fmt, data, related=None):
+ self.format = fmt
+ self.data = data
+
+ if fmt == self.SSH:
+ self.parseSSH(data)
+ else:
+ self.parseHTTP(data)
+ if related:
+ self.parseRelatedHTTP(data, related)
+
+ def parseSSH(self, data):
+ self.needed_by = []
+ self.depends_on = None
+ self.message = data['commitMessage']
+ self.current_patchset = str(data['currentPatchSet']['number'])
+ self.number = str(data['number'])
+
+ if 'dependsOn' in data:
+ parts = data['dependsOn'][0]['ref'].split('/')
+ self.depends_on = (parts[3], parts[4])
+
+ for needed in data.get('neededBy', []):
+ parts = needed['ref'].split('/')
+ self.needed_by.append((parts[3], parts[4]))
+
+ def parseHTTP(self, data):
+ rev = data['revisions'][data['current_revision']]
+ self.message = rev['commit']['message']
+ self.current_patchset = str(rev['_number'])
+ self.number = str(data['_number'])
+
+ def parseRelatedHTTP(self, data, related):
+ self.needed_by = []
+ self.depends_on = None
+ current_rev = data['revisions'][data['current_revision']]
+ for change in related['changes']:
+ for parent in current_rev['commit']['parents']:
+ if change['commit']['commit'] == parent['commit']:
+ self.depends_on = (change['_change_number'],
+ change['_revision_number'])
+ break
+ else:
+ self.needed_by.append((change['_change_number'],
+ change['_revision_number']))
+
+
class GerritEventConnector(threading.Thread):
"""Move events from Gerrit to the scheduler."""
@@ -80,6 +144,7 @@ class GerritEventConnector(threading.Thread):
log = get_annotated_logger(self.log, event)
event.type = data.get('type')
+ event.uuid = data.get('uuid')
# This catches when a change is merged, as it could potentially
# have merged layout info which will need to be read in.
# Ideally this would be done with a refupdate event so as to catch
@@ -131,6 +196,7 @@ class GerritEventConnector(threading.Thread):
'ref-replication-scheduled': None,
'topic-changed': 'changer',
'project-created': None, # Gerrit 2.14
+ 'pending-check': None, # Gerrit 3.0+
}
event.account = None
if event.type in accountfield_from_type:
@@ -294,6 +360,58 @@ class GerritWatcher(threading.Thread):
self._stopped = True
+class GerritPoller(threading.Thread):
+ # Poll gerrit without stream-events
+ log = logging.getLogger("gerrit.GerritPoller")
+ poll_interval = 30
+
+ def __init__(self, connection):
+ threading.Thread.__init__(self)
+ self.connection = connection
+ self._stopped = False
+ self._stop_event = threading.Event()
+
+ def _makeEvent(self, change, uuid, check):
+ return {'type': 'pending-check',
+ 'uuid': uuid,
+ 'change': {
+ 'project': change['patch_set']['repository'],
+ 'number': change['patch_set']['change_number'],
+ },
+ 'patchSet': {
+ 'number': change['patch_set']['patch_set_id'],
+ }}
+
+ def _run(self):
+ try:
+ for checker in self.connection.watched_checkers:
+ changes = self.connection.get(
+ 'plugins/checks/checks.pending/?'
+ 'query=checker:%s+(state:NOT_STARTED)' % checker)
+ for change in changes:
+ for uuid, check in change['pending_checks'].items():
+ event = self._makeEvent(change, uuid, check)
+ self.connection.addEvent(event)
+ except Exception:
+ self.log.exception("Exception on Gerrit poll with %s:",
+ self.connection.connection_name)
+
+ def run(self):
+ last_start = time.time()
+ while not self._stopped:
+ next_start = last_start + self.poll_interval
+ self._stop_event.wait(max(next_start - time.time(), 0))
+ if self._stopped:
+ break
+ last_start = time.time()
+ self._run()
+
+ def stop(self):
+ self.log.debug("Stopping watcher")
+ self._stopped = True
+ self._stop_event.set()
+
+
class GerritConnection(BaseConnection):
driver_name = 'gerrit'
log = logging.getLogger("zuul.GerritConnection")
@@ -305,6 +423,7 @@ class GerritConnection(BaseConnection):
r"@{|\.\.|\.$|^@$|/$|^/|//+") # everything else we can check with re2
replication_timeout = 300
replication_retry_interval = 5
+ _poller_class = GerritPoller
def __init__(self, driver, connection_name, connection_config):
super(GerritConnection, self).__init__(driver, connection_name,
@@ -324,9 +443,20 @@ class GerritConnection(BaseConnection):
self.port = int(self.connection_config.get('port', 29418))
self.keyfile = self.connection_config.get('sshkey', None)
self.keepalive = int(self.connection_config.get('keepalive', 60))
+ # TODO(corvus): Document this when the checks api is stable;
+ # it's not useful without it.
+ self.enable_stream_events = self.connection_config.get(
+ 'stream_events', True)
+ if self.enable_stream_events not in [
+ 'true', 'True', '1', 1, 'TRUE', True]:
+ self.enable_stream_events = False
self.watcher_thread = None
+ self.poller_thread = None
self.event_queue = queue.Queue()
self.client = None
+ self.watched_checkers = []
+ self.project_checker_map = {}
+ self.version = (0, 0, 0)
self.baseurl = self.connection_config.get(
'baseurl', 'https://%s' % self.server).rstrip('/')
@@ -362,6 +492,42 @@ class GerritConnection(BaseConnection):
self.auth = authclass(
self.user, self.password)
+ def setWatchedCheckers(self, checkers_to_watch):
+ self.log.debug("Setting watched checkers to %s", checkers_to_watch)
+ self.watched_checkers = set()
+ self.project_checker_map = {}
+ schemes_to_watch = set()
+ uuids_to_watch = set()
+ for x in checkers_to_watch:
+ if 'scheme' in x:
+ schemes_to_watch.add(x['scheme'])
+ if 'uuid' in x:
+ uuids_to_watch.add(x['uuid'])
+ if schemes_to_watch:
+ # get a list of all configured checkers
+ try:
+ configured_checkers = self.get('plugins/checks/checkers/')
+ except Exception:
+ self.log.exception("Unable to get checkers")
+ configured_checkers = []
+
+ # filter it through scheme matches in checkers_to_watch
+ for checker in configured_checkers:
+ if checker['status'] != 'ENABLED':
+ continue
+ checker_scheme, checker_id = checker['uuid'].split(':')
+ repo = checker['repository']
+ repo = self.canonical_hostname + '/' + repo
+ # map scheme matches to project names
+ if checker_scheme in schemes_to_watch:
+ repo_checkers = self.project_checker_map.setdefault(
+ repo, set())
+ repo_checkers.add(checker['uuid'])
+ self.watched_checkers.add(checker['uuid'])
+ # add uuids from checkers_to_watch
+ for x in uuids_to_watch:
+ self.watched_checkers.add(x)
+
def toDict(self):
d = super().toDict()
d.update({
@@ -375,6 +541,28 @@ class GerritConnection(BaseConnection):
def url(self, path):
return self.baseurl + '/a/' + path
+ def get(self, path):
+ url = self.url(path)
+ self.log.debug('GET: %s' % (url,))
+ r = self.session.get(
+ url,
+ verify=self.verify_ssl,
+ auth=self.auth, timeout=TIMEOUT,
+ headers={'User-Agent': self.user_agent})
+ self.log.debug('Received: %s %s' % (r.status_code, r.text,))
+ if r.status_code != 200:
+ raise Exception("Received response %s" % (r.status_code,))
+ ret = None
+ if r.text and len(r.text) > 4:
+ try:
+ ret = json.loads(r.text[4:])
+ except Exception:
+ self.log.exception(
+ "Unable to parse result %s from post to %s" %
+ (r.text, url))
+ raise
+ return ret
+
def post(self, path, data):
url = self.url(path)
self.log.debug('POST: %s' % (url,))
@@ -510,7 +698,7 @@ class GerritConnection(BaseConnection):
log.debug("Updating %s: Running query %s to find needed changes",
change, query)
records.extend(self.simpleQuery(query, event=event))
- return records
+ return [(x.number, x.current_patchset) for x in records]
def _getNeededByFromCommit(self, change_id, change, event):
log = get_annotated_logger(self.log, event)
@@ -522,11 +710,10 @@ class GerritConnection(BaseConnection):
results = self.simpleQuery(query, event=event)
for result in results:
for match in self.depends_on_re.findall(
- result['commitMessage']):
+ result.message):
if match != change_id:
continue
- key = (str(result['number']),
- str(result['currentPatchSet']['number']))
+ key = (result.number, result.current_patchset)
if key in seen:
continue
log.debug("Updating %s: Found change %s,%s "
@@ -534,7 +721,7 @@ class GerritConnection(BaseConnection):
change, key[0], key[1], change_id)
seen.add(key)
records.append(result)
- return records
+ return [(x.number, x.current_patchset) for x in records]
def _updateChange(self, change, event, history):
log = get_annotated_logger(self.log, event)
@@ -562,54 +749,16 @@ class GerritConnection(BaseConnection):
log.info("Updating %s", change)
data = self.queryChange(change.number, event=event)
- change._data = data
-
- if change.patchset is None:
- change.patchset = str(data['currentPatchSet']['number'])
- if 'project' not in data:
- raise exceptions.ChangeNotFound(change.number, change.patchset)
- change.project = self.source.getProject(data['project'])
- change.id = data['id']
- change.branch = data['branch']
- change.url = data['url']
- urlparse = urllib.parse.urlparse(self.baseurl)
- baseurl = "%s%s" % (urlparse.netloc, urlparse.path)
- baseurl = baseurl.rstrip('/')
- change.uris = [
- '%s/%s' % (baseurl, change.number),
- '%s/#/c/%s' % (baseurl, change.number),
- '%s/c/%s/+/%s' % (baseurl, change.project.name, change.number),
- ]
-
- max_ps = 0
- files = []
- for ps in data['patchSets']:
- if str(ps['number']) == change.patchset:
- change.ref = ps['ref']
- change.commit = ps['revision']
- for f in ps.get('files', []):
- files.append(f['file'])
- if int(ps['number']) > int(max_ps):
- max_ps = str(ps['number'])
- if max_ps == change.patchset:
- change.is_current_patchset = True
- else:
- change.is_current_patchset = False
- change.files = files
+ change.update(data, self)
- change.is_merged = self._isMerged(change)
- change.approvals = data['currentPatchSet'].get('approvals', [])
- change.open = data['open']
- change.status = data['status']
- change.owner = data['owner']
- change.message = data['commitMessage']
+ if not change.is_merged:
+ self._updateChangeDependencies(log, change, data, event, history)
- if change.is_merged:
- # This change is merged, so we don't need to look any further
- # for dependencies.
- log.debug("Updating %s: change is merged", change)
- return change
+ self.sched.onChangeUpdated(change, event)
+ return change
+
+ def _updateChangeDependencies(self, log, change, data, event, history):
if history is None:
history = []
else:
@@ -618,9 +767,8 @@ class GerritConnection(BaseConnection):
needs_changes = set()
git_needs_changes = []
- if 'dependsOn' in data:
- parts = data['dependsOn'][0]['ref'].split('/')
- dep_num, dep_ps = parts[3], parts[4]
+ if data.depends_on is not None:
+ dep_num, dep_ps = data.depends_on
log.debug("Updating %s: Getting git-dependent change %s,%s",
change, dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps, history=history,
@@ -634,10 +782,8 @@ class GerritConnection(BaseConnection):
change.git_needs_changes = git_needs_changes
compat_needs_changes = []
- for record in self._getDependsOnFromCommit(data['commitMessage'],
- change, event):
- dep_num = str(record['number'])
- dep_ps = str(record['currentPatchSet']['number'])
+ for (dep_num, dep_ps) in self._getDependsOnFromCommit(
+ change.message, change, event):
log.debug("Updating %s: Getting commit-dependent "
"change %s,%s", change, dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps, history=history,
@@ -649,24 +795,20 @@ class GerritConnection(BaseConnection):
needed_by_changes = set()
git_needed_by_changes = []
- if 'neededBy' in data:
- for needed in data['neededBy']:
- parts = needed['ref'].split('/')
- dep_num, dep_ps = parts[3], parts[4]
- log.debug("Updating %s: Getting git-needed change %s,%s",
- change, dep_num, dep_ps)
- dep = self._getChange(dep_num, dep_ps, history=history,
- event=event)
- if (dep.open and dep.is_current_patchset and
- dep not in needed_by_changes):
- git_needed_by_changes.append(dep)
- needed_by_changes.add(dep)
+ for (dep_num, dep_ps) in data.needed_by:
+ log.debug("Updating %s: Getting git-needed change %s,%s",
+ change, dep_num, dep_ps)
+ dep = self._getChange(dep_num, dep_ps, history=history,
+ event=event)
+ if (dep.open and dep.is_current_patchset and
+ dep not in needed_by_changes):
+ git_needed_by_changes.append(dep)
+ needed_by_changes.add(dep)
change.git_needed_by_changes = git_needed_by_changes
compat_needed_by_changes = []
- for record in self._getNeededByFromCommit(data['id'], change, event):
- dep_num = str(record['number'])
- dep_ps = str(record['currentPatchSet']['number'])
+ for (dep_num, dep_ps) in self._getNeededByFromCommit(
+ change.id, change, event):
log.debug("Updating %s: Getting commit-needed change %s,%s",
change, dep_num, dep_ps)
# Because a commit needed-by may be a cross-repo
@@ -683,10 +825,6 @@ class GerritConnection(BaseConnection):
needed_by_changes.add(dep)
change.compat_needed_by_changes = compat_needed_by_changes
- self.sched.onChangeUpdated(change, event)
-
- return change
-
def isMerged(self, change, head=None):
self.log.debug("Checking if change %s is merged" % change)
if not change.number:
@@ -696,8 +834,7 @@ class GerritConnection(BaseConnection):
return True
data = self.queryChange(change.number)
- change._data = data
- change.is_merged = self._isMerged(change)
+ change.update(data, self)
if change.is_merged:
self.log.debug("Change %s is merged" % (change,))
else:
@@ -717,17 +854,6 @@ class GerritConnection(BaseConnection):
(change))
return False
- def _isMerged(self, change):
- data = change._data
- if not data:
- return False
- status = data.get('status')
- if not status:
- return False
- if status == 'MERGED':
- return True
- return False
-
def _waitForRefSha(self, project: Project,
ref: str, old_sha: str='') -> bool:
# Wait for the ref to show up in the repo
@@ -756,35 +882,9 @@ class GerritConnection(BaseConnection):
# Good question. It's probably ref-updated, which, ah,
# means it's merged.
return True
- data = change._data
- if not data:
- return False
- if 'submitRecords' not in data:
- return False
- try:
- for sr in data['submitRecords']:
- if sr['status'] == 'OK':
- return True
- elif sr['status'] == 'NOT_READY':
- for label in sr['labels']:
- if label['status'] in ['OK', 'MAY']:
- continue
- elif label['status'] in ['NEED', 'REJECT']:
- # It may be our own rejection, so we ignore
- if label['label'] not in allow_needs:
- return False
- continue
- else:
- # IMPOSSIBLE
- return False
- else:
- # CLOSED, RULE_ERROR
- return False
- except Exception:
- log.exception("Exception determining whether change"
- "%s can merge:", change)
- return False
- return True
+ if change.missing_labels < set(allow_needs):
+ return True
+ return False
def getProjectOpenChanges(self, project: Project) -> List[GerritChange]:
# This is a best-effort function in case Gerrit is unable to return
@@ -797,11 +897,10 @@ class GerritConnection(BaseConnection):
for record in data:
try:
changes.append(
- self._getChange(record['number'],
- record['currentPatchSet']['number']))
+ self._getChange(record.number, record.current_patchset))
except Exception:
- self.log.exception("Unable to query change %s" %
- (record.get('number'),))
+ self.log.exception("Unable to query change %s",
+ record.number)
return changes
@staticmethod
@@ -836,17 +935,18 @@ class GerritConnection(BaseConnection):
def eventDone(self):
self.event_queue.task_done()
- def review(self, change, message, action={},
+ def review(self, item, message, action={},
file_comments={}, zuul_event_id=None):
if self.session:
meth = self.review_http
else:
meth = self.review_ssh
- return meth(change, message, action=action,
+ return meth(item, message, action=action,
file_comments=file_comments, zuul_event_id=zuul_event_id)
- def review_ssh(self, change, message, action={},
+ def review_ssh(self, item, message, action={},
file_comments={}, zuul_event_id=None):
+ change = item.change
project = change.project.name
cmd = 'gerrit review --project %s' % project
if message:
@@ -861,14 +961,56 @@ class GerritConnection(BaseConnection):
out, err = self._ssh(cmd, zuul_event_id=zuul_event_id)
return err
- def review_http(self, change, message, action={},
+ def report_checks(self, log, item, changeid, checkinfo):
+ change = item.change
+ checkinfo = checkinfo.copy()
+ uuid = checkinfo.pop('uuid', None)
+ scheme = checkinfo.pop('scheme', None)
+ if uuid is None:
+ uuids = self.project_checker_map.get(
+ change.project.canonical_name, set())
+ for u in uuids:
+ if u.split(':')[0] == scheme:
+ uuid = u
+ break
+ if uuid is None:
+ log.error("Unable to find matching checker for %s %s",
+ item, checkinfo)
+ return
+
+ def fmt(t):
+ return str(datetime.datetime.fromtimestamp(t))
+
+ if item.enqueue_time:
+ checkinfo['started'] = fmt(item.enqueue_time)
+ if item.report_time:
+ checkinfo['finished'] = fmt(item.report_time)
+ url = item.formatStatusUrl()
+ if url:
+ checkinfo['url'] = url
+ if checkinfo:
+ for x in range(1, 4):
+ try:
+ self.post('changes/%s/revisions/%s/checks/%s' %
+ (changeid, change.commit, uuid),
+ checkinfo)
+ break
+ except Exception:
+ log.exception("Error submitting check data to gerrit, "
+ "attempt %s", x)
+ time.sleep(x * 10)
+
+ def review_http(self, item, message, action={},
file_comments={}, zuul_event_id=None):
+ change = item.change
log = get_annotated_logger(self.log, zuul_event_id)
data = dict(message=message,
strict_labels=False)
submit = False
labels = {}
for key, val in action.items():
+ if key == 'checks_api':
+ continue
if val is True:
if key == 'submit':
submit = True
@@ -878,33 +1020,51 @@ class GerritConnection(BaseConnection):
if labels:
data['labels'] = labels
if file_comments:
- data['comments'] = file_comments
+ if self.version >= (2, 15, 0):
+ file_comments = copy.deepcopy(file_comments)
+ url = item.formatStatusUrl()
+ for comments in itertools.chain(file_comments.values()):
+ for comment in comments:
+ comment['robot_id'] = 'zuul'
+ comment['robot_run_id'] = \
+ item.current_build_set.uuid
+ if url:
+ comment['url'] = url
+ data['robot_comments'] = file_comments
+ else:
+ data['comments'] = file_comments
+ data['tag'] = 'autogenerated:zuul:%s' % (item.pipeline.name)
changeid = "%s~%s~%s" % (
urllib.parse.quote(str(change.project), safe=''),
urllib.parse.quote(str(change.branch), safe=''),
change.id)
- for x in range(1, 4):
- try:
- self.post('changes/%s/revisions/%s/review' %
- (changeid, change.commit),
- data)
- break
- except Exception:
- log.exception("Error submitting data to gerrit, attempt %s", x)
- time.sleep(x * 10)
+ if 'checks_api' in action:
+ self.report_checks(log, item, changeid, action['checks_api'])
+ if (message or data.get('labels') or data.get('comments')):
+ for x in range(1, 4):
+ try:
+ self.post('changes/%s/revisions/%s/review' %
+ (changeid, change.commit),
+ data)
+ break
+ except Exception:
+ log.exception(
+ "Error submitting data to gerrit, attempt %s", x)
+ time.sleep(x * 10)
if change.is_current_patchset and submit:
- try:
- self.post('changes/%s/submit' % (changeid,), {})
- except Exception:
- log.exception("Error submitting data to gerrit, attempt %s", x)
- time.sleep(x * 10)
-
- def query(self, query, event=None):
+ for x in range(1, 4):
+ try:
+ self.post('changes/%s/submit' % (changeid,), {})
+ except Exception:
+ log.exception(
+ "Error submitting data to gerrit, attempt %s", x)
+ time.sleep(x * 10)
+
+ def queryChangeSSH(self, number, event=None):
args = '--all-approvals --comments --commit-message'
args += ' --current-patch-set --dependencies --files'
args += ' --patch-sets --submit-records'
- cmd = 'gerrit query --format json %s %s' % (
- args, query)
+ cmd = 'gerrit query --format json %s %s' % (args, number)
out, err = self._ssh(cmd)
if not out:
return False
@@ -919,10 +1079,23 @@ class GerritConnection(BaseConnection):
pprint.pformat(data))
return data
+ def queryChangeHTTP(self, number, event=None):
+ data = self.get('changes/%s?o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&'
+ 'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&'
+ 'o=DETAILED_LABELS' % (number,))
+ related = self.get('changes/%s/revisions/%s/related' % (
+ number, data['current_revision']))
+ return data, related
+
def queryChange(self, number, event=None):
- return self.query('change:%s' % number, event=event)
+ if self.session:
+ data, related = self.queryChangeHTTP(number, event=event)
+ return GerritChangeData(GerritChangeData.HTTP, data, related)
+ else:
+ data = self.queryChangeSSH(number, event=event)
+ return GerritChangeData(GerritChangeData.SSH, data)
- def simpleQuery(self, query, event=None):
+ def simpleQuerySSH(self, query, event=None):
def _query_chunk(query, event):
args = '--commit-message --current-patch-set'
@@ -974,9 +1147,63 @@ class GerritConnection(BaseConnection):
"%s %s" % (query, resume), event)
return alldata
+ def simpleQueryHTTP(self, query, event=None):
+ iolog = get_annotated_logger(self.iolog, event)
+ changes = []
+ sortkey = ''
+ done = False
+ offset = 0
+ while not done:
+ # We don't actually want to limit to 500, but that's the
+ # server-side default, and if we don't specify this, we
+ # won't get a _more_changes flag.
+ q = ('changes/?n=500%s&o=CURRENT_REVISION&o=CURRENT_COMMIT&'
+ 'q=%s' % (sortkey, query))
+ iolog.debug('Query: %s', q)
+ batch = self.get(q)
+ iolog.debug("Received data from Gerrit query: \n%s",
+ pprint.pformat(batch))
+ done = True
+ if batch:
+ changes += batch
+ if '_more_changes' in batch[-1]:
+ done = False
+ if '_sortkey' in batch[-1]:
+ sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
+ else:
+ offset += len(batch)
+ sortkey = '&start=%s' % (offset,)
+ return changes
+
+ def simpleQuery(self, query, event=None):
+ if self.session:
+ # None of the users of this method require dependency
+ # data, so we only perform the change query and omit the
+ # related changes query.
+ alldata = self.simpleQueryHTTP(query, event=event)
+ return [GerritChangeData(GerritChangeData.HTTP, data)
+ for data in alldata]
+ else:
+ alldata = self.simpleQuerySSH(query, event=event)
+ return [GerritChangeData(GerritChangeData.SSH, data)
+ for data in alldata]
+
def _uploadPack(self, project: Project) -> str:
- cmd = "git-upload-pack %s" % project.name
- out, err = self._ssh(cmd, "0000")
+ if self.session:
+ url = ('%s/%s/info/refs?service=git-upload-pack' %
+ (self.baseurl, project.name))
+ r = self.session.get(
+ url,
+ verify=self.verify_ssl,
+ auth=self.auth, timeout=TIMEOUT,
+ headers={'User-Agent': self.user_agent})
+ self.iolog.debug('Received: %s %s' % (r.status_code, r.text,))
+ if r.status_code != 200:
+ raise Exception("Received response %s" % (r.status_code,))
+ out = r.text[r.text.find('\n') + 5:]
+ else:
+ cmd = "git-upload-pack %s" % project.name
+ out, err = self._ssh(cmd, "0000")
return out
def _open(self):
@@ -1065,8 +1292,14 @@ class GerritConnection(BaseConnection):
return ret
def getGitUrl(self, project: Project) -> str:
- url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port,
- project.name)
+ if self.session:
+ baseurl = list(urllib.parse.urlparse(self.baseurl))
+ baseurl[1] = '%s:%s@%s' % (self.user, self.password, baseurl[1])
+ baseurl = urllib.parse.urlunparse(baseurl)
+ url = ('%s/%s' % (baseurl, project.name))
+ else:
+ url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port,
+ project.name)
return url
def _getWebUrl(self, project: Project, sha: str=None) -> str:
@@ -1075,14 +1308,37 @@ class GerritConnection(BaseConnection):
project=project.getSafeAttributes(),
sha=sha)
+ def _getRemoteVersion(self):
+ version = self.get('config/server/version')
+ base = version.split('-')[0]
+ parts = base.split('.')
+ major = minor = micro = 0
+ if len(parts) > 0:
+ major = int(parts[0])
+ if len(parts) > 1:
+ minor = int(parts[1])
+ if len(parts) > 2:
+ micro = int(parts[2])
+ self.version = (major, minor, micro)
+ self.log.info("Remote version is: %s (parsed as %s)" %
+ (version, self.version))
+
def onLoad(self):
self.log.debug("Starting Gerrit Connection/Watchers")
- self._start_watcher_thread()
+ try:
+ self._getRemoteVersion()
+ except Exception:
+ self.log.exception("Unable to determine remote Gerrit version")
+
+ if self.enable_stream_events:
+ self._start_watcher_thread()
+ self._start_poller_thread()
self._start_event_connector()
def onStop(self):
self.log.debug("Stopping Gerrit Connection/Watchers")
self._stop_watcher_thread()
+ self._stop_poller_thread()
self._stop_event_connector()
def _stop_watcher_thread(self):
@@ -1100,6 +1356,15 @@ class GerritConnection(BaseConnection):
keepalive=self.keepalive)
self.watcher_thread.start()
+ def _stop_poller_thread(self):
+ if self.poller_thread:
+ self.poller_thread.stop()
+ self.poller_thread.join()
+
+ def _start_poller_thread(self):
+ self.poller_thread = self._poller_class(self)
+ self.poller_thread.start()
+
def _stop_event_connector(self):
if self.gerrit_event_connector:
self.gerrit_event_connector.stop()