emailsec.arc

  1from dataclasses import dataclass
  2import typing
  3import enum
  4import re
  5
  6import emailsec._utils
  7from emailsec.dkim.checker import (
  8    _verify_sig,
  9    _verify_dkim_signature,
 10)
 11from emailsec.dkim.parser import (
 12    _algorithm,
 13    headers_hash,
 14    tag_lists,
 15    _DKIMStyleSig,
 16    _SigVerifier,
 17    _CanonicalizationAlg,
 18)
 19from emailsec._utils import body_and_headers_for_canonicalization, BodyAndHeaders
 20
 21arc_message_signature = tag_lists
 22arc_seal = tag_lists
 23
 24__all__ = [
 25    "check_arc",
 26    "ARCCheck",
 27    "ARCChainStatus",
 28    "BodyAndHeaders",
 29    "body_and_headers_for_canonicalization",
 30]
 31
 32
 33class ARCMessageSignature(typing.TypedDict):
 34    i: int
 35    a: str
 36    b: str
 37    bh: str
 38    c: typing.NotRequired[str]
 39    d: str
 40    h: str
 41    l: typing.NotRequired[int]  # noqa: E741
 42    q: typing.NotRequired[str]
 43    s: str
 44    t: typing.NotRequired[int]
 45    x: typing.NotRequired[int]
 46    z: typing.NotRequired[str]
 47
 48
 49class ARCSeal(typing.TypedDict):
 50    i: int
 51    a: str
 52    b: str
 53    d: str
 54    s: str
 55    cv: str
 56    t: typing.NotRequired[int]
 57
 58
 59_ARC_SEAL_REQUIRED_FIELDS = {"i", "a", "b", "d", "s", "cv"}
 60_ARC_MSG_SIG_REQUIRED_FIELDS = {"i", "a", "b", "bh", "d", "h", "s"}
 61
 62
 63class ARCChainStatus(enum.StrEnum):
 64    NONE = "none"
 65    FAIL = "fail"
 66    PASS = "pass"
 67
 68
 69@dataclass
 70class ARCCheck:
 71    result: ARCChainStatus
 72    exp: str
 73    signer: str | None = None
 74    aar_header: bytes | None = None
 75
 76
 77def parse_arc_seal(data: str) -> ARCSeal:
 78    sig: ARCSeal = {}  # type: ignore
 79    for result in arc_seal.parse_string(data, parse_all=True).as_list():
 80        field = result[0]
 81        match field:
 82            case "a" | "b" | "d" | "s" | "cv":
 83                sig[field] = "".join(re.split(r"\s+", result[1]))
 84            case "t" | "i":
 85                try:
 86                    sig[field] = int(result[1])
 87                except ValueError as ve:
 88                    raise ValueError(f"Invalid field value {result=}") from ve
 89            case "h":
 90                # https://datatracker.ietf.org/doc/html/rfc8617#section-4.1.3
 91                # must fail if h tag is found in seal
 92                raise ValueError("h tag not allowed")
 93            case _:
 94                continue
 95    if (
 96        missing_fields := set(sig.keys()) & _ARC_SEAL_REQUIRED_FIELDS
 97    ) != _ARC_SEAL_REQUIRED_FIELDS:
 98        raise ValueError(f"Missing required fields {missing_fields=}")
 99
100    return sig
101
102
103def parse_arc_message_signature(data: str) -> ARCMessageSignature:
104    sig: ARCMessageSignature = {}  # type: ignore
105    for result in arc_message_signature.parse_string(data, parse_all=True).as_list():
106        field = result[0]
107        match field:
108            case "a" | "b" | "bh" | "c" | "d" | "h" | "q" | "s" | "z":
109                sig[field] = "".join(re.split(r"\s+", result[1]))
110            case "l" | "t" | "x" | "i":
111                try:
112                    sig[field] = int(result[1])
113                except ValueError as ve:
114                    raise ValueError(f"Invalid field value {result=}") from ve
115            case _:
116                continue
117
118    if missing_fields := _ARC_MSG_SIG_REQUIRED_FIELDS - set(sig.keys()):
119        raise ValueError(f"Missing required fields {missing_fields=}")
120
121    return sig
122
123
124async def arc_seal_verify(
125    arc_set_headers: tuple[
126        emailsec._utils.Header, emailsec._utils.Header, emailsec._utils.Header
127    ],
128    sig: ARCSeal,
129) -> bool:
130    header_canonicalization: _CanonicalizationAlg = "relaxed"
131    dkim_alg = _algorithm(sig["a"])
132
133    # headers ordering: aar_header, ams_header, seal_header
134    headers_to_sign = list(arc_set_headers[:2])
135    # the ARC-Seal is treated differently as the body hash needs to be stripped
136    sig_header = arc_set_headers[-1]
137    canonicalized_message = headers_hash(
138        headers_to_sign,
139        header_canonicalization,
140        sig_header,
141    )
142    return await _verify_sig(
143        dkim_alg, typing.cast(_SigVerifier, sig), canonicalized_message
144    )
145
146
147_ARC_INSTANCE = re.compile(rb"\s?i\s*=\s*(\d+)", re.MULTILINE | re.IGNORECASE)
148
149
150def _aar_instance(header_value: bytes) -> int:
151    if (match := re.search(_ARC_INSTANCE, header_value)) is not None:
152        return int(match.group(1))
153
154    raise ValueError(f"Instance not found in {header_value=}")
155
156
157async def check_arc(
158    message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None
159) -> ARCCheck:
160    if body_and_headers:
161        body, headers = body_and_headers
162    else:
163        body, headers = body_and_headers_for_canonicalization(message)
164
165    arc_message_signatures = headers.get("arc-message-signature")
166    if not arc_message_signatures:
167        return ARCCheck(ARCChainStatus.NONE, "No ARC Sets")
168    arc_authentication_results = headers.get("arc-authentication-results", [])
169    arc_seals = headers.get("arc-seal", [])
170
171    if not (
172        len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals)
173    ):
174        return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets")
175
176    if len(arc_authentication_results) > 50:
177        return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets")
178
179    parsed_ams = sorted(
180        (
181            (
182                parse_arc_message_signature(value.decode()),
183                (header_name, value),
184            )
185            for header_name, value in headers["arc-message-signature"]
186        ),
187        key=lambda x: x[0]["i"],
188    )
189    parsed_as = sorted(
190        (
191            (
192                parse_arc_seal(value.decode()),
193                (header_name, value),
194            )
195            for header_name, value in headers["arc-seal"]
196        ),
197        key=lambda x: x[0]["i"],
198    )
199    aars = sorted(
200        (
201            (
202                _aar_instance(value),
203                (header_name, value),
204            )
205            for header_name, value in headers["arc-authentication-results"]
206        ),
207        key=lambda x: x[0],
208    )
209
210    highest_validated_aar = None
211    highest_validated_signer = None
212
213    for instance in range(len(arc_message_signatures), 0, -1):
214        ams, ams_header = parsed_ams.pop()
215        if ams["i"] != instance:
216            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}")
217
218        seal, seal_header = parsed_as.pop()
219        if seal["i"] != instance:
220            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}")
221
222        aar_instance, aar_header = aars.pop()
223        if aar_instance != instance:
224            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}")
225
226        if instance == 1 and seal["cv"] != "none":
227            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}")
228        elif instance > 1 and seal["cv"] != "pass":
229            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}")
230
231        is_ams_valid = await _verify_dkim_signature(
232            body, headers, ams_header, typing.cast(_DKIMStyleSig, ams)
233        )
234        if not is_ams_valid:
235            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}")
236
237        arc_set_headers = (aar_header, ams_header, seal_header)
238
239        is_seal_valid = await arc_seal_verify(arc_set_headers, seal)
240        if not is_seal_valid:
241            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}")
242
243        if highest_validated_aar is None:
244            highest_validated_aar = aar_header[1]
245            highest_validated_signer = seal["d"]
246
247    return ARCCheck(
248        ARCChainStatus.PASS,
249        "",
250        signer=highest_validated_signer,
251        aar_header=highest_validated_aar,
252    )
async def check_arc( message: bytes, body_and_headers: BodyAndHeaders | None = None) -> ARCCheck:
158async def check_arc(
159    message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None
160) -> ARCCheck:
161    if body_and_headers:
162        body, headers = body_and_headers
163    else:
164        body, headers = body_and_headers_for_canonicalization(message)
165
166    arc_message_signatures = headers.get("arc-message-signature")
167    if not arc_message_signatures:
168        return ARCCheck(ARCChainStatus.NONE, "No ARC Sets")
169    arc_authentication_results = headers.get("arc-authentication-results", [])
170    arc_seals = headers.get("arc-seal", [])
171
172    if not (
173        len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals)
174    ):
175        return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets")
176
177    if len(arc_authentication_results) > 50:
178        return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets")
179
180    parsed_ams = sorted(
181        (
182            (
183                parse_arc_message_signature(value.decode()),
184                (header_name, value),
185            )
186            for header_name, value in headers["arc-message-signature"]
187        ),
188        key=lambda x: x[0]["i"],
189    )
190    parsed_as = sorted(
191        (
192            (
193                parse_arc_seal(value.decode()),
194                (header_name, value),
195            )
196            for header_name, value in headers["arc-seal"]
197        ),
198        key=lambda x: x[0]["i"],
199    )
200    aars = sorted(
201        (
202            (
203                _aar_instance(value),
204                (header_name, value),
205            )
206            for header_name, value in headers["arc-authentication-results"]
207        ),
208        key=lambda x: x[0],
209    )
210
211    highest_validated_aar = None
212    highest_validated_signer = None
213
214    for instance in range(len(arc_message_signatures), 0, -1):
215        ams, ams_header = parsed_ams.pop()
216        if ams["i"] != instance:
217            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}")
218
219        seal, seal_header = parsed_as.pop()
220        if seal["i"] != instance:
221            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}")
222
223        aar_instance, aar_header = aars.pop()
224        if aar_instance != instance:
225            return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}")
226
227        if instance == 1 and seal["cv"] != "none":
228            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}")
229        elif instance > 1 and seal["cv"] != "pass":
230            return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}")
231
232        is_ams_valid = await _verify_dkim_signature(
233            body, headers, ams_header, typing.cast(_DKIMStyleSig, ams)
234        )
235        if not is_ams_valid:
236            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}")
237
238        arc_set_headers = (aar_header, ams_header, seal_header)
239
240        is_seal_valid = await arc_seal_verify(arc_set_headers, seal)
241        if not is_seal_valid:
242            return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}")
243
244        if highest_validated_aar is None:
245            highest_validated_aar = aar_header[1]
246            highest_validated_signer = seal["d"]
247
248    return ARCCheck(
249        ARCChainStatus.PASS,
250        "",
251        signer=highest_validated_signer,
252        aar_header=highest_validated_aar,
253    )
@dataclass
class ARCCheck:
70@dataclass
71class ARCCheck:
72    result: ARCChainStatus
73    exp: str
74    signer: str | None = None
75    aar_header: bytes | None = None
ARCCheck( result: ARCChainStatus, exp: str, signer: str | None = None, aar_header: bytes | None = None)
result: ARCChainStatus
exp: str
signer: str | None = None
aar_header: bytes | None = None
class ARCChainStatus(enum.StrEnum):
64class ARCChainStatus(enum.StrEnum):
65    NONE = "none"
66    FAIL = "fail"
67    PASS = "pass"
NONE = <ARCChainStatus.NONE: 'none'>
FAIL = <ARCChainStatus.FAIL: 'fail'>
PASS = <ARCChainStatus.PASS: 'pass'>
type BodyAndHeaders = tuple[bytes, dict[str, list[tuple[bytes, bytes]]]]
def body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders:
18def body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders:
19    """
20    Parse a raw email message into its body and headers for DKIM/ARC canonicalization.
21
22    This function splits the message at the first empty line and parses headers,
23    handling folded header values according to RFC 5322.
24
25    Args:
26        message: The raw email message as bytes.
27
28    Returns:
29        A tuple of (body, headers) where body is the raw body bytes and headers
30        is a dictionary mapping lowercase header names to lists of (name, value) tuples.
31    """
32    lines = re.split(b"\r?\n", message)
33
34    headers_idx = collections.defaultdict(list)
35    headers = []
36    for header_line in lines[: lines.index(b"")]:
37        if (m := re.match(rb"([\x21-\x7e]+?):", header_line)) is not None:
38            header_name = m.group(1)
39            header_value = header_line[m.end() :] + b"\r\n"
40            headers.append([header_name, header_value])
41        elif header_line.startswith(b" ") or header_line.startswith(b"\t"):
42            # Unfold header values
43            headers[-1][1] += header_line + b"\r\n"
44        else:
45            raise ValueError(f"Invalid line {header_line!r}")
46
47    for header_name, header_value in headers:
48        headers_idx[header_name.decode().lower()].append((header_name, header_value))
49
50    try:
51        # Split on the first empty line and join the remaining ones with CRLF
52        can_body = b"\r\n".join(lines[lines.index(b"") + 1 :])
53    except ValueError:
54        # No body defaults to CRLF
55        can_body = b"\r\n"
56
57    return can_body, dict(headers_idx)

Parse a raw email message into its body and headers for DKIM/ARC canonicalization.

This function splits the message at the first empty line and parses headers, handling folded header values according to RFC 5322.

Args: message: The raw email message as bytes.

Returns: A tuple of (body, headers) where body is the raw body bytes and headers is a dictionary mapping lowercase header names to lists of (name, value) tuples.