mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-09-14 16:04:48 +08:00
Stress testing scripts
This commit is contained in:
parent
7c71fc6818
commit
4c873d8407
2 changed files with 189 additions and 0 deletions
132
tests/resources/scripts/stress_test.py
Normal file
132
tests/resources/scripts/stress_test.py
Normal file
|
@ -0,0 +1,132 @@
|
|||
import smtplib
|
||||
import imaplib
|
||||
import ssl
|
||||
import threading
|
||||
import random
|
||||
import time
|
||||
import string
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
smtp_server = "127.0.0.1"
|
||||
smtp_port = 465
|
||||
imap_server = "127.0.0.1"
|
||||
imap_port = 993
|
||||
num_threads = 5
|
||||
runs = 10 # Set to None for infinite loop
|
||||
|
||||
def read_credentials(file_path):
|
||||
with open(file_path, "r") as file:
|
||||
credentials = [line.strip().split(':') for line in file if line.strip()]
|
||||
return credentials
|
||||
|
||||
def allow_invalid_certificates():
|
||||
# Create an SSL context
|
||||
context = ssl.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
return context
|
||||
|
||||
def generate_random_string(min_size, max_size):
|
||||
"""Generates a random string of a size between min_size and max_size."""
|
||||
size = random.randint(min_size, max_size)
|
||||
chars = string.ascii_letters + string.digits + ' '
|
||||
return ''.join(random.choice(chars) for _ in range(size))
|
||||
|
||||
def generate_email(username, recipient):
|
||||
"""Generate random subject and content for email."""
|
||||
subject = generate_random_string(10, 100) # Random subject between 10 and 100 characters
|
||||
content_size = random.randint(100, 1048576) # Random content size between 100 bytes and ~1MB
|
||||
content = generate_random_string(content_size, content_size)
|
||||
message = MIMEText(content)
|
||||
message['Subject'] = subject
|
||||
message['From'] = username
|
||||
message['To'] = recipient
|
||||
return message.as_string()
|
||||
|
||||
def smtp_send_message(username, password, recipient):
|
||||
try:
|
||||
with smtplib.SMTP_SSL(smtp_server, smtp_port, context=allow_invalid_certificates()) as server:
|
||||
server.login(username, password)
|
||||
start_time = time.time()
|
||||
server.sendmail(username, recipient, generate_email(username, recipient))
|
||||
elapsed_time_ms = (time.time() - start_time) * 1000
|
||||
print(f"OK {elapsed_time_ms} SMTP {username} -> {recipient}")
|
||||
except Exception as e:
|
||||
print(f"ERR SMTP {e}")
|
||||
|
||||
def imap_append_message(username, password, recipient):
|
||||
try:
|
||||
with imaplib.IMAP4_SSL(imap_server, imap_port, ssl_context=allow_invalid_certificates()) as imap:
|
||||
imap.login(username, password)
|
||||
start_time = time.time()
|
||||
imap.append('INBOX', None, imaplib.Time2Internaldate(time.time()), generate_email(username, recipient).encode('utf-8'))
|
||||
elapsed_time_ms = (time.time() - start_time) * 1000
|
||||
print(f"OK {elapsed_time_ms} IMAP APPEND {username}")
|
||||
except Exception as e:
|
||||
print(f"ERR IMAP {e}")
|
||||
|
||||
def imap_list_fetch(username, password):
|
||||
try:
|
||||
with imaplib.IMAP4_SSL(imap_server, imap_port, ssl_context=allow_invalid_certificates()) as imap:
|
||||
imap.login(username, password)
|
||||
imap.select('INBOX')
|
||||
start_time = time.time()
|
||||
typ, data = imap.search(None, 'ALL')
|
||||
if data[0]:
|
||||
messages = data[0].split()
|
||||
random_msg_num = random.choice(messages)
|
||||
typ, msg_data = imap.fetch(random_msg_num, '(RFC822)')
|
||||
elapsed_time_ms = (time.time() - start_time) * 1000
|
||||
print(f"OK {elapsed_time_ms} IMAP FETCH {username} {random_msg_num}")
|
||||
except Exception as e:
|
||||
print(f"ERR IMAP {e}")
|
||||
|
||||
def imap_delete_message(username, password):
|
||||
try:
|
||||
with imaplib.IMAP4_SSL(imap_server, imap_port, ssl_context=allow_invalid_certificates()) as imap:
|
||||
imap.login(username, password)
|
||||
imap.select('INBOX')
|
||||
start_time = time.time()
|
||||
typ, data = imap.search(None, 'ALL')
|
||||
if data[0]:
|
||||
messages = data[0].split()
|
||||
random_msg_num = random.choice(messages)
|
||||
imap.store(random_msg_num, '+FLAGS', '\\Deleted')
|
||||
imap.expunge()
|
||||
elapsed_time_ms = (time.time() - start_time) * 1000
|
||||
print(f"OK {elapsed_time_ms} IMAP DELETE {username} {random_msg_num}")
|
||||
except Exception as e:
|
||||
print(f"ERR IMAP {e}")
|
||||
|
||||
def perform_random_action(credentials):
|
||||
username, password = random.choice(credentials)
|
||||
recipient, _ = random.choice(credentials)
|
||||
action = random.choice([smtp_send_message, imap_append_message, imap_list_fetch, imap_delete_message])
|
||||
|
||||
if action == smtp_send_message or action == imap_append_message:
|
||||
action(username, password, recipient)
|
||||
else:
|
||||
action(username, password)
|
||||
|
||||
def thread_function(credentials):
|
||||
if runs:
|
||||
for _ in range(runs):
|
||||
perform_random_action(credentials)
|
||||
else:
|
||||
while True:
|
||||
perform_random_action(credentials)
|
||||
|
||||
def main():
|
||||
credentials = read_credentials("users.txt")
|
||||
threads = []
|
||||
|
||||
for _ in range(num_threads):
|
||||
thread = threading.Thread(target=thread_function, args=(credentials,))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
57
tests/resources/scripts/stress_test_prepare.py
Normal file
57
tests/resources/scripts/stress_test_prepare.py
Normal file
|
@ -0,0 +1,57 @@
|
|||
import requests
|
||||
import random
|
||||
import string
|
||||
import urllib3
|
||||
|
||||
# Configuration Variables
|
||||
HOSTNAME = '127.0.0.1' # Replace with the actual hostname
|
||||
DOMAIN = 'test.org' # Replace with your domain name
|
||||
USERNAME = 'admin' # Basic auth username
|
||||
PASSWORD = 'secret' # Basic auth password
|
||||
NUM_USERS = 1000 # Number of test user accounts to create
|
||||
|
||||
# Suppress InsecureRequestWarning
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
# Generate SHA512 password hash
|
||||
def generate_password():
|
||||
return ''.join(random.choices(string.ascii_letters + string.digits, k=10))
|
||||
|
||||
# Create Domain
|
||||
def create_domain():
|
||||
url = f"https://{HOSTNAME}/api/domain/{DOMAIN}"
|
||||
response = requests.post(url, auth=(USERNAME, PASSWORD), verify=False)
|
||||
if response.status_code == 200:
|
||||
print(f"Domain '{DOMAIN}' created successfully.")
|
||||
else:
|
||||
print(f"Failed to create domain '{DOMAIN}'. Status Code: {response.status_code}")
|
||||
print(response.text)
|
||||
|
||||
# Create User Accounts
|
||||
def create_user_accounts():
|
||||
with open('users.txt', 'w') as file:
|
||||
for i in range(1, NUM_USERS + 1):
|
||||
username = f"test{i}@{DOMAIN}"
|
||||
password = generate_password()
|
||||
data = {
|
||||
"type": "individual",
|
||||
"name": username,
|
||||
"secrets": [password],
|
||||
"emails": [username],
|
||||
"description": f"Tester {i}"
|
||||
}
|
||||
url = f"https://{HOSTNAME}/api/principal"
|
||||
response = requests.post(url, json=data, auth=(USERNAME, PASSWORD), verify=False)
|
||||
if response.status_code == 200:
|
||||
file.write(f"{username}:{password}\n")
|
||||
print(f"User account '{username}' created successfully.")
|
||||
else:
|
||||
print(f"Failed to create user account '{username}'. Status Code: {response.status_code}")
|
||||
print(response.text)
|
||||
|
||||
def main():
|
||||
create_domain()
|
||||
create_user_accounts()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Add table
Reference in a new issue