[Twisted-Python] Re: SSL: Getting the client certificate

David Bolen db3l.net at gmail.com
Mon Aug 27 15:00:19 EDT 2007


Dirk Loss <lists at dirk-loss.de> writes:

> Jean-Paul Calderone wrote:
(...)
>> Another possible solution might be to do your verification using the
>> SSL context object.  
>
> Could you elaborate on this? I think I am already using the SSL
> context object to do the verification:

Not sure if it helps, but here's some old code of mine where I
experimented with the echo SSL examples to add symmetric certificate
checking.  Just checked and it seems ok with Python 2.5.1 and Twisted
2.5.0 (pyOpenSSL 0.6).

It uses direct SSL context objects rather than the Twisted
wrapper versions.  To be honest, at the time it was because I was
still feeling my way around the SSL support and found using the direct
context easier, but I believe you do have full access to the
certificate in the context's _verify method.

Returning 0/False from _verify rather than just propagating ok can
reject the handshake.  (Note that _verify can be called multiple times
during the sequence as well as in cases where ok is already 0 I
believe).

There are a bunch of debugging prints still in the code where I was seeing
what sort of stuff was available to the context factory/verification.

-- David


echoserv_ssl.py:
---------------

# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
# 
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
# 
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

from OpenSSL import SSL, crypto

class ServerContextFactory:
    
    def _verify(self, connection, x509, errnum, errdepth, ok):
        print '_verify (ok=%d):' % ok
        print '  subject:', x509.get_subject()
        print '  issuer:', x509.get_issuer()
        print '  errnum %s, errdepth %d' % (errnum, errdepth)
        return False # ok

    def getContext(self):
        """Create an SSL context.
        
        This is a sample implementation that loads a certificate from a file 
        called 'server.pem'."""
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.use_certificate_file('server.pem')
        ctx.use_privatekey_file('server.pem')
        print 'Context additions'
        ctx.load_client_ca('ca/all-cas.cert')
        ctx.load_verify_locations('ca/ca.cert')
        ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
                       self._verify)
        print 'verify depth:', ctx.get_verify_depth()
        ctx.set_verify_depth(10)
        print 'verify depth:', ctx.get_verify_depth()
        return ctx

import echoserv

class MyProtocol(echoserv.Echo):

    def connectionMade(self):
        print 'connectionMade', self.transport.getPeerCertificate()
        return echoserv.Echo.connectionMade(self)

    def dataReceived(self, data):
        print 'dataReceived', self.transport.getPeerCertificate()
        return echoserv.Echo.dataReceived(self, data)


if __name__ == '__main__':
    import echoserv, sys
    from twisted.internet.protocol import Factory
    from twisted.internet import ssl, reactor
    from twisted.python import log
    log.startLogging(sys.stdout)
    factory = Factory()
    factory.protocol = MyProtocol
    reactor.listenSSL(9000, factory, ServerContextFactory())
    reactor.run()


echoclient_ssl.py:
-----------------

# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
# 
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
# 
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

from OpenSSL import SSL
import sys

from twisted.internet.protocol import ClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import ssl, reactor

import inspect


class ClientContextFactory(ssl.ClientContextFactory):

    def _verify(self, connection, x509, errnum, errdepth, ok):
        print '_verify (ok=%d):' % ok
        print '  subject:', x509.get_subject()
        print '  issuer:', x509.get_issuer()
        print '  errnum %s, errdepth %d' % (errnum, errdepth)
        return ok

    def getContext(self):
        ctx = ssl.ClientContextFactory.getContext(self)
        ctx.use_certificate_file('client.pem')
        ctx.use_privatekey_file('client.pem')

        ctx.load_verify_locations('ca/ca.cert')
        ctx.set_verify(SSL.VERIFY_PEER|SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
                       self._verify)

        return ctx

class EchoClient(LineReceiver):
    end="Bye-bye!"

    def connectionMade(self):
        self.sendLine("Hello, world!")
        self.sendLine("What a fine day it is.")
        self.sendLine(self.end)

    def connectionLost(self, reason):
        print 'connection lost (protocol)'

    def lineReceived(self, line):
        x509 = self.transport.getPeerCertificate()
        methods = [x for x in dir(x509)
                   if callable(getattr(x509,x)) and
                   not (x.startswith('set_') or
                        x.startswith('add_') or
                        x.startswith('gmtime_') or
                        x in ('sign','digest'))]
        for m in methods:
            print m, getattr(x509,m)()
        print "receive:", line
        if line==self.end:
            self.transport.loseConnection()

class EchoClientFactory(ClientFactory):
    protocol = EchoClient

    def clientConnectionFailed(self, connector, reason):
        print 'connection failed:', reason.getErrorMessage()
        reactor.stop()

    def clientConnectionLost(self, connector, reason):
        print 'connection lost:', reason.getErrorMessage()
        reactor.stop()

def main():
    if len(sys.argv) > 1:
        host = sys.argv[1]
    else:
        host = 'localhost'

    factory = EchoClientFactory()
    reactor.connectSSL(host, 9000, factory, ClientContextFactory())
    reactor.run()

if __name__ == '__main__':
    main()





More information about the Twisted-Python mailing list