Ticket #5599: socket-family-from-fd-5599.patch

File socket-family-from-fd-5599.patch, 9.8 KB (added by rwall, 3 years ago)

Socket family and type detection functions plus tests

  • twisted/python/test/test_util.py

    === modified file 'twisted/python/test/test_util.py'
     
    1010
    1111import os.path, sys
    1212import shutil, errno, warnings
     13import socket
    1314try:
    1415    import pwd, grp
    1516except ImportError:
     
    1920from twisted.trial.util import suppress as SUPPRESS
    2021
    2122from twisted.python.compat import _PY3
     23from twisted.python.modules import namedAny
    2224from twisted.python import util
    2325from twisted.python.versions import Version
    2426from twisted.internet import reactor
     
    203205        self.assertEqual(self.mockos.actions, [])
    204206        warnings = self.flushWarnings([util.switchUID])
    205207        self.assertEqual(len(warnings), 1)
    206         self.assertIn('tried to drop privileges and setuid %i' % uid, 
     208        self.assertIn('tried to drop privileges and setuid %i' % uid,
    207209                      warnings[0]['message'])
    208210        self.assertIn('but uid is already %i' % uid, warnings[0]['message'])
    209211
     
    219221        self.assertEqual(self.mockos.seteuidCalls, [])
    220222        warnings = self.flushWarnings([util.switchUID])
    221223        self.assertEqual(len(warnings), 1)
    222         self.assertIn('tried to drop privileges and seteuid %i' % euid, 
     224        self.assertIn('tried to drop privileges and seteuid %i' % euid,
    223225                      warnings[0]['message'])
    224226        self.assertIn('but euid is already %i' % euid, warnings[0]['message'])
    225227
     
    11401142        self.assertEqual(str(Foo()), "<Foo first=1 2nd=2.1>")
    11411143
    11421144
     1145
    11431146    def test_fancybasename(self):
    11441147        """
    11451148        If C{fancybasename} is present, C{__str__} uses it instead of the class name.
     
    11601163        obj = Foo()
    11611164        self.assertEqual(str(obj), repr(obj))
    11621165
     1166
     1167
     1168class SocketTestBuilder(object):
     1169    """
     1170    Create L{unittest.TestCase} instances for combinations of
     1171    L{socket} families and types.
     1172
     1173    @ivar socketFamilies: A L{list} of 2tuple(familyName, familyValue)
     1174        which will be used when building test cases from this builder.
     1175    @ivar socketTypes: A L{list} of 2tuple(typeName, typeValue) which
     1176        will be used when building test cases from this builder.
     1177    @ivar socketFamily: The C{int} socket family value which will be
     1178        used when creating sockets for this test.
     1179    @ivar socketType: The C{int} socket family value which will be
     1180        used when creating sockets for this test.
     1181    """
     1182    socketFamilies = []
     1183    for n in dir(socket):
     1184        if n in ('AF_INET', 'AF_INET6', 'AF_UNIX'):
     1185            socketFamilies.append(('socket.%s' % (n,), getattr(socket, n)))
     1186    del n
     1187
     1188    socketTypes = []
     1189    for n in dir(socket):
     1190        if n in ('SOCK_DGRAM', 'SOCK_STREAM'):
     1191            socketTypes.append(('socket.%s' % (n,), getattr(socket, n)))
     1192    del n
     1193
     1194
     1195    def socketFactory(self):
     1196        """
     1197        Create a socket.
     1198
     1199        @returns: a L{socket.socket} built using C{self.socketFamily}
     1200            and C{self.socketType}.
     1201
     1202        @raises: L{unittest.SkipTest} if the L{socket.socket} raises
     1203            L{socket.error} eg because the socketType is unsupported
     1204            or due to permissions errors.
     1205        """
     1206        try:
     1207            s = socket.socket(self.socketFamily, self.socketType)
     1208            self.addCleanup(s.close)
     1209        except socket.error as e:
     1210            raise unittest.SkipTest(
     1211                'Unable to create socket for test. Reason: socket.%r' % (e,))
     1212        return s
     1213
     1214
     1215    @classmethod
     1216    def makeTestCaseClasses(cls):
     1217        """
     1218        @return: A L{dict} whose keys are test case names and whose
     1219            values are L{unittest.TestCase} instances. These should be
     1220            added to the globals() dict so that they can be discovered
     1221            by trial.
     1222        """
     1223        classes = {}
     1224
     1225        for familyName, familyValue in cls.socketFamilies:
     1226            for typeName, typeValue in cls.socketTypes:
     1227                shortFamilyName = familyName.split(".")[-1]
     1228                shortTypeName = typeName.split(".")[-1]
     1229                name = (
     1230                    cls.__name__ + "."
     1231                    + shortFamilyName + "."
     1232                    + shortTypeName).replace(".", "_")
     1233                class testcase(cls, unittest.SynchronousTestCase):
     1234                    __module__ = cls.__module__
     1235                    socketFamily = familyValue
     1236                    socketFamilyName = familyName
     1237                    socketType = typeValue
     1238                    socketTypeName = typeName
     1239                testcase.__name__ = name
     1240                classes[testcase.__name__] = testcase
     1241        return classes
     1242
     1243
     1244
     1245class SocketFamilyFromFdSupportedTestBuilder(SocketTestBuilder):
     1246    """
     1247    Success tests for L{util.socketFamilyFromFd}.
     1248    """
     1249    def test_supported(self):
     1250        """
     1251        L{util.socketFamilyFromFd} accepts a socket filedescriptor and
     1252        returns an C{int} corresponding to one of the following socket
     1253        family constants 'AF_INET', 'AF_INET6', 'AF_UNIX'.
     1254
     1255        Repeated for all supported socketTypes.
     1256        """
     1257        fd = self.socketFactory().fileno()
     1258        self.assertEqual(util.socketFamilyFromFd(fd), self.socketFamily)
     1259
     1260
     1261
     1262globals().update(SocketFamilyFromFdSupportedTestBuilder.makeTestCaseClasses())
     1263
     1264
     1265
     1266class SocketFamilyFromFdUnsupportedTestBuilder(SocketTestBuilder):
     1267    """
     1268    Failure tests for L{util.socketFamilyFromFd}.
     1269    """
     1270    socketFamilies = []
     1271    for n in dir(socket):
     1272        if n.startswith('AF_') and n not in ('AF_INET', 'AF_INET6', 'AF_UNIX'):
     1273            socketFamilies.append(('socket.%s' % (n,), getattr(socket, n)))
     1274    del n
     1275
     1276    def test_unsupported(self):
     1277        """
     1278        L{util.socketFamilyFromFd} returns C{None} if it cannot detect
     1279        the socket family from the provided socket file descriptor.
     1280
     1281        Repeated for all supported socketTypes.
     1282        """
     1283        fd = self.socketFactory().fileno()
     1284        self.assertIs(util.socketFamilyFromFd(fd), None)
     1285
     1286
     1287
     1288globals().update(SocketFamilyFromFdUnsupportedTestBuilder.makeTestCaseClasses())
     1289
     1290
     1291
     1292class SocketTypeFromFdTestBuilder(SocketTestBuilder):
     1293    """
     1294    Success tests for L{util.socketTypeFromFd}.
     1295    """
     1296    def test_type(self):
     1297        """
     1298        L{util.socketTypeFromFd} returns a C{int} corresponding to the
     1299        socket type of the socket filedescriptor.
     1300
     1301        Repeated for all supported socketFamilies.
     1302        """
     1303        fd = self.socketFactory().fileno()
     1304        self.assertEqual(util.socketTypeFromFd(fd), self.socketType)
     1305
     1306
     1307
     1308globals().update(SocketTypeFromFdTestBuilder.makeTestCaseClasses())
     1309
     1310
     1311
    11631312if _PY3:
    11641313    del (SwitchUIDTest, SearchUpwardsTest, RunAsEffectiveUserTests,
    11651314         OrderedDictTest, IntervalDifferentialTestCase, UtilTestCase,
  • twisted/python/util.py

    === modified file 'twisted/python/util.py'
     
    55from __future__ import division, absolute_import
    66
    77import os, sys, errno, warnings
     8import socket
    89try:
    910    import pwd, grp
    1011except ImportError:
     
    1718from twisted.python.deprecate import deprecated
    1819from twisted.python.versions import Version
    1920from twisted.python.compat import _PY3, unicode
     21
    2022if _PY3:
    2123    UserDict = object
    2224else:
     
    10661068
    10671069
    10681070
     1071def _socketFamilyFromName(name):
     1072    """
     1073    Attempt to detect the the socket family based on the type and
     1074    length of a name provided by L{socket.socket.getsocketname}.
     1075
     1076    A string is assumed to be AF_UNIX.
     1077    A 2tuple containing and IPv4 address is AF_INET.
     1078    A 4tuple containing an IPv6 address is AF_INET6.
     1079
     1080    @type name: mixed
     1081    @param name: Any return value of L{socket.socket.getsocketname}.
     1082
     1083    @return: An C{int} corresponding to the detected socket family
     1084        constant or C{None} if the family could not be detected.
     1085    """
     1086    family = None
     1087    if isinstance(name, str):
     1088        family = socket.AF_UNIX
     1089    elif isinstance(name, tuple):
     1090        if len(name) == 4:
     1091            family = socket.AF_INET6
     1092        elif len(name) == 2:
     1093            ip, port = name
     1094            if isinstance(ip, str):
     1095                try:
     1096                    socket.inet_aton(ip)
     1097                except:
     1098                    pass
     1099                else:
     1100                    family = socket.AF_INET
     1101    return family
     1102
     1103
     1104
     1105def socketFamilyFromFd(fd):
     1106    """
     1107    Attempt to detect the socket family of a socket filedescriptor.
     1108
     1109    @type fd: C{int}
     1110    @param fd: A filedescriptor number.
     1111
     1112    @return: An C{int} corresponding to the detected socket family
     1113        constant or C{None} if the family could not be detected.
     1114
     1115    @see: U{http://utcc.utoronto.ca/~cks/space/blog/python/SocketFromFdMistake}
     1116    """
     1117    probe = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_RAW)
     1118    try:
     1119        name = probe.getsockname()
     1120    except socket.error:
     1121        return
     1122    finally:
     1123        probe.close()
     1124
     1125    return _socketFamilyFromName(name)
     1126
     1127
     1128
     1129def socketTypeFromFd(fd):
     1130    """
     1131    Attempt to detect the socket type of a socket filedescriptor.
     1132
     1133    @type fd: C{int}
     1134    @param fd: A filedescriptor number.
     1135
     1136    @return: An C{int} corresponding to the detected socket type
     1137        constant.
     1138
     1139    @see: U{http://utcc.utoronto.ca/~cks/space/blog/python/SocketFromFdMistake}
     1140    """
     1141    probe = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_RAW)
     1142    try:
     1143        socketType = probe.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
     1144    finally:
     1145        probe.close()
     1146    return socketType
     1147
     1148
     1149
    10691150__all__ = [
    10701151    "uniquify", "padTo", "getPluginDirs", "addPluginDir", "sibpath",
    10711152    "getPassword", "println", "makeStatBar", "OrderedDict",
     
    10751156    "nameToLabel", "uidFromString", "gidFromString", "runAsEffectiveUser",
    10761157    "untilConcludes",
    10771158    "runWithWarningsSuppressed",
     1159    "socketFamilyFromFd", "socketTypeFromFd",
    10781160    ]
    10791161
    10801162