aboutsummaryrefslogtreecommitdiffstats
path: root/torturetest.py
blob: c038fe527caeb28306e91ccf6277e941d4eaef97 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python

import sys, getopt, os, smtplib, commands, time

class TestSite:
    temp = "/usr/tmp/torturestest-%d" % os.getpid()

    def __init__(self, line):
        "Initialize site data from the external representation."
        (self.host, self.mailname, self.userid, self.password, \
                self.proto, self.options, self.capabilities, self.version, self.comment) = \
                line.strip().split(":")
        if not self.mailname:
            self.mailname = self.userid
        # Test results
        self.status = None
        self.output = None

    def allattrs(self):
        "Return a tuple consisting of alll this site's attributes."
        return (self.host, self.mailname, self.userid, self.password, \
                self.proto, self.options, self.capabilities, \
                self.version, self.comment)

    def __repr__(self):
        "Return the external representation of this site's data."
        return ":".join(self.allattrs())

    def prettyprint(self):
        "Prettyprint a site entry in human-readable form."
        return "Host: %s\n" \
              "Mail To: %s\n" \
              "Userid: %s\n" \
              "Password: %s\n" \
              "Protocol: %s\n" \
              "Options: %s\n" \
              "Capabilities: %s\n" \
              "Version: %s\n" \
              "Comment: %s\n" \
              % self.allattrs()

    def entryprint(self):
        "Print a .fetchmailrc entry corresponding to a site entry."
        return "poll %s-%s via %s with proto %s %s\n" \
               "   user %s there with password %s is esr here\n\n" \
               % (self.host,self.proto,self.host,self.proto,self.options,self.userid,self.password)

    def tableprint(self):
        "Print an HTML server-type table entry."
        return "<tr><td>%s: %s</td><td>%s</td>\n" \
               % (self.proto, self.comment, self.capabilities)

    def id(self):
        "Identify this site."
        rep = "%s %s at %s" % (self.proto, self.version, self.host)
        if self.capabilities:
            rep += " (" + self.capabilities + ")"
        if self.options:
            rep += " using " + self.options
        return rep

    def testmail(self, n=None):
        "Send test mail to the site."
        server = smtplib.SMTP("localhost")
        fromaddr = "esr@thyrsus.com"
        toaddr = "%s@%s" % (self.mailname, self.host)
        msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
        if n != None:
            msg += `n` + ": "
        msg += "Test mail collected from %s.\n" % (self.id(),)
        server.sendmail(fromaddr, toaddr, msg)
        server.quit()

    def fetch(self):
        "Run a mail fetch on this site."
        try:
            ofp = open(TestSite.temp, "w")
            ofp.write('defaults mda "(echo; echo \'From torturetest\' `date`; cat) >>TEST.LOG"\n')
            ofp.write(site.entryprint())
            ofp.close()
            (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
        finally:
            os.remove(TestSite.temp)

    def failed(self):
        "Did we have a test failure here?"
        return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1

    def explain(self):
        "Explain the status of the last test."
        if not os.WIFEXITED(self.status):
            return self.id() + ": abnormal termination\n"
        elif os.WEXITSTATUS(self.status) > 1:
            return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
        else:
            return self.id() + ": succeeded\n"

if __name__ == "__main__":
    # Start by reading in the sitelist
    ifp = open("testsites")
    sitelist = []
    linect = 0
    while 1:
        linect += 1
        line = ifp.readline()
        if not line:
            break
        elif line[0] in ("#", "\n"):
            continue
        else:
            try:
                sitelist.append(TestSite(line))
            except:
                print "Error on line %d" % linect
                sys.exit(0)

    (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvs")
    verbose = 0
    for (switch, value) in options:
        if switch == "-d":
            # Prettprint the sitelist
            map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
            sys.exit(0)
        elif switch == "-f":
            # Dump the sitelist as a .fetchmailrc file
            map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
            sys.exit(0)
        elif switch == "-p":
            # Probe a single site
            selected = []
            for site in sitelist:
                if `site`.find(value) > -1:
                    selected.append(site)
            sitelist = selected
            # Fall through
        elif switch == "-t":
            # Dump the sitelist in HTML table form
            map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
            sys.exit(0)
        elif switch == "-i":
            # Dump the ids of the sitelist
            map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
            sys.exit(0)
        elif switch == "-g":
            i = 1
            for site in sitelist:
                print "Sending test mail to " + site.id()
                site.testmail(i)
                i+= 1
            # Send test mail to each site
            sys.stdout.write("Delaying to give the test mail time to land...")
            time.sleep(5)
            sys.stdout.write("here we go:\n")
            # Fall through
        elif switch == "-v":
            # Display the test output
            verbose = 1
        elif switch == "-s":
            # Dump version strings of all tested servers as a Python tuple
            print "(" + ",\n".join(map(lambda x: repr(x.version), filter(lambda x: x.version, sitelist))) + ")"
            sys.exit(0)

    # If no options, run the torture test
    try:
        failures = successes = 0
        os.system("fetchmail -q")
        for site in sitelist:
            print "Testing " + site.id()
            site.fetch()
            if verbose:
                print site.output
            if site.failed():
                failures += 1
            else:
                successes += 1

        # OK, summarize results
        print "\n%d successes and %d failures out of %d tests" \
              % (successes, failures, len(sitelist))

        if failures:
            print "Bad status was returned on the following sites:"
            for site in sitelist:
                if site.failed():
                    sys.stdout.write(site.explain() + "\n")
    except KeyboardInterrupt:
        print "Interrupted."

# end