#!/usr/bin/env python import sys, getopt, os, smtplib, commands, time, gtk, gtk.glade ### CUSTOMIZE THESE! # Mail address to use in From: header, user@host.domain format fromaddr = "esr@thyrsus.com" # local user name to receive mail as localuser = "esr" # server to inject mail into smtpserver = "localhost" # delay after sending mail delay = 5 protocols = ('POP3', 'APOP', 'IMAP',) class TestSite: temp = "/usr/tmp/torturestest-%d" % os.getpid() def __init__(self, line=None): "Initialize site data from the external representation." self.host = "" self.mailaddr = "" self.username = "" self.password = "" self.protocol = "" self.ssl = "" self.options = "" self.capabilities = "" self.recognition = "" self.comment = "" if line: (self.host, self.mailaddr, self.username, self.password, \ self.protocol, self.ssl, self.options, self.capabilities, \ self.recognition, self.comment) = \ line.strip().split(":") if not self.mailaddr: self.mailaddr = self.username # Test results self.status = None self.output = None def allattrs(self): "Return a tuple consisting of all this site's attributes." return (self.host, self.mailaddr, self.username, self.password, \ self.protocol, self.ssl, self.options, self.capabilities, \ self.recognition, self.comment) def __repr__(self): "Return the external representation of this site's data." return ":".join(self.allattrs()) def prettyprint(self): "Prettyprint a site entry in human-readable form." return "Host: %s\n" \ "Mail To: %s\n" \ "Username: %s\n" \ "Password: %s\n" \ "Protocol: %s\n" \ "SSL: %s\n" \ "Options: %s\n" \ "Capabilities: %s\n" \ "Recognition: %s\n" \ "Comment: %s\n" \ % self.allattrs() def entryprint(self): "Print a .fetchmailrc entry corresponding to a site entry." rep = "poll %s-%s via %s with proto %s %s\n" \ " user %s there with password '%s' is %s here" \ % (self.host,self.protocol,self.host,self.protocol,self.options,self.username,self.password,localuser) if self.ssl and self.ssl != 'False': rep += " options ssl" rep += "\n\n" return rep def tableprint(self): "Print an HTML server-type table entry." return "%s: %s%s\n" \ % (self.protocol, self.comment, self.capabilities) def id(self): "Identify this site." rep = "%s %s at %s" % (self.protocol, self.recognition, self.host) if self.capabilities: rep += " (" + self.capabilities + ")" if self.options: rep += " using " + self.options return rep def testmail(self, n=None): "Send test mail to the site." server = smtplib.SMTP(smtpserver) if self.mailaddr.find("@") > -1: toaddr = self.mailaddr else: toaddr = "%s@%s" % (self.mailaddr, self.host) msg = ("From: %s\r\nTo: %s\r\n\r\n" % (fromaddr, toaddr)) if n != None: msg += `n` + ": " msg += "Test mail collected from %s.\n" % (self.id(),) server.sendmail(fromaddr, toaddr, msg) server.quit() def fetch(self, logfile=False): "Run a mail fetch on this site." try: ofp = open(TestSite.temp, "w") if logfile: mda = "(echo; echo \'From torturetest\' `date`;cat) >>TEST.LOG" else: mda = 'cat' ofp.write('defaults mda "%s"\n' % mda) ofp.write(self.entryprint()) ofp.close() (self.status, self.output) = commands.getstatusoutput("fetchmail -d0 -v -f - <%s"%TestSite.temp) if self.status: os.system("cat " + TestSite.temp) finally: os.remove(TestSite.temp) def failed(self): "Did we have a test failure here?" return os.WIFEXITED(self.status) and os.WEXITSTATUS(self.status) > 1 def explain(self): "Explain the status of the last test." if not os.WIFEXITED(self.status): return self.id() + ": abnormal termination\n" elif os.WEXITSTATUS(self.status) > 1: return self.id() + ": %d\n" % os.WEXITSTATUS(self.status) + self.output else: return self.id() + ": succeeded\n" class TortureGUI: "Torturetest editing GUI," # All site parameters except protocol field_map = ('host', 'mailaddr', 'username', 'password', \ 'options', 'capabilities', 'recognition', 'comment') def __init__(self): # Build the widget tree from the glade XML file. self.wtree = gtk.glade.XML("torturetest.glade") # File in initial values self.combo = self.wtree.get_widget("combo1") self.combo.set_popdown_strings(map(lambda x: x.comment, sitelist)) self.sslcheck = self.wtree.get_widget("ssl_checkbox") self.site = sitelist[0] self.display(self.site) # Provide handlers for the widget tree's events mydict = {} for key in ('on_torturetest_destroy', 'on_updatebutton_clicked', 'on_newbutton_clicked', 'on_testbutton_clicked', 'on_quitbutton_clicked', 'on_dumpbutton_clicked', 'on_combo_entry1_activate'): mydict[key] = getattr(self, key) self.wtree.signal_autoconnect(mydict) gtk.mainloop() print `self.site` def get_widget(self, widget): "Get the value of a widget's contents." if type(widget) == type(""): widget = self.wtree.get_widget(widget) if type(widget) == gtk.Entry: return widget.get_text() #elif type(widget) == gtk.SpinButton: # return widget.get_value() #elif type(widget) == gtk.TextView: # return widget.get_buffer().get_text() def set_widget(self, name, exp): "Set the value of a widget by name." widget = self.wtree.get_widget(name) if type(widget) == gtk.Entry: widget.set_text(exp) elif type(widget) == gtk.SpinButton: widget.set_value(exp) elif type(widget) == gtk.TextView: if not widget.get_buffer(): widget.set_buffer(gtk.TextBuffer()) widget.get_buffer().set_text(exp) def display(self, site): for member in TortureGUI.field_map: self.set_widget(member + "_entry", getattr(site, member)) for proto in protocols: self.wtree.get_widget(proto + "_radiobutton").set_active(site.protocol == proto) self.sslcheck.set_active(int(site.ssl != '' and site.ssl != 'False')) self.combo.entry.set_text(site.comment) def update(self, site): for member in TortureGUI.field_map: setattr(site, member, self.get_widget(member + "_entry")) for proto in protocols: if self.wtree.get_widget(proto + "_radiobutton").get_active(): site.protocol = proto if self.wtree.get_widget("ssl_checkbox").get_active(): site.ssl = "True" else: site.ssl = "False" # Housekeeping def on_torturetest_destroy(self, obj): gtk.mainquit() def on_updatebutton_clicked(self, obj): self.update(self.site) print self.site if self.site.comment: self.combo.entry.set_text(self.site.comment) else: self.combo.entry.set_text(self.site.host) def on_newbutton_clicked(self, obj): global sitelist sitelist = [TestSite()] + sitelist self.site = sitelist[0] self.display(self.site) self.combo.entry.set_text("") def on_testbutton_clicked(self, obj): self.site.fetch(False) print self.site.output def on_quitbutton_clicked(self, obj): gtk.mainquit() def on_dumpbutton_clicked(self, obj): print `self.site` def on_combo_entry1_activate(self, obj): key = self.combo.entry.get_text() for site in sitelist: if site.comment.find(key) > -1: self.site = site self.display(self.site) break if __name__ == "__main__": # Start by reading in the sitelist ifp = open("testsites") sitelist = [] linect = 0 while 1: linect += 1 line = ifp.readline() if not line: break elif line[0] in ("#", "\n"): continue else: try: sitelist.append(TestSite(line)) except: print "Error on line %d" % linect sys.exit(0) (options, arguments) = getopt.getopt(sys.argv[1:], "dfp:tigvse") verbose = 0 for (switch, value) in options: if switch == "-d": # Prettprint the sitelist map(lambda x: sys.stdout.write(x.prettyprint() + "%%\n"), sitelist) sys.exit(0) elif switch == "-f": # Dump the sitelist as a .fetchmailrc file map(lambda x: sys.stdout.write(x.entryprint()), sitelist) sys.exit(0) elif switch == "-p": # Probe a single site selected = [] for site in sitelist: if `site`.find(value) > -1: selected.append(site) sitelist = selected # Fall through elif switch == "-t": # Dump the sitelist in HTML table form map(lambda x: sys.stdout.write(x.tableprint()), sitelist) sys.exit(0) elif switch == "-i": # Dump the ids of the sitelist map(lambda x: sys.stdout.write(x.id() + "\n"), sitelist) sys.exit(0) elif switch == "-g": i = 1 for site in sitelist: print "Sending test mail to " + site.id() site.testmail(i) i+= 1 # Send test mail to each site sys.stdout.write("Delaying to give the test mail time to land...") time.sleep(delay) sys.stdout.write("here we go:\n") # Fall through elif switch == "-v": # Display the test output verbose = 1 elif switch == "-s": # Dump recognition strings of all tested servers as a Python tuple print "(" + ",\n".join(map(lambda x: repr(x.recognition), filter(lambda x: x.recognition, sitelist))) + ")" sys.exit(0) elif switch == "-e": TortureGUI() sys.exit(0) # If no options, run the torture test try: failures = successes = 0 os.system("fetchmail -q") for site in sitelist: print "Testing " + site.id() site.fetch(True) if verbose: print site.output if site.failed(): failures += 1 else: successes += 1 # OK, summarize results print "\n%d successes and %d failures out of %d tests" \ % (successes, failures, len(sitelist)) if failures: print "Bad status was returned on the following sites:" for site in sitelist: if site.failed(): sys.stdout.write(site.explain() + "\n") except KeyboardInterrupt: print "Interrupted." # end