| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
|
|---|
| 4 |
|
|---|
| 5 |
|
|---|
| 6 |
|
|---|
| 7 |
"""Hierarchical Token Bucket traffic shaping. |
|---|
| 8 |
|
|---|
| 9 |
Patterned after U{Martin Devera's Hierarchical Token Bucket traffic |
|---|
| 10 |
shaper for the Linux kernel<http://luxik.cdi.cz/~devik/qos/htb/>}. |
|---|
| 11 |
|
|---|
| 12 |
@seealso: U{HTB Linux queuing discipline manual - user guide |
|---|
| 13 |
<http://luxik.cdi.cz/~devik/qos/htb/manual/userg.htm>} |
|---|
| 14 |
@seealso: U{Token Bucket Filter in Linux Advanced Routing & Traffic Control |
|---|
| 15 |
HOWTO<http://lartc.org/howto/lartc.qdisc.classless.html#AEN682>} |
|---|
| 16 |
@author: Kevin Turner |
|---|
| 17 |
""" |
|---|
| 18 |
|
|---|
| 19 |
from __future__ import nested_scopes |
|---|
| 20 |
|
|---|
| 21 |
__version__ = '$Revision: 1.5 $'[11:-2] |
|---|
| 22 |
|
|---|
| 23 |
|
|---|
| 24 |
|
|---|
| 25 |
|
|---|
| 26 |
|
|---|
| 27 |
from time import time |
|---|
| 28 |
from zope.interface import implements, Interface |
|---|
| 29 |
|
|---|
| 30 |
from twisted.protocols import pcp |
|---|
| 31 |
|
|---|
| 32 |
|
|---|
| 33 |
class Bucket: |
|---|
| 34 |
"""Token bucket, or something like it. |
|---|
| 35 |
|
|---|
| 36 |
I can hold up to a certain number of tokens, and I drain over time. |
|---|
| 37 |
|
|---|
| 38 |
@cvar maxburst: Size of the bucket, in bytes. If None, the bucket is |
|---|
| 39 |
never full. |
|---|
| 40 |
@type maxburst: int |
|---|
| 41 |
@cvar rate: Rate the bucket drains, in bytes per second. If None, |
|---|
| 42 |
the bucket drains instantaneously. |
|---|
| 43 |
@type rate: int |
|---|
| 44 |
""" |
|---|
| 45 |
|
|---|
| 46 |
maxburst = None |
|---|
| 47 |
rate = None |
|---|
| 48 |
|
|---|
| 49 |
_refcount = 0 |
|---|
| 50 |
|
|---|
| 51 |
def __init__(self, parentBucket=None): |
|---|
| 52 |
self.content = 0 |
|---|
| 53 |
self.parentBucket=parentBucket |
|---|
| 54 |
self.lastDrip = time() |
|---|
| 55 |
|
|---|
| 56 |
def add(self, amount): |
|---|
| 57 |
"""Add tokens to me. |
|---|
| 58 |
|
|---|
| 59 |
@param amount: A quanity of tokens to add. |
|---|
| 60 |
@type amount: int |
|---|
| 61 |
|
|---|
| 62 |
@returns: The number of tokens that fit. |
|---|
| 63 |
@returntype: int |
|---|
| 64 |
""" |
|---|
| 65 |
self.drip() |
|---|
| 66 |
if self.maxburst is None: |
|---|
| 67 |
allowable = amount |
|---|
| 68 |
else: |
|---|
| 69 |
allowable = min(amount, self.maxburst - self.content) |
|---|
| 70 |
|
|---|
| 71 |
if self.parentBucket is not None: |
|---|
| 72 |
allowable = self.parentBucket.add(allowable) |
|---|
| 73 |
self.content += allowable |
|---|
| 74 |
return allowable |
|---|
| 75 |
|
|---|
| 76 |
def drip(self): |
|---|
| 77 |
"""Let some of the bucket drain. |
|---|
| 78 |
|
|---|
| 79 |
How much of the bucket drains depends on how long it has been |
|---|
| 80 |
since I was last called. |
|---|
| 81 |
|
|---|
| 82 |
@returns: True if I am now empty. |
|---|
| 83 |
@returntype: bool |
|---|
| 84 |
""" |
|---|
| 85 |
if self.parentBucket is not None: |
|---|
| 86 |
self.parentBucket.drip() |
|---|
| 87 |
|
|---|
| 88 |
if self.rate is None: |
|---|
| 89 |
self.content = 0 |
|---|
| 90 |
return True |
|---|
| 91 |
else: |
|---|
| 92 |
now = time() |
|---|
| 93 |
deltaT = now - self.lastDrip |
|---|
| 94 |
self.content = long(max(0, self.content - deltaT * self.rate)) |
|---|
| 95 |
self.lastDrip = now |
|---|
| 96 |
return False |
|---|
| 97 |
|
|---|
| 98 |
|
|---|
| 99 |
class IBucketFilter(Interface): |
|---|
| 100 |
def getBucketFor(*somethings, **some_kw): |
|---|
| 101 |
"""I'll give you a bucket for something. |
|---|
| 102 |
|
|---|
| 103 |
@returntype: L{Bucket} |
|---|
| 104 |
""" |
|---|
| 105 |
|
|---|
| 106 |
class HierarchicalBucketFilter: |
|---|
| 107 |
"""I filter things into buckets, and I am nestable. |
|---|
| 108 |
|
|---|
| 109 |
@cvar bucketFactory: Class of buckets to make. |
|---|
| 110 |
@type bucketFactory: L{Bucket} class |
|---|
| 111 |
@cvar sweepInterval: Seconds between sweeping out the bucket cache. |
|---|
| 112 |
@type sweepInterval: int |
|---|
| 113 |
""" |
|---|
| 114 |
|
|---|
| 115 |
implements(IBucketFilter) |
|---|
| 116 |
|
|---|
| 117 |
bucketFactory = Bucket |
|---|
| 118 |
sweepInterval = None |
|---|
| 119 |
|
|---|
| 120 |
def __init__(self, parentFilter=None): |
|---|
| 121 |
self.buckets = {} |
|---|
| 122 |
self.parentFilter = parentFilter |
|---|
| 123 |
self.lastSweep = time() |
|---|
| 124 |
|
|---|
| 125 |
def getBucketFor(self, *a, **kw): |
|---|
| 126 |
"""You want a bucket for that? I'll give you a bucket. |
|---|
| 127 |
|
|---|
| 128 |
Any parameters are passed on to L{getBucketKey}, from them it |
|---|
| 129 |
decides which bucket you get. |
|---|
| 130 |
|
|---|
| 131 |
@returntype: L{Bucket} |
|---|
| 132 |
""" |
|---|
| 133 |
if ((self.sweepInterval is not None) |
|---|
| 134 |
and ((time() - self.lastSweep) > self.sweepInterval)): |
|---|
| 135 |
self.sweep() |
|---|
| 136 |
|
|---|
| 137 |
if self.parentFilter: |
|---|
| 138 |
parentBucket = self.parentFilter.getBucketFor(self, *a, **kw) |
|---|
| 139 |
else: |
|---|
| 140 |
parentBucket = None |
|---|
| 141 |
|
|---|
| 142 |
key = self.getBucketKey(*a, **kw) |
|---|
| 143 |
bucket = self.buckets.get(key) |
|---|
| 144 |
if bucket is None: |
|---|
| 145 |
bucket = self.bucketFactory(parentBucket) |
|---|
| 146 |
self.buckets[key] = bucket |
|---|
| 147 |
return bucket |
|---|
| 148 |
|
|---|
| 149 |
def getBucketKey(self, *a, **kw): |
|---|
| 150 |
"""I determine who gets which bucket. |
|---|
| 151 |
|
|---|
| 152 |
Unless I'm overridden, everything gets the same bucket. |
|---|
| 153 |
|
|---|
| 154 |
@returns: something to be used as a key in the bucket cache. |
|---|
| 155 |
""" |
|---|
| 156 |
return None |
|---|
| 157 |
|
|---|
| 158 |
def sweep(self): |
|---|
| 159 |
"""I throw away references to empty buckets.""" |
|---|
| 160 |
for key, bucket in self.buckets.items(): |
|---|
| 161 |
if (bucket._refcount == 0) and bucket.drip(): |
|---|
| 162 |
del self.buckets[key] |
|---|
| 163 |
|
|---|
| 164 |
self.lastSweep = time() |
|---|
| 165 |
|
|---|
| 166 |
|
|---|
| 167 |
class FilterByHost(HierarchicalBucketFilter): |
|---|
| 168 |
"""A bucket filter with a bucket for each host. |
|---|
| 169 |
""" |
|---|
| 170 |
sweepInterval = 60 * 20 |
|---|
| 171 |
|
|---|
| 172 |
def getBucketKey(self, transport): |
|---|
| 173 |
return transport.getPeer()[1] |
|---|
| 174 |
|
|---|
| 175 |
|
|---|
| 176 |
class FilterByServer(HierarchicalBucketFilter): |
|---|
| 177 |
"""A bucket filter with a bucket for each service. |
|---|
| 178 |
""" |
|---|
| 179 |
sweepInterval = None |
|---|
| 180 |
|
|---|
| 181 |
def getBucketKey(self, transport): |
|---|
| 182 |
return transport.getHost()[2] |
|---|
| 183 |
|
|---|
| 184 |
|
|---|
| 185 |
class ShapedConsumer(pcp.ProducerConsumerProxy): |
|---|
| 186 |
"""I wrap a Consumer and shape the rate at which it receives data. |
|---|
| 187 |
""" |
|---|
| 188 |
|
|---|
| 189 |
|
|---|
| 190 |
iAmStreaming = False |
|---|
| 191 |
|
|---|
| 192 |
def __init__(self, consumer, bucket): |
|---|
| 193 |
pcp.ProducerConsumerProxy.__init__(self, consumer) |
|---|
| 194 |
self.bucket = bucket |
|---|
| 195 |
self.bucket._refcount += 1 |
|---|
| 196 |
|
|---|
| 197 |
def _writeSomeData(self, data): |
|---|
| 198 |
|
|---|
| 199 |
|
|---|
| 200 |
|
|---|
| 201 |
|
|---|
| 202 |
amount = self.bucket.add(len(data)) |
|---|
| 203 |
return pcp.ProducerConsumerProxy._writeSomeData(self, data[:amount]) |
|---|
| 204 |
|
|---|
| 205 |
def stopProducing(self): |
|---|
| 206 |
pcp.ProducerConsumerProxy.stopProducing(self) |
|---|
| 207 |
self.bucket._refcount -= 1 |
|---|
| 208 |
|
|---|
| 209 |
|
|---|
| 210 |
class ShapedTransport(ShapedConsumer): |
|---|
| 211 |
"""I wrap a Transport and shape the rate at which it receives data. |
|---|
| 212 |
|
|---|
| 213 |
I am a L{ShapedConsumer} with a little bit of magic to provide for |
|---|
| 214 |
the case where the consumer I wrap is also a Transport and people |
|---|
| 215 |
will be attempting to access attributes I do not proxy as a |
|---|
| 216 |
Consumer (e.g. loseConnection). |
|---|
| 217 |
""" |
|---|
| 218 |
|
|---|
| 219 |
|
|---|
| 220 |
iAmStreaming = False |
|---|
| 221 |
def __getattr__(self, name): |
|---|
| 222 |
|
|---|
| 223 |
|
|---|
| 224 |
return getattr(self.consumer, name) |
|---|
| 225 |
|
|---|
| 226 |
|
|---|
| 227 |
class ShapedProtocolFactory: |
|---|
| 228 |
"""I dispense Protocols with traffic shaping on their transports. |
|---|
| 229 |
|
|---|
| 230 |
Usage:: |
|---|
| 231 |
|
|---|
| 232 |
myserver = SomeFactory() |
|---|
| 233 |
myserver.protocol = ShapedProtocolFactory(myserver.protocol, |
|---|
| 234 |
bucketFilter) |
|---|
| 235 |
|
|---|
| 236 |
Where SomeServerFactory is a L{twisted.internet.protocol.Factory}, and |
|---|
| 237 |
bucketFilter is an instance of L{HierarchicalBucketFilter}. |
|---|
| 238 |
""" |
|---|
| 239 |
def __init__(self, protoClass, bucketFilter): |
|---|
| 240 |
"""Tell me what to wrap and where to get buckets. |
|---|
| 241 |
|
|---|
| 242 |
@param protoClass: The class of Protocol I will generate |
|---|
| 243 |
wrapped instances of. |
|---|
| 244 |
@type protoClass: L{Protocol<twisted.internet.interfaces.IProtocol>} |
|---|
| 245 |
class |
|---|
| 246 |
@param bucketFilter: The filter which will determine how |
|---|
| 247 |
traffic is shaped. |
|---|
| 248 |
@type bucketFilter: L{HierarchicalBucketFilter}. |
|---|
| 249 |
""" |
|---|
| 250 |
|
|---|
| 251 |
|
|---|
| 252 |
self.protocol = protoClass |
|---|
| 253 |
self.bucketFilter = bucketFilter |
|---|
| 254 |
|
|---|
| 255 |
def __call__(self, *a, **kw): |
|---|
| 256 |
"""Make a Protocol instance with a shaped transport. |
|---|
| 257 |
|
|---|
| 258 |
Any parameters will be passed on to the protocol's initializer. |
|---|
| 259 |
|
|---|
| 260 |
@returns: a Protocol instance with a L{ShapedTransport}. |
|---|
| 261 |
""" |
|---|
| 262 |
proto = self.protocol(*a, **kw) |
|---|
| 263 |
origMakeConnection = proto.makeConnection |
|---|
| 264 |
def makeConnection(transport): |
|---|
| 265 |
bucket = self.bucketFilter.getBucketFor(transport) |
|---|
| 266 |
shapedTransport = ShapedTransport(transport, bucket) |
|---|
| 267 |
return origMakeConnection(shapedTransport) |
|---|
| 268 |
proto.makeConnection = makeConnection |
|---|
| 269 |
return proto |
|---|