1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
#!/usr/bin/env python
import sys, getopt, os, smtplib, commands, time
class TestSite:
temp = "/usr/tmp/torturestest-%d" % os.getpid()
def __init__(self, line):
"Initialize site data from the external representation."
(self.host, self.mailname, self.userid, self.password, \
self.proto, self.options, self.capabilities, self.version, self.comment) = \
line.strip().split(":")
if not self.mailname:
self.mailname = self.userid
# Test results
self.status = None
self.output = None
def allattrs(self):
"Return a tuple consisting of alll this site's attributes."
return (self.host, self.mailname, self.userid, self.password, \
self.proto, self.options, self.capabilities, \
self.version, self.comment)
def __repr__(self):
"Return the external representation of this site's data."
return ":".join(self.allattrs())
def prettyprint(self):
"Prettyprint a site entry in human-readable form."
return "Host: %s\n" \
"Mail To: %s\n" \
"Userid: %s\n" \
"Password: %s\n" \
"Protocol: %s\n" \
"Options: %s\n" \
"Capabilities: %s\n" \
"Version: %s\n" \
"Comment: %s\n" \
% self.allattrs()
def entryprint(self):
"Print a .fetchmailrc entry corresponding to a site entry."
return "poll %s-%s via %s with proto %s %s\n" \
" user %s there with password '%s' is esr here\n\n" \
% (self.host,self.proto,self.host,self.proto,self.options,self.userid,self.password)
def tableprint(self):
"Print an HTML server-type table entry."
return "<tr><td>%s: %s</td><td>%s</td>\n" \
% (self.proto, self.comment, self.capabilities)
def id(self):
"Identify this site."
rep = "%s %s at %s" % (self.proto, self.version, self.host)
if self.capabilities:
rep += " (" + self.capabilities + ")"
if self.options:
rep += " using " + self.options
return rep
def testmail(self, n=None):
"Send test mail to the site."
server = smtplib.SMTP("localhost")
fromaddr = "esr@thyrsus.com"
toaddr = "%s@%s" % (self.mailname, self.host)
msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr))
if n != None:
msg += `n` + ": "
msg += "Test mail collected from %s.\n" % (self.id(),)
server.sendmail(fromaddr, toaddr, msg)
server.quit()
def fetch(self):
"Run a mail fetch on this site."
try:
ofp = open(TestSite.temp, "w")
ofp.write('defaults mda "(echo; echo \'From torturetest\' `date`; cat) >>TEST.LOG"\n')
ofp.write(site.entryprint())
ofp.close()
(self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp)
finally:
os.remove(TestSite.temp)
def failed(self):
"Did we have a test failure here?"
return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1
def explain(self):
"Explain the status of the last test."
if not os.WIFEXITED(self.status):
return self.id() + ": abnormal termination\n"
elif os.WEXITSTATUS(self.status) > 1:
return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output
else:
return self.id() + ": succeeded\n"
if __name__ == "__main__":
# Start by reading in the sitelist
ifp = open("testsites")
sitelist = []
linect = 0
while 1:
linect += 1
line = ifp.readline()
if not line:
break
elif line[0] in ("#", "\n"):
continue
else:
try:
sitelist.append(TestSite(line))
except:
print "Error on line %d" % linect
sys.exit(0)
(options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvs")
verbose = 0
for (switch, value) in options:
if switch == "-d":
# Prettprint the sitelist
map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist)
sys.exit(0)
elif switch == "-f":
# Dump the sitelist as a .fetchmailrc file
map(lambda x: sys.stdout.write(x.entryprint()), sitelist)
sys.exit(0)
elif switch == "-p":
# Probe a single site
selected = []
for site in sitelist:
if `site`.find(value) > -1:
selected.append(site)
sitelist = selected
# Fall through
elif switch == "-t":
# Dump the sitelist in HTML table form
map(lambda x: sys.stdout.write(x.tableprint()), sitelist)
sys.exit(0)
elif switch == "-i":
# Dump the ids of the sitelist
map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist)
sys.exit(0)
elif switch == "-g":
i = 1
for site in sitelist:
print "Sending test mail to " + site.id()
site.testmail(i)
i+= 1
# Send test mail to each site
sys.stdout.write("Delaying to give the test mail time to land...")
time.sleep(5)
sys.stdout.write("here we go:\n")
# Fall through
elif switch == "-v":
# Display the test output
verbose = 1
elif switch == "-s":
# Dump version strings of all tested servers as a Python tuple
print "(" + ",\n".join(map(lambda x: repr(x.version), filter(lambda x: x.version, sitelist))) + ")"
sys.exit(0)
# If no options, run the torture test
try:
failures = successes = 0
os.system("fetchmail -q")
for site in sitelist:
print "Testing " + site.id()
site.fetch()
if verbose:
print site.output
if site.failed():
failures += 1
else:
successes += 1
# OK, summarize results
print "\n%d successes and %d failures out of %d tests" \
% (successes, failures, len(sitelist))
if failures:
print "Bad status was returned on the following sites:"
for site in sitelist:
if site.failed():
sys.stdout.write(site.explain() + "\n")
except KeyboardInterrupt:
print "Interrupted."
# end
|