[Twisted-Python] streaming producer

Andrea Arcangeli andrea at cpushare.com
Wed Mar 9 01:27:21 MST 2005


It wasn't difficult to throttle the process stdout after
understanding the registerProducer mechanism. This patch did
the trick just fine. I didn't find docs covering the details,
but the transport code was very readable.

I should be ok with the streaming version since I don't need
resumeProducing to be recalled every time the socket returns writeable
(I only need resumeProducing to be recalled if the socket returns
writeable and the process-protocol is in paused state).

Index: cpushare/seccomp.py
===================================================================
RCS file: /home/andrea/crypto/cvs/cpushare/client/cpushare/cpushare/seccomp.py,v
retrieving revision 1.11
diff -u -p -r1.11 seccomp.py
--- cpushare/seccomp.py	3 Mar 2005 05:56:50 -0000	1.11
+++ cpushare/seccomp.py	9 Mar 2005 07:38:01 -0000
@@ -29,6 +29,8 @@ class seccomp_protocol_class(protocol.Pr
 		self.d = deferred
 		self.outReceived = self.enable_seccomp_mode
 	def connectionMade(self):
+		self.seccomp.cpushare_protocol.seccomp_protocols.append(self)
+		self.seccomp.cpushare_protocol.transport.registerProducer(self, 1)
 		self.transport.closeChildFD(2) # close stderr right away
 		self.transport.writeToChild(0, self.seccomp.header + self.seccomp.text_data)
 	def enable_seccomp_mode(self, data):
@@ -50,12 +52,14 @@ class seccomp_protocol_class(protocol.Pr
 		self.outReceived = self.send_to_server
 		self.transport.writeToChild(0, MAGIC_GOT_SECCOMP)
 	def send_to_server(self, data):
-		self.seccomp.state_machine.protocol.sendString(PROTO_SECCOMP_FORWARD + data)
+		self.seccomp.cpushare_protocol.sendString(PROTO_SECCOMP_FORWARD + data)
 	def recv_from_server(self, data):
 		self.transport.writeToChild(0, data)
 	def errReceived(self, data):
 		raise "shouldn't happen"
 	def processEnded(self, status):
+		self.seccomp.cpushare_protocol.seccomp_protocols.remove(self)
+		self.seccomp.cpushare_protocol.transport.unregisterProducer()
 		if status.value.exitCode:
 			if status.value.exitCode == 4:
 				print 'Failure in setting the stack size to %d bytes.' % self.seccomp.stack
@@ -70,11 +74,19 @@ class seccomp_protocol_class(protocol.Pr
 		if self.transport.pid is not None:
 			os.kill(self.transport.pid, signal.SIGKILL)
 
+	def resumeProducing(self):
+		self.transport.resumeProducing()
+	def pauseProducing(self):
+		self.transport.pauseProducing()
+	def stopProducing(self):
+		self.transport.loseConnection()
+
 class seccomp_class(object):
 	header_fmt = 'iiiIIiiI'
 
 	def __init__(self, header, state_machine):
 		self.state_machine = state_machine
+		self.cpushare_protocol = state_machine.protocol
 
 		size = struct.calcsize(self.header_fmt)
 		assert size + struct.calcsize('I') + 16 == len(header), "corrupted header"

> I will probably make the max buffer size configurable dynamically and

It should be enough to override the bufferSize in the cpushare_protocol
implementation if I want to enlarge the buffer (default is 64k).




More information about the Twisted-Python mailing list