emailsec.dmarc

  1from enum import StrEnum
  2from dataclasses import dataclass
  3from typing import Literal
  4
  5import publicsuffixlist
  6from emailsec._dns_resolver import DNSResolver
  7from emailsec._alignment import is_spf_aligned, is_dkim_aligned, AlignmentMode
  8from emailsec.config import AuthenticationConfiguration
  9from emailsec import _errors as errors
 10from emailsec.spf.checker import SPFResult, SPFCheck
 11from emailsec.dkim.checker import DKIMResult, DKIMCheck
 12from emailsec.arc import ARCChainStatus, ARCCheck
 13from emailsec._authentication_results import extract_original_auth_results
 14
 15__all__ = [
 16    "check_dmarc",
 17    "get_dmarc_policy",
 18    "DMARCCheck",
 19    "DMARCResult",
 20    "DMARCPolicy",
 21    "DMARCRecord",
 22    "AlignmentMode",
 23]
 24
 25
 26class DMARCPolicy(StrEnum):
 27    NONE = "none"
 28    QUARANTINE = "quarantine"
 29    REJECT = "reject"
 30
 31
 32@dataclass
 33class DMARCRecord:
 34    """Holds a parsed DMARC DNS record."""
 35
 36    policy: DMARCPolicy
 37    spf_mode: AlignmentMode
 38    dkim_mode: AlignmentMode
 39    # percentage: int
 40
 41
 42class DMARCResult(StrEnum):
 43    """Defined in RFC 7489 Section 11.2"""
 44
 45    PASS = "pass"
 46    FAIL = "fail"
 47    NONE = "none"
 48    PERMERROR = "permerror"
 49    TEMPERROR = "temperror"
 50
 51
 52@dataclass
 53class DMARCCheck:
 54    result: DMARCResult
 55    policy: DMARCPolicy | None
 56    spf_aligned: bool | None = None
 57    dkim_aligned: bool | None = None
 58    arc_override_applied: bool = False
 59
 60
 61_DMARCError = Literal[DMARCResult.TEMPERROR] | Literal[DMARCResult.PERMERROR]
 62
 63
 64async def _fetch_dmarc_record(
 65    resolver: DNSResolver, domain: str
 66) -> tuple[DMARCRecord | None, _DMARCError | None]:
 67    """Fetch and parse DMARC record for the given domain."""
 68    try:
 69        txt_records = await resolver.txt(f"_dmarc.{domain}")
 70    except errors.Permerror:
 71        return None, DMARCResult.PERMERROR
 72    except errors.Temperror:
 73        return None, DMARCResult.TEMPERROR
 74
 75    if not txt_records:
 76        return None, None
 77
 78    try:
 79        record = parse_dmarc_record(txt_records[0].text)
 80        return record, None
 81    except Exception:
 82        return None, DMARCResult.PERMERROR
 83
 84
 85async def get_dmarc_policy(
 86    domain: str,
 87) -> tuple[DMARCRecord | None, _DMARCError | None]:
 88    """Fetch DMARC policy according to RFC 7489 Section 6.1."""
 89    resolver = DNSResolver()
 90
 91    record, error = await _fetch_dmarc_record(resolver, domain)
 92
 93    # Temp error means early return
 94    if error == DMARCResult.TEMPERROR or record:
 95        return record, error
 96
 97    # RFC 7489 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for
 98    # a DMARC TXT record at the DNS domain matching the Organizational
 99    # Domain in place of the RFC5322.From domain in the message (if
100    # different).
101    psl = publicsuffixlist.PublicSuffixList()
102    organizational_domain = psl.privatesuffix(domain.lower()) or domain
103
104    if organizational_domain != domain:
105        org_record, org_error = await _fetch_dmarc_record(
106            resolver, organizational_domain
107        )
108        # Return org domain result if we found a record or hit a temp error
109        if org_error == DMARCResult.TEMPERROR or org_record:
110            return org_record, org_error
111
112    # No policy found anywhere, return the original domain's result
113    return record, error
114
115
116def parse_dmarc_record(record: str) -> DMARCRecord:
117    tags = {}
118    for part in record.split(";"):
119        part = part.strip()
120        if "=" in part:
121            key, value = part.split("=", 1)
122            tags[key.strip()] = value.strip()
123
124    if "v" not in tags:
125        raise ValueError("Missing mandatory v=DMARC1 tag")
126
127    if tags["v"] != "DMARC1":
128        raise ValueError(f"Invalid DMARC version: {tags['v']}, expected DMARC1")
129
130    if "p" not in tags:
131        raise ValueError("Missing mandatory p= tag")
132
133    return DMARCRecord(
134        policy=DMARCPolicy(tags.get("p", "none")),
135        spf_mode="strict" if tags.get("aspf") == "s" else "relaxed",
136        dkim_mode="strict" if tags.get("adkim") == "s" else "relaxed",
137        # percentage=int(tags.get('pct', '100'))
138    )
139
140
141async def check_dmarc(
142    header_from: str,
143    envelope_from: str,
144    spf_check: SPFCheck,
145    dkim_check: DKIMCheck,
146    arc_check: ARCCheck,
147    configuration: AuthenticationConfiguration | None = None,
148) -> DMARCCheck:
149    """
150    DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).
151
152    RFC 7489: "A message satisfies the DMARC checks if at least one of the supported
153    authentication mechanisms: 1. produces a 'pass' result, and 2. produces that
154    result based on an identifier that is in alignment"
155    """
156
157    # Get DMARC policy (RFC 7489 Section 6.1)
158    dmarc_policy, error = await get_dmarc_policy(header_from)
159
160    # Return early if we hit temp/perm errors
161    if error:
162        return DMARCCheck(result=error, policy=None)
163
164    # No policy found
165    if not dmarc_policy:
166        return DMARCCheck(result=DMARCResult.NONE, policy=None)
167
168    # Check identifier alignment (RFC 7489 Section 3.1)
169    # SPF alignment: envelope sender domain vs header from domain
170    envelope_domain = (
171        envelope_from.split("@")[-1] if "@" in envelope_from else envelope_from
172    )
173    spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned(
174        envelope_domain, header_from, dmarc_policy.spf_mode
175    )
176
177    # DKIM alignment: signing domain (d=) vs header from domain
178    dkim_aligned = bool(
179        dkim_check.result == DKIMResult.SUCCESS
180        and dkim_check.domain
181        and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode)
182    )
183
184    # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes
185    dmarc_pass = spf_aligned or dkim_aligned
186
187    # ARC override logic (RFC 8617 Section 7.2.1)
188    # RFC 8617: "a DMARC processor MAY choose to accept the authentication
189    # assessments provided by an Authenticated Received Chain"
190    arc_override_applied = False
191    if (
192        not dmarc_pass
193        and configuration
194        and configuration.trusted_arc_signers
195        and arc_check.signer in configuration.trusted_arc_signers
196        and arc_check.result == ARCChainStatus.PASS
197        and arc_check.aar_header
198    ):
199        parsed_aar = extract_original_auth_results(
200            arc_check.result, arc_check.aar_header
201        )
202        if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass":
203            dmarc_pass = True
204            arc_override_applied = True
205
206    return DMARCCheck(
207        result=DMARCResult.PASS if dmarc_pass else DMARCResult.FAIL,
208        policy=dmarc_policy.policy,
209        spf_aligned=spf_aligned,
210        dkim_aligned=dkim_aligned,
211        arc_override_applied=arc_override_applied,
212    )
async def check_dmarc( header_from: str, envelope_from: str, spf_check: emailsec.spf.SPFCheck, dkim_check: emailsec.dkim.DKIMCheck, arc_check: emailsec.arc.ARCCheck, configuration: emailsec.AuthenticationConfiguration | None = None) -> DMARCCheck:
142async def check_dmarc(
143    header_from: str,
144    envelope_from: str,
145    spf_check: SPFCheck,
146    dkim_check: DKIMCheck,
147    arc_check: ARCCheck,
148    configuration: AuthenticationConfiguration | None = None,
149) -> DMARCCheck:
150    """
151    DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).
152
153    RFC 7489: "A message satisfies the DMARC checks if at least one of the supported
154    authentication mechanisms: 1. produces a 'pass' result, and 2. produces that
155    result based on an identifier that is in alignment"
156    """
157
158    # Get DMARC policy (RFC 7489 Section 6.1)
159    dmarc_policy, error = await get_dmarc_policy(header_from)
160
161    # Return early if we hit temp/perm errors
162    if error:
163        return DMARCCheck(result=error, policy=None)
164
165    # No policy found
166    if not dmarc_policy:
167        return DMARCCheck(result=DMARCResult.NONE, policy=None)
168
169    # Check identifier alignment (RFC 7489 Section 3.1)
170    # SPF alignment: envelope sender domain vs header from domain
171    envelope_domain = (
172        envelope_from.split("@")[-1] if "@" in envelope_from else envelope_from
173    )
174    spf_aligned = spf_check.result == SPFResult.PASS and is_spf_aligned(
175        envelope_domain, header_from, dmarc_policy.spf_mode
176    )
177
178    # DKIM alignment: signing domain (d=) vs header from domain
179    dkim_aligned = bool(
180        dkim_check.result == DKIMResult.SUCCESS
181        and dkim_check.domain
182        and is_dkim_aligned(dkim_check.domain, header_from, dmarc_policy.dkim_mode)
183    )
184
185    # RFC 7489: DMARC passes if either SPF or DKIM is aligned and passes
186    dmarc_pass = spf_aligned or dkim_aligned
187
188    # ARC override logic (RFC 8617 Section 7.2.1)
189    # RFC 8617: "a DMARC processor MAY choose to accept the authentication
190    # assessments provided by an Authenticated Received Chain"
191    arc_override_applied = False
192    if (
193        not dmarc_pass
194        and configuration
195        and configuration.trusted_arc_signers
196        and arc_check.signer in configuration.trusted_arc_signers
197        and arc_check.result == ARCChainStatus.PASS
198        and arc_check.aar_header
199    ):
200        parsed_aar = extract_original_auth_results(
201            arc_check.result, arc_check.aar_header
202        )
203        if parsed_aar and "dmarc" in parsed_aar and parsed_aar["dmarc"] == "pass":
204            dmarc_pass = True
205            arc_override_applied = True
206
207    return DMARCCheck(
208        result=DMARCResult.PASS if dmarc_pass else DMARCResult.FAIL,
209        policy=dmarc_policy.policy,
210        spf_aligned=spf_aligned,
211        dkim_aligned=dkim_aligned,
212        arc_override_applied=arc_override_applied,
213    )

DMARC evaluation per RFC 7489 Section 3 (Identifier Alignment).

RFC 7489: "A message satisfies the DMARC checks if at least one of the supported authentication mechanisms: 1. produces a 'pass' result, and 2. produces that result based on an identifier that is in alignment"

async def get_dmarc_policy( domain: str) -> tuple[DMARCRecord | None, typing.Union[typing.Literal[<DMARCResult.TEMPERROR: 'temperror'>], typing.Literal[<DMARCResult.PERMERROR: 'permerror'>], NoneType]]:
 86async def get_dmarc_policy(
 87    domain: str,
 88) -> tuple[DMARCRecord | None, _DMARCError | None]:
 89    """Fetch DMARC policy according to RFC 7489 Section 6.1."""
 90    resolver = DNSResolver()
 91
 92    record, error = await _fetch_dmarc_record(resolver, domain)
 93
 94    # Temp error means early return
 95    if error == DMARCResult.TEMPERROR or record:
 96        return record, error
 97
 98    # RFC 7489 Section 6.6.3: "If the set is now empty, the Mail Receiver MUST query the DNS for
 99    # a DMARC TXT record at the DNS domain matching the Organizational
100    # Domain in place of the RFC5322.From domain in the message (if
101    # different).
102    psl = publicsuffixlist.PublicSuffixList()
103    organizational_domain = psl.privatesuffix(domain.lower()) or domain
104
105    if organizational_domain != domain:
106        org_record, org_error = await _fetch_dmarc_record(
107            resolver, organizational_domain
108        )
109        # Return org domain result if we found a record or hit a temp error
110        if org_error == DMARCResult.TEMPERROR or org_record:
111            return org_record, org_error
112
113    # No policy found anywhere, return the original domain's result
114    return record, error

Fetch DMARC policy according to RFC 7489 Section 6.1.

@dataclass
class DMARCCheck:
53@dataclass
54class DMARCCheck:
55    result: DMARCResult
56    policy: DMARCPolicy | None
57    spf_aligned: bool | None = None
58    dkim_aligned: bool | None = None
59    arc_override_applied: bool = False
DMARCCheck( result: DMARCResult, policy: DMARCPolicy | None, spf_aligned: bool | None = None, dkim_aligned: bool | None = None, arc_override_applied: bool = False)
result: DMARCResult
policy: DMARCPolicy | None
spf_aligned: bool | None = None
dkim_aligned: bool | None = None
arc_override_applied: bool = False
class DMARCResult(enum.StrEnum):
43class DMARCResult(StrEnum):
44    """Defined in RFC 7489 Section 11.2"""
45
46    PASS = "pass"
47    FAIL = "fail"
48    NONE = "none"
49    PERMERROR = "permerror"
50    TEMPERROR = "temperror"

Defined in RFC 7489 Section 11.2

PASS = <DMARCResult.PASS: 'pass'>
FAIL = <DMARCResult.FAIL: 'fail'>
NONE = <DMARCResult.NONE: 'none'>
PERMERROR = <DMARCResult.PERMERROR: 'permerror'>
TEMPERROR = <DMARCResult.TEMPERROR: 'temperror'>
class DMARCPolicy(enum.StrEnum):
27class DMARCPolicy(StrEnum):
28    NONE = "none"
29    QUARANTINE = "quarantine"
30    REJECT = "reject"
NONE = <DMARCPolicy.NONE: 'none'>
QUARANTINE = <DMARCPolicy.QUARANTINE: 'quarantine'>
REJECT = <DMARCPolicy.REJECT: 'reject'>
@dataclass
class DMARCRecord:
33@dataclass
34class DMARCRecord:
35    """Holds a parsed DMARC DNS record."""
36
37    policy: DMARCPolicy
38    spf_mode: AlignmentMode
39    dkim_mode: AlignmentMode
40    # percentage: int

Holds a parsed DMARC DNS record.

DMARCRecord( policy: DMARCPolicy, spf_mode: Literal['relaxed', 'strict'], dkim_mode: Literal['relaxed', 'strict'])
policy: DMARCPolicy
spf_mode: Literal['relaxed', 'strict']
dkim_mode: Literal['relaxed', 'strict']
AlignmentMode = typing.Literal['relaxed', 'strict']