[Twisted-Python] Re: How to make a secure connection between two computers

Noam Raphael noamraph at gmail.com
Tue Feb 12 18:04:28 EST 2008

2008/2/12, Drew Smathers <drew.smathers at gmail.com>:
> I think you could have written the equivalent program in less lines of code
> using facilities provided by Twisted.  And you'd have the benefits of a
> number of things your program doesn't provide including:
> * transport/protocol separation
> * ability to handle multiple clients efficiently
As it happens, I don't really need to handle multiple clients
efficiently. I started to write the program using twisted, and stopped
when I realized that I had to write a fairly complex state machine. If
you have a simple way to write this protocol using twisted, it would
be nice to know.

> It is not my area expertise by any means, but shared-secret message hashing
> is a rather crude (read : easy to sniff and replay traffic) security
> mechanism.
Well, in TLS, after handshaking, both sides have a shared secret they
use for communication. I just skip the handshaking. (I do have a
challenge-response in the protocol, so I don't think replaying will
work.) But I did forget about hashing the data, so here's a better

#!/usr/bin/env python

import sys
import struct
import socket

from Crypto.Hash import SHA
from Crypto.Cipher import ARC4


def get_random():
    return open('/dev/urandom').read(RANDOM_LEN)

class SocketEOFError(socket.error):

def recv(sock, n):
    buf = ''
    while len(buf) < n:
        r = sock.recv(n - len(buf))
        if not r:
            if not buf:
                raise SocketEOFError("Connection closed")
                raise socket.error("Couldn't read enough bytes")
        buf += r
    return buf

def handshake(sock, secret):
    my_random = get_random()
    my_rc4 = ARC4.new(SHA.new(secret + my_random).digest())
    peer_random = recv(sock, RANDOM_LEN)
    peer_rc4 = ARC4.new(SHA.new(secret + peer_random).digest())
    peer_response = recv(sock, RANDOM_LEN)
    if my_rc4.decrypt(peer_response) != '\0' * RANDOM_LEN:
        raise socket.error('Failed peer authentication')
    return my_rc4, peer_rc4

def send_data(sock, peer_rc4, secret, data):
    length_s = struct.pack('<h', len(data))
    mac = SHA.new(secret + data + length_s).digest()
    sock.sendall(length_s + peer_rc4.encrypt(data + mac))

def recv_data(sock, my_rc4, secret):
        length_s = recv(sock, 2)
    except SocketEOFError:
        return ''
    length = struct.unpack('<h', length_s)[0]
    msg = my_rc4.decrypt(recv(sock, length + SHA.digest_size))
    data = msg[:length]
    mac = SHA.new(secret + data + length_s).digest()
    if mac != msg[length:]:
        raise socket.error('Failed hash')
    return data

def server(host, port, secret):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    while True:
        conn, addr = s.accept()
        print 'Connected by', addr
            my_rc4, peer_rc4 = handshake(conn, secret)
            while True:
                data = recv_data(conn, my_rc4, secret)
                if not data: break
                print 'Received', repr(data)
                send_data(conn, peer_rc4, secret, data)
        except socket.error, e:
            print 'Error:', e
            print 'Closed connection'

def client(host, port, secret):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((host, port))
    my_rc4, peer_rc4 = handshake(s, secret)
    send_data(s, peer_rc4, secret, 'Hello, world!')
    data = recv_data(s, my_rc4, secret)
    print 'Received', repr(data)

def main():
    if len(sys.argv) != 5 or sys.argv[1] not in ('server', 'client'):
        print "Usage: %s server/client host port secret"
    is_server = sys.argv[1] == 'server'
    host = sys.argv[2]
    port = int(sys.argv[3])
    secret = sys.argv[4]
    if is_server:
        server(host, port, secret)
        client(host, port, secret)

if __name__ == '__main__':

More information about the Twisted-Python mailing list