emailsec.dkim
1from .checker import check_dkim, DKIMCheck, DKIMResult 2from .parser import DKIMSignature 3from emailsec._utils import BodyAndHeaders, body_and_headers_for_canonicalization 4 5__all__ = [ 6 "check_dkim", 7 "DKIMCheck", 8 "DKIMResult", 9 "DKIMSignature", 10 "BodyAndHeaders", 11 "body_and_headers_for_canonicalization", 12]
44async def check_dkim( 45 message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None 46) -> DKIMCheck: 47 if body_and_headers: 48 body, headers = body_and_headers 49 else: 50 body, headers = body_and_headers_for_canonicalization(message) 51 52 signatures = [] 53 for header_name, raw_signature in headers.get("dkim-signature", []): 54 try: 55 sig = parse_dkim_header_field(raw_signature.decode()) 56 except ValueError: 57 continue 58 59 signatures.append(((header_name, raw_signature), sig)) 60 61 if not signatures: 62 return DKIMCheck(result=DKIMResult.PERMFAIL) 63 64 # Try to pick an aligned signature if multiple signatures are present 65 def _sort_sig(item: tuple[emailsec._utils.Header, DKIMSignature]) -> bool: 66 _, s = item 67 _, from_addr = email.utils.parseaddr(headers["from"][0][1].decode().strip()) 68 rfc5322_from = from_addr.partition("@")[-1] 69 return is_dkim_aligned(s["d"], rfc5322_from) 70 71 # Verify the top 5 signatures and stop once one verifies successfully 72 for sig_header, parsed_sig in sorted(signatures, key=_sort_sig, reverse=True)[:5]: 73 try: 74 if await _verify_dkim_signature( 75 body, headers, sig_header, typing.cast(_DKIMStyleSig, parsed_sig) 76 ): 77 return DKIMCheck( 78 result=DKIMResult.SUCCESS, 79 domain=parsed_sig["d"], 80 selector=parsed_sig["s"], 81 signature=parsed_sig, 82 ) 83 except errors.Temperror: 84 return DKIMCheck( 85 result=DKIMResult.TEMPFAIL, 86 domain=parsed_sig["d"], 87 selector=parsed_sig["s"], 88 signature=parsed_sig, 89 ) 90 except errors.Permerror: 91 continue 92 93 return DKIMCheck(result=DKIMResult.PERMFAIL)
@dataclass
class
DKIMCheck:
36@dataclass 37class DKIMCheck: 38 result: DKIMResult 39 domain: str | None = None 40 selector: str | None = None 41 signature: DKIMSignature | None = None
DKIMCheck( result: DKIMResult, domain: str | None = None, selector: str | None = None, signature: DKIMSignature | None = None)
result: DKIMResult
class
DKIMResult(enum.StrEnum):
30class DKIMResult(StrEnum): 31 SUCCESS = "SUCCESS" 32 PERMFAIL = "PERMFAIL" 33 TEMPFAIL = "TEMPFAIL"
SUCCESS =
<DKIMResult.SUCCESS: 'SUCCESS'>
PERMFAIL =
<DKIMResult.PERMFAIL: 'PERMFAIL'>
TEMPFAIL =
<DKIMResult.TEMPFAIL: 'TEMPFAIL'>
class
DKIMSignature(typing.TypedDict):
65class DKIMSignature(typing.TypedDict): 66 v: str 67 a: str 68 b: str 69 bh: str 70 c: typing.NotRequired[str] 71 d: str 72 h: str 73 i: typing.NotRequired[str] 74 l: typing.NotRequired[int] # noqa: E741 75 q: typing.NotRequired[str] 76 s: str 77 t: typing.NotRequired[int] 78 x: typing.NotRequired[int] 79 z: typing.NotRequired[str]
type BodyAndHeaders =
tuple[bytes, dict[str, list[tuple[bytes, bytes]]]]
def
body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders:
18def body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders: 19 """ 20 Parse a raw email message into its body and headers for DKIM/ARC canonicalization. 21 22 This function splits the message at the first empty line and parses headers, 23 handling folded header values according to RFC 5322. 24 25 Args: 26 message: The raw email message as bytes. 27 28 Returns: 29 A tuple of (body, headers) where body is the raw body bytes and headers 30 is a dictionary mapping lowercase header names to lists of (name, value) tuples. 31 """ 32 lines = re.split(b"\r?\n", message) 33 34 headers_idx = collections.defaultdict(list) 35 headers = [] 36 for header_line in lines[: lines.index(b"")]: 37 if (m := re.match(rb"([\x21-\x7e]+?):", header_line)) is not None: 38 header_name = m.group(1) 39 header_value = header_line[m.end() :] + b"\r\n" 40 headers.append([header_name, header_value]) 41 elif header_line.startswith(b" ") or header_line.startswith(b"\t"): 42 # Unfold header values 43 headers[-1][1] += header_line + b"\r\n" 44 else: 45 raise ValueError(f"Invalid line {header_line!r}") 46 47 for header_name, header_value in headers: 48 headers_idx[header_name.decode().lower()].append((header_name, header_value)) 49 50 try: 51 # Split on the first empty line and join the remaining ones with CRLF 52 can_body = b"\r\n".join(lines[lines.index(b"") + 1 :]) 53 except ValueError: 54 # No body defaults to CRLF 55 can_body = b"\r\n" 56 57 return can_body, dict(headers_idx)
Parse a raw email message into its body and headers for DKIM/ARC canonicalization.
This function splits the message at the first empty line and parses headers, handling folded header values according to RFC 5322.
Args: message: The raw email message as bytes.
Returns: A tuple of (body, headers) where body is the raw body bytes and headers is a dictionary mapping lowercase header names to lists of (name, value) tuples.