#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # A helper script to go through the latest comments added to a bugzilla # to see if any of them link to external sites. If the reviewer deems them # spammy, the script will tag them as such. # # Caution: work in progress # __author__ = 'Konstantin Ryabitsev ' import sys import requests import argparse import logging import re import shelve import datetime import notify2 import time from urllib.parse import urlparse from configparser import ConfigParser logger = logging.getLogger('default') APIKEY = None BZURL = None REQSESSION = None CACHEDATA = None def notify_desktop(message): notify2.init('bugjunker') n = notify2.Notification('bugjunker', message) n.set_timeout(notify2.EXPIRES_NEVER) n.show() def get_session(): global REQSESSION if REQSESSION is None: REQSESSION = requests.session() REQSESSION.headers.update({'User-Agent': 'bugjunker'}) return REQSESSION def ban_hammer(spammers): params = {} for spammer in set(spammers): path = 'user/{spammer}'.format(spammer=spammer) logger.info('Banning %s', spammer) payload = { 'email_enabled': False, 'login_denied_text': 'Spammer', } bz_put(path, params, payload) def tag_hammer(spamcids, spamtag): params = {} for cid in set(spamcids): logger.info('Tagging comment %s', cid) path = 'bug/comment/{cid}/tags'.format(cid=cid) payload = { 'comment_id': cid, 'add': [spamtag], } bz_put(path, params, payload) def bug_hammer(spambugs, args): params = {} for bugid in set(spambugs): logger.info('Junking bug %s', bugid) path = 'bug/{bugid}'.format(bugid=bugid) payload = { 'groups': {'add': [args.group]}, 'status': args.status, 'resolution': args.resolution, } bz_put(path, params, payload) def bz_get(path, params): url = '{BZURL}/rest/{path}'.format(BZURL=BZURL, path=path) params['api_key'] = APIKEY ses = get_session() res = ses.get(url, params=params) return res.json() def bz_put(path, params, payload): url = '{BZURL}/rest/{path}'.format(BZURL=BZURL, path=path) params['api_key'] = APIKEY ses = get_session() res = ses.put(url, params=params, json=payload) return res.json() def load_cache(cachefile): global CACHEDATA if CACHEDATA is not None: return CACHEDATA # noinspection PyBroadException try: with shelve.open(cachefile, 'r') as wc: logger.info('Loading cache from %s', cachefile) CACHEDATA = dict(wc) except: CACHEDATA = { 'seencids': list(), 'seenaids': list(), 'okdomains': list(), 'okfolks': list(), } pass if 'lastrun' in CACHEDATA: lastrun = CACHEDATA['lastrun'] else: lastrun = '24h' return lastrun, CACHEDATA def save_cache(cachefile, cachedata): with shelve.open(cachefile, 'c') as wc: for key, val in cachedata.items(): wc[key] = val wc.sync() def check_bad_urls(urls, okdomains): for url in urls: try: up = urlparse(url) except ValueError: return url, None isok = False for okd in okdomains: if okd == up.netloc: isok = True break if not isok: return url, up.netloc return None, None def is_junk_attachment(attid): attid = str(attid) logger.info(' checking attachment %s', attid) path = 'bug/attachment/{attid}'.format(attid=attid) attinfo = bz_get(path, {}) if attid not in attinfo['attachments']: return False attdata = attinfo['attachments'][attid] if attdata['is_patch'] or attdata['content_type'] in ('text/plain',): return False if attdata['content_type'] == 'text/html': # Almost certainly junk logger.info(' junking attachment %s', attid) payload = { 'content_type': 'text/plain', 'filename': 'caution.txt', 'is_private': True, } bz_put(path, {}, payload) return True return False def process_bugs(cmdargs, cachefile, c, bugs, spamtag): spammers = list() spamcids = list() spambugs = list() for bug in bugs: logger.info('Analyzing [%s]: %s', bug['id'], bug['summary']) params = {} bugid = bug['id'] path = 'bug/{bugid}/comment'.format(bugid=bugid) comments = bz_get(path, params) for f1, f2 in comments['bugs'].items(): c_count = -1 for comment in f2['comments']: c_count += 1 cid = comment['id'] if cid in c['seencids']: # already seen, skip continue c['seencids'].append(cid) creator = comment['creator'] if creator in c['okfolks']: # Known good person continue tags = comment['tags'] if spamtag in tags: # already marked as spammy continue if creator in spammers: # Made by a known spammer, banit spamcids.append(cid) logger.info(' auto-tagging comment by %s: %s', creator, cid) continue if cmdargs.checkatt: attid = comment['attachment_id'] if attid is not None and attid not in c['seenaids']: c['seenaids'].append(attid) if is_junk_attachment(attid): logger.info(' check if spammer: %s', creator) # Look for remote URLs in the comment if bug['url'].find('http') > -1 or comment['text'].find('http') > -1: urls = re.findall(r'(https?://\S+)', comment['text']) if len(bug['url']): urls.append(bug['url']) badurl, baddomain = check_bad_urls(urls, c['okdomains']) if badurl is not None: if cmdargs.noninteractive: notify_desktop('Spam in %s: %s' % (bug['id'], baddomain)) when = datetime.datetime.strptime(bug['last_change_time'], '%Y-%m-%dT%H:%M:%SZ') c['seencids'].remove(cid) c['lastrun'] = when.strftime('%Y-%m-%d %H:%M:%S') c['needsinput'] = True return spammers, spamcids, spambugs logger.info(' ---') logger.info(' suspish URL: %s', badurl) logger.info(' checkit out: %s/show_bug.cgi?id=%s#c%s', BZURL, bugid, c_count) baw = input(' (b)an, (a)llow, (w)hitelist %s: ' % baddomain) if baw == 'a': c['okfolks'].append(creator) save_cache(cachefile, c) continue if baw == 'w': logger.info(' whitelisted %s', baddomain) c['okdomains'].append(baddomain) c['okfolks'].append(creator) save_cache(cachefile, c) continue logger.info(' spamcid: %s', cid) spamcids.append(cid) # If it's a comment #0, then the whole bug needs junking if c_count == 0: spambugs.append(bug['id']) if creator not in spammers: logger.info(' spammer: %s', creator) spammers.append(creator) return spammers, spamcids, spambugs def main(args): global BZURL global APIKEY logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() formatter = logging.Formatter('%(message)s') ch.setFormatter(formatter) if args.quiet: ch.setLevel(logging.CRITICAL) elif args.debug: ch.setLevel(logging.DEBUG) else: ch.setLevel(logging.INFO) logger.addHandler(ch) logger.info('Loading configuration file %s', args.config) config = ConfigParser() config.read(args.config) BZURL = config.get('main', 'url') APIKEY = config.get('main', 'apikey') spamtag = config.get('main', 'spamtag') if config.get('main', 'logfile'): ch = logging.FileHandler(config.get('main', 'logfile')) fmt = '[%(process)d] %(asctime)s - %(message)s' ch.setFormatter(logging.Formatter(fmt)) ch.setLevel(logging.INFO) logger.addHandler(ch) cachefile = config.get('main', 'cache') lastrun, c = load_cache(cachefile) if args.lookback is not None: lastrun = args.lookback while True: if args.noninteractive and 'needsinput' in c and c['needsinput']: logger.info('Need to run interactively to make some decisions') sys.exit(0) params = { 'chfieldfrom': lastrun, 'include_fields': 'id,summary,last_change_time,url', } logger.info('Querying %s for changes since %s', BZURL, lastrun) unow = datetime.datetime.utcnow() json = bz_get('bug', params) c['lastrun'] = unow.strftime('%Y-%m-%d %H:%M:%S') c['needsinput'] = False if len(json['bugs']): spammers, spamcids, spambugs = process_bugs(args, cachefile, c, json['bugs'], spamtag) if len(spammers) or len(spamcids) or len(spambugs): ban_hammer(spammers) tag_hammer(spamcids, spamtag) bug_hammer(spambugs, args) else: logger.info('No new spam found') else: logger.info('No changes since %s', lastrun) save_cache(cachefile, c) if not args.sleep: sys.exit(0) logger.info('Sleeping %d seconds', args.sleep) time.sleep(args.sleep) def cmd(): description = 'Junk spammy bugzilla comments and ban their authors' parser = argparse.ArgumentParser(description=description, prog='bz-comment-junker.py') parser.add_argument('-c', '--config', required=True, help='Configuration file') parser.add_argument('-q', '--quiet', action='store_true', default=False, help='Output only errors') parser.add_argument('-d', '--debug', action='store_true', default=False, help='Add debugging info') parser.add_argument('-l', '--lookback', default=None, help='How far back to look (default: since last run, or 24h if no cached data)') parser.add_argument('-n', '--noninteractive', action='store_true', default=False, help='Run non-interactively and send an alert when potential spam is found') parser.add_argument('-a', '--check-attachments', action='store_true', dest='checkatt', default=False, help='Check attachments for junkiness') parser.add_argument('--sleep', type=int, default=0, help='After the run, sleep N seconds and then run again') parser.add_argument('--status', default='RESOLVED', help='Status value for junked bugs') parser.add_argument('--resolution', default='INVALID', help='Resolution value for junked bugs') parser.add_argument('--product', default='Other', help='Product value for junked bugs') parser.add_argument('--component', default='Spam', help='Component value for junked bugs') parser.add_argument('--group', default='Junk', help='Private group name for junked bugs') args = parser.parse_args() main(args) if __name__ == '__main__': cmd()