root / trunk / twisted / internet / abstract.py

Revision 25452, 13.3 kB (checked in by exarkun, 8 months ago)

Merge writesomedata-docs-3455

Author: exarkun
Reviewer: mwhudson
Fixes: #3455

Change the API documentation for writeSomeData so that it does not claim negative
values are returned to indicate connection lose. Instead document that an exception
instance will be returned to indicate that.

Line 
1 # -*- test-case-name: twisted.test.test_abstract -*-
2 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 Support for generic select()able objects.
8
9 Maintainer: Itamar Shtull-Trauring
10 """
11
12 from zope.interface import implements
13
14 # Twisted Imports
15 from twisted.python import log, reflect, failure
16 from twisted.persisted import styles
17 from twisted.internet import interfaces, main
18
19
20 class FileDescriptor(log.Logger, styles.Ephemeral, object):
21     """An object which can be operated on by select().
22
23     This is an abstract superclass of all objects which may be notified when
24     they are readable or writable; e.g. they have a file-descriptor that is
25     valid to be passed to select(2).
26     """
27     connected = 0
28     producerPaused = 0
29     streamingProducer = 0
30     producer = None
31     disconnected = 0
32     disconnecting = 0
33     _writeDisconnecting = False
34     _writeDisconnected = False
35     dataBuffer = ""
36     offset = 0
37
38     SEND_LIMIT = 128*1024
39
40     implements(interfaces.IProducer, interfaces.IReadWriteDescriptor,
41                interfaces.IConsumer, interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
42
43     def __init__(self, reactor=None):
44         if not reactor:
45             from twisted.internet import reactor
46         self.reactor = reactor
47         self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
48         self._tempDataLen = 0
49
50     def connectionLost(self, reason):
51         """The connection was lost.
52
53         This is called when the connection on a selectable object has been
54         lost.  It will be called whether the connection was closed explicitly,
55         an exception occurred in an event handler, or the other end of the
56         connection closed it first.
57
58         Clean up state here, but make sure to call back up to FileDescriptor.
59         """
60
61         self.disconnected = 1
62         self.connected = 0
63         if self.producer is not None:
64             self.producer.stopProducing()
65             self.producer = None
66         self.stopReading()
67         self.stopWriting()
68
69
70     def writeSomeData(self, data):
71         """
72         Write as much as possible of the given data, immediately.
73
74         This is called to invoke the lower-level writing functionality, such
75         as a socket's send() method, or a file's write(); this method
76         returns an integer or an exception.  If an integer, it is the number
77         of bytes written (possibly zero); if an exception, it indicates the
78         connection was lost.
79         """
80         raise NotImplementedError("%s does not implement writeSomeData" %
81                                   reflect.qual(self.__class__))
82
83
84     def doRead(self):
85         """Called when data is avaliable for reading.
86
87         Subclasses must override this method. The result will be interpreted
88         in the same way as a result of doWrite().
89         """
90         raise NotImplementedError("%s does not implement doRead" %
91                                   reflect.qual(self.__class__))
92
93     def doWrite(self):
94         """
95         Called when data can be written.
96
97         A result that is true (which will be a negative number or an
98         exception instance) indicates that the connection was lost. A false
99         result implies the connection is still there; a result of 0
100         indicates no write was done, and a result of None indicates that a
101         write was done.
102         """
103         if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
104             # If there is currently less than SEND_LIMIT bytes left to send
105             # in the string, extend it with the array data.
106             self.dataBuffer = buffer(self.dataBuffer, self.offset) + "".join(self._tempDataBuffer)
107             self.offset = 0
108             self._tempDataBuffer = []
109             self._tempDataLen = 0
110
111         # Send as much data as you can.
112         if self.offset:
113             l = self.writeSomeData(buffer(self.dataBuffer, self.offset))
114         else:
115             l = self.writeSomeData(self.dataBuffer)
116
117         # There is no writeSomeData implementation in Twisted which returns
118         # 0, but the documentation for writeSomeData used to claim negative
119         # integers meant connection lost.  Keep supporting this here,
120         # although it may be worth deprecating and removing at some point.
121         if l < 0 or isinstance(l, Exception):
122             return l
123         if l == 0 and self.dataBuffer:
124             result = 0
125         else:
126             result = None
127         self.offset += l
128         # If there is nothing left to send,
129         if self.offset == len(self.dataBuffer) and not self._tempDataLen:
130             self.dataBuffer = ""
131             self.offset = 0
132             # stop writing.
133             self.stopWriting()
134             # If I've got a producer who is supposed to supply me with data,
135             if self.producer is not None and ((not self.streamingProducer)
136                                               or self.producerPaused):
137                 # tell them to supply some more.
138                 self.producerPaused = 0
139                 self.producer.resumeProducing()
140             elif self.disconnecting:
141                 # But if I was previously asked to let the connection die, do
142                 # so.
143                 return self._postLoseConnection()
144             elif self._writeDisconnecting:
145                 # I was previously asked to to half-close the connection.
146                 result = self._closeWriteConnection()
147                 self._writeDisconnected = True
148                 return result
149         return result
150
151     def _postLoseConnection(self):
152         """Called after a loseConnection(), when all data has been written.
153
154         Whatever this returns is then returned by doWrite.
155         """
156         # default implementation, telling reactor we're finished
157         return main.CONNECTION_DONE
158
159     def _closeWriteConnection(self):
160         # override in subclasses
161         pass
162
163     def writeConnectionLost(self, reason):
164         # in current code should never be called
165         self.connectionLost(reason)
166
167     def readConnectionLost(self, reason):
168         # override in subclasses
169         self.connectionLost(reason)
170
171     def write(self, data):
172         """Reliably write some data.
173
174         The data is buffered until the underlying file descriptor is ready
175         for writing. If there is more than C{self.bufferSize} data in the
176         buffer and this descriptor has a registered streaming producer, its
177         C{pauseProducing()} method will be called.
178         """
179         if isinstance(data, unicode): # no, really, I mean it
180             raise TypeError("Data must not be unicode")
181         if not self.connected or self._writeDisconnected:
182             return
183         if data:
184             self._tempDataBuffer.append(data)
185             self._tempDataLen += len(data)
186             # If we are responsible for pausing our producer,
187             if self.producer is not None and self.streamingProducer:
188                 # and our buffer is full,
189                 if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
190                     # pause it.
191                     self.producerPaused = 1
192                     self.producer.pauseProducing()
193             self.startWriting()
194
195     def writeSequence(self, iovec):
196         """Reliably write a sequence of data.
197
198         Currently, this is a convenience method roughly equivalent to::
199
200             for chunk in iovec:
201                 fd.write(chunk)
202
203         It may have a more efficient implementation at a later time or in a
204         different reactor.
205
206         As with the C{write()} method, if a buffer size limit is reached and a
207         streaming producer is registered, it will be paused until the buffered
208         data is written to the underlying file descriptor.
209         """
210         if not self.connected or not iovec or self._writeDisconnected:
211             return
212         self._tempDataBuffer.extend(iovec)
213         for i in iovec:
214             self._tempDataLen += len(i)
215         # If we are responsible for pausing our producer,
216         if self.producer is not None and self.streamingProducer:
217             # and our buffer is full,
218             if len(self.dataBuffer) + self._tempDataLen > self.bufferSize:
219                 # pause it.
220                 self.producerPaused = 1
221                 self.producer.pauseProducing()
222         self.startWriting()
223
224     def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
225         """Close the connection at the next available opportunity.
226
227         Call this to cause this FileDescriptor to lose its connection.  It will
228         first write any data that it has buffered.
229
230         If there is data buffered yet to be written, this method will cause the
231         transport to lose its connection as soon as it's done flushing its
232         write buffer.  If you have a producer registered, the connection won't
233         be closed until the producer is finished. Therefore, make sure you
234         unregister your producer when it's finished, or the connection will
235         never close.
236         """
237
238         if self.connected and not self.disconnecting:
239             if self._writeDisconnected:
240                 # doWrite won't trigger the connection close anymore
241                 self.stopReading()
242                 self.stopWriting()
243                 self.connectionLost(_connDone)
244             else:
245                 self.stopReading()
246                 self.startWriting()
247                 self.disconnecting = 1
248
249     def loseWriteConnection(self):
250         self._writeDisconnecting = True
251         self.startWriting()
252
253     def stopReading(self):
254         """Stop waiting for read availability.
255
256         Call this to remove this selectable from being notified when it is
257         ready for reading.
258         """
259         self.reactor.removeReader(self)
260
261     def stopWriting(self):
262         """Stop waiting for write availability.
263
264         Call this to remove this selectable from being notified when it is ready
265         for writing.
266         """
267         self.reactor.removeWriter(self)
268
269     def startReading(self):
270         """Start waiting for read availability.
271         """
272         self.reactor.addReader(self)
273
274     def startWriting(self):
275         """Start waiting for write availability.
276
277         Call this to have this FileDescriptor be notified whenever it is ready for
278         writing.
279         """
280         self.reactor.addWriter(self)
281
282     # Producer/consumer implementation
283
284     # first, the consumer stuff.  This requires no additional work, as
285     # any object you can write to can be a consumer, really.
286
287     producer = None
288     bufferSize = 2**2**2**2
289
290     def registerProducer(self, producer, streaming):
291         """Register to receive data from a producer.
292
293         This sets this selectable to be a consumer for a producer.  When this
294         selectable runs out of data on a write() call, it will ask the producer
295         to resumeProducing(). When the FileDescriptor's internal data buffer is
296         filled, it will ask the producer to pauseProducing(). If the connection
297         is lost, FileDescriptor calls producer's stopProducing() method.
298
299         If streaming is true, the producer should provide the IPushProducer
300         interface. Otherwise, it is assumed that producer provides the
301         IPullProducer interface. In this case, the producer won't be asked
302         to pauseProducing(), but it has to be careful to write() data only
303         when its resumeProducing() method is called.
304         """
305         if self.producer is not None:
306             raise RuntimeError("Cannot register producer %s, because producer %s was never unregistered." % (producer, self.producer))
307         if self.disconnected:
308             producer.stopProducing()
309         else:
310             self.producer = producer
311             self.streamingProducer = streaming
312             if not streaming:
313                 producer.resumeProducing()
314
315     def unregisterProducer(self):
316         """Stop consuming data from a producer, without disconnecting.
317         """
318         self.producer = None
319
320     def stopConsuming(self):
321         """Stop consuming data.
322
323         This is called when a producer has lost its connection, to tell the
324         consumer to go lose its connection (and break potential circular
325         references).
326         """
327         self.unregisterProducer()
328         self.loseConnection()
329
330     # producer interface implementation
331
332     def resumeProducing(self):
333         assert self.connected and not self.disconnecting
334         self.startReading()
335
336     def pauseProducing(self):
337         self.stopReading()
338
339     def stopProducing(self):
340         self.loseConnection()
341
342
343     def fileno(self):
344         """File Descriptor number for select().
345
346         This method must be overridden or assigned in subclasses to
347         indicate a valid file descriptor for the operating system.
348         """
349         return -1
350
351
352 def isIPAddress(addr):
353     """
354     Determine whether the given string represents an IPv4 address.
355
356     @type addr: C{str}
357     @param addr: A string which may or may not be the decimal dotted
358     representation of an IPv4 address.
359
360     @rtype: C{bool}
361     @return: C{True} if C{addr} represents an IPv4 address, C{False}
362     otherwise.
363     """
364     dottedParts = addr.split('.')
365     if len(dottedParts) == 4:
366         for octet in dottedParts:
367             try:
368                 value = int(octet)
369             except ValueError:
370                 return False
371             else:
372                 if value < 0 or value > 255:
373                     return False
374         return True
375     return False
376
377
378 __all__ = ["FileDescriptor"]
Note: See TracBrowser for help on using the browser.