emailsec.arc
1from dataclasses import dataclass 2import typing 3import enum 4import re 5 6import emailsec._utils 7from emailsec.dkim.checker import ( 8 _verify_sig, 9 _verify_dkim_signature, 10) 11from emailsec.dkim.parser import ( 12 _algorithm, 13 headers_hash, 14 tag_lists, 15 _DKIMStyleSig, 16 _SigVerifier, 17 _CanonicalizationAlg, 18) 19from emailsec._utils import body_and_headers_for_canonicalization, BodyAndHeaders 20 21arc_message_signature = tag_lists 22arc_seal = tag_lists 23 24__all__ = [ 25 "check_arc", 26 "ARCCheck", 27 "ARCChainStatus", 28 "BodyAndHeaders", 29 "body_and_headers_for_canonicalization", 30] 31 32 33class ARCMessageSignature(typing.TypedDict): 34 i: int 35 a: str 36 b: str 37 bh: str 38 c: typing.NotRequired[str] 39 d: str 40 h: str 41 l: typing.NotRequired[int] # noqa: E741 42 q: typing.NotRequired[str] 43 s: str 44 t: typing.NotRequired[int] 45 x: typing.NotRequired[int] 46 z: typing.NotRequired[str] 47 48 49class ARCSeal(typing.TypedDict): 50 i: int 51 a: str 52 b: str 53 d: str 54 s: str 55 cv: str 56 t: typing.NotRequired[int] 57 58 59_ARC_SEAL_REQUIRED_FIELDS = {"i", "a", "b", "d", "s", "cv"} 60_ARC_MSG_SIG_REQUIRED_FIELDS = {"i", "a", "b", "bh", "d", "h", "s"} 61 62 63class ARCChainStatus(enum.StrEnum): 64 NONE = "none" 65 FAIL = "fail" 66 PASS = "pass" 67 68 69@dataclass 70class ARCCheck: 71 result: ARCChainStatus 72 exp: str 73 signer: str | None = None 74 aar_header: bytes | None = None 75 76 77def parse_arc_seal(data: str) -> ARCSeal: 78 sig: ARCSeal = {} # type: ignore 79 for result in arc_seal.parse_string(data, parse_all=True).as_list(): 80 field = result[0] 81 match field: 82 case "a" | "b" | "d" | "s" | "cv": 83 sig[field] = "".join(re.split(r"\s+", result[1])) 84 case "t" | "i": 85 try: 86 sig[field] = int(result[1]) 87 except ValueError as ve: 88 raise ValueError(f"Invalid field value {result=}") from ve 89 case "h": 90 # https://datatracker.ietf.org/doc/html/rfc8617#section-4.1.3 91 # must fail if h tag is found in seal 92 raise ValueError("h tag not allowed") 93 case _: 94 continue 95 if ( 96 missing_fields := set(sig.keys()) & _ARC_SEAL_REQUIRED_FIELDS 97 ) != _ARC_SEAL_REQUIRED_FIELDS: 98 raise ValueError(f"Missing required fields {missing_fields=}") 99 100 return sig 101 102 103def parse_arc_message_signature(data: str) -> ARCMessageSignature: 104 sig: ARCMessageSignature = {} # type: ignore 105 for result in arc_message_signature.parse_string(data, parse_all=True).as_list(): 106 field = result[0] 107 match field: 108 case "a" | "b" | "bh" | "c" | "d" | "h" | "q" | "s" | "z": 109 sig[field] = "".join(re.split(r"\s+", result[1])) 110 case "l" | "t" | "x" | "i": 111 try: 112 sig[field] = int(result[1]) 113 except ValueError as ve: 114 raise ValueError(f"Invalid field value {result=}") from ve 115 case _: 116 continue 117 118 if missing_fields := _ARC_MSG_SIG_REQUIRED_FIELDS - set(sig.keys()): 119 raise ValueError(f"Missing required fields {missing_fields=}") 120 121 return sig 122 123 124async def arc_seal_verify( 125 arc_set_headers: tuple[ 126 emailsec._utils.Header, emailsec._utils.Header, emailsec._utils.Header 127 ], 128 sig: ARCSeal, 129) -> bool: 130 header_canonicalization: _CanonicalizationAlg = "relaxed" 131 dkim_alg = _algorithm(sig["a"]) 132 133 # headers ordering: aar_header, ams_header, seal_header 134 headers_to_sign = list(arc_set_headers[:2]) 135 # the ARC-Seal is treated differently as the body hash needs to be stripped 136 sig_header = arc_set_headers[-1] 137 canonicalized_message = headers_hash( 138 headers_to_sign, 139 header_canonicalization, 140 sig_header, 141 ) 142 return await _verify_sig( 143 dkim_alg, typing.cast(_SigVerifier, sig), canonicalized_message 144 ) 145 146 147_ARC_INSTANCE = re.compile(rb"\s?i\s*=\s*(\d+)", re.MULTILINE | re.IGNORECASE) 148 149 150def _aar_instance(header_value: bytes) -> int: 151 if (match := re.search(_ARC_INSTANCE, header_value)) is not None: 152 return int(match.group(1)) 153 154 raise ValueError(f"Instance not found in {header_value=}") 155 156 157async def check_arc( 158 message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None 159) -> ARCCheck: 160 if body_and_headers: 161 body, headers = body_and_headers 162 else: 163 body, headers = body_and_headers_for_canonicalization(message) 164 165 arc_message_signatures = headers.get("arc-message-signature") 166 if not arc_message_signatures: 167 return ARCCheck(ARCChainStatus.NONE, "No ARC Sets") 168 arc_authentication_results = headers.get("arc-authentication-results", []) 169 arc_seals = headers.get("arc-seal", []) 170 171 if not ( 172 len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals) 173 ): 174 return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets") 175 176 if len(arc_authentication_results) > 50: 177 return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets") 178 179 parsed_ams = sorted( 180 ( 181 ( 182 parse_arc_message_signature(value.decode()), 183 (header_name, value), 184 ) 185 for header_name, value in headers["arc-message-signature"] 186 ), 187 key=lambda x: x[0]["i"], 188 ) 189 parsed_as = sorted( 190 ( 191 ( 192 parse_arc_seal(value.decode()), 193 (header_name, value), 194 ) 195 for header_name, value in headers["arc-seal"] 196 ), 197 key=lambda x: x[0]["i"], 198 ) 199 aars = sorted( 200 ( 201 ( 202 _aar_instance(value), 203 (header_name, value), 204 ) 205 for header_name, value in headers["arc-authentication-results"] 206 ), 207 key=lambda x: x[0], 208 ) 209 210 highest_validated_aar = None 211 highest_validated_signer = None 212 213 for instance in range(len(arc_message_signatures), 0, -1): 214 ams, ams_header = parsed_ams.pop() 215 if ams["i"] != instance: 216 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}") 217 218 seal, seal_header = parsed_as.pop() 219 if seal["i"] != instance: 220 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}") 221 222 aar_instance, aar_header = aars.pop() 223 if aar_instance != instance: 224 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}") 225 226 if instance == 1 and seal["cv"] != "none": 227 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}") 228 elif instance > 1 and seal["cv"] != "pass": 229 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}") 230 231 is_ams_valid = await _verify_dkim_signature( 232 body, headers, ams_header, typing.cast(_DKIMStyleSig, ams) 233 ) 234 if not is_ams_valid: 235 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}") 236 237 arc_set_headers = (aar_header, ams_header, seal_header) 238 239 is_seal_valid = await arc_seal_verify(arc_set_headers, seal) 240 if not is_seal_valid: 241 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}") 242 243 if highest_validated_aar is None: 244 highest_validated_aar = aar_header[1] 245 highest_validated_signer = seal["d"] 246 247 return ARCCheck( 248 ARCChainStatus.PASS, 249 "", 250 signer=highest_validated_signer, 251 aar_header=highest_validated_aar, 252 )
158async def check_arc( 159 message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None 160) -> ARCCheck: 161 if body_and_headers: 162 body, headers = body_and_headers 163 else: 164 body, headers = body_and_headers_for_canonicalization(message) 165 166 arc_message_signatures = headers.get("arc-message-signature") 167 if not arc_message_signatures: 168 return ARCCheck(ARCChainStatus.NONE, "No ARC Sets") 169 arc_authentication_results = headers.get("arc-authentication-results", []) 170 arc_seals = headers.get("arc-seal", []) 171 172 if not ( 173 len(arc_message_signatures) == len(arc_authentication_results) == len(arc_seals) 174 ): 175 return ARCCheck(ARCChainStatus.FAIL, "Uneven ARC Sets") 176 177 if len(arc_authentication_results) > 50: 178 return ARCCheck(ARCChainStatus.FAIL, "Too many ARC Sets") 179 180 parsed_ams = sorted( 181 ( 182 ( 183 parse_arc_message_signature(value.decode()), 184 (header_name, value), 185 ) 186 for header_name, value in headers["arc-message-signature"] 187 ), 188 key=lambda x: x[0]["i"], 189 ) 190 parsed_as = sorted( 191 ( 192 ( 193 parse_arc_seal(value.decode()), 194 (header_name, value), 195 ) 196 for header_name, value in headers["arc-seal"] 197 ), 198 key=lambda x: x[0]["i"], 199 ) 200 aars = sorted( 201 ( 202 ( 203 _aar_instance(value), 204 (header_name, value), 205 ) 206 for header_name, value in headers["arc-authentication-results"] 207 ), 208 key=lambda x: x[0], 209 ) 210 211 highest_validated_aar = None 212 highest_validated_signer = None 213 214 for instance in range(len(arc_message_signatures), 0, -1): 215 ams, ams_header = parsed_ams.pop() 216 if ams["i"] != instance: 217 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AMS for {instance=}") 218 219 seal, seal_header = parsed_as.pop() 220 if seal["i"] != instance: 221 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AS for {instance=}") 222 223 aar_instance, aar_header = aars.pop() 224 if aar_instance != instance: 225 return ARCCheck(ARCChainStatus.FAIL, f"Cannot find AAR for {instance=}") 226 227 if instance == 1 and seal["cv"] != "none": 228 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv must be none for {instance=}") 229 elif instance > 1 and seal["cv"] != "pass": 230 return ARCCheck(ARCChainStatus.FAIL, f"AMS cv fail for {instance=}") 231 232 is_ams_valid = await _verify_dkim_signature( 233 body, headers, ams_header, typing.cast(_DKIMStyleSig, ams) 234 ) 235 if not is_ams_valid: 236 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AMS for {instance=}") 237 238 arc_set_headers = (aar_header, ams_header, seal_header) 239 240 is_seal_valid = await arc_seal_verify(arc_set_headers, seal) 241 if not is_seal_valid: 242 return ARCCheck(ARCChainStatus.FAIL, f"Cannot verify AS for {instance=}") 243 244 if highest_validated_aar is None: 245 highest_validated_aar = aar_header[1] 246 highest_validated_signer = seal["d"] 247 248 return ARCCheck( 249 ARCChainStatus.PASS, 250 "", 251 signer=highest_validated_signer, 252 aar_header=highest_validated_aar, 253 )
@dataclass
class
ARCCheck:
70@dataclass 71class ARCCheck: 72 result: ARCChainStatus 73 exp: str 74 signer: str | None = None 75 aar_header: bytes | None = None
ARCCheck( result: ARCChainStatus, exp: str, signer: str | None = None, aar_header: bytes | None = None)
result: ARCChainStatus
class
ARCChainStatus(enum.StrEnum):
NONE =
<ARCChainStatus.NONE: 'none'>
FAIL =
<ARCChainStatus.FAIL: 'fail'>
PASS =
<ARCChainStatus.PASS: 'pass'>
type BodyAndHeaders =
tuple[bytes, dict[str, list[tuple[bytes, bytes]]]]
def
body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders:
18def body_and_headers_for_canonicalization(message: bytes) -> BodyAndHeaders: 19 """ 20 Parse a raw email message into its body and headers for DKIM/ARC canonicalization. 21 22 This function splits the message at the first empty line and parses headers, 23 handling folded header values according to RFC 5322. 24 25 Args: 26 message: The raw email message as bytes. 27 28 Returns: 29 A tuple of (body, headers) where body is the raw body bytes and headers 30 is a dictionary mapping lowercase header names to lists of (name, value) tuples. 31 """ 32 lines = re.split(b"\r?\n", message) 33 34 headers_idx = collections.defaultdict(list) 35 headers = [] 36 for header_line in lines[: lines.index(b"")]: 37 if (m := re.match(rb"([\x21-\x7e]+?):", header_line)) is not None: 38 header_name = m.group(1) 39 header_value = header_line[m.end() :] + b"\r\n" 40 headers.append([header_name, header_value]) 41 elif header_line.startswith(b" ") or header_line.startswith(b"\t"): 42 # Unfold header values 43 headers[-1][1] += header_line + b"\r\n" 44 else: 45 raise ValueError(f"Invalid line {header_line!r}") 46 47 for header_name, header_value in headers: 48 headers_idx[header_name.decode().lower()].append((header_name, header_value)) 49 50 try: 51 # Split on the first empty line and join the remaining ones with CRLF 52 can_body = b"\r\n".join(lines[lines.index(b"") + 1 :]) 53 except ValueError: 54 # No body defaults to CRLF 55 can_body = b"\r\n" 56 57 return can_body, dict(headers_idx)
Parse a raw email message into its body and headers for DKIM/ARC canonicalization.
This function splits the message at the first empty line and parses headers, handling folded header values according to RFC 5322.
Args: message: The raw email message as bytes.
Returns: A tuple of (body, headers) where body is the raw body bytes and headers is a dictionary mapping lowercase header names to lists of (name, value) tuples.