root / trunk / twisted / mail / maildir.py

Revision 26821, 15.1 kB (checked in by exarkun, 2 months ago)

Merge mailbox-order-3812

Author: exarkun
Reviewer: glyph
Fixes: #3812

Fix an intermittent failure in the message ordering of the maildir-based
mailbox implementation. Previously messages delivered within the same
second had a small chance to be delivered in such a way which that they
would appear to have been delivered in the opposite order as they really
were.

Line 
1 # -*- test-case-name: twisted.mail.test.test_mail -*-
2 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5
6 """
7 Maildir-style mailbox support
8 """
9
10 import os
11 import stat
12 import socket
13 import time
14
15 from zope.interface import implements
16
17 try:
18     import cStringIO as StringIO
19 except ImportError:
20     import StringIO
21
22 from twisted.python.compat import set
23 from twisted.mail import pop3
24 from twisted.mail import smtp
25 from twisted.protocols import basic
26 from twisted.persisted import dirdbm
27 from twisted.python import log, failure
28 from twisted.python.hashlib import md5
29 from twisted.mail import mail
30 from twisted.internet import interfaces, defer, reactor
31
32 from twisted import cred
33 import twisted.cred.portal
34 import twisted.cred.credentials
35 import twisted.cred.checkers
36 import twisted.cred.error
37
38 INTERNAL_ERROR = '''\
39 From: Twisted.mail Internals
40 Subject: An Error Occurred
41
42   An internal server error has occurred.  Please contact the
43   server administrator.
44 '''
45
46 class _MaildirNameGenerator:
47     """
48     Utility class to generate a unique maildir name
49
50     @ivar _clock: An L{IReactorTime} provider which will be used to learn
51         the current time to include in names returned by L{generate} so that
52         they sort properly.
53     """
54     n = 0
55     p = os.getpid()
56     s = socket.gethostname().replace('/', r'\057').replace(':', r'\072')
57
58     def __init__(self, clock):
59         self._clock = clock
60
61     def generate(self):
62         """
63         Return a string which is intended to unique across all calls to this
64         function (across all processes, reboots, etc).
65
66         Strings returned by earlier calls to this method will compare less
67         than strings returned by later calls as long as the clock provided
68         doesn't go backwards.
69         """
70         self.n = self.n + 1
71         t = self._clock.seconds()
72         seconds = str(int(t))
73         microseconds = '%07d' % (int((t - int(t)) * 10e6),)
74         return '%s.M%sP%sQ%s.%s' % (seconds, microseconds,
75                                     self.p, self.n, self.s)
76
77 _generateMaildirName = _MaildirNameGenerator(reactor).generate
78
79 def initializeMaildir(dir):
80     if not os.path.isdir(dir):
81         os.mkdir(dir, 0700)
82         for subdir in ['new', 'cur', 'tmp', '.Trash']:
83             os.mkdir(os.path.join(dir, subdir), 0700)
84         for subdir in ['new', 'cur', 'tmp']:
85             os.mkdir(os.path.join(dir, '.Trash', subdir), 0700)
86         # touch
87         open(os.path.join(dir, '.Trash', 'maildirfolder'), 'w').close()
88
89
90 class MaildirMessage(mail.FileMessage):
91     size = None
92
93     def __init__(self, address, fp, *a, **kw):
94         header = "Delivered-To: %s\n" % address
95         fp.write(header)
96         self.size = len(header)
97         mail.FileMessage.__init__(self, fp, *a, **kw)
98
99     def lineReceived(self, line):
100         mail.FileMessage.lineReceived(self, line)
101         self.size += len(line)+1
102
103     def eomReceived(self):
104         self.finalName = self.finalName+',S=%d' % self.size
105         return mail.FileMessage.eomReceived(self)
106
107 class AbstractMaildirDomain:
108     """Abstract maildir-backed domain.
109     """
110     alias = None
111     root = None
112
113     def __init__(self, service, root):
114         """Initialize.
115         """
116         self.root = root
117
118     def userDirectory(self, user):
119         """Get the maildir directory for a given user
120
121         Override to specify where to save mails for users.
122         Return None for non-existing users.
123         """
124         return None
125
126     ##
127     ## IAliasableDomain
128     ##
129
130     def setAliasGroup(self, alias):
131         self.alias = alias
132
133     ##
134     ## IDomain
135     ##
136     def exists(self, user, memo=None):
137         """Check for existence of user in the domain
138         """
139         if self.userDirectory(user.dest.local) is not None:
140             return lambda: self.startMessage(user)
141         try:
142             a = self.alias[user.dest.local]
143         except:
144             raise smtp.SMTPBadRcpt(user)
145         else:
146             aliases = a.resolve(self.alias, memo)
147             if aliases:
148                 return lambda: aliases
149             log.err("Bad alias configuration: " + str(user))
150             raise smtp.SMTPBadRcpt(user)
151
152     def startMessage(self, user):
153         """Save a message for a given user
154         """
155         if isinstance(user, str):
156             name, domain = user.split('@', 1)
157         else:
158             name, domain = user.dest.local, user.dest.domain
159         dir = self.userDirectory(name)
160         fname = _generateMaildirName()
161         filename = os.path.join(dir, 'tmp', fname)
162         fp = open(filename, 'w')
163         return MaildirMessage('%s@%s' % (name, domain), fp, filename,
164                               os.path.join(dir, 'new', fname))
165
166     def willRelay(self, user, protocol):
167         return False
168
169     def addUser(self, user, password):
170         raise NotImplementedError
171
172     def getCredentialsCheckers(self):
173         raise NotImplementedError
174     ##
175     ## end of IDomain
176     ##
177
178 class _MaildirMailboxAppendMessageTask:
179     implements(interfaces.IConsumer)
180
181     osopen = staticmethod(os.open)
182     oswrite = staticmethod(os.write)
183     osclose = staticmethod(os.close)
184     osrename = staticmethod(os.rename)
185
186     def __init__(self, mbox, msg):
187         self.mbox = mbox
188         self.defer = defer.Deferred()
189         self.openCall = None
190         if not hasattr(msg, "read"):
191             msg = StringIO.StringIO(msg)
192         self.msg = msg
193         # This is needed, as this startup phase might call defer.errback and zero out self.defer
194         # By doing it on the reactor iteration appendMessage is able to use .defer without problems.
195         reactor.callLater(0, self.startUp)
196
197     def startUp(self):
198         self.createTempFile()
199         if self.fh != -1:
200             self.filesender = basic.FileSender()
201             self.filesender.beginFileTransfer(self.msg, self)
202
203     def registerProducer(self, producer, streaming):
204         self.myproducer = producer
205         self.streaming = streaming
206         if not streaming:
207             self.prodProducer()
208
209     def prodProducer(self):
210         self.openCall = None
211         if self.myproducer is not None:
212             self.openCall = reactor.callLater(0, self.prodProducer)
213             self.myproducer.resumeProducing()
214
215     def unregisterProducer(self):
216         self.myproducer = None
217         self.streaming = None
218         self.osclose(self.fh)
219         self.moveFileToNew()
220
221     def write(self, data):
222         try:
223             self.oswrite(self.fh, data)
224         except:
225             self.fail()
226
227     def fail(self, err=None):
228         if err is None:
229             err = failure.Failure()
230         if self.openCall is not None:
231             self.openCall.cancel()
232         self.defer.errback(err)
233         self.defer = None
234
235     def moveFileToNew(self):
236         while True:
237             newname = os.path.join(self.mbox.path, "new", _generateMaildirName())
238             try:
239                 self.osrename(self.tmpname, newname)
240                 break
241             except OSError, (err, estr):
242                 import errno
243                 # if the newname exists, retry with a new newname.
244                 if err != errno.EEXIST:
245                     self.fail()
246                     newname = None
247                     break
248         if newname is not None:
249             self.mbox.list.append(newname)
250             self.defer.callback(None)
251             self.defer = None
252
253     def createTempFile(self):
254         attr = (os.O_RDWR | os.O_CREAT | os.O_EXCL
255                 | getattr(os, "O_NOINHERIT", 0)
256                 | getattr(os, "O_NOFOLLOW", 0))
257         tries = 0
258         self.fh = -1
259         while True:
260             self.tmpname = os.path.join(self.mbox.path, "tmp", _generateMaildirName())
261             try:
262                 self.fh = self.osopen(self.tmpname, attr, 0600)
263                 return None
264             except OSError:
265                 tries += 1
266                 if tries > 500:
267                     self.defer.errback(RuntimeError("Could not create tmp file for %s" % self.mbox.path))
268                     self.defer = None
269                     return None
270
271 class MaildirMailbox(pop3.Mailbox):
272     """Implement the POP3 mailbox semantics for a Maildir mailbox
273     """
274     AppendFactory = _MaildirMailboxAppendMessageTask
275
276     def __init__(self, path):
277         """Initialize with name of the Maildir mailbox
278         """
279         self.path = path
280         self.list = []
281         self.deleted = {}
282         initializeMaildir(path)
283         for name in ('cur', 'new'):
284             for file in os.listdir(os.path.join(path, name)):
285                 self.list.append((file, os.path.join(path, name, file)))
286         self.list.sort()
287         self.list = [e[1] for e in self.list]
288
289     def listMessages(self, i=None):
290         """Return a list of lengths of all files in new/ and cur/
291         """
292         if i is None:
293             ret = []
294             for mess in self.list:
295                 if mess:
296                     ret.append(os.stat(mess)[stat.ST_SIZE])
297                 else:
298                     ret.append(0)
299             return ret
300         return self.list[i] and os.stat(self.list[i])[stat.ST_SIZE] or 0
301
302     def getMessage(self, i):
303         """Return an open file-pointer to a message
304         """
305         return open(self.list[i])
306
307     def getUidl(self, i):
308         """Return a unique identifier for a message
309
310         This is done using the basename of the filename.
311         It is globally unique because this is how Maildirs are designed.
312         """
313         # Returning the actual filename is a mistake.  Hash it.
314         base = os.path.basename(self.list[i])
315         return md5(base).hexdigest()
316
317     def deleteMessage(self, i):
318         """Delete a message
319
320         This only moves a message to the .Trash/ subfolder,
321         so it can be undeleted by an administrator.
322         """
323         trashFile = os.path.join(
324             self.path, '.Trash', 'cur', os.path.basename(self.list[i])
325         )
326         os.rename(self.list[i], trashFile)
327         self.deleted[self.list[i]] = trashFile
328         self.list[i] = 0
329
330     def undeleteMessages(self):
331         """Undelete any deleted messages it is possible to undelete
332
333         This moves any messages from .Trash/ subfolder back to their
334         original position, and empties out the deleted dictionary.
335         """
336         for (real, trash) in self.deleted.items():
337             try:
338                 os.rename(trash, real)
339             except OSError, (err, estr):
340                 import errno
341                 # If the file has been deleted from disk, oh well!
342                 if err != errno.ENOENT:
343                     raise
344                 # This is a pass
345             else:
346                 try:
347                     self.list[self.list.index(0)] = real
348                 except ValueError:
349                     self.list.append(real)
350         self.deleted.clear()
351
352     def appendMessage(self, txt):
353         """Appends a message into the mailbox."""
354         task = self.AppendFactory(self, txt)
355         return task.defer
356
357 class StringListMailbox:
358     """
359     L{StringListMailbox} is an in-memory mailbox.
360
361     @ivar msgs: A C{list} of C{str} giving the contents of each message in the
362         mailbox.
363
364     @ivar _delete: A C{set} of the indexes of messages which have been deleted
365         since the last C{sync} call.
366     """
367     implements(pop3.IMailbox)
368
369     def __init__(self, msgs):
370         self.msgs = msgs
371         self._delete = set()
372
373
374     def listMessages(self, i=None):
375         """
376         Return the length of the message at the given offset, or a list of all
377         message lengths.
378         """
379         if i is None:
380             return [self.listMessages(i) for i in range(len(self.msgs))]
381         if i in self._delete:
382             return 0
383         return len(self.msgs[i])
384
385
386     def getMessage(self, i):
387         """
388         Return an in-memory file-like object for the message content at the
389         given offset.
390         """
391         return StringIO.StringIO(self.msgs[i])
392
393
394     def getUidl(self, i):
395         """
396         Return a hash of the contents of the message at the given offset.
397         """
398         return md5(self.msgs[i]).hexdigest()
399
400
401     def deleteMessage(self, i):
402         """
403         Mark the given message for deletion.
404         """
405         self._delete.add(i)
406
407
408     def undeleteMessages(self):
409         """
410         Reset deletion tracking, undeleting any messages which have been
411         deleted since the last call to C{sync}.
412         """
413         self._delete = set()
414
415
416     def sync(self):
417         """
418         Discard the contents of any message marked for deletion and reset
419         deletion tracking.
420         """
421         for index in self._delete:
422             self.msgs[index] = ""
423         self._delete = set()
424
425
426
427 class MaildirDirdbmDomain(AbstractMaildirDomain):
428     """A Maildir Domain where membership is checked by a dirdbm file
429     """
430
431     implements(cred.portal.IRealm, mail.IAliasableDomain)
432
433     portal = None
434     _credcheckers = None
435
436     def __init__(self, service, root, postmaster=0):
437         """Initialize
438
439         The first argument is where the Domain directory is rooted.
440         The second is whether non-existing addresses are simply
441         forwarded to postmaster instead of outright bounce
442
443         The directory structure of a MailddirDirdbmDomain is:
444
445         /passwd <-- a dirdbm file
446         /USER/{cur,new,del} <-- each user has these three directories
447         """
448         AbstractMaildirDomain.__init__(self, service, root)
449         dbm = os.path.join(root, 'passwd')
450         if not os.path.exists(dbm):
451             os.makedirs(dbm)
452         self.dbm = dirdbm.open(dbm)
453         self.postmaster = postmaster
454
455     def userDirectory(self, name):
456         """Get the directory for a user
457
458         If the user exists in the dirdbm file, return the directory
459         os.path.join(root, name), creating it if necessary.
460         Otherwise, returns postmaster's mailbox instead if bounces
461         go to postmaster, otherwise return None
462         """
463         if not self.dbm.has_key(name):
464             if not self.postmaster:
465                 return None
466             name = 'postmaster'
467         dir = os.path.join(self.root, name)
468         if not os.path.exists(dir):
469             initializeMaildir(dir)
470         return dir
471
472     ##
473     ## IDomain
474     ##
475     def addUser(self, user, password):
476         self.dbm[user] = password
477         # Ensure it is initialized
478         self.userDirectory(user)
479
480     def getCredentialsCheckers(self):
481         if self._credcheckers is None:
482             self._credcheckers = [DirdbmDatabase(self.dbm)]
483         return self._credcheckers
484
485     ##
486     ## IRealm
487     ##
488     def requestAvatar(self, avatarId, mind, *interfaces):
489         if pop3.IMailbox not in interfaces:
490             raise NotImplementedError("No interface")
491         if avatarId == cred.checkers.ANONYMOUS:
492             mbox = StringListMailbox([INTERNAL_ERROR])
493         else:
494             mbox = MaildirMailbox(os.path.join(self.root, avatarId))
495
496         return (
497             pop3.IMailbox,
498             mbox,
499             lambda: None
500         )
501
502 class DirdbmDatabase:
503     implements(cred.checkers.ICredentialsChecker)
504
505     credentialInterfaces = (
506         cred.credentials.IUsernamePassword,
507         cred.credentials.IUsernameHashedPassword
508     )
509
510     def __init__(self, dbm):
511         self.dirdbm = dbm
512
513     def requestAvatarId(self, c):
514         if c.username in self.dirdbm:
515             if c.checkPassword(self.dirdbm[c.username]):
516                 return c.username
517         raise cred.error.UnauthorizedLogin()
Note: See TracBrowser for help on using the browser.