diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2023-01-10 17:32:21 -0500 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2023-01-10 17:32:21 -0500 |
commit | e28b60d62f08cefab20ef3c886473ea15070c2a9 (patch) | |
tree | ae56ba48a76c61409824ed5de97f820d41bd301f | |
parent | c88e6a31442bc41e9b56df763ba8b30e64d18c93 (diff) | |
download | b4-e28b60d62f08cefab20ef3c886473ea15070c2a9.tar.gz |
Tweak wrap_header to allow decode/encode/clean operations
Sometimes we want to decode headers into 8bit-clean, sometimes we want
to encode them, and sometimes we want to leave everything as-is.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r-- | b4/__init__.py | 81 | ||||
-rw-r--r-- | tests/test___init__.py | 54 |
2 files changed, 69 insertions, 66 deletions
diff --git a/b4/__init__.py b/b4/__init__.py index e52b025..10be82b 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -33,7 +33,7 @@ import requests from pathlib import Path from contextlib import contextmanager -from typing import Optional, Tuple, Set, List, BinaryIO, Union, Sequence +from typing import Optional, Tuple, Set, List, BinaryIO, Union, Sequence, Literal from email import charset charset.add_charset('utf-8', None) @@ -850,8 +850,8 @@ class LoreSeries: def save_cover(self, outfile): # noinspection PyUnresolvedReferences cover_msg = self.patches[0].get_am_message(add_trailers=False) - with open(outfile, 'w') as fh: - fh.write(cover_msg.as_string(policy=emlpolicy)) + with open(outfile, 'wb') as fh: + fh.write(LoreMessage.get_msg_as_bytes(cover_msg, headers='decode')) logger.critical('Cover: %s', outfile) @@ -1430,15 +1430,20 @@ class LoreMessage: return new_hdrval.strip() @staticmethod - def wrap_header(hdr, width: int = 75, nl: str = '\n', qpencode: bool = True) -> bytes: + def wrap_header(hdr, width: int = 75, nl: str = '\n', + transform: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes: hname, hval = hdr if hname.lower() in ('to', 'cc', 'from', 'x-original-from'): _parts = [f'{hname}: ',] first = True for addr in email.utils.getaddresses([hval]): - if not addr[0].isascii() and qpencode: + if transform == 'encode' and not addr[0].isascii(): addr = (email.quoprimime.header_encode(addr[0].encode(), charset='utf-8'), addr[1]) - qp = format_addrs([addr], clean=False) + qp = format_addrs([addr], clean=False) + elif transform == 'decode': + qp = format_addrs([addr], clean=True) + else: + qp = format_addrs([addr], clean=False) # See if there is enough room on the existing line if first: _parts[-1] += qp @@ -1450,14 +1455,19 @@ class LoreMessage: continue _parts[-1] += ', ' + qp else: - if not qpencode or hval.isascii(): + if transform == 'decode' and hval.find('?=') >= 0: + hdata = f'{hname}: ' + LoreMessage.clean_header(hval) + else: hdata = f'{hname}: {hval}' - # Use simple textwrap + if transform != 'encode' or hval.isascii(): if len(hdata) <= width: return hdata.encode() + # Use simple textwrap, with a small trick that ensures that long non-breakable + # strings don't show up on the next line from the bare header + hdata = hdata.replace(': ', ':_', 1) wrapped = textwrap.wrap(hdata, break_long_words=False, break_on_hyphens=False, subsequent_indent=' ', width=width) - return nl.join(wrapped).encode() + return nl.join(wrapped).replace(':_', ': ', 1).encode() qp = f'{hname}: ' + email.quoprimime.header_encode(hval.encode(), charset='utf-8') # is it longer than width? @@ -1479,10 +1489,11 @@ class LoreMessage: return f'{nl} '.join(_parts).encode() @staticmethod - def get_msg_as_bytes(msg: email.message.Message, nl: str ='\n') -> bytes: + def get_msg_as_bytes(msg: email.message.Message, nl: str ='\n', + headers: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes: bdata = b'' - for hdr in msg._headers: # noqa - bdata += LoreMessage.wrap_header(hdr, nl=nl) + nl.encode() + for hname, hval in msg.items(): + bdata += LoreMessage.wrap_header((hname, str(hval)), nl=nl, transform=headers) + nl.encode() bdata += nl.encode() payload = msg.get_payload(decode=True) for bline in payload.split(b'\n'): @@ -2956,10 +2967,10 @@ def format_addrs(pairs, clean=True): if clean: # Remove any quoted-printable header junk from the name pair = (LoreMessage.clean_header(pair[0]), pair[1]) - # Work around https://github.com/python/cpython/issues/100900 - if re.search(r'[^\w\s]', pair[0]): - addrs.append(f'"{pair[0]}" <{pair[1]}>') - continue + # Work around https://github.com/python/cpython/issues/100900 + if not pair[0].startswith('=?') and not pair[0].startswith('"') and re.search(r'[^\w\s]', pair[0]): + addrs.append(f'"{pair[0]}" <{pair[1]}>') + continue addrs.append(email.utils.formataddr(pair)) return ', '.join(addrs) @@ -3056,11 +3067,9 @@ def save_git_am_mbox(msgs: list, dest: BinaryIO): # unless invoked with --patch-format=mboxrd (this is wrong, because ">From " escapes are also # required in the original mbox "mboxo" format). # So, save in the format that git-am expects - gen = email.generator.BytesGenerator(dest, policy=emlpolicy) for msg in msgs: - msg.set_unixfrom('From git@z Thu Jan 1 00:00:00 1970') - gen.flatten(msg, unixfrom=True) - gen.write('\n') + dest.write(b'From git@z Thu Jan 1 00:00:00 1970\n') + dest.write(LoreMessage.get_msg_as_bytes(msg, headers='decode')) def save_maildir(msgs: list, dest): @@ -3075,7 +3084,7 @@ def save_maildir(msgs: list, dest): lsubj = LoreSubject(msg.get('subject', '')) slug = '%04d_%s' % (lsubj.counter, re.sub(r'\W+', '_', lsubj.subject).strip('_').lower()) with open(os.path.join(d_tmp, f'{slug}.eml'), 'wb') as mfh: - mfh.write(msg.as_bytes(policy=emlpolicy)) + mfh.write(LoreMessage.get_msg_as_bytes(msg, headers='decode')) os.rename(os.path.join(d_tmp, f'{slug}.eml'), os.path.join(d_new, f'{slug}.eml')) @@ -3295,8 +3304,8 @@ def patchwork_set_state(msgids: List[str], state: str) -> bool: def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, None], msgs: Sequence[email.message.Message], fromaddr: Optional[str], destaddrs: Optional[Union[set, list]] = None, patatt_sign: bool = False, dryrun: bool = False, - maxheaderlen: Optional[int] = None, output_dir: Optional[str] = None, - web_endpoint: Optional[str] = None, reflect: bool = False) -> Optional[int]: + output_dir: Optional[str] = None, web_endpoint: Optional[str] = None, + reflect: bool = False) -> Optional[int]: tosend = list() if output_dir is not None: @@ -3306,29 +3315,13 @@ def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, None], msgs: Sequence[ if not msg.get('X-Mailer'): msg.add_header('X-Mailer', f'b4 {__VERSION__}') msg.set_charset('utf-8') - if maxheaderlen is None: - if dryrun: - # Make it fit the terminal window, but no wider than 120 minus visual padding - ts = shutil.get_terminal_size((120, 20)) - maxheaderlen = ts.columns - 8 - if maxheaderlen > 112: - maxheaderlen = 112 - else: - # Use a sane-ish default (we don't need to stick to 80, but - # we need to make sure it's shorter than 255) - maxheaderlen = 120 - - if dryrun and not output_dir: - # Use 8bit-clean policy if we're dumping things to screen - emldata = msg.as_string(policy=emlpolicy, maxheaderlen=maxheaderlen) - bdata = emldata.encode() + + if dryrun or web_endpoint: + nl = '\n' else: - if dryrun or web_endpoint: - nl = '\n' - else: - nl = '\r\n' + nl = '\r\n' - bdata = LoreMessage.get_msg_as_bytes(msg, nl=nl) + bdata = LoreMessage.get_msg_as_bytes(msg, nl=nl, headers='encode') subject = msg.get('Subject', '') ls = LoreSubject(subject) diff --git a/tests/test___init__.py b/tests/test___init__.py index 4add46c..5b50d1d 100644 --- a/tests/test___init__.py +++ b/tests/test___init__.py @@ -22,8 +22,9 @@ def test_check_gpg_status(source, expected): @pytest.mark.parametrize('source,regex,flags,ismbox', [ (None, r'^From git@z ', 0, False), (None, r'\n\nFrom git@z ', 0, False), - ('save-8bit-clean', r'Unicôdé', 0, True), - ('save-7bit-clean', r'=\?utf-8\?q\?S=C3=BBbject\?=', 0, True), + ('save-7bit-clean', r'From: Unicôdé', 0, True), + # mailbox.mbox does not properly handle 8bit-clean headers + ('save-8bit-clean', r'From: Unicôdé', 0, False), ]) def test_save_git_am_mbox(sampledir, tmp_path, source, regex, flags, ismbox): import re @@ -113,56 +114,65 @@ def test_followup_trailers(sampledir, source, serargs, amargs, reference, b4cfg) assert ifh.getvalue().decode() == fh.read() -@pytest.mark.parametrize('hval,verify,qp', [ - ('short-ascii', 'short-ascii', True), - ('short-unicôde', '=?utf-8?q?short-unic=C3=B4de?=', True), +@pytest.mark.parametrize('hval,verify,tr', [ + ('short-ascii', 'short-ascii', 'encode'), + ('short-unicôde', '=?utf-8?q?short-unic=C3=B4de?=', 'encode'), # Long ascii (('Lorem ipsum dolor sit amet consectetur adipiscing elit ' 'sed do eiusmod tempor incididunt ut labore et dolore magna aliqua'), ('Lorem ipsum dolor sit amet consectetur adipiscing elit sed do\n' - ' eiusmod tempor incididunt ut labore et dolore magna aliqua'), True), + ' eiusmod tempor incididunt ut labore et dolore magna aliqua'), 'encode'), # Long unicode (('Lorem îpsum dolor sit amet consectetur adipiscing elît ' 'sed do eiusmod tempôr incididunt ut labore et dolôre magna aliqua'), ('=?utf-8?q?Lorem_=C3=AEpsum_dolor_sit_amet_consectetur_adipiscin?=\n' ' =?utf-8?q?g_el=C3=AEt_sed_do_eiusmod_temp=C3=B4r_incididunt_ut_labore_et?=\n' - ' =?utf-8?q?_dol=C3=B4re_magna_aliqua?='), True), + ' =?utf-8?q?_dol=C3=B4re_magna_aliqua?='), 'encode'), # Exactly 75 long ('Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu', - 'Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu', True), + 'Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu', 'encode'), # Unicode that breaks on escape boundary ('Lorem ipsum dolor sit amet consectetur adipiscin elît', - '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipiscin_el?=\n =?utf-8?q?=C3=AEt?=', True), + '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipiscin_el?=\n =?utf-8?q?=C3=AEt?=', 'encode'), # Unicode that's just 1 too long ('Lorem ipsum dolor sit amet consectetur adipi elît', - '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipi_el=C3=AE?=\n =?utf-8?q?t?=', True), + '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipi_el=C3=AE?=\n =?utf-8?q?t?=', 'encode'), # A single address - ('foo@example.com', 'foo@example.com', True), + ('foo@example.com', 'foo@example.com', 'encode'), # Two addresses - ('foo@example.com, bar@example.com', 'foo@example.com, bar@example.com', True), + ('foo@example.com, bar@example.com', 'foo@example.com, bar@example.com', 'encode'), # Mixed addresses - ('foo@example.com, Foo Bar <bar@example.com>', 'foo@example.com, Foo Bar <bar@example.com>', True), + ('foo@example.com, Foo Bar <bar@example.com>', 'foo@example.com, Foo Bar <bar@example.com>', 'encode'), # Mixed Unicode ('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>', - 'foo@example.com, Foo Bar <bar@example.com>, \n =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>', True), + 'foo@example.com, Foo Bar <bar@example.com>, \n =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>', 'encode'), ('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quux, Foo" <quux@example.com>', ('foo@example.com, Foo Bar <bar@example.com>, \n' - ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, "Quux, Foo" <quux@example.com>'), True), + ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, "Quux, Foo" <quux@example.com>'), 'encode'), ('01234567890123456789012345678901234567890123456789012345678901@example.org, ä <foo@example.org>', ('01234567890123456789012345678901234567890123456789012345678901@example.org, \n' - ' =?utf-8?q?=C3=A4?= <foo@example.org>'), True), + ' =?utf-8?q?=C3=A4?= <foo@example.org>'), 'encode'), # Test for https://github.com/python/cpython/issues/100900 ('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quûx, Foo" <quux@example.com>', ('foo@example.com, Foo Bar <bar@example.com>, \n' - ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, \n =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>'), True), + ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, \n =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>'), 'encode'), + # Test preserve + ('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quûx, Foo" <quux@example.com>', + 'foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, \n "Quûx, Foo" <quux@example.com>', + 'preserve'), + # Test decode + ('foo@example.com, Foo Bar <bar@example.com>, =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>', + 'foo@example.com, Foo Bar <bar@example.com>, \n "Quûx, Foo" <quux@example.com>', + 'decode'), ]) -def test_header_wrapping(sampledir, hval, verify, qp): +def test_header_wrapping(sampledir, hval, verify, tr): hname = 'To' if '@' in hval else "X-Header" - wrapped = b4.LoreMessage.wrap_header((hname, hval)) - assert wrapped == f'{hname}: {verify}'.encode() + wrapped = b4.LoreMessage.wrap_header((hname, hval), transform=tr) + assert wrapped.decode() == f'{hname}: {verify}' wname, wval = wrapped.split(b':', maxsplit=1) - cval = b4.LoreMessage.clean_header(wval.decode()) - assert cval == hval + if tr != 'decode': + cval = b4.LoreMessage.clean_header(wval.decode()) + assert cval == hval @pytest.mark.parametrize('pairs,verify,clean', [ |