Ticket #2060: iaddress-connect-listen.diff
| File iaddress-connect-listen.diff, 5.4 KB (added by rwall, 7 years ago) |
|---|
-
twisted/test/test_unix.py
299 299 300 300 return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening) 301 301 302 class UNIXAddressTestCase(PortCleanerUpper): 303 socketPath = "/tmp/UNIXAddressTestCase.socket" 304 def testConnect(self): 305 """Test that UNIXAddress.connect returns an IConnector""" 306 listeningPort = reactor.listenUNIX(self.socketPath, Factory(self, self.socketPath)) 307 self.ports.append(listeningPort) 308 addr = address.UNIXAddress(self.socketPath) 309 conn = addr.connect(TestClientFactory(self, self.socketPath)) 310 self.assertTrue(interfaces.IConnector.providedBy(conn)) 302 311 312 def testListen(self): 313 """Test that UNIXAddress.listen returns an IListeningPort""" 314 addr = address.UNIXAddress(self.socketPath) 315 listeningPort = addr.listen(Factory(self, self.socketPath)) 316 self.ports.append(listeningPort) 317 self.assertTrue(interfaces.IListeningPort.providedBy(listeningPort)) 318 303 319 if not interfaces.IReactorUNIX(reactor, None): 304 320 UnixSocketTestCase.skip = "This reactor does not support UNIX domain sockets" 305 321 if not interfaces.IReactorUNIXDatagram(reactor, None): -
twisted/test/test_tcp.py
15 15 from twisted.internet import protocol, reactor, defer, interfaces 16 16 from twisted.internet import error 17 17 from twisted.internet.address import IPv4Address 18 from twisted.internet.interfaces import IHalfCloseableProtocol 18 from twisted.internet.interfaces import (IConnector, IHalfCloseableProtocol, 19 IListeningPort) 19 20 from twisted.protocols import policies 20 21 21 22 … … 1142 1143 d.addCallback(lambda _: log.flushErrors(RuntimeError)) 1143 1144 return d 1144 1145 1146 class IPv4AddressTestCase(PortCleanerUpper): 1147 def testConnect(self): 1148 """Test that IPv4Address.connect returns an IConnector""" 1149 factory = ClosingFactory() 1150 listeningPort = reactor.listenTCP(0, factory) 1151 self.ports.append(listeningPort) 1152 factory.port = listeningPort 1153 portNo = listeningPort.getHost().port 1154 addr = IPv4Address("TCP", "127.0.0.1", portNo) 1155 conn = addr.connect(MyClientFactory()) 1156 self.assertTrue(IConnector.providedBy(conn)) 1145 1157 1158 def testListen(self): 1159 """Test that IPv4Address.listen returns an IListeningPort""" 1160 factory = ClosingFactory() 1161 addr = IPv4Address("TCP", "0.0.0.0", 0) 1162 listeningPort = addr.listen(factory) 1163 self.ports.append(listeningPort) 1164 factory.port = listeningPort 1165 self.assertTrue(IListeningPort.providedBy(listeningPort)) 1166 1146 1167 try: 1147 1168 import resource 1148 1169 except ImportError: -
twisted/internet/interfaces.py
19 19 Default implementations are in L{twisted.internet.address}. 20 20 """ 21 21 22 22 def connect(factory, timeout=30): 23 """Attempt to connect to my address 24 25 @param factory: A protocol factory 26 @return: An object which provides L{IConnector}. 27 """ 28 29 def listen(factory, backlog=50): 30 """Attempt to listen at my address 31 32 @param factory: A protocol factory 33 @return: An object which provides L{IListeningPort}. 34 """ 23 35 ### Reactor Interfaces 24 36 25 37 class IConnector(Interface): -
twisted/internet/address.py
34 34 self.port = port 35 35 self._bwHack = _bwHack 36 36 37 def connect(self, factory, timeout=30): 38 from twisted.internet import reactor 39 return reactor.connectTCP(self.host, self.port, factory, timeout) 40 41 def listen(self, factory, backlog=50): 42 from twisted.internet import reactor 43 return reactor.listenTCP(self.port, factory, backlog, interface=self.host) 44 37 45 def __getitem__(self, index): 38 46 warnings.warn("IPv4Address.__getitem__ is deprecated. Use attributes instead.", 39 47 category=DeprecationWarning, stacklevel=2) … … 70 78 def __init__(self, name, _bwHack='UNIX'): 71 79 self.name = name 72 80 self._bwHack = _bwHack 73 81 82 def connect(self, factory, timeout=30): 83 from twisted.internet import reactor 84 return reactor.connectUNIX(self.name, factory, timeout) 85 86 def listen(self, factory, backlog=50): 87 from twisted.internet import reactor 88 return reactor.listenUNIX(self.name, factory, backlog) 89 74 90 def __getitem__(self, index): 75 91 warnings.warn("UNIXAddress.__getitem__ is deprecated. Use attributes instead.", 76 92 category=DeprecationWarning, stacklevel=2) … … 92 108 return False 93 109 94 110 def __str__(self): 95 return 'UNIX Socket(%r)' % (self.name,)111 return 'UNIXAddress(%r)' % (self.name,) 96 112 97 113 98 114 # These are for buildFactory backwards compatability due to
