aboutsummaryrefslogtreecommitdiffstats
path: root/debian/tests/mock-pop3-server.py
diff options
context:
space:
mode:
Diffstat (limited to 'debian/tests/mock-pop3-server.py')
-rw-r--r--debian/tests/mock-pop3-server.py214
1 files changed, 214 insertions, 0 deletions
diff --git a/debian/tests/mock-pop3-server.py b/debian/tests/mock-pop3-server.py
new file mode 100644
index 00000000..159069b4
--- /dev/null
+++ b/debian/tests/mock-pop3-server.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python3
+# -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
+
+# Copyright (C) 2019 Bryce W. Harrington
+#
+# Released under GNU GPLv2 or later, read the file 'LICENSE.GPLv2+' for
+# more information.
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+#
+# Author: Bryce Harrington <bryce@canonical.com>
+
+import os
+import sys
+import socket
+from tempfile import mkstemp
+
+DEBUGGING = True
+DEFAULT_HOST = ''
+DEFAULT_PORT = 11110
+NEWLINE = b"\r\n"
+MESSAGE_CONTENT = b'''
+From: test@example.com\r
+Subject: Test message\r
+\r
+This is a body for testing\r
+\r
+'''
+
+def dbg(msg):
+ """Prints information to stdout if debugging is enabled"""
+ if DEBUGGING:
+ sys.stdout.write("{}\n".format(msg))
+
+def command_user(data, msg):
+ return b'+OK user accepted'
+
+def command_pass(data, msg):
+ return b'+OK pass accepted'
+
+def command_stat(data, msg):
+ return b'+OK 1 %i' %(msg.size)
+
+def command_list(data, msg):
+ if data == b'LIST':
+ return b'+OK 1 messages (%i octets)'%(msg.size) \
+ + NEWLINE \
+ + b'1 %i'%(msg.size) \
+ + NEWLINE \
+ + b'.'
+ else:
+ cmd, num = data.split()
+ return b'+OK 1 (%i octects)'%(msg.size)
+
+def command_last(data, msg):
+ return b'+OK 0'
+
+def command_top(data, msg):
+ cmd, num, lines = data.split()
+ assert num == b'1', "unknown message number: {num}".format(num)
+ bottom = NEWLINE.join(msg.bottom[:int(lines)])
+ text = msg.top + NEWLINE + NEWLINE + bottom
+ dbg(text)
+ return b'+OK top of message follows%s' % (NEWLINE + text + NEWLINE + b'.')
+
+def command_retr(data, msg):
+ return b'+OK %i octets'%(msg.size) \
+ + NEWLINE \
+ + data \
+ + NEWLINE \
+ + b'.'
+
+def command_dele(data, msg):
+ return b'+OK 1 %i'%(msg.size)
+
+def command_noop(data, msg):
+ return b'+OK 1 %i'%(msg.size)
+
+def command_quit(data, msg):
+ return b'+OK mock pop3 server signing off'
+
+COMMANDS = {
+ b'USER' : command_user,
+ b'PASS' : command_pass,
+ b'STAT' : command_stat,
+ b'LIST' : command_list,
+ b'LAST' : command_last,
+ b'TOP' : command_top,
+ b'RETR' : command_retr,
+ b'DELE' : command_dele,
+ b'NOOP' : command_noop,
+ b'QUIT' : command_quit,
+}
+
+class Mailbox:
+ """Encapsulates a mailbox containing a single email message"""
+ def __init__(self, message_filename):
+ with open(message_filename, "rb") as msg:
+ data = msg.read()
+ self.data = data
+ self.size = len(data)
+ self.top, rest = data.split(NEWLINE + NEWLINE, 1)
+ self.bottom = rest.split(NEWLINE)
+
+
+class Mailserver:
+ def __init__(self, conn, mbox):
+ self._conn = conn
+ self._mbox = mbox
+ self._chunk_size = 4096
+
+ def close(self):
+ dbg(" - Server exiting")
+ self._conn.close()
+
+ def send(self, data):
+ dbg(" - Server sending")
+ self._conn.sendall(data + NEWLINE)
+
+ def receive(self):
+ dbg(" - Server receiving")
+ data = []
+ while True:
+ chunk = self._conn.recv(self._chunk_size)
+ if NEWLINE in chunk:
+ data.append(chunk[:chunk.index(NEWLINE)])
+ break
+ data.append(chunk)
+ return b"".join(data)
+
+ def process(self):
+ data = self.receive()
+ dbg(" - Received: '{}'".format(data))
+ command = data.split(None, 1)[0]
+ if command in COMMANDS.keys():
+ response = COMMANDS[command](data, self._mbox)
+ dbg(" - Response: {}".format(response))
+ try:
+ self.send(response)
+ if command == b'QUIT':
+ return False
+ except BrokenPipeError:
+ dbg(" - Client terminated connection")
+ return False
+ else:
+ self.send(b"-ERR unrecognized command")
+ return True
+
+def serve(messages_filename, host=DEFAULT_HOST, port=DEFAULT_PORT):
+ dbg("Serving for {} on {}".format(host, port))
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+
+ mailbox = Mailbox(messages_filename)
+
+ try:
+ dbg("* mock-pop3-server ready on {}:{}".format(host, port))
+ while True:
+ sock.listen(1)
+ conn, address = sock.accept()
+ dbg("* Connection from {}".format(address))
+ server = Mailserver(conn, mailbox)
+ server.send(b"+OK mock-pop3-server pop3 server ready")
+ try:
+ dbg("* Processing commands for connection...")
+ while server.process():
+ dbg("* Finished command")
+ pass
+ finally:
+ dbg("* Closing remote connection")
+ server.close()
+ except KeyboardInterrupt:
+ dbg("* mock-pop3-server interrupted")
+ return 1
+ except SystemExit:
+ dbg("* mock-pop3-server exiting")
+ return 1
+ finally:
+ sock.shutdown(socket.SHUT_RDWR)
+ sock.close()
+ return 0
+
+
+if __name__ == "__main__":
+ fd, message_filename = mkstemp(prefix='message-', suffix='.txt')
+ dbg("Creating message file '{}'".format(message_filename))
+
+ with open(message_filename, 'wb') as f:
+ f.write(MESSAGE_CONTENT.lstrip())
+
+ try:
+ exit(serve(message_filename, host=DEFAULT_HOST, port=DEFAULT_PORT))
+ except KeyError:
+ sys.stderr.write("Exiting\n")
+ sys.exit(1)