mirror of
https://github.com/bokysan/docker-postfix.git
synced 2024-11-10 08:55:39 +08:00
c7c56d3ff1
Test a few more functionalities in the image. Change how postfix message IDs are detected. Message ID can be set by the client and hence detecting them via regex is not the best way to go around it. This fix will actually look at the log line and try to determine if we're looking at the message ID or not.
357 lines
12 KiB
Python
357 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
|
|
Filter to anonyimize email addresses. It reads input line by line,
|
|
finds all emails in the input and masks them using given filter.
|
|
|
|
Big thanks to [Sergio Del Río Mayoral](https://github.com/sdelrio)
|
|
for the concept and the idea, although not a lot of the code went
|
|
into this commit in the end.
|
|
|
|
"""
|
|
|
|
import re
|
|
import logging
|
|
import typing
|
|
import json
|
|
import sys
|
|
import importlib
|
|
from urllib.parse import parse_qs
|
|
import hashlib
|
|
import hmac
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# BIG FAT NOTICE on emails and regular expressions:
|
|
# If you're planning on using a regular expression to validate an email: don't. Emails
|
|
# are much more complex than you would imagine and most regular expressions will not
|
|
# cover all usecases. Newer RFCs even allow for international (read: UTF-8) email addresses.
|
|
# Most of your favourite programming languages will have a dedicated library for validating
|
|
# addresses.
|
|
#
|
|
# This pattern below, should, however match anything that remotely looks like an email.
|
|
# It is too broad, though, as it will match things which are not considered valid email
|
|
# addresses as well. But for our use case, that's OK and more than sufficient.
|
|
EMAIL_CATCH_ALL_PATTERN = '([^ "\\[\\]<]+|".+")@(\[([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+|[A-Za-z0-9]+:.+)\]|([^ \\{}():;>]+(\.[^ \\{}():;]+)*))'
|
|
EMAIL_CATCH_ALL = re.compile(EMAIL_CATCH_ALL_PATTERN)
|
|
EMPTY_RESPONSE = json.dumps({})
|
|
|
|
"""A default filter, if none other is provided."""
|
|
DEFAULT_FILTER_CLASS: str = 'SmartFilter'
|
|
|
|
"""Map filter names to friendly names"""
|
|
FILTER_MAPPINGS = {
|
|
'default': DEFAULT_FILTER_CLASS,
|
|
'smart': 'SmartFilter',
|
|
'paranoid': 'ParanoidFilter',
|
|
'noop': 'Filter',
|
|
'hash': 'HashFilter',
|
|
}
|
|
|
|
"""
|
|
Helper function to check if the provided value is truthy or falsy.
|
|
Will throw an error on unknown values.
|
|
"""
|
|
def is_truthy(val: any, name: str) -> bool:
|
|
if val is None:
|
|
raise ValueError(f"Cannot evaluate empty (None) value for {name}")
|
|
if val == True:
|
|
return True
|
|
elif val == False:
|
|
return False
|
|
|
|
val = str(val).lower()
|
|
if val == 'y' or val == 'yes' or val == 't' or val == 'true' or val == '1':
|
|
return True
|
|
elif val == 'n' or val == 'no' or val == 'f' or val == 'false' or val == '0':
|
|
return False
|
|
|
|
raise ValueError(f"Cannot evaluate value '{val}' for {name}")
|
|
|
|
# -------------------------------------------------------------------------------- #
|
|
|
|
"""
|
|
Abstract base for all filters. Does nothing.
|
|
"""
|
|
class Filter():
|
|
MESSAGE_ID_LINE = "message-id="
|
|
MESSAGE_ID_LINE_LEN = len(MESSAGE_ID_LINE)
|
|
|
|
def init(self, args: 'dict[str, list[str]]') -> None:
|
|
pass
|
|
|
|
def is_message_id(self, match: re.match, msg: str) -> bool:
|
|
start = match.start()
|
|
email = match.group()
|
|
|
|
# Note that our regex will match thigs like "message-id=Issue1649523226559@postfix-mail.mail-system.svc.cluster.local"
|
|
# so we need to filter / check for these first
|
|
|
|
if email.startswith(self.MESSAGE_ID_LINE):
|
|
return True
|
|
|
|
if start >= self.MESSAGE_ID_LINE_LEN:
|
|
pos = start-1
|
|
while True:
|
|
char = msg[pos]
|
|
if char == '=':
|
|
break
|
|
elif char in '{<["\'':
|
|
pos = pos - 1
|
|
continue
|
|
|
|
return False
|
|
|
|
check = msg[pos-self.MESSAGE_ID_LINE_LEN+1:pos+1]
|
|
if check == self.MESSAGE_ID_LINE:
|
|
return True
|
|
|
|
return False
|
|
|
|
def replace(self, match: re.match, msg: str) -> str:
|
|
return match.group()
|
|
|
|
def processMessage(self, msg: str) -> typing.Optional[str]:
|
|
result = EMAIL_CATCH_ALL.sub(
|
|
lambda x: self.replace(x, msg), msg
|
|
)
|
|
return json.dumps({'msg': result}, ensure_ascii=False) if result != msg else EMPTY_RESPONSE
|
|
|
|
# -------------------------------------------------------------------------------- #
|
|
|
|
"""
|
|
This filter will take an educated guess at how to best mask the emails, specifically:
|
|
|
|
* It will leave the first and the last letter of the local part (if it's oly one letter, it will get repated)
|
|
* If the local part is in quotes, it will remove the quotes (Warning: if the email starts with a space, this might look weird in logs)
|
|
* It will replace all the letters inbetween with **ONE** asterisk
|
|
* It will replace everything but a TLD with a star
|
|
* Address-style domains will see the number replaced with stars
|
|
|
|
E.g.:
|
|
|
|
* `demo@example.org` -> `d*o@*******.org`
|
|
* `john.doe@example.solutions` -> `j*e@*******.solutions`
|
|
* `sa@localhost` -> `s*a@*********`
|
|
* `s@[192.168.8.10]` -> `s*s@[*.*.*.*]`
|
|
* `"multi....dot"@[IPv6:2001:db8:85a3:8d3:1319:8a2e:370:7348]` -> `m*t@[IPv6:*]`
|
|
|
|
This filter should still allow you to grep for lines relating to a specific email, if you know the original email, but you won't
|
|
be able to extract the original email back from the log.
|
|
|
|
"""
|
|
class SmartFilter(Filter):
|
|
mask_symbol: str = '*'
|
|
|
|
def init(self, args: 'dict[str, list[str]]') -> None:
|
|
if 'mask_symbol' in args and len(args['mask_symbol']) > 0:
|
|
self.mask_symbol = args['mask_symbol'][0]
|
|
|
|
super().init(args)
|
|
|
|
def mask_local(self, local: str) -> str:
|
|
if local[0] == '"' and local[-1] == '"':
|
|
return local[:2] + self.mask_symbol + local[-2:]
|
|
else:
|
|
return local[0] + self.mask_symbol + local[-1]
|
|
|
|
def mask_domain(self, domain: str) -> str:
|
|
if domain[0] == '[' and domain[-1] == ']': # Numerical domain
|
|
if ':' in domain[1:-1]:
|
|
left, right = domain.split(":", 1)
|
|
return left + ':' + (len(right)-1) * self.mask_symbol + ']'
|
|
else:
|
|
return '[' + self.mask_symbol + '.' + self.mask_symbol + '.' + self.mask_symbol + '.' + self.mask_symbol + ']'
|
|
elif '.' in domain: # Normal domain
|
|
s, tld = domain.rsplit('.', 1)
|
|
return len(s) * self.mask_symbol + '.' + tld
|
|
pass
|
|
else: # Local domain
|
|
return len(domain) * self.mask_symbol
|
|
|
|
def replace(self, match: re.match, msg: str) -> str:
|
|
email = match.group()
|
|
|
|
# Return the details unchanged if they look like Postfix message ID
|
|
if self.is_message_id(match, msg):
|
|
return email
|
|
|
|
# The "@" can show up in the local part, but shouldn't appear in the
|
|
# domain part (at least not that we know).
|
|
local, domain = email.rsplit("@", 1)
|
|
|
|
local = self.mask_local(local)
|
|
domain = self.mask_domain(domain)
|
|
|
|
return local + '@' + domain
|
|
|
|
# -------------------------------------------------------------------------------- #
|
|
|
|
"""
|
|
Paranoid filter is an extension of the smart filter. It will replace everything in the
|
|
local part with the `mask_symbol` and will leavel only top level domain (TLD) in the
|
|
domain part. Domain will be replaced by ONE occurence of the `mask_symbol`.
|
|
|
|
E.g.:
|
|
|
|
* `demo@example.org` -> `*@*.org`
|
|
* `john.doe@example.solutions` -> `*@*.solutions`
|
|
* `sa@localhost` -> `*@*`
|
|
* `s@[192.168.8.10]` -> `*@[*]`
|
|
* `"multi....dot"@[IPv6:2001:db8:85a3:8d3:1319:8a2e:370:7348]` -> `*@[IPv6:*]`
|
|
"""
|
|
class ParanoidFilter(SmartFilter):
|
|
|
|
def mask_local(self, local: str) -> str:
|
|
return self.mask_symbol
|
|
|
|
def mask_domain(self, domain: str) -> str:
|
|
if domain[0] == '[' and domain[-1] == ']': # Numerical domain
|
|
if ':' in domain[1:-1]:
|
|
left, right = domain.split(":", 1)
|
|
return left + ':*]'
|
|
else:
|
|
return '[*]'
|
|
elif '.' in domain: # Normal domain
|
|
s, tld = domain.rsplit('.', 1)
|
|
return self.mask_symbol + '.' + tld
|
|
pass
|
|
else: # Local domain
|
|
return self.mask_symbol
|
|
|
|
# -------------------------------------------------------------------------------- #
|
|
|
|
"""
|
|
HashFilter will replace the email with its hash. This should allow you to grep through
|
|
the logs by hashing the email address yourself, while retaining the anonymity.
|
|
|
|
HashFilter uses HMAC to hash addresses (SHA256), so make sure you inject your unique salt.
|
|
This way, the calculated hashes will be different accross organizations, though providing
|
|
complete anonymity.
|
|
|
|
Notice that for IP addresses hashes are not appropriate, the set of IP addresses is
|
|
limited and rainbow tables could be used to get back IP addresses. This is not possible
|
|
with emails.
|
|
|
|
IANAL, though, so your mileage my vary.
|
|
|
|
"""
|
|
class HashFilter(Filter):
|
|
prefix: str = '' # Prefix emails with this (set of) character(s) for easier grepping
|
|
suffix: str = '' # Suffix emails with this (set of) character(s) for easier grepping
|
|
case_sensitive: bool = True # Convert strings to lowercase if false
|
|
short_sha: bool = False # Cut the string to first 8 characters
|
|
split: bool = False # Split and anonymize local and domain part independently
|
|
|
|
def init(self, args: 'dict[str, list[str]]') -> None:
|
|
if 'salt' in args and len(args['salt']) > 0:
|
|
self.salt_encoded = args['salt'][0].encode()
|
|
else:
|
|
raise ValueError("You need to specify salt with this filter!")
|
|
|
|
if 'prefix' in args and len(args['prefix']) > 0:
|
|
self.prefix = args['prefix'][0]
|
|
if 'suffix' in args and len(args['suffix']) > 0:
|
|
self.suffix = args['suffix'][0]
|
|
if 'case_sensitive' in args and len(args['case_sensitive']) > 0:
|
|
self.case_sensitive = is_truthy(args['case_sensitive'][0], 'case_sensitive')
|
|
if 'short_sha' in args and len(args['short_sha']) > 0:
|
|
self.short_sha = is_truthy(args['short_sha'][0], 'short_sha')
|
|
if 'split' in args and len(args['split']) > 0:
|
|
self.split = is_truthy(args['split'][0], 'split')
|
|
|
|
super().init(args)
|
|
|
|
def replace(self, match: re.match, msg: str) -> str:
|
|
email = match.group()
|
|
|
|
# Return the details unchanged if they look like Postfix message ID
|
|
if self.is_message_id(match, msg):
|
|
return email
|
|
|
|
if not self.case_sensitive:
|
|
email = email.lower()
|
|
|
|
if self.split:
|
|
# The "@" can show up in the local part, but shouldn't appear in the
|
|
# domain part (at least not that we know).
|
|
local, domain = email.rsplit("@", 1)
|
|
|
|
local = hmac.new(self.salt_encoded, local.encode(), hashlib.sha256).hexdigest()
|
|
domain = hmac.new(self.salt_encoded, domain.encode(), hashlib.sha256).hexdigest()
|
|
|
|
if self.short_sha:
|
|
local = local[:8]
|
|
domain = domain[:8]
|
|
|
|
email = local + "@" + domain
|
|
|
|
else:
|
|
email = hmac.new(self.salt_encoded, email.encode(), hashlib.sha256).hexdigest()
|
|
if self.short_sha:
|
|
email = email[:8]
|
|
|
|
return self.prefix + email + self.suffix
|
|
|
|
|
|
# -------------------------------------------------------------------------------- #
|
|
|
|
def get_filter() -> Filter:
|
|
"""
|
|
Initialize the filter
|
|
|
|
This method will check your configuration and create a new filter
|
|
|
|
:return: Returns a specific implementation of the `Filter`
|
|
"""
|
|
opts: dict[str, list[str]] = {}
|
|
clazz: typing.Optional[str] = None
|
|
|
|
if len(sys.argv) > 1:
|
|
clazz = sys.argv[1].strip()
|
|
# opts = sys.argv[2:]
|
|
if '?' in clazz:
|
|
clazz, query_string = clazz.split("?", 1)
|
|
opts = parse_qs(query_string, keep_blank_values=True,strict_parsing=True)
|
|
|
|
if clazz.lower() in FILTER_MAPPINGS:
|
|
clazz = FILTER_MAPPINGS[clazz.lower()]
|
|
|
|
if clazz is None or clazz.strip() == '':
|
|
clazz = DEFAULT_FILTER_CLASS
|
|
|
|
logger.debug(f"Constructing new {clazz} filter.")
|
|
|
|
try:
|
|
if "." in clazz:
|
|
module_name, class_name = clazz.rsplit(".", 1)
|
|
filter_class = getattr(importlib.import_module(module_name), class_name)
|
|
filter_obj: Filter = filter_class()
|
|
else:
|
|
filter_class = getattr(sys.modules[__name__], clazz)
|
|
filter_obj: Filter = filter_class()
|
|
except Exception as e:
|
|
raise RuntimeError(f'Could not instatiate filter named "{clazz}"!') from e
|
|
|
|
try:
|
|
filter_obj.init(opts)
|
|
except Exception as e:
|
|
raise RuntimeError(f'Init of filter "{clazz}" with parameters {opts} failed!') from e
|
|
|
|
return filter_obj
|
|
|
|
|
|
def process(f: Filter) -> None:
|
|
while True:
|
|
message = sys.stdin.readline()
|
|
if message:
|
|
message = message[:-1] # Remove line feed
|
|
result = f.processMessage(message)
|
|
print(result)
|
|
sys.stdout.flush()
|
|
else:
|
|
# Empty line. stdin has been closed
|
|
break
|
|
|
|
process(get_filter())
|