aboutsummaryrefslogtreecommitdiffstats
path: root/peebz/bz2pi.py
blob: 0559d534d8de725322b0b2994a456d14c45cbb76 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2023 by the Linux Foundation

import argparse
import peebz
import datetime
import re
import b4

import email.message

from fnmatch import fnmatch

logger = peebz.logger


def process_new_comments(bid: int, privacy_mode: bool = False, dry_run: bool = False):
    config = peebz.get_config()
    cdatas = peebz.bz_get_newest_comments_for_bid(bid)
    for cdata in cdatas:
        # Check if we've already notified about this bug
        cid = cdata['id']
        try:
            peebz.db_get_msgid_by_bid_cid(bid, cid)
            logger.debug('Skipping, msgid match for bid=%s, cid=%s', bid, cid)
            continue
        except LookupError:
            pass
        # Check if the creator is in never_if_creator
        skip = False
        for mask in config['notify'].get('never_if_creator', list()):
            if fnmatch(cdata['creator'], mask):
                logger.debug('Skipping cid=%s because it matched never_if_creator=%s', cid, mask)
                skip = True
                break
        if skip:
            continue
        # Check if text is in never_if_text_matches
        for mask in config['notify'].get('never_if_text_matches', list()):
            if fnmatch(cdata['text'], mask):
                logger.debug('Skipping cid=%s because it matched never_if_text_matches=%s', cid, mask)
                skip = True
                break
        if skip:
            continue
        clines = cdata['text'].strip().splitlines()
        inre_cid = None
        comment_author = cdata['creator']
        if privacy_mode:
            comment_author = comment_author.split('@')[0]
            logger.debug('Privacy mode, comment_author=%s', comment_author)

        bodyvals = {
            'bzname': config['bugzilla'].get('name'),
            'bug_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid),
            'comment_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid) + f"#c{cdata['count']}",
            'comment_author': comment_author,
        }
        if cdata['attachment_id']:
            logger.info('Processing new attachment for bug_id=%s, comment_id=%s', bid, cid)
            adata = peebz.bz_get_attachment_by_aid(
                cdata['attachment_id'],
                include_fields='file_name,size,content_type,summary,is_patch,is_private',
            )
            if adata['is_private']:
                logger.debug('Skipping attachment marked private')
                continue
            bodytpt = peebz.get_template_by_bid('new_attachment_notify', bid)
            bodyvals.update(adata)
            bodyvals['attachment_url'] = config['bugzilla'].get('attachmask', '').format(
                attachment_id=cdata['attachment_id'])
        else:
            logger.info('Processing new comment for bug_id=%s, comment_id=%s', bid, cid)
            bodytpt = peebz.get_template_by_bid('new_comment_notify', bid)

        fline = clines[0]
        matches = re.search(r'\(In reply to.*from comment #(\d+)', fline, flags=re.I)
        if matches:
            inre_count = int(matches.groups()[0])
            try:
                inre_cid = peebz.bz_get_cid_by_bid_count(bid, inre_count)
            except LookupError:
                pass
        bodyvals['comment_text'] = '\n'.join(clines)

        msg = email.message.EmailMessage()
        if not privacy_mode:
            msg['Reply-To'] = b4.format_addrs([('', cdata['creator'])])
        body = bodytpt.safe_substitute(bodyvals)
        body = peebz.add_bot_signature(body)
        msg.set_payload(body, charset='utf-8')
        msgid = peebz.notify_bug(bid, cid, msg, inre_cid=inre_cid, dry_run=dry_run)
        if msgid and not dry_run:
            peebz.db_store_msgid_bid_cid(msgid, bid, cid)
            if not privacy_mode:
                peebz.db_store_recipients(bid, {cdata['creator']})
            # TODO: This assumes that comments are always in incremental order
            lastcheck = cdata['creation_time'].replace('T', ' ').rstrip('Z')
            peebz.db_store_notify_last_check(bid, lastcheck)


def main(cmdargs: argparse.Namespace) -> None:
    now = datetime.datetime.utcnow()
    lastrun = now.strftime('%Y-%m-%d %H:%M:%S')
    try:
        # Get all new bugs that changed since last run
        since = peebz.db_get_meta_value('notify_last_run')
    except LookupError:
        logger.debug('Got a LookupError, getting everything for the past hour')
        # Assume it's the first run and get changes for the past hour
        hourago = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
        since = hourago.strftime('%Y-%m-%d %H:%M:%S')

    # first, process all changed bugs that we're tracking
    logger.info('Getting a list of changed bugs since %s', since)
    buglist = peebz.bz_get_changed_bugs(since)
    seen = set()
    config = peebz.get_config()
    if buglist:
        for bdata in buglist:
            logger.debug('Looking at %s: %s', bdata['id'], bdata['summary'])
            bid = bdata['id']
            privacy_mode = peebz.get_privacy_mode(bdata['product'], bdata['component'])
            process_new_comments(bid, privacy_mode=privacy_mode, dry_run=cmdargs.dry_run)
            seen.add(bid)
    else:
        logger.info('No changes to any tracked bugs')

    # Now go by product/component and handle new bug queries if defined
    for bz_product, bz_components in config['components'].items():
        for bz_component in bz_components.keys():
            cconf = peebz.get_component_config(bz_product, bz_component)
            qs = cconf.get('bz_new_bugs_quicksearch')
            if not qs:
                logger.debug('No quicksearch defined for %s/%s', bz_product, bz_component)
                continue
            logger.info('Querying matching quicksearch results since %s for %s/%s, qs=%s', since, bz_product,
                        bz_component, qs)
            params = {
                'chfieldfrom': since,
                'product': bz_product,
                'component': bz_component,
                'quicksearch': qs,
            }
            buglist = peebz.bz_get_query_bugs(params, exclude=seen)
            if config['bugzilla'].get('privacy_mode', False) or cconf.get('bz_privacy_mode', False):
                privacy_mode = True
            else:
                privacy_mode = False
            if buglist:
                logger.info('Processing %s matching quicksearch bugs', len(buglist))
                for bid in buglist:
                    seen.add(bid)
                    process_new_comments(bid, privacy_mode=privacy_mode, dry_run=cmdargs.dry_run)
            else:
                logger.info('No changed bugs matching these parameters.')

    if not cmdargs.dry_run:
        peebz.db_store_meta_value(key='notify_last_run', value=lastrun)