1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2023 by the Linux Foundation
import argparse
import peebz
import datetime
import re
import b4
import email.message
from fnmatch import fnmatch
logger = peebz.logger
def process_new_comments(bid: int, privacy_mode: bool = False, dry_run: bool = False):
config = peebz.get_config()
cdatas = peebz.bz_get_newest_comments_for_bid(bid)
for cdata in cdatas:
# Check if we've already notified about this bug
cid = cdata['id']
try:
peebz.db_get_msgid_by_bid_cid(bid, cid)
logger.debug('Skipping, msgid match for bid=%s, cid=%s', bid, cid)
continue
except LookupError:
pass
# Check if the creator is in never_if_creator
skip = False
for mask in config['notify'].get('never_if_creator', list()):
if fnmatch(cdata['creator'], mask):
logger.debug('Skipping cid=%s because it matched never_if_creator=%s', cid, mask)
skip = True
break
if skip:
continue
# Check if text is in never_if_text_matches
for mask in config['notify'].get('never_if_text_matches', list()):
if fnmatch(cdata['text'], mask):
logger.debug('Skipping cid=%s because it matched never_if_text_matches=%s', cid, mask)
skip = True
break
if skip:
continue
clines = cdata['text'].strip().splitlines()
inre_cid = None
comment_author = cdata['creator']
if privacy_mode:
comment_author = comment_author.split('@')[0]
logger.debug('Privacy mode, comment_author=%s', comment_author)
bodyvals = {
'bzname': config['bugzilla'].get('name'),
'bug_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid),
'comment_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid) + f"#c{cdata['count']}",
'comment_author': comment_author,
}
if cdata['attachment_id']:
logger.info('Processing new attachment for bug_id=%s, comment_id=%s', bid, cid)
adata = peebz.bz_get_attachment_by_aid(
cdata['attachment_id'],
include_fields='file_name,size,content_type,summary,is_patch,is_private',
)
if adata['is_private']:
logger.debug('Skipping attachment marked private')
continue
bodytpt = peebz.get_template_by_bid('new_attachment_notify', bid)
bodyvals.update(adata)
bodyvals['attachment_url'] = config['bugzilla'].get('attachmask', '').format(
attachment_id=cdata['attachment_id'])
else:
logger.info('Processing new comment for bug_id=%s, comment_id=%s', bid, cid)
bodytpt = peebz.get_template_by_bid('new_comment_notify', bid)
fline = clines[0]
matches = re.search(r'\(In reply to.*from comment #(\d+)', fline, flags=re.I)
if matches:
inre_count = int(matches.groups()[0])
try:
inre_cid = peebz.bz_get_cid_by_bid_count(bid, inre_count)
except LookupError:
pass
bodyvals['comment_text'] = '\n'.join(clines)
msg = email.message.EmailMessage()
if not privacy_mode:
msg['Reply-To'] = b4.format_addrs([('', cdata['creator'])])
body = bodytpt.safe_substitute(bodyvals)
body = peebz.add_bot_signature(body)
msg.set_payload(body, charset='utf-8')
msgid = peebz.notify_bug(bid, cid, msg, inre_cid=inre_cid, dry_run=dry_run)
if msgid and not dry_run:
peebz.db_store_msgid_bid_cid(msgid, bid, cid)
if not privacy_mode:
peebz.db_store_recipients(bid, {cdata['creator']})
# TODO: This assumes that comments are always in incremental order
lastcheck = cdata['creation_time'].replace('T', ' ').rstrip('Z')
peebz.db_store_notify_last_check(bid, lastcheck)
def main(cmdargs: argparse.Namespace) -> None:
now = datetime.datetime.utcnow()
lastrun = now.strftime('%Y-%m-%d %H:%M:%S')
try:
# Get all new bugs that changed since last run
since = peebz.db_get_meta_value('notify_last_run')
except LookupError:
logger.debug('Got a LookupError, getting everything for the past hour')
# Assume it's the first run and get changes for the past hour
hourago = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
since = hourago.strftime('%Y-%m-%d %H:%M:%S')
# first, process all changed bugs that we're tracking
logger.info('Getting a list of changed bugs since %s', since)
buglist = peebz.bz_get_changed_bugs(since)
seen = set()
config = peebz.get_config()
if buglist:
for bdata in buglist:
logger.debug('Looking at %s: %s', bdata['id'], bdata['summary'])
bid = bdata['id']
privacy_mode = peebz.get_privacy_mode(bdata['product'], bdata['component'])
process_new_comments(bid, privacy_mode=privacy_mode, dry_run=cmdargs.dry_run)
seen.add(bid)
else:
logger.info('No changes to any tracked bugs')
# Now go by product/component and handle new bug queries if defined
for bz_product, bz_components in config['components'].items():
for bz_component in bz_components.keys():
cconf = peebz.get_component_config(bz_product, bz_component)
qs = cconf.get('bz_new_bugs_quicksearch')
if not qs:
logger.debug('No quicksearch defined for %s/%s', bz_product, bz_component)
continue
logger.info('Querying matching quicksearch results since %s for %s/%s, qs=%s', since, bz_product,
bz_component, qs)
params = {
'chfieldfrom': since,
'product': bz_product,
'component': bz_component,
'quicksearch': qs,
}
buglist = peebz.bz_get_query_bugs(params, exclude=seen)
if config['bugzilla'].get('privacy_mode', False) or cconf.get('bz_privacy_mode', False):
privacy_mode = True
else:
privacy_mode = False
if buglist:
logger.info('Processing %s matching quicksearch bugs', len(buglist))
for bid in buglist:
seen.add(bid)
process_new_comments(bid, privacy_mode=privacy_mode, dry_run=cmdargs.dry_run)
else:
logger.info('No changed bugs matching these parameters.')
if not cmdargs.dry_run:
peebz.db_store_meta_value(key='notify_last_run', value=lastrun)
|