emailsec.dnsbl
1import ipaddress 2import asyncio 3import time 4from dataclasses import dataclass 5from emailsec._dns_resolver import DNSResolver 6import cachetools 7 8 9@dataclass 10class DNSBLEntry: 11 return_code: str 12 description: str | None 13 14 15@dataclass 16class DNSBLResult: 17 ip_address: str 18 is_listed: bool 19 listed_on: list[str] 20 entries: dict[str, DNSBLEntry] 21 failed_queries: list[str] 22 23 def get_smtp_response(self) -> str | None: 24 if self.is_listed: 25 sources = ", ".join(self.listed_on) 26 # Include description from first entry if available 27 first_entry = next(iter(self.entries.values()), None) 28 if first_entry and first_entry.description: 29 return f"550 5.7.1 {first_entry.description}" 30 return f"550 5.7.1 IP blocked by {sources}" 31 return None 32 33 34class DNSBLChecker: 35 """DNSBL checker with caching.""" 36 37 def __init__(self, cache_size: int = 1000): 38 """Initialize DNSBL checker with cache. 39 40 Args: 41 cache_size: Maximum number of cache entries 42 """ 43 self._cache = cachetools.LRUCache(maxsize=cache_size) 44 self._cache_expiry: dict[tuple[str, str], float] = {} 45 self._resolver: DNSResolver | None = None 46 47 def _get_resolver(self) -> DNSResolver: 48 if self._resolver is None: 49 self._resolver = DNSResolver(no_max_lookups=True) 50 return self._resolver 51 52 async def check_ip(self, ip: str, blocklists: list[str]) -> DNSBLResult: 53 """Check if an IP address is listed in DNS blacklists. 54 55 Args: 56 ip: IP address to check 57 blocklists: List of DNSBL domains to query 58 59 Returns: 60 DNSBLResult with listing status and details 61 """ 62 # Sanity check, will raise ValueError if invalid 63 ipaddress.ip_address(ip) 64 65 queries = [] 66 for blocklist in blocklists: 67 queries.append(self._check_single_dnsbl(ip, blocklist)) 68 69 results = await asyncio.gather(*queries, return_exceptions=True) 70 71 listed_on = [] 72 entries: dict[str, DNSBLEntry] = {} 73 failed_queries = [] 74 75 for i, result in enumerate(results): 76 blocklist = blocklists[i] 77 78 if isinstance(result, Exception): 79 failed_queries.append(blocklist) 80 elif result: 81 listed_on.append(blocklist) 82 entries[blocklist] = result # type: ignore 83 # No/None results means it's not listed 84 85 return DNSBLResult( 86 ip_address=ip, 87 is_listed=bool(listed_on), 88 listed_on=listed_on, 89 entries=entries, 90 failed_queries=failed_queries, 91 ) 92 93 async def _check_single_dnsbl(self, ip: str, blocklist: str) -> DNSBLEntry | None: 94 cache_key = (ip, blocklist) 95 96 if cache_key in self._cache: 97 expiry_time = self._cache_expiry.get(cache_key) 98 if expiry_time and time.time() < expiry_time: 99 return self._cache[cache_key] 100 else: 101 self._cache.pop(cache_key, None) 102 self._cache_expiry.pop(cache_key, None) 103 104 parsed_ip = ipaddress.ip_address(ip) 105 if parsed_ip.version == 4: 106 reversed_addr = parsed_ip.reverse_pointer.removesuffix(".in-addr.arpa") 107 else: 108 # IPv6: convert to nibble format as per RFC 5782 Section 2.4 109 hex_addr = parsed_ip.packed.hex() 110 reversed_addr = ".".join(reversed(hex_addr)) 111 112 query_name = f"{reversed_addr}.{blocklist}" 113 114 # Query both A and TXT records as per RFC 5782 115 resolver = self._get_resolver() 116 117 a_result = await resolver.a(query_name) 118 119 ttl = 300 120 if a_result and hasattr(a_result[0], "ttl"): 121 ttl = a_result[0].ttl 122 123 if not a_result or not a_result[0].host.startswith("127."): 124 self._cache[cache_key] = None 125 self._cache_expiry[cache_key] = time.time() + ttl 126 return None 127 128 return_code = a_result[0].host 129 130 # Validate return code is in 127.0.0.0/8 range (RFC 5782 Section 2.1) 131 try: 132 code_ip = ipaddress.ip_address(return_code) 133 if not (code_ip.version == 4 and str(code_ip).startswith("127.")): 134 self._cache[cache_key] = None 135 self._cache_expiry[cache_key] = time.time() + ttl 136 return None 137 except ValueError: 138 self._cache[cache_key] = None 139 self._cache_expiry[cache_key] = time.time() + ttl 140 return None 141 142 # Fetch TXT record for description (RFC 5782 Section 2.1) 143 description = None 144 try: 145 txt_result = await resolver.txt(query_name) 146 if txt_result and txt_result[0].text: 147 desc = txt_result[0].text 148 if isinstance(desc, bytes): 149 description = desc.decode("utf-8", errors="ignore") 150 else: 151 description = str(desc) 152 except Exception: 153 # description is optional, let it fail silently 154 pass 155 156 result = DNSBLEntry(return_code=return_code, description=description) 157 158 self._cache[cache_key] = result 159 self._cache_expiry[cache_key] = time.time() + ttl 160 161 return result
@dataclass
class
DNSBLEntry:
@dataclass
class
DNSBLResult:
16@dataclass 17class DNSBLResult: 18 ip_address: str 19 is_listed: bool 20 listed_on: list[str] 21 entries: dict[str, DNSBLEntry] 22 failed_queries: list[str] 23 24 def get_smtp_response(self) -> str | None: 25 if self.is_listed: 26 sources = ", ".join(self.listed_on) 27 # Include description from first entry if available 28 first_entry = next(iter(self.entries.values()), None) 29 if first_entry and first_entry.description: 30 return f"550 5.7.1 {first_entry.description}" 31 return f"550 5.7.1 IP blocked by {sources}" 32 return None
DNSBLResult( ip_address: str, is_listed: bool, listed_on: list[str], entries: dict[str, DNSBLEntry], failed_queries: list[str])
entries: dict[str, DNSBLEntry]
def
get_smtp_response(self) -> str | None:
24 def get_smtp_response(self) -> str | None: 25 if self.is_listed: 26 sources = ", ".join(self.listed_on) 27 # Include description from first entry if available 28 first_entry = next(iter(self.entries.values()), None) 29 if first_entry and first_entry.description: 30 return f"550 5.7.1 {first_entry.description}" 31 return f"550 5.7.1 IP blocked by {sources}" 32 return None
class
DNSBLChecker:
35class DNSBLChecker: 36 """DNSBL checker with caching.""" 37 38 def __init__(self, cache_size: int = 1000): 39 """Initialize DNSBL checker with cache. 40 41 Args: 42 cache_size: Maximum number of cache entries 43 """ 44 self._cache = cachetools.LRUCache(maxsize=cache_size) 45 self._cache_expiry: dict[tuple[str, str], float] = {} 46 self._resolver: DNSResolver | None = None 47 48 def _get_resolver(self) -> DNSResolver: 49 if self._resolver is None: 50 self._resolver = DNSResolver(no_max_lookups=True) 51 return self._resolver 52 53 async def check_ip(self, ip: str, blocklists: list[str]) -> DNSBLResult: 54 """Check if an IP address is listed in DNS blacklists. 55 56 Args: 57 ip: IP address to check 58 blocklists: List of DNSBL domains to query 59 60 Returns: 61 DNSBLResult with listing status and details 62 """ 63 # Sanity check, will raise ValueError if invalid 64 ipaddress.ip_address(ip) 65 66 queries = [] 67 for blocklist in blocklists: 68 queries.append(self._check_single_dnsbl(ip, blocklist)) 69 70 results = await asyncio.gather(*queries, return_exceptions=True) 71 72 listed_on = [] 73 entries: dict[str, DNSBLEntry] = {} 74 failed_queries = [] 75 76 for i, result in enumerate(results): 77 blocklist = blocklists[i] 78 79 if isinstance(result, Exception): 80 failed_queries.append(blocklist) 81 elif result: 82 listed_on.append(blocklist) 83 entries[blocklist] = result # type: ignore 84 # No/None results means it's not listed 85 86 return DNSBLResult( 87 ip_address=ip, 88 is_listed=bool(listed_on), 89 listed_on=listed_on, 90 entries=entries, 91 failed_queries=failed_queries, 92 ) 93 94 async def _check_single_dnsbl(self, ip: str, blocklist: str) -> DNSBLEntry | None: 95 cache_key = (ip, blocklist) 96 97 if cache_key in self._cache: 98 expiry_time = self._cache_expiry.get(cache_key) 99 if expiry_time and time.time() < expiry_time: 100 return self._cache[cache_key] 101 else: 102 self._cache.pop(cache_key, None) 103 self._cache_expiry.pop(cache_key, None) 104 105 parsed_ip = ipaddress.ip_address(ip) 106 if parsed_ip.version == 4: 107 reversed_addr = parsed_ip.reverse_pointer.removesuffix(".in-addr.arpa") 108 else: 109 # IPv6: convert to nibble format as per RFC 5782 Section 2.4 110 hex_addr = parsed_ip.packed.hex() 111 reversed_addr = ".".join(reversed(hex_addr)) 112 113 query_name = f"{reversed_addr}.{blocklist}" 114 115 # Query both A and TXT records as per RFC 5782 116 resolver = self._get_resolver() 117 118 a_result = await resolver.a(query_name) 119 120 ttl = 300 121 if a_result and hasattr(a_result[0], "ttl"): 122 ttl = a_result[0].ttl 123 124 if not a_result or not a_result[0].host.startswith("127."): 125 self._cache[cache_key] = None 126 self._cache_expiry[cache_key] = time.time() + ttl 127 return None 128 129 return_code = a_result[0].host 130 131 # Validate return code is in 127.0.0.0/8 range (RFC 5782 Section 2.1) 132 try: 133 code_ip = ipaddress.ip_address(return_code) 134 if not (code_ip.version == 4 and str(code_ip).startswith("127.")): 135 self._cache[cache_key] = None 136 self._cache_expiry[cache_key] = time.time() + ttl 137 return None 138 except ValueError: 139 self._cache[cache_key] = None 140 self._cache_expiry[cache_key] = time.time() + ttl 141 return None 142 143 # Fetch TXT record for description (RFC 5782 Section 2.1) 144 description = None 145 try: 146 txt_result = await resolver.txt(query_name) 147 if txt_result and txt_result[0].text: 148 desc = txt_result[0].text 149 if isinstance(desc, bytes): 150 description = desc.decode("utf-8", errors="ignore") 151 else: 152 description = str(desc) 153 except Exception: 154 # description is optional, let it fail silently 155 pass 156 157 result = DNSBLEntry(return_code=return_code, description=description) 158 159 self._cache[cache_key] = result 160 self._cache_expiry[cache_key] = time.time() + ttl 161 162 return result
DNSBL checker with caching.
DNSBLChecker(cache_size: int = 1000)
38 def __init__(self, cache_size: int = 1000): 39 """Initialize DNSBL checker with cache. 40 41 Args: 42 cache_size: Maximum number of cache entries 43 """ 44 self._cache = cachetools.LRUCache(maxsize=cache_size) 45 self._cache_expiry: dict[tuple[str, str], float] = {} 46 self._resolver: DNSResolver | None = None
Initialize DNSBL checker with cache.
Args: cache_size: Maximum number of cache entries
53 async def check_ip(self, ip: str, blocklists: list[str]) -> DNSBLResult: 54 """Check if an IP address is listed in DNS blacklists. 55 56 Args: 57 ip: IP address to check 58 blocklists: List of DNSBL domains to query 59 60 Returns: 61 DNSBLResult with listing status and details 62 """ 63 # Sanity check, will raise ValueError if invalid 64 ipaddress.ip_address(ip) 65 66 queries = [] 67 for blocklist in blocklists: 68 queries.append(self._check_single_dnsbl(ip, blocklist)) 69 70 results = await asyncio.gather(*queries, return_exceptions=True) 71 72 listed_on = [] 73 entries: dict[str, DNSBLEntry] = {} 74 failed_queries = [] 75 76 for i, result in enumerate(results): 77 blocklist = blocklists[i] 78 79 if isinstance(result, Exception): 80 failed_queries.append(blocklist) 81 elif result: 82 listed_on.append(blocklist) 83 entries[blocklist] = result # type: ignore 84 # No/None results means it's not listed 85 86 return DNSBLResult( 87 ip_address=ip, 88 is_listed=bool(listed_on), 89 listed_on=listed_on, 90 entries=entries, 91 failed_queries=failed_queries, 92 )
Check if an IP address is listed in DNS blacklists.
Args: ip: IP address to check blocklists: List of DNSBL domains to query
Returns: DNSBLResult with listing status and details