root / trunk / twisted / protocols / pcp.py

Revision 17451, 7.1 kB (checked in by foom, 3 years ago)

Remove twisted.components.Interface completely.

Merges: killtpc-1636-2
Authors: glyph, therve, jknight
Reviewer: exarkun

Remove deprecated twisted.python.components functionality:
MetaInterface?, Interface, getAdapterClass, and
getAdapterClassWithInheritance.

Also convert all twisted interfaces to use zope.interface directly
(including removing "self" argument), and remove redundant tests.

Line 
1 # -*- test-case-name: twisted.test.test_pcp -*-
2 #
3 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6
7 """Producer-Consumer Proxy."""
8
9 __version__ = '$Revision: 1.4 $'[11:-2]
10
11 import operator
12
13 from zope.interface import implements
14
15 from twisted.internet import interfaces
16
17
18 class BasicProducerConsumerProxy:
19     """ I can act as a man in the middle between any Producer and Consumer.
20
21     @ivar producer: the Producer I subscribe to.
22     @type producer: L{IProducer<interfaces.IProducer>}
23     @ivar consumer: the Consumer I publish to.
24     @type consumer: L{IConsumer<interfaces.IConsumer>}
25     @ivar paused: As a Producer, am I paused?
26     @type paused: bool
27     """
28     implements(interfaces.IProducer, interfaces.IConsumer)
29
30     consumer = None
31     producer = None
32     producerIsStreaming = None
33     iAmStreaming = True
34     outstandingPull = False
35     paused = False
36     stopped = False
37
38     def __init__(self, consumer):
39         self._buffer = []
40         if consumer is not None:
41             self.consumer = consumer
42             consumer.registerProducer(self, self.iAmStreaming)
43
44     # Producer methods:
45
46     def pauseProducing(self):
47         self.paused = True
48         if self.producer:
49             self.producer.pauseProducing()
50
51     def resumeProducing(self):
52         self.paused = False
53         if self._buffer:
54             # TODO: Check to see if consumer supports writeSeq.
55             self.consumer.write(''.join(self._buffer))
56             self._buffer[:] = []
57         else:
58             if not self.iAmStreaming:
59                 self.outstandingPull = True
60
61         if self.producer is not None:
62             self.producer.resumeProducing()
63
64     def stopProducing(self):
65         if self.producer is not None:
66             self.producer.stopProducing()
67         if self.consumer is not None:
68             del self.consumer
69
70     # Consumer methods:
71
72     def write(self, data):
73         if self.paused or (not self.iAmStreaming and not self.outstandingPull):
74             # We could use that fifo queue here.
75             self._buffer.append(data)
76
77         elif self.consumer is not None:
78             self.consumer.write(data)
79             self.outstandingPull = False
80
81     def finish(self):
82         if self.consumer is not None:
83             self.consumer.finish()
84         self.unregisterProducer()
85
86     def registerProducer(self, producer, streaming):
87         self.producer = producer
88         self.producerIsStreaming = streaming
89
90     def unregisterProducer(self):
91         if self.producer is not None:
92             del self.producer
93             del self.producerIsStreaming
94         if self.consumer:
95             self.consumer.unregisterProducer()
96
97     def __repr__(self):
98         return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
99
100
101 class ProducerConsumerProxy(BasicProducerConsumerProxy):
102     """ProducerConsumerProxy with a finite buffer.
103
104     When my buffer fills up, I have my parent Producer pause until my buffer
105     has room in it again.
106     """
107     # Copies much from abstract.FileDescriptor
108     bufferSize = 2**2**2**2
109
110     producerPaused = False
111     unregistered = False
112
113     def pauseProducing(self):
114         # Does *not* call up to ProducerConsumerProxy to relay the pause
115         # message through to my parent Producer.
116         self.paused = True
117
118     def resumeProducing(self):
119         self.paused = False
120         if self._buffer:
121             data = ''.join(self._buffer)
122             bytesSent = self._writeSomeData(data)
123             if bytesSent < len(data):
124                 unsent = data[bytesSent:]
125                 assert not self.iAmStreaming, (
126                     "Streaming producer did not write all its data.")
127                 self._buffer[:] = [unsent]
128             else:
129                 self._buffer[:] = []
130         else:
131             bytesSent = 0
132
133         if (self.unregistered and bytesSent and not self._buffer and
134             self.consumer is not None):
135             self.consumer.unregisterProducer()
136
137         if not self.iAmStreaming:
138             self.outstandingPull = not bytesSent
139
140         if self.producer is not None:
141             bytesBuffered = reduce(operator.add,
142                                    [len(s) for s in self._buffer], 0)
143             # TODO: You can see here the potential for high and low
144             # watermarks, where bufferSize would be the high mark when we
145             # ask the upstream producer to pause, and we wouldn't have
146             # it resume again until it hit the low mark.  Or if producer
147             # is Pull, maybe we'd like to pull from it as much as necessary
148             # to keep our buffer full to the low mark, so we're never caught
149             # without something to send.
150             if self.producerPaused and (bytesBuffered < self.bufferSize):
151                 # Now that our buffer is empty,
152                 self.producerPaused = False
153                 self.producer.resumeProducing()
154             elif self.outstandingPull:
155                 # I did not have any data to write in response to a pull,
156                 # so I'd better pull some myself.
157                 self.producer.resumeProducing()
158
159     def write(self, data):
160         if self.paused or (not self.iAmStreaming and not self.outstandingPull):
161             # We could use that fifo queue here.
162             self._buffer.append(data)
163
164         elif self.consumer is not None:
165             assert not self._buffer, (
166                 "Writing fresh data to consumer before my buffer is empty!")
167             # I'm going to use _writeSomeData here so that there is only one
168             # path to self.consumer.write.  But it doesn't actually make sense,
169             # if I am streaming, for some data to not be all data.  But maybe I
170             # am not streaming, but I am writing here anyway, because there was
171             # an earlier request for data which was not answered.
172             bytesSent = self._writeSomeData(data)
173             self.outstandingPull = False
174             if not bytesSent == len(data):
175                 assert not self.iAmStreaming, (
176                     "Streaming producer did not write all its data.")
177                 self._buffer.append(data[bytesSent:])
178
179         if (self.producer is not None) and self.producerIsStreaming:
180             bytesBuffered = reduce(operator.add,
181                                    [len(s) for s in self._buffer], 0)
182             if bytesBuffered >= self.bufferSize:
183
184                 self.producer.pauseProducing()
185                 self.producerPaused = True
186
187     def registerProducer(self, producer, streaming):
188         self.unregistered = False
189         BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
190         if not streaming:
191             producer.resumeProducing()
192
193     def unregisterProducer(self):
194         if self.producer is not None:
195             del self.producer
196             del self.producerIsStreaming
197         self.unregistered = True
198         if self.consumer and not self._buffer:
199             self.consumer.unregisterProducer()
200
201     def _writeSomeData(self, data):
202         """Write as much of this data as possible.
203
204         @returns: The number of bytes written.
205         """
206         if self.consumer is None:
207             return 0
208         self.consumer.write(data)
209         return len(data)
Note: See TracBrowser for help on using the browser.