emailsec.dmarc
1from enum import StrEnum 2from dataclasses import dataclass 3from typing import Literal 4 5import publicsuffixlist 6from emailsec._dns_resolver import DNSResolver 7from emailsec._alignment import is_spf_aligned, is_dkim_aligned, AlignmentMode 8from emailsec.config import AuthenticationConfiguration 9from emailsec import _errors as errors 10from emailsec.spf.checker import SPFResult, SPFCheck 11from emailsec.dkim.checker import DKIMResult, DKIMCheck 12from emailsec.arc import ARCChainStatus, ARCCheck 13from emailsec._authentication_results import extract_original_auth_results 14 15__all__ = [ 16 "check_dmarc", 17 "get_dmarc_policy", 18 "DMARCCheck", 19 "DMARCResult", 20 "DMARCPolicy", 21 "DMARCRecord", 22 "AlignmentMode", 23] 24 25 26class DMARCPolicy(StrEnum): 27 NONE = "none" 28 QUARANTINE = "quarantine" 29 REJECT = "reject" 30 31 32@dataclass 33class DMARCRecord: 34 """Holds a parsed DMARC DNS record.""" 35 36 policy: DMARCPolicy 37 spf_mode: AlignmentMode 38 dkim_mode: AlignmentMode 39 # percentage: int 40 41 42class DMARCResult(StrEnum): 43 """Defined in RFC 7489 Section 11.2""" 44 45 PASS = "pass" 46 FAIL = "fail" 47 NONE = "none" 48 PERMERROR = "permerror" 49 TEMPERROR = "temperror" 50 51 52@dataclass 53class DMARCCheck: 54 result: DMARCResult 55 policy: DMARCPolicy | None 56 spf_aligned: bool | None = None 57 dkim_aligned: bool | None = None 58 arc_override_applied: bool = False 59 60 61_DMARCError = Literal[DMARCResult.TEMPERROR] | Literal[DMARCResult.PERMERROR] 62 63 64async def _fetch_dmarc_record( 65 resolver: DNSResolver, domain: str 66) -> tuple[DMARCRecord | None, _DMARCError | None]: 67 """Fetch and parse DMARC record for the given domain.""" 68 try: 69 txt_records = await resolver.txt(f"_dmarc.{domain}") 70 except errors.Permerror: 71 return None, DMARCResult.PERMERROR 72 except errors.Temperror: 73 return None, DMARCResult.TEMPERROR 74 75 if not txt_records: 76 return None, None 77 78 try: 79 record = parse_dmarc_record(txt_records[0].text) 80 return record, None 81 except Exception: 82 return None, DMARCResult.PERMERROR 83 84 85async def get_dmarc_policy( 86 domain: str, 87) -> tuple[DMARCRecord | None, _DMARCError | None]: 88 """Fetch DMARC policy according to RFC 7489 Section 6.1.""" 89 resolver = DNSResolver() 90 91 record, error = await _fetch_dmarc_record(resolver, domain) 92 93 # Temp error means early return 94 if error == DMARCResult.TEMPERROR or record: 95 return record, error 96 97 # RFC 7489 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for 98 # a DMARC TXT record at the DNS domain matching the Organizational 99 # Domain in place of the RFC5322.From domain in the message (if 100 # different). 101 psl = publicsuffixlist.PublicSuffixList() 102 organizational_domain = psl.privatesuffix(domain.lower()) or domain 103 104 if organizational_domain != domain: 105 org_record, org_error = await _fetch_dmarc_record( 106 resolver, organizational_domain 107 ) 108 # Return org domain result if we found a record or hit a temp error 109 if org_error == DMARCResult.TEMPERROR or org_record: 110 return org_record, org_error 111 112 # No policy found anywhere, return the original domain's result 113 return record, error 114 115 116def parse_dmarc_record(record: str) -> DMARCRecord: 117 tags = {} 118 for part in record.split(";"): 119 part = part.strip() 120 if "=" in part: 121 key, value = part.split("=", 1) 122 tags[key.strip()] = value.strip() 123 124 if "v" not in tags: 125 raise ValueError("Missing mandatory v=DMARC1 tag") 126 127 if tags["v"] != "DMARC1": 128 raise ValueError(f"Invalid DMARC version: {tags['v']}, expected DMARC1") 129 130 if "p" not in tags: 131 raise ValueError("Missing mandatory p= tag") 132 133 return DMARCRecord( 134 policy=DMARCPolicy(tags.get("p", "none")), 135 spf_mode="strict" if tags.get("aspf") == "s" else "relaxed", 136 dkim_mode="strict" if tags.get("adkim") == "s" else "relaxed", 137 # percentage=int(tags.get('pct', '100')) 138 ) 139 140 141async def check_dmarc( 142 header_from: str, 143 envelope_from: str, 144 spf_check: SPFCheck, 145 dkim_check: DKIMCheck, 146 arc_check: ARCCheck, 147 configuration: AuthenticationConfiguration | None = None, 148) -> DMARCCheck: 149 """ 150 DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment). 151 152 RFC 7489: "A message satisfies the DMARC checks if at least one of the supported 153 authentication mechanisms: 1. produces a 'pass' result, and 2. produces that 154 result based on an identifier that is in alignment" 155 """ 156 157 # Get DMARC policy (RFC 7489 Section 6.1) 158 dmarc_policy, error = await get_dmarc_policy(header_from) 159 160 # Return early if we hit temp/perm errors 161 if error: 162 return DMARCCheck(result=error, policy=None) 163 164 # No policy found 165 if not dmarc_policy: 166 return DMARCCheck(result=DMARCResult.NONE, policy=None) 167 168 # Check identifier alignment (RFC 7489 Section 3.1) 169 # SPF alignment: envelope sender domain vs header from domain 170 envelope_domain = ( 171 envelope_from.split("@")[-1] if "@" in envelope_from else envelope_from 172 ) 173 spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned( 174 envelope_domain, header_from, dmarc_policy.spf_mode 175 ) 176 177 # DKIM alignment: signing domain (d=) vs header from domain 178 dkim_aligned = bool( 179 dkim_check.result == DKIMResult.SUCCESS 180 and dkim_check.domain 181 and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode) 182 ) 183 184 # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes 185 dmarc_pass = spf_aligned or dkim_aligned 186 187 # ARC override logic (RFC 8617 Section 7.2.1) 188 # RFC 8617: "a DMARC processor MAY choose to accept the authentication 189 # assessments provided by an Authenticated Received Chain" 190 arc_override_applied = False 191 if ( 192 not dmarc_pass 193 and configuration 194 and configuration.trusted_arc_signers 195 and arc_check.signer in configuration.trusted_arc_signers 196 and arc_check.result == ARCChainStatus.PASS 197 and arc_check.aar_header 198 ): 199 parsed_aar = extract_original_auth_results( 200 arc_check.result, arc_check.aar_header 201 ) 202 if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass": 203 dmarc_pass = True 204 arc_override_applied = True 205 206 return DMARCCheck( 207 result=DMARCResult.PASS if dmarc_pass else DMARCResult.FAIL, 208 policy=dmarc_policy.policy, 209 spf_aligned=spf_aligned, 210 dkim_aligned=dkim_aligned, 211 arc_override_applied=arc_override_applied, 212 )
async def
check_dmarc( header_from: str, envelope_from: str, spf_check: emailsec.spf.SPFCheck, dkim_check: emailsec.dkim.DKIMCheck, arc_check: emailsec.arc.ARCCheck, configuration: emailsec.AuthenticationConfiguration | None = None) -> DMARCCheck:
142async def check_dmarc( 143 header_from: str, 144 envelope_from: str, 145 spf_check: SPFCheck, 146 dkim_check: DKIMCheck, 147 arc_check: ARCCheck, 148 configuration: AuthenticationConfiguration | None = None, 149) -> DMARCCheck: 150 """ 151 DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment). 152 153 RFC 7489: "A message satisfies the DMARC checks if at least one of the supported 154 authentication mechanisms: 1. produces a 'pass' result, and 2. produces that 155 result based on an identifier that is in alignment" 156 """ 157 158 # Get DMARC policy (RFC 7489 Section 6.1) 159 dmarc_policy, error = await get_dmarc_policy(header_from) 160 161 # Return early if we hit temp/perm errors 162 if error: 163 return DMARCCheck(result=error, policy=None) 164 165 # No policy found 166 if not dmarc_policy: 167 return DMARCCheck(result=DMARCResult.NONE, policy=None) 168 169 # Check identifier alignment (RFC 7489 Section 3.1) 170 # SPF alignment: envelope sender domain vs header from domain 171 envelope_domain = ( 172 envelope_from.split("@")[-1] if "@" in envelope_from else envelope_from 173 ) 174 spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned( 175 envelope_domain, header_from, dmarc_policy.spf_mode 176 ) 177 178 # DKIM alignment: signing domain (d=) vs header from domain 179 dkim_aligned = bool( 180 dkim_check.result == DKIMResult.SUCCESS 181 and dkim_check.domain 182 and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode) 183 ) 184 185 # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes 186 dmarc_pass = spf_aligned or dkim_aligned 187 188 # ARC override logic (RFC 8617 Section 7.2.1) 189 # RFC 8617: "a DMARC processor MAY choose to accept the authentication 190 # assessments provided by an Authenticated Received Chain" 191 arc_override_applied = False 192 if ( 193 not dmarc_pass 194 and configuration 195 and configuration.trusted_arc_signers 196 and arc_check.signer in configuration.trusted_arc_signers 197 and arc_check.result == ARCChainStatus.PASS 198 and arc_check.aar_header 199 ): 200 parsed_aar = extract_original_auth_results( 201 arc_check.result, arc_check.aar_header 202 ) 203 if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass": 204 dmarc_pass = True 205 arc_override_applied = True 206 207 return DMARCCheck( 208 result=DMARCResult.PASS if dmarc_pass else DMARCResult.FAIL, 209 policy=dmarc_policy.policy, 210 spf_aligned=spf_aligned, 211 dkim_aligned=dkim_aligned, 212 arc_override_applied=arc_override_applied, 213 )
DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).
RFC 7489: "A message satisfies the DMARC checks if at least one of the supported authentication mechanisms: 1. produces a 'pass' result, and 2. produces that result based on an identifier that is in alignment"
async def
get_dmarc_policy( domain: str) -> tuple[DMARCRecord | None, typing.Union[typing.Literal[<DMARCResult.TEMPERROR: 'temperror'>], typing.Literal[<DMARCResult.PERMERROR: 'permerror'>], NoneType]]:
86async def get_dmarc_policy( 87 domain: str, 88) -> tuple[DMARCRecord | None, _DMARCError | None]: 89 """Fetch DMARC policy according to RFC 7489 Section 6.1.""" 90 resolver = DNSResolver() 91 92 record, error = await _fetch_dmarc_record(resolver, domain) 93 94 # Temp error means early return 95 if error == DMARCResult.TEMPERROR or record: 96 return record, error 97 98 # RFC 7489 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for 99 # a DMARC TXT record at the DNS domain matching the Organizational 100 # Domain in place of the RFC5322.From domain in the message (if 101 # different). 102 psl = publicsuffixlist.PublicSuffixList() 103 organizational_domain = psl.privatesuffix(domain.lower()) or domain 104 105 if organizational_domain != domain: 106 org_record, org_error = await _fetch_dmarc_record( 107 resolver, organizational_domain 108 ) 109 # Return org domain result if we found a record or hit a temp error 110 if org_error == DMARCResult.TEMPERROR or org_record: 111 return org_record, org_error 112 113 # No policy found anywhere, return the original domain's result 114 return record, error
Fetch DMARC policy according to RFC 7489 Section 6.1.
@dataclass
class
DMARCCheck:
53@dataclass 54class DMARCCheck: 55 result: DMARCResult 56 policy: DMARCPolicy | None 57 spf_aligned: bool | None = None 58 dkim_aligned: bool | None = None 59 arc_override_applied: bool = False
DMARCCheck( result: DMARCResult, policy: DMARCPolicy | None, spf_aligned: bool | None = None, dkim_aligned: bool | None = None, arc_override_applied: bool = False)
result: DMARCResult
policy: DMARCPolicy | None
class
DMARCResult(enum.StrEnum):
43class DMARCResult(StrEnum): 44 """Defined in RFC 7489 Section 11.2""" 45 46 PASS = "pass" 47 FAIL = "fail" 48 NONE = "none" 49 PERMERROR = "permerror" 50 TEMPERROR = "temperror"
Defined in RFC 7489 Section 11.2
PASS =
<DMARCResult.PASS: 'pass'>
FAIL =
<DMARCResult.FAIL: 'fail'>
NONE =
<DMARCResult.NONE: 'none'>
PERMERROR =
<DMARCResult.PERMERROR: 'permerror'>
TEMPERROR =
<DMARCResult.TEMPERROR: 'temperror'>
class
DMARCPolicy(enum.StrEnum):
NONE =
<DMARCPolicy.NONE: 'none'>
QUARANTINE =
<DMARCPolicy.QUARANTINE: 'quarantine'>
REJECT =
<DMARCPolicy.REJECT: 'reject'>
@dataclass
class
DMARCRecord:
33@dataclass 34class DMARCRecord: 35 """Holds a parsed DMARC DNS record.""" 36 37 policy: DMARCPolicy 38 spf_mode: AlignmentMode 39 dkim_mode: AlignmentMode 40 # percentage: int
Holds a parsed DMARC DNS record.
DMARCRecord( policy: DMARCPolicy, spf_mode: Literal['relaxed', 'strict'], dkim_mode: Literal['relaxed', 'strict'])
policy: DMARCPolicy
AlignmentMode =
typing.Literal['relaxed', 'strict']