emailsec.dnsbl

  1import ipaddress
  2import asyncio
  3import time
  4from dataclasses import dataclass
  5from emailsec._dns_resolver import DNSResolver
  6import cachetools
  7
  8
  9@dataclass
 10class DNSBLEntry:
 11    return_code: str
 12    description: str | None
 13
 14
 15@dataclass
 16class DNSBLResult:
 17    ip_address: str
 18    is_listed: bool
 19    listed_on: list[str]
 20    entries: dict[str, DNSBLEntry]
 21    failed_queries: list[str]
 22
 23    def get_smtp_response(self) -> str | None:
 24        if self.is_listed:
 25            sources = ", ".join(self.listed_on)
 26            # Include description from first entry if available
 27            first_entry = next(iter(self.entries.values()), None)
 28            if first_entry and first_entry.description:
 29                return f"550 5.7.1 {first_entry.description}"
 30            return f"550 5.7.1 IP blocked by {sources}"
 31        return None
 32
 33
 34class DNSBLChecker:
 35    """DNSBL checker with caching."""
 36
 37    def __init__(self, cache_size: int = 1000):
 38        """Initialize DNSBL checker with cache.
 39
 40        Args:
 41            cache_size: Maximum number of cache entries
 42        """
 43        self._cache = cachetools.LRUCache(maxsize=cache_size)
 44        self._cache_expiry: dict[tuple[str, str], float] = {}
 45        self._resolver: DNSResolver | None = None
 46
 47    def _get_resolver(self) -> DNSResolver:
 48        if self._resolver is None:
 49            self._resolver = DNSResolver(no_max_lookups=True)
 50        return self._resolver
 51
 52    async def check_ip(self, ip: str, blocklists: list[str]) -> DNSBLResult:
 53        """Check if an IP address is listed in DNS blacklists.
 54
 55        Args:
 56            ip: IP address to check
 57            blocklists: List of DNSBL domains to query
 58
 59        Returns:
 60            DNSBLResult with listing status and details
 61        """
 62        # Sanity check, will raise ValueError if invalid
 63        ipaddress.ip_address(ip)
 64
 65        queries = []
 66        for blocklist in blocklists:
 67            queries.append(self._check_single_dnsbl(ip, blocklist))
 68
 69        results = await asyncio.gather(*queries, return_exceptions=True)
 70
 71        listed_on = []
 72        entries: dict[str, DNSBLEntry] = {}
 73        failed_queries = []
 74
 75        for i, result in enumerate(results):
 76            blocklist = blocklists[i]
 77
 78            if isinstance(result, Exception):
 79                failed_queries.append(blocklist)
 80            elif result:
 81                listed_on.append(blocklist)
 82                entries[blocklist] = result  # type: ignore
 83            # No/None results means it's not listed
 84
 85        return DNSBLResult(
 86            ip_address=ip,
 87            is_listed=bool(listed_on),
 88            listed_on=listed_on,
 89            entries=entries,
 90            failed_queries=failed_queries,
 91        )
 92
 93    async def _check_single_dnsbl(self, ip: str, blocklist: str) -> DNSBLEntry | None:
 94        cache_key = (ip, blocklist)
 95
 96        if cache_key in self._cache:
 97            expiry_time = self._cache_expiry.get(cache_key)
 98            if expiry_time and time.time() < expiry_time:
 99                return self._cache[cache_key]
100            else:
101                self._cache.pop(cache_key, None)
102                self._cache_expiry.pop(cache_key, None)
103
104        parsed_ip = ipaddress.ip_address(ip)
105        if parsed_ip.version == 4:
106            reversed_addr = parsed_ip.reverse_pointer.removesuffix(".in-addr.arpa")
107        else:
108            # IPv6: convert to nibble format as per RFC 5782 Section 2.4
109            hex_addr = parsed_ip.packed.hex()
110            reversed_addr = ".".join(reversed(hex_addr))
111
112        query_name = f"{reversed_addr}.{blocklist}"
113
114        # Query both A and TXT records as per RFC 5782
115        resolver = self._get_resolver()
116
117        a_result = await resolver.a(query_name)
118
119        ttl = 300
120        if a_result and hasattr(a_result[0], "ttl"):
121            ttl = a_result[0].ttl
122
123        if not a_result or not a_result[0].host.startswith("127."):
124            self._cache[cache_key] = None
125            self._cache_expiry[cache_key] = time.time() + ttl
126            return None
127
128        return_code = a_result[0].host
129
130        # Validate return code is in 127.0.0.0/8 range (RFC 5782 Section 2.1)
131        try:
132            code_ip = ipaddress.ip_address(return_code)
133            if not (code_ip.version == 4 and str(code_ip).startswith("127.")):
134                self._cache[cache_key] = None
135                self._cache_expiry[cache_key] = time.time() + ttl
136                return None
137        except ValueError:
138            self._cache[cache_key] = None
139            self._cache_expiry[cache_key] = time.time() + ttl
140            return None
141
142        # Fetch TXT record for description (RFC 5782 Section 2.1)
143        description = None
144        try:
145            txt_result = await resolver.txt(query_name)
146            if txt_result and txt_result[0].text:
147                desc = txt_result[0].text
148                if isinstance(desc, bytes):
149                    description = desc.decode("utf-8", errors="ignore")
150                else:
151                    description = str(desc)
152        except Exception:
153            # description is optional, let it fail silently
154            pass
155
156        result = DNSBLEntry(return_code=return_code, description=description)
157
158        self._cache[cache_key] = result
159        self._cache_expiry[cache_key] = time.time() + ttl
160
161        return result
@dataclass
class DNSBLEntry:
10@dataclass
11class DNSBLEntry:
12    return_code: str
13    description: str | None
DNSBLEntry(return_code: str, description: str | None)
return_code: str
description: str | None
@dataclass
class DNSBLResult:
16@dataclass
17class DNSBLResult:
18    ip_address: str
19    is_listed: bool
20    listed_on: list[str]
21    entries: dict[str, DNSBLEntry]
22    failed_queries: list[str]
23
24    def get_smtp_response(self) -> str | None:
25        if self.is_listed:
26            sources = ", ".join(self.listed_on)
27            # Include description from first entry if available
28            first_entry = next(iter(self.entries.values()), None)
29            if first_entry and first_entry.description:
30                return f"550 5.7.1 {first_entry.description}"
31            return f"550 5.7.1 IP blocked by {sources}"
32        return None
DNSBLResult( ip_address: str, is_listed: bool, listed_on: list[str], entries: dict[str, DNSBLEntry], failed_queries: list[str])
ip_address: str
is_listed: bool
listed_on: list[str]
entries: dict[str, DNSBLEntry]
failed_queries: list[str]
def get_smtp_response(self) -> str | None:
24    def get_smtp_response(self) -> str | None:
25        if self.is_listed:
26            sources = ", ".join(self.listed_on)
27            # Include description from first entry if available
28            first_entry = next(iter(self.entries.values()), None)
29            if first_entry and first_entry.description:
30                return f"550 5.7.1 {first_entry.description}"
31            return f"550 5.7.1 IP blocked by {sources}"
32        return None
class DNSBLChecker:
 35class DNSBLChecker:
 36    """DNSBL checker with caching."""
 37
 38    def __init__(self, cache_size: int = 1000):
 39        """Initialize DNSBL checker with cache.
 40
 41        Args:
 42            cache_size: Maximum number of cache entries
 43        """
 44        self._cache = cachetools.LRUCache(maxsize=cache_size)
 45        self._cache_expiry: dict[tuple[str, str], float] = {}
 46        self._resolver: DNSResolver | None = None
 47
 48    def _get_resolver(self) -> DNSResolver:
 49        if self._resolver is None:
 50            self._resolver = DNSResolver(no_max_lookups=True)
 51        return self._resolver
 52
 53    async def check_ip(self, ip: str, blocklists: list[str]) -> DNSBLResult:
 54        """Check if an IP address is listed in DNS blacklists.
 55
 56        Args:
 57            ip: IP address to check
 58            blocklists: List of DNSBL domains to query
 59
 60        Returns:
 61            DNSBLResult with listing status and details
 62        """
 63        # Sanity check, will raise ValueError if invalid
 64        ipaddress.ip_address(ip)
 65
 66        queries = []
 67        for blocklist in blocklists:
 68            queries.append(self._check_single_dnsbl(ip, blocklist))
 69
 70        results = await asyncio.gather(*queries, return_exceptions=True)
 71
 72        listed_on = []
 73        entries: dict[str, DNSBLEntry] = {}
 74        failed_queries = []
 75
 76        for i, result in enumerate(results):
 77            blocklist = blocklists[i]
 78
 79            if isinstance(result, Exception):
 80                failed_queries.append(blocklist)
 81            elif result:
 82                listed_on.append(blocklist)
 83                entries[blocklist] = result  # type: ignore
 84            # No/None results means it's not listed
 85
 86        return DNSBLResult(
 87            ip_address=ip,
 88            is_listed=bool(listed_on),
 89            listed_on=listed_on,
 90            entries=entries,
 91            failed_queries=failed_queries,
 92        )
 93
 94    async def _check_single_dnsbl(self, ip: str, blocklist: str) -> DNSBLEntry | None:
 95        cache_key = (ip, blocklist)
 96
 97        if cache_key in self._cache:
 98            expiry_time = self._cache_expiry.get(cache_key)
 99            if expiry_time and time.time() < expiry_time:
100                return self._cache[cache_key]
101            else:
102                self._cache.pop(cache_key, None)
103                self._cache_expiry.pop(cache_key, None)
104
105        parsed_ip = ipaddress.ip_address(ip)
106        if parsed_ip.version == 4:
107            reversed_addr = parsed_ip.reverse_pointer.removesuffix(".in-addr.arpa")
108        else:
109            # IPv6: convert to nibble format as per RFC 5782 Section 2.4
110            hex_addr = parsed_ip.packed.hex()
111            reversed_addr = ".".join(reversed(hex_addr))
112
113        query_name = f"{reversed_addr}.{blocklist}"
114
115        # Query both A and TXT records as per RFC 5782
116        resolver = self._get_resolver()
117
118        a_result = await resolver.a(query_name)
119
120        ttl = 300
121        if a_result and hasattr(a_result[0], "ttl"):
122            ttl = a_result[0].ttl
123
124        if not a_result or not a_result[0].host.startswith("127."):
125            self._cache[cache_key] = None
126            self._cache_expiry[cache_key] = time.time() + ttl
127            return None
128
129        return_code = a_result[0].host
130
131        # Validate return code is in 127.0.0.0/8 range (RFC 5782 Section 2.1)
132        try:
133            code_ip = ipaddress.ip_address(return_code)
134            if not (code_ip.version == 4 and str(code_ip).startswith("127.")):
135                self._cache[cache_key] = None
136                self._cache_expiry[cache_key] = time.time() + ttl
137                return None
138        except ValueError:
139            self._cache[cache_key] = None
140            self._cache_expiry[cache_key] = time.time() + ttl
141            return None
142
143        # Fetch TXT record for description (RFC 5782 Section 2.1)
144        description = None
145        try:
146            txt_result = await resolver.txt(query_name)
147            if txt_result and txt_result[0].text:
148                desc = txt_result[0].text
149                if isinstance(desc, bytes):
150                    description = desc.decode("utf-8", errors="ignore")
151                else:
152                    description = str(desc)
153        except Exception:
154            # description is optional, let it fail silently
155            pass
156
157        result = DNSBLEntry(return_code=return_code, description=description)
158
159        self._cache[cache_key] = result
160        self._cache_expiry[cache_key] = time.time() + ttl
161
162        return result

DNSBL checker with caching.

DNSBLChecker(cache_size: int = 1000)
38    def __init__(self, cache_size: int = 1000):
39        """Initialize DNSBL checker with cache.
40
41        Args:
42            cache_size: Maximum number of cache entries
43        """
44        self._cache = cachetools.LRUCache(maxsize=cache_size)
45        self._cache_expiry: dict[tuple[str, str], float] = {}
46        self._resolver: DNSResolver | None = None

Initialize DNSBL checker with cache.

Args: cache_size: Maximum number of cache entries

async def check_ip(self, ip: str, blocklists: list[str]) -> DNSBLResult:
53    async def check_ip(self, ip: str, blocklists: list[str]) -> DNSBLResult:
54        """Check if an IP address is listed in DNS blacklists.
55
56        Args:
57            ip: IP address to check
58            blocklists: List of DNSBL domains to query
59
60        Returns:
61            DNSBLResult with listing status and details
62        """
63        # Sanity check, will raise ValueError if invalid
64        ipaddress.ip_address(ip)
65
66        queries = []
67        for blocklist in blocklists:
68            queries.append(self._check_single_dnsbl(ip, blocklist))
69
70        results = await asyncio.gather(*queries, return_exceptions=True)
71
72        listed_on = []
73        entries: dict[str, DNSBLEntry] = {}
74        failed_queries = []
75
76        for i, result in enumerate(results):
77            blocklist = blocklists[i]
78
79            if isinstance(result, Exception):
80                failed_queries.append(blocklist)
81            elif result:
82                listed_on.append(blocklist)
83                entries[blocklist] = result  # type: ignore
84            # No/None results means it's not listed
85
86        return DNSBLResult(
87            ip_address=ip,
88            is_listed=bool(listed_on),
89            listed_on=listed_on,
90            entries=entries,
91            failed_queries=failed_queries,
92        )

Check if an IP address is listed in DNS blacklists.

Args: ip: IP address to check blocklists: List of DNSBL domains to query

Returns: DNSBLResult with listing status and details