Ticket #1335: twisted-words-ctcp-quoting.2.diff
| File twisted-words-ctcp-quoting.2.diff, 8.0 KB (added by nullie, 5 years ago) |
|---|
-
D:/work/eclipse-workspace/twisted/twisted/words/test/test_irc.py
20 20 "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL }, 21 21 "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR, 22 22 'NL': irc.NL}, 23 "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE, 24 'M': irc.M_QUOTE} 23 "escape!escape!%(M)s %(M)s0" % {'M': irc.M_QUOTE} 25 24 ] 26 25 27 26 … … 31 30 for s in stringSubjects: 32 31 self.failUnlessEqual(s, irc.lowDequote(irc.lowQuote(s))) 33 32 34 def test_ctcpquoteSanity(self):35 """Testing CTCP message level quote/dequote"""36 for s in stringSubjects:37 self.failUnlessEqual(s, irc.ctcpDequote(irc.ctcpQuote(s)))38 33 39 40 34 class IRCClientWithoutLogin(irc.IRCClient): 41 35 performLogin = 0 42 36 … … 297 291 user="sender!ident@host", 298 292 channel="recipient", 299 293 message=msg) 294 295 def test_ACTION(self): 296 """Testing CTCP ACTION. 297 298 This imitates behavior of wide-spread IRC clients for ACTION CTCP 299 query. 300 """ 301 302 actionQuery = (r":nick!guy@over.there PRIVMSG #theChan :" 303 "%(X)cACTION \o/%(X)c%(EOL)s" 304 % {'X': irc.X_DELIM, 305 'EOL': irc.CR + irc.LF}) 306 307 self.client.dataReceived(actionQuery) 308 self.assertEquals(self.client.calls, 309 [("action", dict(user="nick!guy@over.there", 310 channel="#theChan", 311 data=r"\o/"))]) 300 312 301 313 class BasicServerFunctionalityTestCase(unittest.TestCase): 302 314 def setUp(self): -
D:/work/eclipse-workspace/twisted/twisted/words/protocols/irc.py
1028 1028 1029 1029 if not message: return # don't raise an exception if some idiot sends us a blank message 1030 1030 1031 # If message starts with X_DELIM, it's CTCP message. 1031 1032 if message[0]==X_DELIM: 1032 m = ctcpExtract(message) 1033 if m['extended']: 1034 self.ctcpQuery(user, channel, m['extended']) 1033 # Trailing X_DELIM is optional. 1034 tag, data = ctcpParse(message) 1035 self.ctcpQuery(user, channel, tag, data) 1036 return 1035 1037 1036 if not m['normal']:1037 return1038 1039 message = string.join(m['normal'], ' ')1040 1041 1038 self.privmsg(user, channel, message) 1042 1039 1043 1040 def irc_NOTICE(self, prefix, params): … … 1045 1042 channel = params[0] 1046 1043 message = params[-1] 1047 1044 1045 # If message starts with X_DELIM, it's CTCP message. 1048 1046 if message[0]==X_DELIM: 1049 m = ctcpExtract(message) 1050 if m['extended']: 1051 self.ctcpReply(user, channel, m['extended']) 1047 # Trailing X_DELIM is optional. 1048 tag, data = ctcpParse(message) 1049 self.ctcpReply(user, channel, tag, data) 1050 return 1052 1051 1053 if not m['normal']:1054 return1055 1056 message = string.join(m['normal'], ' ')1057 1058 1052 self.noticed(user, channel, message) 1059 1053 1060 1054 def irc_NICK(self, prefix, params): … … 1155 1149 ### Receiving a CTCP query from another party 1156 1150 ### It is safe to leave these alone. 1157 1151 1158 def ctcpQuery(self, user, channel, messages):1152 def ctcpQuery(self, user, channel, tag, data): 1159 1153 """Dispatch method for any CTCP queries received. 1160 1154 """ 1161 for m in messages: 1162 method = getattr(self, "ctcpQuery_%s" % m[0], None) 1163 if method: 1164 method(user, channel, m[1]) 1165 else: 1166 self.ctcpUnknownQuery(user, channel, m[0], m[1]) 1155 method = getattr(self, "ctcpQuery_%s" % tag, None) 1156 if method: 1157 method(user, channel, data) 1158 else: 1159 self.ctcpUnknownQuery(user, channel, tag, data) 1167 1160 1168 1161 def ctcpQuery_ACTION(self, user, channel, data): 1169 1162 self.action(user, channel, data) … … 1419 1412 ### Receiving a response to a CTCP query (presumably to one we made) 1420 1413 ### You may want to add methods here, or override UnknownReply. 1421 1414 1422 def ctcpReply(self, user, channel, messages):1415 def ctcpReply(self, user, channel, tag, data): 1423 1416 """Dispatch method for any CTCP replies received. 1424 1417 """ 1425 for m in messages:1426 method = getattr(self, "ctcpReply_%s" % m[0], None)1427 if method:1428 method(user, channel, m[1])1429 else:1430 self.ctcpUnknownReply(user, channel, m[0], m[1])1431 1418 1419 method = getattr(self, "ctcpReply_%s" % tag, None) 1420 if method: 1421 method(user, channel, data) 1422 else: 1423 self.ctcpUnknownReply(user, channel, tag, data) 1424 1432 1425 def ctcpReply_PING(self, user, channel, data): 1433 1426 nick = user.split('!', 1)[0] 1434 1427 if (not self._pings) or (not self._pings.has_key((nick, data))): … … 1908 1901 1909 1902 X_DELIM = chr(001) 1910 1903 1911 def ctcp Extract(message):1912 """ Extract CTCP data from a string.1904 def ctcpParse(message): 1905 """Basic CTCP message parsing, data decoding is done by handlers""" 1913 1906 1914 Returns a dictionary with two items: 1907 # Trailing X_DELIM is optional. 1908 if message[-1] == X_DELIM: 1909 message = message[1:-1] 1910 else: 1911 message = message[1:] 1912 1913 tag, data = message.split(" ", 1) 1914 return (tag, data) 1915 1915 1916 - C{'extended'}: a list of CTCP (tag, data) tuples1917 - C{'normal'}: a list of strings which were not inside a CTCP delimeter1918 """1919 1920 extended_messages = []1921 normal_messages = []1922 retval = {'extended': extended_messages,1923 'normal': normal_messages }1924 1925 messages = string.split(message, X_DELIM)1926 odd = 01927 1928 # X1 extended data X2 nomal data X3 extended data X4 normal...1929 while messages:1930 if odd:1931 extended_messages.append(messages.pop(0))1932 else:1933 normal_messages.append(messages.pop(0))1934 odd = not odd1935 1936 extended_messages[:] = filter(None, extended_messages)1937 normal_messages[:] = filter(None, normal_messages)1938 1939 extended_messages[:] = map(ctcpDequote, extended_messages)1940 for i in xrange(len(extended_messages)):1941 m = string.split(extended_messages[i], SPC, 1)1942 tag = m[0]1943 if len(m) > 1:1944 data = m[1]1945 else:1946 data = None1947 1948 extended_messages[i] = (tag, data)1949 1950 return retval1951 1952 1916 # CTCP escaping 1953 1917 1954 1918 M_QUOTE= chr(020) … … 1983 1947 1984 1948 return mEscape_re.sub(sub, s) 1985 1949 1986 X_QUOTE = '\\'1987 1988 xQuoteTable = {1989 X_DELIM: X_QUOTE + 'a',1990 X_QUOTE: X_QUOTE + X_QUOTE1991 }1992 1993 xDequoteTable = {}1994 1995 for k, v in xQuoteTable.items():1996 xDequoteTable[v[-1]] = k1997 1998 xEscape_re = re.compile('%s.' % (re.escape(X_QUOTE),), re.DOTALL)1999 2000 def ctcpQuote(s):2001 for c in (X_QUOTE, X_DELIM):2002 s = string.replace(s, c, xQuoteTable[c])2003 return s2004 2005 def ctcpDequote(s):2006 def sub(matchobj, xDequoteTable=xDequoteTable):2007 s = matchobj.group()[1]2008 try:2009 s = xDequoteTable[s]2010 except KeyError:2011 s = s2012 return s2013 2014 return xEscape_re.sub(sub, s)2015 2016 1950 def ctcpStringify(messages): 2017 1951 """ 2018 1952 @type messages: a list of extended messages. An extended … … 2034 1968 m = "%s %s" % (tag, data) 2035 1969 else: 2036 1970 m = str(tag) 2037 m = ctcpQuote(m) 1971 2038 1972 m = "%s%s%s" % (X_DELIM, m, X_DELIM) 2039 1973 coded_messages.append(m) 2040 1974
