aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2023-01-10 17:32:21 -0500
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2023-01-10 17:32:21 -0500
commite28b60d62f08cefab20ef3c886473ea15070c2a9 (patch)
treeae56ba48a76c61409824ed5de97f820d41bd301f
parentc88e6a31442bc41e9b56df763ba8b30e64d18c93 (diff)
downloadb4-e28b60d62f08cefab20ef3c886473ea15070c2a9.tar.gz
Tweak wrap_header to allow decode/encode/clean operations
Sometimes we want to decode headers into 8bit-clean, sometimes we want to encode them, and sometimes we want to leave everything as-is. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r--b4/__init__.py81
-rw-r--r--tests/test___init__.py54
2 files changed, 69 insertions, 66 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index e52b025..10be82b 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -33,7 +33,7 @@ import requests
from pathlib import Path
from contextlib import contextmanager
-from typing import Optional, Tuple, Set, List, BinaryIO, Union, Sequence
+from typing import Optional, Tuple, Set, List, BinaryIO, Union, Sequence, Literal
from email import charset
charset.add_charset('utf-8', None)
@@ -850,8 +850,8 @@ class LoreSeries:
def save_cover(self, outfile):
# noinspection PyUnresolvedReferences
cover_msg = self.patches[0].get_am_message(add_trailers=False)
- with open(outfile, 'w') as fh:
- fh.write(cover_msg.as_string(policy=emlpolicy))
+ with open(outfile, 'wb') as fh:
+ fh.write(LoreMessage.get_msg_as_bytes(cover_msg, headers='decode'))
logger.critical('Cover: %s', outfile)
@@ -1430,15 +1430,20 @@ class LoreMessage:
return new_hdrval.strip()
@staticmethod
- def wrap_header(hdr, width: int = 75, nl: str = '\n', qpencode: bool = True) -> bytes:
+ def wrap_header(hdr, width: int = 75, nl: str = '\n',
+ transform: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes:
hname, hval = hdr
if hname.lower() in ('to', 'cc', 'from', 'x-original-from'):
_parts = [f'{hname}: ',]
first = True
for addr in email.utils.getaddresses([hval]):
- if not addr[0].isascii() and qpencode:
+ if transform == 'encode' and not addr[0].isascii():
addr = (email.quoprimime.header_encode(addr[0].encode(), charset='utf-8'), addr[1])
- qp = format_addrs([addr], clean=False)
+ qp = format_addrs([addr], clean=False)
+ elif transform == 'decode':
+ qp = format_addrs([addr], clean=True)
+ else:
+ qp = format_addrs([addr], clean=False)
# See if there is enough room on the existing line
if first:
_parts[-1] += qp
@@ -1450,14 +1455,19 @@ class LoreMessage:
continue
_parts[-1] += ', ' + qp
else:
- if not qpencode or hval.isascii():
+ if transform == 'decode' and hval.find('?=') >= 0:
+ hdata = f'{hname}: ' + LoreMessage.clean_header(hval)
+ else:
hdata = f'{hname}: {hval}'
- # Use simple textwrap
+ if transform != 'encode' or hval.isascii():
if len(hdata) <= width:
return hdata.encode()
+ # Use simple textwrap, with a small trick that ensures that long non-breakable
+ # strings don't show up on the next line from the bare header
+ hdata = hdata.replace(': ', ':_', 1)
wrapped = textwrap.wrap(hdata, break_long_words=False, break_on_hyphens=False,
subsequent_indent=' ', width=width)
- return nl.join(wrapped).encode()
+ return nl.join(wrapped).replace(':_', ': ', 1).encode()
qp = f'{hname}: ' + email.quoprimime.header_encode(hval.encode(), charset='utf-8')
# is it longer than width?
@@ -1479,10 +1489,11 @@ class LoreMessage:
return f'{nl} '.join(_parts).encode()
@staticmethod
- def get_msg_as_bytes(msg: email.message.Message, nl: str ='\n') -> bytes:
+ def get_msg_as_bytes(msg: email.message.Message, nl: str ='\n',
+ headers: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes:
bdata = b''
- for hdr in msg._headers: # noqa
- bdata += LoreMessage.wrap_header(hdr, nl=nl) + nl.encode()
+ for hname, hval in msg.items():
+ bdata += LoreMessage.wrap_header((hname, str(hval)), nl=nl, transform=headers) + nl.encode()
bdata += nl.encode()
payload = msg.get_payload(decode=True)
for bline in payload.split(b'\n'):
@@ -2956,10 +2967,10 @@ def format_addrs(pairs, clean=True):
if clean:
# Remove any quoted-printable header junk from the name
pair = (LoreMessage.clean_header(pair[0]), pair[1])
- # Work around https://github.com/python/cpython/issues/100900
- if re.search(r'[^\w\s]', pair[0]):
- addrs.append(f'"{pair[0]}" <{pair[1]}>')
- continue
+ # Work around https://github.com/python/cpython/issues/100900
+ if not pair[0].startswith('=?') and not pair[0].startswith('"') and re.search(r'[^\w\s]', pair[0]):
+ addrs.append(f'"{pair[0]}" <{pair[1]}>')
+ continue
addrs.append(email.utils.formataddr(pair))
return ', '.join(addrs)
@@ -3056,11 +3067,9 @@ def save_git_am_mbox(msgs: list, dest: BinaryIO):
# unless invoked with --patch-format=mboxrd (this is wrong, because ">From " escapes are also
# required in the original mbox "mboxo" format).
# So, save in the format that git-am expects
- gen = email.generator.BytesGenerator(dest, policy=emlpolicy)
for msg in msgs:
- msg.set_unixfrom('From git@z Thu Jan 1 00:00:00 1970')
- gen.flatten(msg, unixfrom=True)
- gen.write('\n')
+ dest.write(b'From git@z Thu Jan 1 00:00:00 1970\n')
+ dest.write(LoreMessage.get_msg_as_bytes(msg, headers='decode'))
def save_maildir(msgs: list, dest):
@@ -3075,7 +3084,7 @@ def save_maildir(msgs: list, dest):
lsubj = LoreSubject(msg.get('subject', ''))
slug = '%04d_%s' % (lsubj.counter, re.sub(r'\W+', '_', lsubj.subject).strip('_').lower())
with open(os.path.join(d_tmp, f'{slug}.eml'), 'wb') as mfh:
- mfh.write(msg.as_bytes(policy=emlpolicy))
+ mfh.write(LoreMessage.get_msg_as_bytes(msg, headers='decode'))
os.rename(os.path.join(d_tmp, f'{slug}.eml'), os.path.join(d_new, f'{slug}.eml'))
@@ -3295,8 +3304,8 @@ def patchwork_set_state(msgids: List[str], state: str) -> bool:
def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, None], msgs: Sequence[email.message.Message],
fromaddr: Optional[str], destaddrs: Optional[Union[set, list]] = None,
patatt_sign: bool = False, dryrun: bool = False,
- maxheaderlen: Optional[int] = None, output_dir: Optional[str] = None,
- web_endpoint: Optional[str] = None, reflect: bool = False) -> Optional[int]:
+ output_dir: Optional[str] = None, web_endpoint: Optional[str] = None,
+ reflect: bool = False) -> Optional[int]:
tosend = list()
if output_dir is not None:
@@ -3306,29 +3315,13 @@ def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, None], msgs: Sequence[
if not msg.get('X-Mailer'):
msg.add_header('X-Mailer', f'b4 {__VERSION__}')
msg.set_charset('utf-8')
- if maxheaderlen is None:
- if dryrun:
- # Make it fit the terminal window, but no wider than 120 minus visual padding
- ts = shutil.get_terminal_size((120, 20))
- maxheaderlen = ts.columns - 8
- if maxheaderlen > 112:
- maxheaderlen = 112
- else:
- # Use a sane-ish default (we don't need to stick to 80, but
- # we need to make sure it's shorter than 255)
- maxheaderlen = 120
-
- if dryrun and not output_dir:
- # Use 8bit-clean policy if we're dumping things to screen
- emldata = msg.as_string(policy=emlpolicy, maxheaderlen=maxheaderlen)
- bdata = emldata.encode()
+
+ if dryrun or web_endpoint:
+ nl = '\n'
else:
- if dryrun or web_endpoint:
- nl = '\n'
- else:
- nl = '\r\n'
+ nl = '\r\n'
- bdata = LoreMessage.get_msg_as_bytes(msg, nl=nl)
+ bdata = LoreMessage.get_msg_as_bytes(msg, nl=nl, headers='encode')
subject = msg.get('Subject', '')
ls = LoreSubject(subject)
diff --git a/tests/test___init__.py b/tests/test___init__.py
index 4add46c..5b50d1d 100644
--- a/tests/test___init__.py
+++ b/tests/test___init__.py
@@ -22,8 +22,9 @@ def test_check_gpg_status(source, expected):
@pytest.mark.parametrize('source,regex,flags,ismbox', [
(None, r'^From git@z ', 0, False),
(None, r'\n\nFrom git@z ', 0, False),
- ('save-8bit-clean', r'Unicôdé', 0, True),
- ('save-7bit-clean', r'=\?utf-8\?q\?S=C3=BBbject\?=', 0, True),
+ ('save-7bit-clean', r'From: Unicôdé', 0, True),
+ # mailbox.mbox does not properly handle 8bit-clean headers
+ ('save-8bit-clean', r'From: Unicôdé', 0, False),
])
def test_save_git_am_mbox(sampledir, tmp_path, source, regex, flags, ismbox):
import re
@@ -113,56 +114,65 @@ def test_followup_trailers(sampledir, source, serargs, amargs, reference, b4cfg)
assert ifh.getvalue().decode() == fh.read()
-@pytest.mark.parametrize('hval,verify,qp', [
- ('short-ascii', 'short-ascii', True),
- ('short-unicôde', '=?utf-8?q?short-unic=C3=B4de?=', True),
+@pytest.mark.parametrize('hval,verify,tr', [
+ ('short-ascii', 'short-ascii', 'encode'),
+ ('short-unicôde', '=?utf-8?q?short-unic=C3=B4de?=', 'encode'),
# Long ascii
(('Lorem ipsum dolor sit amet consectetur adipiscing elit '
'sed do eiusmod tempor incididunt ut labore et dolore magna aliqua'),
('Lorem ipsum dolor sit amet consectetur adipiscing elit sed do\n'
- ' eiusmod tempor incididunt ut labore et dolore magna aliqua'), True),
+ ' eiusmod tempor incididunt ut labore et dolore magna aliqua'), 'encode'),
# Long unicode
(('Lorem îpsum dolor sit amet consectetur adipiscing elît '
'sed do eiusmod tempôr incididunt ut labore et dolôre magna aliqua'),
('=?utf-8?q?Lorem_=C3=AEpsum_dolor_sit_amet_consectetur_adipiscin?=\n'
' =?utf-8?q?g_el=C3=AEt_sed_do_eiusmod_temp=C3=B4r_incididunt_ut_labore_et?=\n'
- ' =?utf-8?q?_dol=C3=B4re_magna_aliqua?='), True),
+ ' =?utf-8?q?_dol=C3=B4re_magna_aliqua?='), 'encode'),
# Exactly 75 long
('Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu',
- 'Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu', True),
+ 'Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu', 'encode'),
# Unicode that breaks on escape boundary
('Lorem ipsum dolor sit amet consectetur adipiscin elît',
- '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipiscin_el?=\n =?utf-8?q?=C3=AEt?=', True),
+ '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipiscin_el?=\n =?utf-8?q?=C3=AEt?=', 'encode'),
# Unicode that's just 1 too long
('Lorem ipsum dolor sit amet consectetur adipi elît',
- '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipi_el=C3=AE?=\n =?utf-8?q?t?=', True),
+ '=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipi_el=C3=AE?=\n =?utf-8?q?t?=', 'encode'),
# A single address
- ('foo@example.com', 'foo@example.com', True),
+ ('foo@example.com', 'foo@example.com', 'encode'),
# Two addresses
- ('foo@example.com, bar@example.com', 'foo@example.com, bar@example.com', True),
+ ('foo@example.com, bar@example.com', 'foo@example.com, bar@example.com', 'encode'),
# Mixed addresses
- ('foo@example.com, Foo Bar <bar@example.com>', 'foo@example.com, Foo Bar <bar@example.com>', True),
+ ('foo@example.com, Foo Bar <bar@example.com>', 'foo@example.com, Foo Bar <bar@example.com>', 'encode'),
# Mixed Unicode
('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>',
- 'foo@example.com, Foo Bar <bar@example.com>, \n =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>', True),
+ 'foo@example.com, Foo Bar <bar@example.com>, \n =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>', 'encode'),
('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quux, Foo" <quux@example.com>',
('foo@example.com, Foo Bar <bar@example.com>, \n'
- ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, "Quux, Foo" <quux@example.com>'), True),
+ ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, "Quux, Foo" <quux@example.com>'), 'encode'),
('01234567890123456789012345678901234567890123456789012345678901@example.org, ä <foo@example.org>',
('01234567890123456789012345678901234567890123456789012345678901@example.org, \n'
- ' =?utf-8?q?=C3=A4?= <foo@example.org>'), True),
+ ' =?utf-8?q?=C3=A4?= <foo@example.org>'), 'encode'),
# Test for https://github.com/python/cpython/issues/100900
('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quûx, Foo" <quux@example.com>',
('foo@example.com, Foo Bar <bar@example.com>, \n'
- ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, \n =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>'), True),
+ ' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, \n =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>'), 'encode'),
+ # Test preserve
+ ('foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quûx, Foo" <quux@example.com>',
+ 'foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, \n "Quûx, Foo" <quux@example.com>',
+ 'preserve'),
+ # Test decode
+ ('foo@example.com, Foo Bar <bar@example.com>, =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>',
+ 'foo@example.com, Foo Bar <bar@example.com>, \n "Quûx, Foo" <quux@example.com>',
+ 'decode'),
])
-def test_header_wrapping(sampledir, hval, verify, qp):
+def test_header_wrapping(sampledir, hval, verify, tr):
hname = 'To' if '@' in hval else "X-Header"
- wrapped = b4.LoreMessage.wrap_header((hname, hval))
- assert wrapped == f'{hname}: {verify}'.encode()
+ wrapped = b4.LoreMessage.wrap_header((hname, hval), transform=tr)
+ assert wrapped.decode() == f'{hname}: {verify}'
wname, wval = wrapped.split(b':', maxsplit=1)
- cval = b4.LoreMessage.clean_header(wval.decode())
- assert cval == hval
+ if tr != 'decode':
+ cval = b4.LoreMessage.clean_header(wval.decode())
+ assert cval == hval
@pytest.mark.parametrize('pairs,verify,clean', [