diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-04-15 10:54:47 -0400 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2021-04-15 10:54:47 -0400 |
commit | 23e13997be3311aa25a503d1ede01de9b9c5bef0 (patch) | |
tree | ec18118f0d6276f654a15d060c5621e320e85335 | |
parent | adb5cd690a51db1a842675097fb2849f7bb01fd9 (diff) | |
download | korg-helpers-23e13997be3311aa25a503d1ede01de9b9c5bef0.tar.gz |
Remove obsolete scripts
Both attest-patches and get-lore-mbox are now part of b4.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rwxr-xr-x | attest-patches.py | 545 | ||||
-rwxr-xr-x | get-lore-mbox.py | 1186 |
2 files changed, 0 insertions, 1731 deletions
diff --git a/attest-patches.py b/attest-patches.py deleted file mode 100755 index f9b6325..0000000 --- a/attest-patches.py +++ /dev/null @@ -1,545 +0,0 @@ -#!/usr/bin/env python3 -# SPDX-License-Identifier: GPL-2.0-or-later -# !EXPERIMENTAL! -# Proof of concept for patch attestation using signatures@kernel.org -# pseudo-list. Do not use for anything useful, as in its current form -# it doesn't cover a bunch of malicious use-cases. -# -# -*- coding: utf-8 -*- -# -__author__ = 'Konstantin Ryabitsev <konstantin@linuxfoundation.org>' - -import os -import sys -import argparse -import logging -import hashlib -import subprocess -import re -import email.message -import email.utils -import mailbox -import urllib -import requests -import smtplib - -from tempfile import mkstemp - -HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@') -FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)') - -# Used for caching attestation data lookups -ATTESTATION_DATA = dict() -# Used for keeping a mapping of subkeys to UIDs -SUBKEY_DATA = dict() -# Used for keeping a list of validation errors -VALIDATION_ERRORS = set() - -logger = logging.getLogger('attest-patches') - -VERSION = '0.1' -ATTESTATION_FORMAT = '0.1' - -GPGBIN = 'gpg2' -GPGTRUSTMODEL = 'pgp' - - -def get_config_from_git(regexp, defaults=None): - args = ['config', '-z', '--get-regexp', regexp] - ecode, out = git_run_command(None, args) - gitconfig = defaults - if not gitconfig: - gitconfig = dict() - if not out: - return gitconfig - - for line in out.split('\x00'): - if not line: - continue - key, value = line.split('\n', 1) - try: - chunks = key.split('.') - cfgkey = chunks[-1] - gitconfig[cfgkey.lower()] = value - except ValueError: - logger.debug('Ignoring git config entry %s', line) - - return gitconfig - - -def _run_command(cmdargs, stdin=None, logstderr=False): - logger.debug('Running %s' % ' '.join(cmdargs)) - - sp = subprocess.Popen(cmdargs, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE) - - (output, error) = sp.communicate(input=stdin) - - output = output.decode('utf-8', errors='replace') - - if logstderr and len(error.strip()): - logger.debug('Stderr: %s', error.decode('utf-8', errors='replace')) - - return sp.returncode, output - - -def gpg_run_command(cmdargs, stdin=None, logstderr=False): - logger.debug('Running %s' % ' '.join(cmdargs)) - - return _run_command(cmdargs, stdin=stdin, logstderr=logstderr) - - -def git_run_command(gitdir, args, stdin=None, logstderr=False): - cmdargs = ['git', '--no-pager'] - if gitdir: - cmdargs += ['--git-dir', gitdir] - cmdargs += args - - return _run_command(cmdargs, stdin=stdin, logstderr=logstderr) - - -def get_mailinfo_hashes(content): - msg_out = mkstemp() - patch_out = mkstemp() - cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]] - ecode, info = git_run_command(None, cmdargs, content) - if ecode > 0: - logger.critical('ERROR: Could not get mailinfo') - return None, None, None - logger.debug(info) - ihasher = hashlib.sha256() - for line in info.split('\n'): - # We don't use the "Date:" field because it is likely to be - # mangled between when git-format-patch generates it and - # when it is sent out by git-send-email (or other tools). - # TODO: We can do some basic date sanity checking by - # looking at the PGP signature date and making sure - # that it is within a sane limit compared to the - # commit, though it is unlikely to matter for attestation - if re.search(r'^(Author|Email|Subject):', line): - ihasher.update((line + '\n').encode('utf-8')) - ihash = ihasher.hexdigest() - - with open(msg_out[1], 'r') as mfh: - msg = mfh.read() - mhasher = hashlib.sha256() - mhasher.update(msg.encode('utf-8')) - mhash = mhasher.hexdigest() - os.unlink(msg_out[1]) - - with open(patch_out[1], 'r') as pfh: - patch = pfh.read() - if len(patch.strip()): - phash = get_patch_hash(patch) - else: - phash = None - os.unlink(patch_out[1]) - - return ihash, mhash, phash - - -def get_patch_hash(diff): - # The aim is to represent the patch as if you did the following: - # git diff HEAD~.. | dos2unix | sha256sum - # - # This subroutine removes anything at the beginning of diff data, like - # diffstat or any other auxiliary data, and anything trailing at the end - # XXX: This currently doesn't work for git binary patches - # - diff = diff.replace('\r', '') - diff = diff.strip() + '\n' - - # For keeping a buffer of lines preceding @@ ... @@ - buflines = list() - - phasher = hashlib.sha256() - - # Used for counting where we are in the patch - pp = 0 - for line in diff.split('\n'): - hunk_match = HUNK_RE.match(line) - if hunk_match: - # logger.debug('Crunching %s', line) - mlines, plines = hunk_match.groups() - pp = int(plines) - addlines = list() - for bline in reversed(buflines): - # Go backward and add lines until we get to the start - # or encounter a blank line - if len(bline.strip()) == 0: - break - addlines.append(bline) - if addlines: - phasher.update(('\n'.join(reversed(addlines))+'\n').encode('utf-8')) - buflines = list() - # Feed this line to the hasher - phasher.update((line+'\n').encode('utf-8')) - continue - if pp > 0: - # Inside the patch - phasher.update((line+'\n').encode('utf-8')) - if line[0] != '-': - pp -= 1 - continue - # Not anything we recognize, so stick into buflines - buflines.append(line) - - return phasher.hexdigest() - - -def create_attestation(cmdargs): - attlines = list() - subject = 'Patch attestation' - for patchfile in cmdargs.attest: - with open(patchfile, 'rb') as fh: - content = fh.read() - ihash, mhash, phash = get_mailinfo_hashes(content) - if not phash: - logger.info('SKP | %s', os.path.basename(patchfile)) - # See if it's a cover letter - matches = re.search(r'^Subject:\s*\[([^\]]*)\s+0{1,3}/(\d{1,3})([^\]]*)]\s+(.*)', - content.decode('utf-8'), re.I | re.M) - if matches: - mgr = matches.groups() - # Set the subject to match cover letter - subject = '[%s %s/%s%s] %s' % (mgr[0].replace('PATCH', 'PSIGN'), 'X' * len(mgr[1]), - mgr[1], mgr[2], mgr[3]) - continue - logger.info('ADD | %s', os.path.basename(patchfile)) - attid = '%s-%s-%s' % (ihash[:8], mhash[:8], phash[:8]) - attlines.append('%s:' % attid) - attlines.append(' i: %s' % ihash) - attlines.append(' m: %s' % mhash) - attlines.append(' p: %s' % phash) - - payload = '\n'.join(attlines) - - usercfg = get_config_from_git(r'user\..*') - - gpgargs = [GPGBIN, '--batch'] - if 'signingkey' in usercfg: - gpgargs += ['-u', usercfg['signingkey']] - gpgargs += ['--clearsign', - '--comment', 'att-fmt-ver: %s' % ATTESTATION_FORMAT, - '--comment', 'att-hash: sha256', - ] - - ecode, signed = gpg_run_command(gpgargs, stdin=payload.encode('utf-8')) - if ecode > 0: - logger.critical('ERROR: Unable to sign using %s', GPGBIN) - sys.exit(1) - - att_msg = email.message.EmailMessage() - att_msg.set_payload(signed.encode('utf-8')) - sender = cmdargs.sender - if '>' not in sender: - sender = '<%s>' % sender - att_msg['From'] = sender - att_msg['To'] = '<signatures@kernel.org>' - att_msg['Message-Id'] = email.utils.make_msgid(domain='kernel.org') - att_msg['Subject'] = subject - - logger.info('---') - # Try to deliver it via mail.kernel.org - try: - mailserver = smtplib.SMTP('mail.kernel.org', 587) - # identify ourselves to smtp gmail client - mailserver.ehlo() - # secure our email with tls encryption - mailserver.starttls() - # re-identify ourselves as an encrypted connection - mailserver.ehlo() - logger.info('Delivering via mail.kernel.org') - if cmdargs.dryrun: - raise Exception('Dry-run, not delivering mail.') - mailserver.sendmail('devnull@kernel.org', 'signatures@kernel.org', att_msg.as_string()) - mailserver.quit() - sys.exit(0) - except Exception as ex: - logger.info('Could not deliver: %s', ex) - - # Future iterations will also be able to submit this to a RESTful URL - # at git.kernel.org, in order not to depend on avaialbility of SMTP gateways - with open(cmdargs.output, 'wb') as fh: - fh.write(att_msg.as_bytes()) - - logger.info('Wrote %s', cmdargs.output) - logger.info('You can send it using:') - logger.info(' sendmail -oi signatures@kernel.org < %s', cmdargs.output) - logger.info(' mutt -H %s', cmdargs.output) - - -def load_attestation_data(link, content): - global ATTESTATION_DATA - gpgargs = [GPGBIN, '--batch', '--verify', '--status-fd=1'] - if GPGTRUSTMODEL == 'tofu': - gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good'] - - ecode, output = gpg_run_command(gpgargs, stdin=content.encode('utf-8')) - good = False - valid = False - trusted = False - sigkey = None - siguid = None - if ecode == 0: - # We're looking for both GOODSIG and VALIDSIG - gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+(.*)$', output, re.M) - if gs_matches: - logger.debug(' GOODSIG') - good = True - sigkey, siguid = gs_matches.groups() - if re.search(r'^\[GNUPG:\] VALIDSIG', output, re.M): - logger.debug(' VALIDSIG') - valid = True - # Do we have a TRUST_(FULLY|ULTIMATE)? - matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M) - if matches: - logger.debug(' TRUST_%s', matches.groups()[0]) - trusted = True - else: - # Are we missing a key? - matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M) - if matches: - VALIDATION_ERRORS.update(('Missing public key: %s' % matches.groups()[0],)) - else: - VALIDATION_ERRORS.update(('PGP Validation failed for: %s' % link,)) - - siginfo = (good, valid, trusted, sigkey, siguid) - - # No need to go on if it's no good - if not good: - return - - ihash = mhash = phash = None - for line in content.split('\n'): - # It's a yaml, but we don't parse it as yaml for safety reasons - line = line.rstrip() - if re.search(r'^([0-9a-f-]{26}:|-----BEGIN.*)$', line): - if ihash and mhash and phash: - if (ihash, mhash, phash) not in ATTESTATION_DATA: - ATTESTATION_DATA[(ihash, mhash, phash)] = list() - ATTESTATION_DATA[(ihash, mhash, phash)].append(siginfo) - ihash = mhash = phash = None - continue - matches = re.search(r'^\s+([imp]):\s*([0-9a-f]{64})$', line) - if matches: - t = matches.groups()[0] - if t == 'i': - ihash = matches.groups()[1] - elif t == 'm': - mhash = matches.groups()[1] - elif t == 'p': - phash = matches.groups()[1] - - -def query_lore_signatures(attid, session): - global ATTESTATION_DATA - global VALIDATION_ERRORS - # XXX: Querying this via the Atom feed is a temporary kludge until we have - # proper search API on lore.kernel.org - queryurl = '%s?%s' % ('https://lore.kernel.org/signatures/', - urllib.parse.urlencode({'q': attid, 'x': 'A', 'o': '-1'})) - logger.debug('Query URL: %s', queryurl) - resp = session.get(queryurl) - content = resp.content.decode('utf-8') - matches = re.findall(r'link\s+href="([^"]+)".*?(-----BEGIN PGP SIGNED MESSAGE-----.*?-----END PGP SIGNATURE-----)', - content, flags=re.DOTALL) - - if not matches: - VALIDATION_ERRORS.update(('No matches found in the signatures archive on lore.',)) - return - - for link, sigdata in matches: - load_attestation_data(link, sigdata) - - -def load_attestation_file(afile): - with open(afile, 'r') as fh: - sigdata = fh.read() - load_attestation_data(afile, sigdata) - - -def get_lore_attestation(c_ihash, c_mhash, c_phash, session): - global ATTESTATION_DATA - if (c_ihash, c_mhash, c_phash) not in ATTESTATION_DATA: - attid = '%s-%s-%s' % (c_ihash[:8], c_mhash[:8], c_phash[:8]) - query_lore_signatures(attid, session) - - # This will throw a KeyError on non-match, which we bubble up - return ATTESTATION_DATA[(c_ihash, c_mhash, c_phash)] - - -def get_subkey_uids(keyid): - global SUBKEY_DATA - - if keyid in SUBKEY_DATA: - return SUBKEY_DATA[keyid] - - gpgargs = [GPGBIN, '--batch', '--with-colons', '--list-keys', keyid] - ecode, keyinfo = gpg_run_command(gpgargs) - if ecode > 0: - logger.critical('ERROR: Unable to get UIDs list matching key %s', keyid) - return None - uids = list() - for line in keyinfo.split('\n'): - if line[:4] != 'uid:': - continue - chunks = line.split(':') - if chunks[1] in ('r',): - # Revoked UID, ignore - continue - uids.append(chunks[9]) - - SUBKEY_DATA[keyid] = email.utils.getaddresses(uids) - return SUBKEY_DATA[keyid] - - -def get_matching_uid(keyid, msg): - uids = get_subkey_uids(keyid) - fromaddr = email.utils.getaddresses(msg.get_all('from', []))[0] - for uid in uids: - if fromaddr[1] == uid[1]: - return '%s <%s>' % uid - - return None - - -def verify_attestation(cmdargs): - mbx = mailbox.mbox(cmdargs.check) - if cmdargs.attfile: - load_attestation_file(cmdargs.attfile) - session = requests.session() - session.headers.update({'User-Agent': 'attest-patches/%s' % VERSION}) - ecode = 1 - attestors = set() - for msg in mbx: - content = msg.as_bytes() - ihash, mhash, phash = get_mailinfo_hashes(content) - if not phash: - logger.debug('SKIP | %s', msg['Subject']) - continue - logger.debug('Verifying: %s', msg['Subject']) - logger.debug(' i: %s', ihash) - logger.debug(' m: %s', mhash) - logger.debug(' p: %s', phash) - try: - adata = get_lore_attestation(ihash, mhash, phash, session) - except KeyError: - # No attestations found - logger.critical('FAIL | %s', msg['Subject']) - if not cmdargs.nofast: - logger.critical('Aborting due to failure.') - ecode = 1 - break - else: - ecode = 128 - continue - - for good, valid, trusted, sigkey, siguid in adata: - muid = get_matching_uid(sigkey, msg) - if muid is None and cmdargs.ignorefrom: - muid = siguid - if muid is not None: - if not trusted: - VALIDATION_ERRORS.update(('Insufficient owner trust (model=%s): %s (key=%s)' - % (GPGTRUSTMODEL, siguid, sigkey),)) - ecode = 128 - else: - if ecode != 128: - attestors.update(('%s (pgp:%s)' % (muid, sigkey),)) - ecode = 0 - break - else: - VALIDATION_ERRORS.update(('Attestation ignored due to From/UID mismatch: %s' % siguid,)) - ecode = 1 - - if ecode > 0: - logger.critical('FAIL | %s', msg['Subject']) - if not cmdargs.nofast: - logger.critical('Aborting due to failure.') - break - else: - logger.critical('PASS | %s', msg['Subject']) - - logger.critical('---') - if ecode > 0: - logger.critical('Attestation verification failed.') - if len(VALIDATION_ERRORS): - logger.critical('---') - logger.critical('The validation process reported the following errors:') - for error in VALIDATION_ERRORS: - logger.critical(' %s', error) - else: - logger.critical('All patches passed attestation:') - for attestor in attestors: - logger.critical(' Attestation-by: %s', attestor) - - sys.exit(ecode) - - -def main(cmdargs): - global GPGBIN - global GPGTRUSTMODEL - logger.setLevel(logging.DEBUG) - - ch = logging.StreamHandler() - formatter = logging.Formatter('%(message)s') - ch.setFormatter(formatter) - - if cmdargs.quiet: - ch.setLevel(logging.CRITICAL) - elif cmdargs.verbose: - ch.setLevel(logging.DEBUG) - else: - ch.setLevel(logging.INFO) - - logger.addHandler(ch) - gpgcfg = get_config_from_git(r'gpg\..*', {'program': GPGBIN}) - GPGBIN = gpgcfg['program'] - if cmdargs.tofu: - GPGTRUSTMODEL = 'tofu' - - if cmdargs.attest and cmdargs.check: - logger.critical('You cannot both --attest and --check. Pick one.') - sys.exit(1) - if cmdargs.attest: - create_attestation(cmdargs) - elif cmdargs.check: - verify_attestation(cmdargs) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument('-q', '--quiet', action='store_true', default=False, - help='Only output errors to the stdout') - parser.add_argument('-v', '--verbose', action='store_true', default=False, - help='Be more verbose in logging output') - # Attestation arguments - agroup = parser.add_argument_group('attestation', 'Attestation parameters') - agroup.add_argument('-a', '--attest', nargs='+', - help='Create attestation for patches') - # GDPR-proofing: by default, we add as little PII-sensitive info as possible - agroup.add_argument('-f', '--from', dest='sender', default='devnull@kernel.org', - help='Use custom From field (use with -a)') - agroup.add_argument('-o', '--output', default='attestation.eml', - help='Save attestation message in this file (use with -a)') - agroup.add_argument('-d', '--dry-run', dest='dryrun', action='store_true', default=False, - help='Do not send any mail') - # Verification arguments - vgroup = parser.add_argument_group('verification', 'Verification parameters') - vgroup.add_argument('-c', '--check', - help='Check attestation for patches in an mbox file') - vgroup.add_argument('-i', '--attestation-file', dest='attfile', - help='Use this file for attestation data instead of querying lore.kernel.org') - vgroup.add_argument('-t', '--tofu', action='store_true', default=False, - help='Force TOFU trust model (otherwise uses your global GnuPG setting)') - vgroup.add_argument('-X', '--no-fast-exit', dest='nofast', action='store_true', default=False, - help='Do not exit after first failure') - vgroup.add_argument('-F', '--ignore-from-mismatch', dest='ignorefrom', action='store_true', - default=False, help='Ignore mismatches between From: and PGP uid data') - - main(parser.parse_args()) diff --git a/get-lore-mbox.py b/get-lore-mbox.py deleted file mode 100755 index a9b15bb..0000000 --- a/get-lore-mbox.py +++ /dev/null @@ -1,1186 +0,0 @@ -#!/usr/bin/env python3 -# SPDX-License-Identifier: GPL-2.0-or-later -# -*- coding: utf-8 -*- -# -__author__ = 'Konstantin Ryabitsev <konstantin@linuxfoundation.org>' - -import os -import sys -import argparse -import mailbox -import email -import email.message -import email.utils -import email.header -import email.policy -import subprocess -import logging -import re -import fnmatch -import time - -import requests -import urllib.parse -import xml.etree.ElementTree -import gzip - -from tempfile import mkstemp -from email import charset -charset.add_charset('utf-8', None) -emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None) -logger = logging.getLogger('get-lore-mbox') - -VERSION = '0.2.16' - -# You can use bash-style globbing here -WANTHDRS = [ - 'sender', - 'from', - 'to', - 'cc', - 'subject', - 'date', - 'message-id', - 'resent-message-id', - 'reply-to', - 'in-reply-to', - 'references', - 'list-id', - 'errors-to', - 'x-mailing-list', - 'resent-to', -] - -# You can use bash-style globbing here -# end with '*' to include any other trailers -# You can change the default in your ~/.gitconfig, e.g.: -# [get-lore-mbox] -# # remember to end with ,* -# trailer-order=link*,fixes*,cc*,reported*,suggested*,original*,co-*,tested*,reviewed*,acked*,signed-off*,* -DEFAULT_TRAILER_ORDER = 'fixes*,reported*,suggested*,original*,co-*,signed-off*,tested*,reviewed*,acked*,cc*,link*,*' - -DEFAULT_CONFIG = { - 'midmask': 'https://lore.kernel.org/r/%s', - 'linkmask': 'https://lore.kernel.org/r/%s', - 'trailer-order': DEFAULT_TRAILER_ORDER, -} - - -class LoreMailbox: - def __init__(self): - self.msgid_map = dict() - self.series = dict() - self.followups = list() - self.unknowns = list() - - def __repr__(self): - out = list() - for key, lser in self.series.items(): - out.append(str(lser)) - out.append('--- Followups ---') - for lmsg in self.followups: - out.append(' %s' % lmsg.full_subject) - out.append('--- Unknowns ---') - for lmsg in self.unknowns: - out.append(' %s' % lmsg.full_subject) - - return '\n'.join(out) - - def get_by_msgid(self, msgid): - if msgid in self.msgid_map: - return self.msgid_map[msgid] - return None - - def get_series(self, revision=None): - if revision is None: - if not len(self.series): - return None - # Use the highest revision - revision = max(self.series.keys()) - elif revision not in self.series.keys(): - return None - - lser = self.series[revision] - - # Is it empty? - empty = True - for lmsg in lser.patches: - if lmsg is not None: - empty = False - break - if empty: - logger.critical('All patches in series v%s are missing.', lser.revision) - return None - - # Do we have a cover letter for it? - if not lser.has_cover: - # Let's find the first patch with an in-reply-to and see if that - # is our cover letter - for member in lser.patches: - if member is not None and member.in_reply_to is not None: - potential = self.get_by_msgid(member.in_reply_to) - if potential is not None and potential.has_diffstat and not potential.has_diff: - # This is *probably* the cover letter - lser.patches[0] = potential - lser.has_cover = True - break - - # Do we have any follow-ups? - for fmsg in self.followups: - logger.debug('Analyzing follow-up: %s (%s)', fmsg.full_subject, fmsg.fromemail) - # If there are no trailers in this one, ignore it - if not len(fmsg.trailers): - logger.debug(' no trailers found, skipping') - continue - # if it's for the wrong revision, ignore it - if not fmsg.revision_inferred and lser.revision != fmsg.revision: - logger.debug(' follow-up for the wrong revision, skipping') - continue - # Go up through the follow-ups and tally up trailers until - # we either run out of in-reply-tos, or we find a patch in - # our series - if fmsg.in_reply_to is None: - # Check if there's something matching in References - refs = fmsg.msg.get('References', '') - pmsg = None - for ref in refs.split(): - refid = ref.strip('<>') - if refid in self.msgid_map and refid != fmsg.msgid: - pmsg = self.msgid_map[refid] - break - if pmsg is None: - # Can't find the message we're replying to here - continue - else: - pmsg = self.msgid_map[fmsg.in_reply_to] - - trailers = fmsg.trailers - lvl = 1 - while True: - logger.debug('%sParent: %s', ' ' * lvl, pmsg.full_subject) - logger.debug('%sTrailers:', ' ' * lvl) - for trailer in set(trailers): - logger.debug('%s%s: %s', ' ' * (lvl+1), trailer[0], trailer[1]) - found = False - if lser.revision != pmsg.revision: - break - for lmsg in lser.patches: - if lmsg is not None and lmsg.msgid == pmsg.msgid: - # Confirmed, this is our parent patch - lmsg.followup_trailers += trailers - found = True - break - if found: - break - elif pmsg.in_reply_to and pmsg.in_reply_to in self.msgid_map: - lvl += 1 - trailers += pmsg.trailers - pmsg = self.msgid_map[pmsg.in_reply_to] - else: - break - - return lser - - def add_message(self, msg): - lmsg = LoreMessage(msg) - logger.debug('Looking at: %s', lmsg.full_subject) - self.msgid_map[lmsg.msgid] = lmsg - - if lmsg.has_diff or lmsg.has_diffstat: - if lmsg.revision not in self.series: - self.series[lmsg.revision] = LoreSeries(lmsg.revision, lmsg.expected) - if len(self.series) > 1: - logger.info('Found new series v%s', lmsg.revision) - if lmsg.has_diff: - # Attempt to auto-number series from the same author who did not bother - # to set v2, v3, etc in the patch revision - if (lmsg.counter == 1 and lmsg.counters_inferred - and not lmsg.reply and lmsg.lsubject.patch and not lmsg.lsubject.resend): - omsg = self.series[lmsg.revision].patches[lmsg.counter] - if (omsg is not None and omsg.counters_inferred and lmsg.fromemail == omsg.fromemail - and omsg.date < lmsg.date): - lmsg.revision = len(self.series) + 1 - self.series[lmsg.revision] = LoreSeries(lmsg.revision, lmsg.expected) - logger.info('Assuming new revision: v%s (%s)', lmsg.revision, lmsg.full_subject) - logger.debug(' adding as patch') - self.series[lmsg.revision].add_patch(lmsg) - elif lmsg.counter == 0 and lmsg.has_diffstat: - # Bona-fide cover letter - logger.debug(' adding as cover letter') - self.series[lmsg.revision].add_cover(lmsg) - elif lmsg.reply: - # We'll figure out where this belongs later - logger.debug(' adding to followups') - self.followups.append(lmsg) - elif lmsg.reply: - logger.debug(' adding to followups') - self.followups.append(lmsg) - else: - logger.debug(' adding to unknowns') - self.unknowns.append(lmsg) - - -class LoreSeries: - def __init__(self, revision, expected): - self.revision = revision - self.expected = expected - self.patches = [None] * (expected+1) - self.followups = list() - self.complete = False - self.has_cover = False - - def __repr__(self): - out = list() - if self.has_cover: - out.append('- Series: [v%s] %s' % (self.revision, self.patches[0].subject)) - elif self.patches[1] is not None: - out.append('- Series: [v%s] %s' % (self.revision, self.patches[1].subject)) - else: - out.append('- Series: [v%s] (untitled)' % self.revision) - - out.append(' revision: %s' % self.revision) - out.append(' expected: %s' % self.expected) - out.append(' complete: %s' % self.complete) - out.append(' has_cover: %s' % self.has_cover) - out.append(' patches:') - at = 0 - for member in self.patches: - if member is not None: - out.append(' [%s/%s] %s' % (at, self.expected, member.subject)) - if member.followup_trailers: - out.append(' Add: %s' % ', '.join(member.followup_trailers)) - else: - out.append(' [%s/%s] MISSING' % (at, self.expected)) - at += 1 - - return '\n'.join(out) - - def add_patch(self, lmsg): - while len(self.patches) < lmsg.expected + 1: - self.patches.append(None) - self.expected = lmsg.expected - if self.patches[lmsg.counter] is not None: - # Okay, weird, is the one in there a reply? - omsg = self.patches[lmsg.counter] - if omsg.reply or (omsg.counters_inferred and not lmsg.counters_inferred): - # Replace that one with this one - logger.debug(' replacing existing: %s', omsg.subject) - self.patches[lmsg.counter] = lmsg - else: - self.patches[lmsg.counter] = lmsg - self.complete = not (None in self.patches[1:]) - - def add_cover(self, lmsg): - self.add_patch(lmsg) - self.has_cover = True - - def get_slug(self): - # Find the first non-None entry - lmsg = None - for lmsg in self.patches: - if lmsg is not None: - break - - if lmsg is None: - return 'undefined' - - prefix = time.strftime('%Y%m%d', lmsg.date[:9]) - authorline = email.utils.getaddresses(lmsg.msg.get_all('from', []))[0] - author = re.sub(r'\W+', '_', authorline[1]).strip('_').lower() - slug = '%s_%s' % (prefix, author) - if self.revision != 1: - slug = 'v%s_%s' % (self.revision, slug) - - return slug - - def save_am_mbox(self, outfile, noaddtrailers, covertrailers, - trailer_order=None, addmysob=False, addlink=False, linkmask=None): - if os.path.exists(outfile): - os.unlink(outfile) - usercfg = dict() - if addmysob: - usercfg = get_config_from_git(r'user\..*') - if 'name' not in usercfg or 'email' not in usercfg: - logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email') - addmysob = False - - mbx = mailbox.mbox(outfile) - logger.info('---') - logger.critical('Writing %s', outfile) - at = 1 - for lmsg in self.patches[1:]: - if lmsg is not None: - if self.has_cover and covertrailers and self.patches[0].followup_trailers: - lmsg.followup_trailers += self.patches[0].followup_trailers - if addmysob: - lmsg.followup_trailers.append(('Signed-off-by', '%s <%s>' % (usercfg['name'], usercfg['email']))) - if addlink: - lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid)) - logger.info(' %s', lmsg.full_subject) - add_trailers = True - if noaddtrailers: - add_trailers = False - msg = lmsg.get_am_message(add_trailers=add_trailers, trailer_order=trailer_order) - # Pass a policy that avoids most legacy encoding horrors - mbx.add(msg.as_bytes(policy=emlpolicy)) - else: - logger.error(' ERROR: missing [%s/%s]!', at, self.expected) - at += 1 - return mbx - - def save_cover(self, outfile): - cover_msg = self.patches[0].get_am_message(add_trailers=False, trailer_order=None) - with open(outfile, 'w') as fh: - fh.write(cover_msg.as_string(policy=emlpolicy)) - logger.critical('Cover: %s', outfile) - - -class LoreMessage: - def __init__(self, msg): - self.msg = msg - self.msgid = None - - # Subject-based info - self.lsubject = None - self.full_subject = None - self.subject = None - self.reply = False - self.revision = 1 - self.counter = 1 - self.expected = 1 - self.revision_inferred = True - self.counters_inferred = True - - # Header-based info - self.in_reply_to = None - self.fromname = None - self.fromemail = None - self.date = None - - # Body and body-based info - self.body = None - self.has_diff = False - self.has_diffstat = False - self.trailers = list() - self.followup_trailers = list() - - self.msgid = LoreMessage.get_clean_msgid(self.msg) - self.lsubject = LoreSubject(msg['Subject']) - # Copy them into this object for convenience - self.full_subject = self.lsubject.full_subject - self.subject = self.lsubject.subject - self.reply = self.lsubject.reply - self.revision = self.lsubject.revision - self.counter = self.lsubject.counter - self.expected = self.lsubject.expected - self.revision_inferred = self.lsubject.revision_inferred - self.counters_inferred = self.lsubject.counters_inferred - - # Handle [PATCH 6/5] - if self.counter > self.expected: - self.expected = self.counter - - self.in_reply_to = LoreMessage.get_clean_msgid(self.msg, header='In-Reply-To') - - try: - fromdata = email.utils.getaddresses(self.msg.get_all('from', []))[0] - self.fromname = fromdata[0] - self.fromemail = fromdata[1] - except IndexError: - pass - - self.date = email.utils.parsedate_tz(str(self.msg['Date'])) - - diffre = re.compile(r'^(---.*\n\+\+\+|GIT binary patch)', re.M | re.I) - diffstatre = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', re.M | re.I) - - # walk until we find the first text/plain part - mcharset = self.msg.get_content_charset() - if not mcharset: - mcharset = 'utf-8' - - for part in msg.walk(): - cte = part.get_content_type() - if cte.find('/plain') < 0 and cte.find('/x-patch') < 0: - continue - payload = part.get_payload(decode=True) - if payload is None: - continue - pcharset = part.get_content_charset() - if not pcharset: - pcharset = mcharset - payload = payload.decode(pcharset, errors='replace') - if self.body is None: - self.body = payload - continue - # If we already found a body, but we now find something that contains a diff, - # then we prefer this part - if diffre.search(payload): - self.body = payload - - if diffstatre.search(self.body): - self.has_diffstat = True - if diffre.search(self.body): - self.has_diff = True - - # We only pay attention to trailers that are sent in reply - if self.reply: - # Do we have something that looks like a person-trailer? - matches = re.findall(r'^\s*([\w-]+):[ \t]+(.*<\S+>)\s*$', self.body, re.MULTILINE) - if matches: - # Basic sanity checking -- the trailer must match the name or the email - # in the From header, to avoid false-positive trailer parsing errors - for tname, tvalue in matches: - tmatch = False - namedata = email.utils.getaddresses([tvalue])[0] - tfrom = re.sub(r'\+[^@]+@', '@', namedata[1].lower()) - hfrom = re.sub(r'\+[^@]+@', '@', self.fromemail.lower()) - tlname = namedata[0].lower() - hlname = self.fromname.lower() - tchunks = tfrom.split('@') - hchunks = hfrom.split('@') - if tfrom == hfrom: - logger.debug(' trailer exact email match') - tmatch = True - # See if domain part of one of the addresses is a subset of the other one, - # which should match cases like @linux.intel.com and @intel.com - elif (len(tchunks) == 2 and len(hchunks) == 2 - and tchunks[0] == hchunks[0] - and (tchunks[1].find(hchunks[1]) >= 0 or hchunks[1].find(tchunks[1]) >= 0)): - logger.debug(' trailer fuzzy email match') - tmatch = True - # Does the name match, at least? - elif tlname == hlname: - logger.debug(' trailer exact name match') - tmatch = True - # Finally, see if the header From has a comma in it and try to find all - # parts in the trailer name - elif hlname.find(',') > 0: - nmatch = True - for nchunk in hlname.split(','): - if hlname.find(nchunk.strip()) < 0: - nmatch = False - break - if nmatch: - logger.debug(' trailer fuzzy name match') - tmatch = True - if tmatch: - self.trailers.append((tname, tvalue)) - else: - logger.debug(' ignoring "%s: %s" due to from mismatch (from: %s %s)', tname, tvalue, - self.fromname, self.fromemail) - - def __repr__(self): - out = list() - out.append('msgid: %s' % self.msgid) - out.append(str(self.lsubject)) - - out.append(' fromname: %s' % self.fromname) - out.append(' fromemail: %s' % self.fromemail) - out.append(' date: %s' % str(self.date)) - out.append(' in_reply_to: %s' % self.in_reply_to) - - # Header-based info - out.append(' --- begin body ---') - for line in self.body.split('\n'): - out.append(' |%s' % line) - out.append(' --- end body ---') - - # Body and body-based info - out.append(' has_diff: %s' % self.has_diff) - out.append(' has_diffstat: %s' % self.has_diffstat) - out.append(' --- begin my trailers ---') - for trailer in self.trailers: - out.append(' |%s' % str(trailer)) - out.append(' --- begin followup trailers ---') - for trailer in self.followup_trailers: - out.append(' |%s' % str(trailer)) - out.append(' --- end trailers ---') - - return '\n'.join(out) - - @staticmethod - def clean_header(hdrval): - uval = hdrval.replace('\n', ' ') - new_hdrval = re.sub(r'\s+', ' ', uval) - return new_hdrval.strip() - - @staticmethod - def get_clean_msgid(msg, header='Message-Id'): - msgid = None - raw = msg.get(header) - if raw: - matches = re.search(r'<([^>]+)>', LoreMessage.clean_header(raw)) - if matches: - msgid = matches.groups()[0] - return msgid - - def fix_trailers(self, trailer_order=None): - bodylines = self.body.split('\n') - # Get existing trailers - # 1. Find the first --- - # 2. Go backwards and grab everything matching ^[\w-]+:\s.*$ until a blank line - fixlines = list() - trailersdone = False - for line in bodylines: - if trailersdone: - fixlines.append(line) - continue - - if line.strip() == '---': - # Start going backwards in fixlines - btrailers = list() - for rline in reversed(fixlines): - if not len(rline.strip()): - break - matches = re.search(r'^([\w-]+):\s+(.*)', rline) - if not matches: - break - fixlines.pop() - btrailers.append(matches.groups()) - - # Now we add mix-in trailers - btrailers.reverse() - trailers = btrailers + self.followup_trailers - added = list() - if trailer_order is None: - trailer_order = DEFAULT_TRAILER_ORDER - for trailermatch in trailer_order: - for trailer in trailers: - if trailer in added: - continue - if fnmatch.fnmatch(trailer[0].lower(), trailermatch.strip()): - fixlines.append('%s: %s' % trailer) - if trailer not in btrailers: - logger.info(' Added: %s: %s' % trailer) - else: - logger.debug(' Kept: %s: %s' % trailer) - added.append(trailer) - trailersdone = True - fixlines.append(line) - self.body = '\n'.join(fixlines) - - def get_am_message(self, add_trailers=True, trailer_order=None): - if add_trailers: - self.fix_trailers(trailer_order=trailer_order) - am_body = self.body - am_msg = email.message.EmailMessage() - am_msg.set_payload(am_body.encode('utf-8')) - # Clean up headers - for hdrname, hdrval in self.msg.items(): - lhdrname = hdrname.lower() - wanthdr = False - for hdrmatch in WANTHDRS: - if fnmatch.fnmatch(lhdrname, hdrmatch): - wanthdr = True - break - if wanthdr: - new_hdrval = LoreMessage.clean_header(hdrval) - # noinspection PyBroadException - try: - am_msg.add_header(hdrname, new_hdrval) - except: - # A broad except to handle any potential weird header conditions - pass - am_msg.set_charset('utf-8') - return am_msg - - -class LoreSubject: - def __init__(self, subject): - # Subject-based info - self.full_subject = None - self.subject = None - self.reply = False - self.resend = False - self.patch = False - self.rfc = False - self.revision = 1 - self.counter = 1 - self.expected = 1 - self.revision_inferred = True - self.counters_inferred = True - self.prefixes = list() - - subject = re.sub(r'\s+', ' ', LoreMessage.clean_header(subject)).strip() - # Remove any leading [] that don't have "patch", "resend" or "rfc" in them - while True: - oldsubj = subject - subject = re.sub(r'^\s*\[[^\]]*\]\s*(\[[^\]]*(:?patch|resend|rfc).*)', '\\1', subject, flags=re.IGNORECASE) - if oldsubj == subject: - break - - # Remove any brackets inside brackets - while True: - oldsubj = subject - subject = re.sub(r'^\s*\[([^\]]*)\[([^\]]*)\]', '[\\1\\2]', subject) - subject = re.sub(r'^\s*\[([^\]]*)\]([^\]]*)\]', '[\\1\\2]', subject) - if oldsubj == subject: - break - - self.full_subject = subject - # Is it a reply? - if re.search(r'^(Re|Aw|Fwd):', subject, re.I) or re.search(r'^\w{2,3}:\s*\[', subject): - self.reply = True - subject = re.sub(r'^\w+:\s*\[', '[', subject) - - # Find all [foo] in the title - while subject.find('[') == 0: - matches = re.search(r'^\[([^\]]*)\]', subject) - for chunk in matches.groups()[0].split(): - # Remove any trailing commas or semicolons - chunk = chunk.strip(',;') - if re.search(r'^\d{1,3}/\d{1,3}$', chunk): - counters = chunk.split('/') - self.counter = int(counters[0]) - self.expected = int(counters[1]) - self.counters_inferred = False - elif re.search(r'^v\d+$', chunk, re.IGNORECASE): - self.revision = int(chunk[1:]) - self.revision_inferred = False - elif chunk.lower().find('rfc') == 0: - self.rfc = True - elif chunk.lower().find('resend') == 0: - self.resend = True - elif chunk.lower().find('patch') == 0: - self.patch = True - self.prefixes.append(chunk.lower()) - subject = re.sub(r'^\s*\[[^\]]*\]\s*', '', subject) - self.subject = subject - - def __repr__(self): - out = list() - out.append(' full_subject: %s' % self.full_subject) - out.append(' subject: %s' % self.subject) - out.append(' reply: %s' % self.reply) - out.append(' resend: %s' % self.resend) - out.append(' patch: %s' % self.patch) - out.append(' rfc: %s' % self.rfc) - out.append(' revision: %s' % self.revision) - out.append(' revision_inferred: %s' % self.revision_inferred) - out.append(' counter: %s' % self.counter) - out.append(' expected: %s' % self.expected) - out.append(' counters_inferred: %s' % self.counters_inferred) - out.append(' prefixes: %s' % ', '.join(self.prefixes)) - - return '\n'.join(out) - - -def git_get_command_lines(gitdir, args): - out = git_run_command(gitdir, args) - lines = list() - if out: - for line in out.split('\n'): - if line == '': - continue - lines.append(line) - - return lines - - -def git_run_command(gitdir, args, stdin=None, logstderr=False): - cmdargs = ['git', '--no-pager'] - if gitdir: - cmdargs += ['--git-dir', gitdir] - cmdargs += args - - logger.debug('Running %s' % ' '.join(cmdargs)) - - if stdin: - (output, error) = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE).communicate(input=stdin) - else: - (output, error) = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, - stderr=subprocess.PIPE).communicate() - - output = output.strip().decode('utf-8', errors='replace') - - if logstderr and len(error.strip()): - logger.debug('Stderr: %s', error.decode('utf-8', errors='replace')) - - return output - - -def get_config_from_git(regexp, defaults=None): - args = ['config', '-z', '--get-regexp', regexp] - out = git_run_command(None, args) - gitconfig = defaults - if not gitconfig: - gitconfig = dict() - if not out: - return gitconfig - - for line in out.split('\x00'): - if not line: - continue - key, value = line.split('\n', 1) - try: - chunks = key.split('.') - cfgkey = chunks[-1] - gitconfig[cfgkey] = value - except ValueError: - logger.debug('Ignoring git config entry %s', line) - - return gitconfig - - -def get_msgid_from_stdin(): - if not sys.stdin.isatty(): - message = email.message_from_string(sys.stdin.read()) - return message.get('Message-ID', None) - logger.error('Error: pipe a message or pass msgid as parameter') - sys.exit(1) - - -def get_pi_thread_by_url(t_mbx_url, savefile, session): - resp = session.get(t_mbx_url) - if resp.status_code != 200: - logger.critical('Server returned an error: %s', resp.status_code) - return None - t_mbox = gzip.decompress(resp.content) - resp.close() - if not len(t_mbox): - logger.critical('No messages found for that query') - return None - with open(savefile, 'wb') as fh: - logger.debug('Saving %s', savefile) - fh.write(t_mbox) - return savefile - - -def get_pi_thread_by_msgid(msgid, config, cmdargs, session): - wantname = cmdargs.wantname - outdir = cmdargs.outdir - # Grab the head from lore, to see where we are redirected - midmask = config['midmask'] % msgid - logger.info('Looking up %s', midmask) - resp = session.head(midmask) - if resp.status_code < 300 or resp.status_code > 400: - logger.critical('That message-id is not known.') - return None - canonical = resp.headers['Location'].rstrip('/') - resp.close() - t_mbx_url = '%s/t.mbox.gz' % canonical - if wantname: - savefile = os.path.join(outdir, wantname) - else: - # Save it into msgid.mbox - savefile = '%s.t.mbx' % msgid - savefile = os.path.join(outdir, savefile) - - loc = urllib.parse.urlparse(t_mbx_url) - if cmdargs.useproject: - logger.debug('Modifying query to use %s', cmdargs.useproject) - t_mbx_url = '%s://%s/%s/%s/t.mbox.gz' % ( - loc.scheme, loc.netloc, cmdargs.useproject, msgid) - logger.debug('Will query: %s', t_mbx_url) - logger.critical('Grabbing thread from %s', loc.netloc) - pi_mbx = get_pi_thread_by_url(t_mbx_url, '%s-loose' % savefile, session) - return get_strict_thread(pi_mbx, msgid, savefile) - - -def get_strict_thread(pi_mbx, msgid, savefile): - pmbx = mailbox.mbox(pi_mbx) - smbx = mailbox.mbox(savefile) - want = {msgid} - got = set() - seen = set() - while True: - for msg in pmbx: - c_msgid = LoreMessage.get_clean_msgid(msg) - seen.add(c_msgid) - if c_msgid in got: - continue - - refs = list() - for ref in msg.get('References', msg.get('In-Reply-To', '')).split(): - ref = ref.strip().strip('<>') - if ref in got or ref in want: - want.add(c_msgid) - elif len(ref): - refs.append(ref) - - if c_msgid in want: - smbx.add(msg) - got.add(c_msgid) - want.update(refs) - want.discard(c_msgid) - logger.debug('Kept in thread: %s', c_msgid) - - # Remove any entries not in "seen" (missing messages) - for c_msgid in set(want): - if c_msgid not in seen: - want.remove(c_msgid) - if not len(want): - break - - if not len(smbx): - return None - - if len(pmbx) > len(smbx): - logger.info('Reduced thread to strict matches only (%s->%s)', len(pmbx), len(smbx)) - pmbx.close() - smbx.close() - os.unlink(pi_mbx) - return savefile - - -def mbox_to_am(mboxfile, config, cmdargs): - outdir = cmdargs.outdir - wantver = cmdargs.wantver - wantname = cmdargs.wantname - covertrailers = cmdargs.covertrailers - mbx = mailbox.mbox(mboxfile) - count = len(mbx) - logger.info('Analyzing %s messages in the thread', count) - lmbx = LoreMailbox() - # Go through the mbox once to populate base series - for key, msg in mbx.items(): - lmbx.add_message(msg) - - lser = lmbx.get_series(revision=wantver) - if lser is None and wantver is None: - logger.critical('No patches found.') - return - if lser is None: - logger.critical('Unable to find revision %s', wantver) - return - if len(lmbx.series) > 1 and not wantver: - logger.info('Will use the latest revision: v%s', lser.revision) - logger.info('You can pick other revisions using the -vN flag') - - if wantname: - slug = wantname - if wantname.find('.') > -1: - slug = '.'.join(wantname.split('.')[:-1]) - else: - slug = lser.get_slug() - - am_filename = os.path.join(outdir, '%s.mbx' % slug) - am_cover = os.path.join(outdir, '%s.cover' % slug) - - am_mbx = lser.save_am_mbox(am_filename, cmdargs.noaddtrailers, covertrailers, - trailer_order=config['trailer-order'], - addmysob=cmdargs.addmysob, addlink=cmdargs.addlink, - linkmask=config['linkmask']) - logger.info('---') - - logger.critical('Total patches: %s', len(am_mbx)) - if lser.has_cover and lser.patches[0].followup_trailers and not covertrailers: - # Warn that some trailers were sent to the cover letter - logger.critical('---') - logger.critical('NOTE: Some trailers were sent to the cover letter:') - for trailer in lser.patches[0].followup_trailers: - logger.critical(' %s: %s', trailer[0], trailer[1]) - logger.critical('NOTE: Rerun with -t to apply them to all patches') - - logger.critical('---') - if not lser.complete: - logger.critical('WARNING: Thread incomplete!') - - if lser.has_cover: - lser.save_cover(am_cover) - - top_msgid = None - first_body = None - for lmsg in lser.patches: - if lmsg is not None: - first_body = lmsg.body - top_msgid = lmsg.msgid - break - if top_msgid is None: - logger.critical('Could not find any patches in the series.') - return - - linkurl = config['linkmask'] % top_msgid - if cmdargs.quiltready: - q_dirname = os.path.join(outdir, '%s.patches' % slug) - am_mbox_to_quilt(am_mbx, q_dirname) - logger.critical('Quilt: %s', q_dirname) - - logger.critical(' Link: %s', linkurl) - - base_commit = None - matches = re.search(r'base-commit: .*?([0-9a-f]+)', first_body, re.MULTILINE) - if matches: - base_commit = matches.groups()[0] - else: - # Try a more relaxed search - matches = re.search(r'based on .*?([0-9a-f]{40})', first_body, re.MULTILINE) - if matches: - base_commit = matches.groups()[0] - - if base_commit: - logger.critical(' Base: %s', base_commit) - logger.critical(' git checkout -b %s %s', slug, base_commit) - logger.critical(' git am %s', am_filename) - else: - logger.critical(' Base: not found, sorry') - logger.critical(' git checkout -b %s master', slug) - logger.critical(' git am %s', am_filename) - - am_mbx.close() - - return am_filename - - -def am_mbox_to_quilt(am_mbx, q_dirname): - if os.path.exists(q_dirname): - logger.critical('ERROR: Directory %s exists, not saving quilt patches', q_dirname) - return - os.mkdir(q_dirname, 0o755) - patch_filenames = list() - for key, msg in am_mbx.items(): - # Run each message through git mailinfo - msg_out = mkstemp(suffix=None, prefix=None, dir=q_dirname) - patch_out = mkstemp(suffix=None, prefix=None, dir=q_dirname) - cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]] - info = git_run_command(None, cmdargs, msg.as_bytes(policy=emlpolicy)) - if not len(info.strip()): - logger.critical('ERROR: Could not get mailinfo from patch %s', msg['Subject']) - continue - patchinfo = dict() - for line in info.split('\n'): - chunks = line.split(':', 1) - patchinfo[chunks[0]] = chunks[1] - - slug = re.sub(r'\W+', '_', patchinfo['Subject']).strip('_').lower() - patch_filename = '%04d_%s.patch' % (key+1, slug) - patch_filenames.append(patch_filename) - quilt_out = os.path.join(q_dirname, patch_filename) - with open(quilt_out, 'wb') as fh: - line = 'From: %s <%s>\n' % (patchinfo['Author'].strip(), patchinfo['Email'].strip()) - fh.write(line.encode('utf-8')) - line = 'Subject: %s\n' % patchinfo['Subject'].strip() - fh.write(line.encode('utf-8')) - line = 'Date: %s\n' % patchinfo['Date'].strip() - fh.write(line.encode('utf-8')) - fh.write('\n'.encode('utf-8')) - with open(msg_out[1], 'r') as mfh: - fh.write(mfh.read().encode('utf-8')) - with open(patch_out[1], 'r') as pfh: - fh.write(pfh.read().encode('utf-8')) - logger.debug(' Wrote: %s', patch_filename) - os.unlink(msg_out[1]) - os.unlink(patch_out[1]) - # Write the series file - with open(os.path.join(q_dirname, 'series'), 'w') as sfh: - for patch_filename in patch_filenames: - sfh.write('%s\n' % patch_filename) - - -def get_newest_series(mboxfile, session): - # Open the mbox and find the latest series mentioned in it - mbx = mailbox.mbox(mboxfile) - base_msg = None - latest_revision = None - seen_msgids = list() - seen_covers = list() - for key, msg in mbx.items(): - msgid = LoreMessage.get_clean_msgid(msg) - seen_msgids.append(msgid) - lsub = LoreSubject(msg['Subject']) - # Ignore replies or counters above 1 - if lsub.reply or lsub.counter > 1: - continue - if latest_revision is None or lsub.revision > latest_revision: - # New revision - latest_revision = lsub.revision - if lsub.counter == 0: - # And a cover letter, nice. This is the easy case - base_msg = msg - seen_covers.append(latest_revision) - continue - if lsub.counter == 1: - if latest_revision not in seen_covers: - # A patch/series without a cover letter - base_msg = msg - - # Get subject info from base_msg again - lsub = LoreSubject(base_msg['Subject']) - if not len(lsub.prefixes): - logger.debug('Not checking for new revisions: no prefixes on the cover letter.') - mbx.close() - return - base_msgid = LoreMessage.get_clean_msgid(base_msg) - fromeml = email.utils.getaddresses(base_msg.get_all('from', []))[0][1] - msgdate = email.utils.parsedate_tz(str(base_msg['Date'])) - startdate = time.strftime('%Y%m%d', msgdate[:9]) - listarc = base_msg.get_all('List-Archive')[-1].strip('<>') - q = 's:"%s" AND f:"%s" AND d:%s..' % (lsub.subject.replace('"', ''), fromeml, startdate) - queryurl = '%s?%s' % (listarc, urllib.parse.urlencode({'q': q, 'x': 'A', 'o': '-1'})) - logger.critical('Checking for newer revisions on %s', listarc) - logger.debug('Query URL: %s', queryurl) - resp = session.get(queryurl) - # try to parse it - try: - tree = xml.etree.ElementTree.fromstring(resp.content) - except xml.etree.ElementTree.ParseError as ex: - logger.debug('Unable to parse results, ignoring', ex) - resp.close() - mbx.close() - return - resp.close() - ns = {'atom': 'http://www.w3.org/2005/Atom'} - entries = tree.findall('atom:entry', ns) - - for entry in entries: - title = entry.find('atom:title', ns).text - lsub = LoreSubject(title) - if lsub.reply or lsub.counter > 1: - logger.debug('Ignoring result (not interesting): %s', title) - continue - link = entry.find('atom:link', ns).get('href') - if lsub.revision < latest_revision: - logger.debug('Ignoring result (not new revision): %s', title) - continue - if link.find('/%s/' % base_msgid) > 0: - logger.debug('Ignoring result (same thread as ours):%s', title) - continue - if lsub.revision == 1 and lsub.revision == latest_revision: - # Someone sent a separate message with an identical title but no new vX in the subject line - # It's *probably* a new revision. - logger.debug('Likely a new revision: %s', title) - elif lsub.revision > latest_revision: - logger.debug('Definitely a new revision [v%s]: %s', lsub.revision, title) - else: - logger.debug('No idea what this is: %s', title) - continue - t_mbx_url = '%st.mbox.gz' % link - savefile = mkstemp('get-lore-mbox')[1] - nt_mboxfile = get_pi_thread_by_url(t_mbx_url, savefile, session) - nt_mbx = mailbox.mbox(nt_mboxfile) - # Append all of these to the existing mailbox - new_adds = 0 - for nt_msg in nt_mbx: - nt_msgid = LoreMessage.get_clean_msgid(nt_msg) - if nt_msgid in seen_msgids: - logger.debug('Duplicate message, skipping') - continue - nt_subject = re.sub(r'\s+', ' ', nt_msg['Subject']) - logger.debug('Adding: %s', nt_subject) - new_adds += 1 - mbx.add(nt_msg) - seen_msgids.append(nt_msgid) - nt_mbx.close() - if new_adds: - logger.info('Added %s messages from thread: %s', new_adds, title) - logger.debug('Removing temporary %s', nt_mboxfile) - os.unlink(nt_mboxfile) - - # We close the mbox, since we'll be reopening it later - mbx.close() - - -def main(cmdargs): - logger.setLevel(logging.DEBUG) - - ch = logging.StreamHandler() - formatter = logging.Formatter('%(message)s') - ch.setFormatter(formatter) - - if cmdargs.quiet: - ch.setLevel(logging.CRITICAL) - elif cmdargs.debug: - ch.setLevel(logging.DEBUG) - else: - ch.setLevel(logging.INFO) - - logger.addHandler(ch) - - logger.critical('--- WARNING: OBSOLETE ---') - logger.critical('This script has been obsoleted by "b4" and is unmaintained.') - logger.critical('https://git.kernel.org/pub/scm/utils/b4/b4.git') - logger.critical('-------------------------') - - session = requests.session() - session.headers.update({'User-Agent': 'get-lore-mbox/%s' % VERSION}) - - config = get_config_from_git(r'get-lore-mbox\..*', defaults=DEFAULT_CONFIG) - config['trailer-order'] = config['trailer-order'].split(',') - - if not cmdargs.localmbox: - if not cmdargs.msgid: - logger.debug('Getting Message-ID from stdin') - msgid = get_msgid_from_stdin() - if msgid is None: - logger.error('Unable to find a valid message-id in stdin.') - sys.exit(1) - else: - msgid = cmdargs.msgid - - msgid = msgid.strip('<>') - # Handle the case when someone pastes a full URL to the message - matches = re.search(r'^https?://[^/]+/([^/]+)/([^/]+@[^/]+)', msgid, re.IGNORECASE) - if matches: - chunks = matches.groups() - msgid = chunks[1] - # Infer the project name from the URL, if possible - if chunks[0] != 'r': - cmdargs.useproject = chunks[0] - - mboxfile = get_pi_thread_by_msgid(msgid, config, cmdargs, session) - if mboxfile is None: - return - - # Move it into -thread - threadmbox = '%s-thread' % mboxfile - os.rename(mboxfile, threadmbox) - else: - if os.path.exists(cmdargs.localmbox): - threadmbox = cmdargs.localmbox - else: - logger.critical('Mailbox %s does not exist', cmdargs.localmbox) - sys.exit(1) - - if threadmbox and cmdargs.checknewer: - get_newest_series(threadmbox, session) - - if cmdargs.amready: - mbox_to_am(threadmbox, config, cmdargs) - if not cmdargs.localmbox: - os.unlink(threadmbox) - else: - mbx = mailbox.mbox(threadmbox) - logger.critical('Saved %s', threadmbox) - logger.critical('%s messages in the thread', len(mbx)) - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - formatter_class=argparse.ArgumentDefaultsHelpFormatter - ) - parser.add_argument('msgid', nargs='?', - help='Message ID to process, or pipe a raw message') - parser.add_argument('-o', '--outdir', default='.', - help='Output into this directory') - parser.add_argument('-p', '--use-project', dest='useproject', default=None, - help='Use a specific project instead of guessing (linux-mm, linux-hardening, etc)') - parser.add_argument('-c', '--check-newer-revisions', dest='checknewer', action='store_true', default=False, - help='Check if newer patch revisions exist') - parser.add_argument('-n', '--mbox-name', dest='wantname', default=None, - help='Filename to name the mbox file') - parser.add_argument('-d', '--debug', action='store_true', default=False, - help='Add more debugging info to the output') - parser.add_argument('-q', '--quiet', action='store_true', default=False, - help='Output critical information only') - - agroup = parser.add_argument_group('am-ready parameters') - agroup.add_argument('-a', '--am-ready', dest='amready', action='store_true', default=False, - help='Make an mbox ready for git am') - agroup.add_argument('-m', '--use-local-mbox', dest='localmbox', default=None, - help='Instead of grabbing a thread from lore, process this mbox file') - agroup.add_argument('-v', '--use-version', dest='wantver', type=int, default=None, - help='Get a specific version of the patch/series') - agroup.add_argument('-t', '--apply-cover-trailers', dest='covertrailers', action='store_true', default=False, - help='Apply trailers sent to the cover letter to all patches') - agroup.add_argument('-T', '--no-add-trailers', dest='noaddtrailers', action='store_true', default=False, - help='Do not add or sort any trailers') - agroup.add_argument('-s', '--add-my-sob', dest='addmysob', action='store_true', default=False, - help='Add your own signed-off-by to every patch') - agroup.add_argument('-l', '--add-link', dest='addlink', action='store_true', default=False, - help='Add a lore.kernel.org/r/ link to every patch') - agroup.add_argument('-Q', '--quilt-ready', dest='quiltready', action='store_true', default=False, - help='Save mbox patches in a quilt-ready folder') - main(parser.parse_args()) |