aboutsummaryrefslogtreecommitdiffstats
path: root/protocols/skype/test.py
blob: 233c41c8542ca47215008847639fad933ee73a34 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python2.7

import subprocess
import sys
import pexpect
import unittest
import shutil
import os
import hashlib

def openssl(args):
	with open(os.devnull, "w") as devnull:
		proc = subprocess.Popen(['openssl'] + args, stdin=subprocess.PIPE, stderr=devnull)
		for i in range(6):
			proc.stdin.write("\n")
		proc.stdin.close()
		proc.communicate()
def setupSkyped():
	try:
		shutil.rmtree("t/skyped")
	except OSError:
		pass
	os.makedirs("t/skyped")
	cwd = os.getcwd()
	os.chdir("t/skyped")
	try:
		shutil.copyfile("../../skyped.cnf", "skyped.cnf")
		openssl(['req', '-new', '-x509', '-days', '365', '-nodes', '-config', 'skyped.cnf', '-out', 'skyped.cert.pem', '-keyout', 'skyped.key.pem'])
		with open("skyped.conf", "w") as sock:
			sock.write("[skyped]\n")
			sock.write("username = alice\n")
			sock.write("password = %s\n" % hashlib.sha1("foo").hexdigest())
			sock.write("cert = %s/skyped.cert.pem\n" % os.getcwd())
			sock.write("key = %s/skyped.key.pem\n" % os.getcwd())
			sock.write("port = 2727\n")
	finally:
		os.chdir(cwd)

class Test(unittest.TestCase):
	def mock(self, name):
		with open("t/skyped.log", "w") as skyped_log,\
				open("t/pexpect.log", "w") as pexpect_log:
			skyped = subprocess.Popen([sys.executable, "skyped.py",
				"-c", "t/skyped/skyped.conf", "-n", "-d", "-m", "t/%s-skyped.mock" % name],
				stdout=skyped_log, stderr=subprocess.STDOUT)
			try:
				bitlbee = pexpect.spawn('../../bitlbee', ['-d', 't/bitlbee'], logfile=pexpect_log)
				if os.environ.get('ATTACH_GDB'):
					subprocess.Popen(['gdb', '-batch-silent',
						'-ex', 'set logging overwrite on',
						'-ex', 'set logging file t/gdb-%s.log' % bitlbee.pid,
						'-ex', 'set logging on',
						'-ex', 'handle all pass nostop noprint',
						'-ex', 'handle SIGSEGV pass stop print',
						'-ex', 'set pagination 0',
						'-ex', 'continue',
						'-ex', 'backtrace full',
						'-ex', 'info registers',
						'-ex', 'thread apply all backtrace',
						'-ex', 'quit',
						'../../bitlbee', str(bitlbee.pid) ])
				bitlbee_mock = open("t/%s-bitlbee.mock" % name)
				for i in bitlbee_mock.readlines():
					line = i.strip()
					if line.startswith(">> "):
						bitlbee.expect_exact(line[3:], timeout=10)
					elif line.startswith("<< "):
						bitlbee.sendline(line[3:])
				bitlbee_mock.close()
				bitlbee.close()
			finally:
				skyped.terminate()
				skyped.communicate()

	def setUp(self):
		try:
			shutil.rmtree("t/bitlbee")
		except OSError:
			pass
		os.makedirs("t/bitlbee")

	def testMsg(self):
		self.mock("msg")

	def testLogin(self):
		self.mock("login")

	def testInfo(self):
		self.mock("info")

	def testCall(self):
		self.mock("call")

	def testCallFailed(self):
		self.mock("call-failed")

	def testAddYes(self):
		self.mock("add-yes")

	def testAddedYes(self):
		self.mock("added-yes")

	def testAddedNo(self):
		self.mock("added-no")

	def testGroupchatInvited(self):
		self.mock("groupchat-invited")

	def testGroupchatInvite(self):
		self.mock("groupchat-invite")

	def testGroupchatLeave(self):
		self.mock("groupchat-leave")
	
	def testGroupchatMsg(self):
		self.mock("groupchat-msg")
	
	def testGroupchatTopic(self):
		self.mock("groupchat-topic")
	
	def testCalledYes(self):
		self.mock("called-yes")

	def testCalledNo(self):
		self.mock("called-no")

	def testFiletransfer(self):
		self.mock("filetransfer")

	def testGroupRead(self):
		self.mock("group-read")

	def testGroupAdd(self):
		self.mock("group-add")

	def testCtcpHelp(self):
		self.mock("ctcp-help")

	def testSetMoodText(self):
		self.mock("set-mood-text")
	
	def testAwaySet(self):
		self.mock("away-set")

if __name__ == '__main__':
	setupSkyped()
	unittest.main()
class="w"> [:filename] before_destroy :delete_cached_file! BODY_MAX_TRIES = 3 BODY_MAX_DELAY = 5 def directory base_dir = File.expand_path(File.join(File.dirname(__FILE__), "../../cache", "attachments_#{Rails.env}")) return File.join(base_dir, self.hexdigest[0..2]) end def filepath File.join(self.directory, self.hexdigest) end def delete_cached_file! begin @cached_body = nil File.delete(self.filepath) rescue end end def body=(d) self.hexdigest = Digest::MD5.hexdigest(d) if !File.exists?(self.directory) FileUtils.mkdir_p self.directory end File.open(self.filepath, "wb") { |file| file.write d } update_display_size! @cached_body = d end def body if @cached_body.nil? tries = 0 delay = 1 begin binary_data = File.open(self.filepath, "rb" ).read if self.content_type =~ /^text/ @cached_body = convert_string_to_utf8_or_binary(binary_data, 'UTF-8') else @cached_body = binary_data end rescue Errno::ENOENT # we've lost our cached attachments for some reason. Reparse them. if tries > BODY_MAX_TRIES raise else sleep delay end tries += 1 delay *= 2 delay = BODY_MAX_DELAY if delay > BODY_MAX_DELAY force = true self.incoming_message.parse_raw_email!(force) retry end end return @cached_body end # List of DSN codes taken from RFC 3463 # http://tools.ietf.org/html/rfc3463 DsnToMessage = { 'X.1.0' => 'Other address status', 'X.1.1' => 'Bad destination mailbox address', 'X.1.2' => 'Bad destination system address', 'X.1.3' => 'Bad destination mailbox address syntax', 'X.1.4' => 'Destination mailbox address ambiguous', 'X.1.5' => 'Destination mailbox address valid', 'X.1.6' => 'Mailbox has moved', 'X.1.7' => 'Bad sender\'s mailbox address syntax', 'X.1.8' => 'Bad sender\'s system address', 'X.2.0' => 'Other or undefined mailbox status', 'X.2.1' => 'Mailbox disabled, not accepting messages', 'X.2.2' => 'Mailbox full', 'X.2.3' => 'Message length exceeds administrative limit.', 'X.2.4' => 'Mailing list expansion problem', 'X.3.0' => 'Other or undefined mail system status', 'X.3.1' => 'Mail system full', 'X.3.2' => 'System not accepting network messages', 'X.3.3' => 'System not capable of selected features', 'X.3.4' => 'Message too big for system', 'X.4.0' => 'Other or undefined network or routing status', 'X.4.1' => 'No answer from host', 'X.4.2' => 'Bad connection', 'X.4.3' => 'Routing server failure', 'X.4.4' => 'Unable to route', 'X.4.5' => 'Network congestion', 'X.4.6' => 'Routing loop detected', 'X.4.7' => 'Delivery time expired', 'X.5.0' => 'Other or undefined protocol status', 'X.5.1' => 'Invalid command', 'X.5.2' => 'Syntax error', 'X.5.3' => 'Too many recipients', 'X.5.4' => 'Invalid command arguments', 'X.5.5' => 'Wrong protocol version', 'X.6.0' => 'Other or undefined media error', 'X.6.1' => 'Media not supported', 'X.6.2' => 'Conversion required and prohibited', 'X.6.3' => 'Conversion required but not supported', 'X.6.4' => 'Conversion with loss performed', 'X.6.5' => 'Conversion failed', 'X.7.0' => 'Other or undefined security status', 'X.7.1' => 'Delivery not authorized, message refused', 'X.7.2' => 'Mailing list expansion prohibited', 'X.7.3' => 'Security conversion required but not possible', 'X.7.4' => 'Security features not supported', 'X.7.5' => 'Cryptographic failure', 'X.7.6' => 'Cryptographic algorithm not supported', 'X.7.7' => 'Message integrity failure' } # Returns HTML, of extra comment to put by attachment def extra_note # For delivery status notification attachments, extract the status and # look up what it means in the DSN table. if @content_type == 'message/delivery-status' if !@body.match(/Status:\s+([0-9]+\.([0-9]+\.[0-9]+))\s+/) return "" end dsn = $1 dsn_part = 'X.' + $2 dsn_message = "" if DsnToMessage.include?(dsn_part) dsn_message = " (" + DsnToMessage[dsn_part] + ")" end return "<br><em>DSN: " + dsn + dsn_message + "</em>" end return "" end # Called by controller so old filenames still work def old_display_filename filename = self.filename # Convert weird spaces (e.g. \n) to normal ones filename = filename.gsub(/\s/, " ") # Remove slashes, they mess with URLs filename = filename.gsub(/\//, "-") return filename end # XXX changing this will break existing URLs, so have a care - maybe # make another old_display_filename see above def display_filename filename = self.filename if !self.incoming_message.nil? self.incoming_message.info_request.apply_censor_rules_to_text!(filename) end # Sometimes filenames have e.g. %20 in - no point butchering that # (without unescaping it, this would remove the % and leave 20s in there) filename = CGI.unescape(filename) # Remove weird spaces filename = filename.gsub(/\s+/, " ") # Remove non-alphabetic characters filename = filename.gsub(/[^A-Za-z0-9.]/, " ") # Remove spaces near dots filename = filename.gsub(/\s*\.\s*/, ".") # Compress adjacent spaces down to a single one filename = filename.gsub(/\s+/, " ") filename = filename.strip return filename end def ensure_filename! if self.filename.blank? calc_ext = AlaveteliFileTypes.mimetype_to_extension(self.content_type) if !calc_ext calc_ext = "bin" end if !self.within_rfc822_subject.nil? computed = self.within_rfc822_subject + "." + calc_ext else computed = "attachment." + calc_ext end self.filename = computed end end def filename=(filename) calc_ext = AlaveteliFileTypes.mimetype_to_extension(self.content_type) # Put right extension on if missing if !filename.nil? && !filename.match(/\.#{calc_ext}$/) && calc_ext computed = filename + "." + calc_ext else computed = filename end write_attribute('filename', computed) end # Size to show next to the download link for the attachment def update_display_size! s = self.body.size if s > 1024 * 1024 self.display_size = sprintf("%.1f", s.to_f / 1024 / 1024) + 'M' else self.display_size = (s / 1024).to_s + 'K' end end # Whether this type can be shown in the Google Docs Viewer. # The full list of supported types can be found at # https://docs.google.com/support/bin/answer.py?hl=en&answer=1189935 def has_google_docs_viewer? return !! { "application/pdf" => true, # .pdf "image/tiff" => true, # .tiff "application/vnd.ms-word" => true, # .doc "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => true, # .docx "application/vnd.ms-powerpoint" => true, # .ppt "application/vnd.openxmlformats-officedocument.presentationml.presentation" => true, # .pptx "application/vnd.ms-excel" => true, # .xls "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => true, # .xlsx } [self.content_type] end # Whether this type has a "View as HTML" def has_body_as_html? return ( !!{ "text/plain" => true, "application/rtf" => true, }[self.content_type] or self.has_google_docs_viewer? ) end # Name of type of attachment type - only valid for things that has_body_as_html? def name_of_content_type return { "text/plain" => "Text file", 'application/rtf' => "RTF file", 'application/pdf' => "PDF file", 'image/tiff' => "TIFF image", 'application/vnd.ms-word' => "Word document", 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' => "Word document", 'application/vnd.ms-powerpoint' => "PowerPoint presentation", 'application/vnd.openxmlformats-officedocument.presentationml.presentation' => "PowerPoint presentation", 'application/vnd.ms-excel' => "Excel spreadsheet", 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' => "Excel spreadsheet", }[self.content_type] end # For "View as HTML" of attachment def body_as_html(dir) html = nil wrapper_id = "wrapper" # simple cases, can never fail if self.content_type == 'text/plain' text = self.body.strip text = CGI.escapeHTML(text) text = MySociety::Format.make_clickable(text) html = text.gsub(/\n/, '<br>') return '<!DOCTYPE html><html><head><title></title></head><body>' + html + "</body></html>", wrapper_id end # the extractions will also produce image files, which go in the # current directory, so change to the directory the function caller # wants everything in html = nil if ['application/pdf', 'application/rtf'].include?(self.content_type) text = self.body Dir.chdir(dir) do if RUBY_VERSION.to_f >= 1.9 tempfile = Tempfile.new('foiextract', '.', :encoding => text.encoding) else tempfile = Tempfile.new('foiextract', '.') end tempfile.print text tempfile.flush if self.content_type == 'application/pdf' # We set a timeout here, because pdftohtml can spiral out of control # on some PDF files and we don't want to crash the whole server. html = AlaveteliExternalCommand.run("pdftohtml", "-nodrm", "-zoom", "1.0", "-stdout", "-enc", "UTF-8", "-noframes", tempfile.path, :timeout => 30) elsif self.content_type == 'application/rtf' html = AlaveteliExternalCommand.run("unrtf", "--html", tempfile.path, :timeout => 120) end tempfile.close tempfile.delete end end if html.nil? if self.has_google_docs_viewer? html = '' # force error and using Google docs viewer else raise "No HTML conversion available for type " + self.content_type end end # We need to look at: # a) Any error code # b) The output size, as pdftohtml does not return an error code upon error. # c) For cases when there is no text in the body of the HTML, or # images, so nothing will be rendered. This is to detect some bug in # pdftohtml, which sometimes makes it return just <hr>s and no other # content. html.match(/(\<body[^>]*\>.*)/mi) body = $1.to_s body_without_tags = body.gsub(/\s+/,"").gsub(/\<[^\>]*\>/, "") contains_images = html.match(/<img/mi) ? true : false if html.size == 0 || !$?.success? || (body_without_tags.size == 0 && !contains_images) ret = "<html><head></head><body>"; if self.has_google_docs_viewer? wrapper_id = "wrapper_google_embed" ret = ret + "<iframe src='http://docs.google.com/viewer?url=<attachment-url-here>&embedded=true' width='100%' height='100%' style='border: none;'></iframe>"; else ret = ret + "<p>Sorry, we were unable to convert this file to HTML. Please use the download link at the top right.</p>" end ret = ret + "</body></html>" return ret, wrapper_id end return html, wrapper_id end end