emailsec.dkim

 1from .checker import check_dkim, DKIMCheck, DKIMResult
 2from .parser import DKIMSignature
 3from emailsec._utils import BodyAndHeaders, body_and_headers_for_canonicalization
 4
 5__all__ = [
 6    "check_dkim",
 7    "DKIMCheck",
 8    "DKIMResult",
 9    "DKIMSignature",
10    "BodyAndHeaders",
11    "body_and_headers_for_canonicalization",
12]
async def check_dkim( message: bytes, body_and_headers: BodyAndHeaders | None = None) -> DKIMCheck:
44async def check_dkim(
45    message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None
46) -> DKIMCheck:
47    if body_and_headers:
48        body, headers = body_and_headers
49    else:
50        body, headers = body_and_headers_for_canonicalization(message)
51
52    signatures = []
53    for header_name, raw_signature in headers.get("dkim-signature", []):
54        try:
55            sig = parse_dkim_header_field(raw_signature.decode())
56        except ValueError:
57            continue
58
59        signatures.append(((header_name, raw_signature), sig))
60
61    if not signatures:
62        return DKIMCheck(result=DKIMResult.PERMFAIL)
63
64    # Try to pick an aligned signature if multiple signatures are present
65    def _sort_sig(item: tuple[emailsec._utils.Header, DKIMSignature]) -> bool:
66        _, s = item
67        _, from_addr = email.utils.parseaddr(headers["from"][0][1].decode().strip())
68        rfc5322_from = from_addr.partition("@")[-1]
69        return is_dkim_aligned(s["d"], rfc5322_from)
70
71    # Verify the top 5 signatures and stop once one verifies successfully
72    for sig_header, parsed_sig in sorted(signatures, key=_sort_sig, reverse=True)[:5]:
73        try:
74            if await _verify_dkim_signature(
75                body, headers, sig_header, typing.cast(_DKIMStyleSig, parsed_sig)
76            ):
77                return DKIMCheck(
78                    result=DKIMResult.SUCCESS,
79                    domain=parsed_sig["d"],
80                    selector=parsed_sig["s"],
81                    signature=parsed_sig,
82                )
83        except errors.Temperror:
84            return DKIMCheck(
85                result=DKIMResult.TEMPFAIL,
86                domain=parsed_sig["d"],
87                selector=parsed_sig["s"],
88                signature=parsed_sig,
89            )
90        except errors.Permerror:
91            continue
92
93    return DKIMCheck(result=DKIMResult.PERMFAIL)
@dataclass
class DKIMCheck:
36@dataclass
37class DKIMCheck:
38    result: DKIMResult
39    domain: str | None = None
40    selector: str | None = None
41    signature: DKIMSignature | None = None
DKIMCheck( result: DKIMResult, domain: str | None = None, selector: str | None = None, signature: DKIMSignature | None = None)
result: DKIMResult
domain: str | None = None
selector: str | None = None
signature: DKIMSignature | None = None
class DKIMResult(enum.StrEnum):
30class DKIMResult(StrEnum):
31    SUCCESS = "SUCCESS"
32    PERMFAIL = "PERMFAIL"
33    TEMPFAIL = "TEMPFAIL"
SUCCESS = <DKIMResult.SUCCESS: 'SUCCESS'>
PERMFAIL = <DKIMResult.PERMFAIL: 'PERMFAIL'>
TEMPFAIL = <DKIMResult.TEMPFAIL: 'TEMPFAIL'>
class DKIMSignature(typing.TypedDict):
65class DKIMSignature(typing.TypedDict):
66    v: str
67    a: str
68    b: str
69    bh: str
70    c: typing.NotRequired[str]
71    d: str
72    h: str
73    i: typing.NotRequired[str]
74    l: typing.NotRequired[int]  # noqa: E741
75    q: typing.NotRequired[str]
76    s: str
77    t: typing.NotRequired[int]
78    x: typing.NotRequired[int]
79    z: typing.NotRequired[str]
v: str
a: str
b: str
bh: str
c: NotRequired[str]
d: str
h: str
i: NotRequired[str]
l: NotRequired[int]
q: NotRequired[str]
s: str
t: NotRequired[int]
x: NotRequired[int]
z: NotRequired[str]
type BodyAndHeaders = tuple[bytes, dict[str, list[tuple[bytes, bytes]]]]
def body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders:
18def body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders:
19    """
20    Parse a raw email message into its body and headers for DKIM/ARC canonicalization.
21
22    This function splits the message at the first empty line and parses headers,
23    handling folded header values according to RFC 5322.
24
25    Args:
26        message: The raw email message as bytes.
27
28    Returns:
29        A tuple of (body, headers) where body is the raw body bytes and headers
30        is a dictionary mapping lowercase header names to lists of (name, value) tuples.
31    """
32    lines = re.split(b"\r?\n", message)
33
34    headers_idx = collections.defaultdict(list)
35    headers = []
36    for header_line in lines[: lines.index(b"")]:
37        if (m := re.match(rb"([\x21-\x7e]+?):", header_line)) is not None:
38            header_name = m.group(1)
39            header_value = header_line[m.end() :] + b"\r\n"
40            headers.append([header_name, header_value])
41        elif header_line.startswith(b" ") or header_line.startswith(b"\t"):
42            # Unfold header values
43            headers[-1][1] += header_line + b"\r\n"
44        else:
45            raise ValueError(f"Invalid line {header_line!r}")
46
47    for header_name, header_value in headers:
48        headers_idx[header_name.decode().lower()].append((header_name, header_value))
49
50    try:
51        # Split on the first empty line and join the remaining ones with CRLF
52        can_body = b"\r\n".join(lines[lines.index(b"") + 1 :])
53    except ValueError:
54        # No body defaults to CRLF
55        can_body = b"\r\n"
56
57    return can_body, dict(headers_idx)

Parse a raw email message into its body and headers for DKIM/ARC canonicalization.

This function splits the message at the first empty line and parses headers, handling folded header values according to RFC 5322.

Args: message: The raw email message as bytes.

Returns: A tuple of (body, headers) where body is the raw body bytes and headers is a dictionary mapping lowercase header names to lists of (name, value) tuples.