emailsec.dkim

1from .checker import check_dkim, DKIMCheck, DKIMResult
2from .parser import DKIMSignature
3
4__all__ = [
5    "check_dkim",
6    "DKIMCheck",
7    "DKIMResult",
8    "DKIMSignature",
9]
async def check_dkim( message: bytes, body_and_headers: BodyAndHeaders | None = None) -> DKIMCheck:
40async def check_dkim(
41    message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None
42) -> DKIMCheck:
43    if body_and_headers:
44        body, headers = body_and_headers
45    else:
46        body, headers = body_and_headers_for_canonicalization(message)
47
48    signatures = []
49    for header_name, raw_signature in headers.get("dkim-signature", []):
50        try:
51            sig = parse_dkim_header_field(raw_signature.decode())
52        except ValueError:
53            continue
54
55        signatures.append(((header_name, raw_signature), sig))
56
57    if not signatures:
58        return DKIMCheck(result=DKIMResult.PERMFAIL)
59
60    # Try to pick an aligned signature if multiple signatures are present
61    def _sort_sig(item: tuple[emailsec._utils.Header, DKIMSignature]) -> bool:
62        _, s = item
63        _, from_addr = email.utils.parseaddr(headers["from"][0][1].decode().strip())
64        rfc5322_from = from_addr.partition("@")[-1]
65        return is_dkim_aligned(s["d"], rfc5322_from)
66
67    # Verify the top 5 signatures and stop once one verifies successfully
68    for sig_header, parsed_sig in sorted(signatures, key=_sort_sig, reverse=True)[:5]:
69        try:
70            if await _verify_dkim_signature(
71                body, headers, sig_header, typing.cast(_DKIMStyleSig, parsed_sig)
72            ):
73                return DKIMCheck(
74                    result=DKIMResult.SUCCESS,
75                    domain=parsed_sig["d"],
76                    selector=parsed_sig["s"],
77                    signature=parsed_sig,
78                )
79        except errors.Temperror:
80            return DKIMCheck(
81                result=DKIMResult.TEMPFAIL,
82                domain=parsed_sig["d"],
83                selector=parsed_sig["s"],
84                signature=parsed_sig,
85            )
86        except errors.Permerror:
87            continue
88
89    return DKIMCheck(result=DKIMResult.PERMFAIL)
@dataclass
class DKIMCheck:
32@dataclass
33class DKIMCheck:
34    result: DKIMResult
35    domain: str | None = None
36    selector: str | None = None
37    signature: DKIMSignature | None = None
DKIMCheck( result: DKIMResult, domain: str | None = None, selector: str | None = None, signature: DKIMSignature | None = None)
result: DKIMResult
domain: str | None = None
selector: str | None = None
signature: DKIMSignature | None = None
class DKIMResult(enum.StrEnum):
26class DKIMResult(StrEnum):
27    SUCCESS = "SUCCESS"
28    PERMFAIL = "PERMFAIL"
29    TEMPFAIL = "TEMPFAIL"
SUCCESS = <DKIMResult.SUCCESS: 'SUCCESS'>
PERMFAIL = <DKIMResult.PERMFAIL: 'PERMFAIL'>
TEMPFAIL = <DKIMResult.TEMPFAIL: 'TEMPFAIL'>
class DKIMSignature(typing.TypedDict):
65class DKIMSignature(typing.TypedDict):
66    v: str
67    a: str
68    b: str
69    bh: str
70    c: typing.NotRequired[str]
71    d: str
72    h: str
73    i: typing.NotRequired[str]
74    l: typing.NotRequired[int]  # noqa: E741
75    q: typing.NotRequired[str]
76    s: str
77    t: typing.NotRequired[int]
78    x: typing.NotRequired[int]
79    z: typing.NotRequired[str]
v: str
a: str
b: str
bh: str
c: NotRequired[str]
d: str
h: str
i: NotRequired[str]
l: NotRequired[int]
q: NotRequired[str]
s: str
t: NotRequired[int]
x: NotRequired[int]
z: NotRequired[str]