Ticket #5566: ampssh.py

File ampssh.py, 2.5 KB (added by teratorn, 2 years ago)
Line 
1from twisted.application import internet
2from twisted.application.service import Application
3from twisted.protocols import amp
4
5from twisted.cred.portal import Portal
6from twisted.conch.ssh.factory import SSHFactory
7from twisted.conch.ssh.keys import Key
8from twisted.conch.interfaces import IConchUser
9from twisted.conch.avatar import ConchUser
10from twisted.internet.protocol import Protocol
11from twisted.conch.ssh.session import (
12    SSHSession, SSHSessionProcessProtocol, wrapProtocol)
13
14class Echoer(Protocol):
15    def connectionMade(self):
16        self.transport.write("Echo protocol connected\r\n")
17
18    def dataReceived(self, bytes):
19        self.transport.write("echo: " + repr(bytes) + "\r\n")
20
21    def connectionLost(self, reason):
22        print 'Connection lost', reason
23
24
25class AMPSession(SSHSession):
26    name = 'session'
27
28    def request_pty_req(self, data):
29        # ignore pty requests
30        return True
31
32    def request_shell(self, data):
33        protocol = amp.AMP()
34        #protocol = Echoer()
35
36        transport = SSHSessionProcessProtocol(self)
37        transport.getPeer = lambda:'FAKE'
38        transport.getHost = lambda:'FAKE'
39        protocol.makeConnection(transport)
40        transport.makeConnection(wrapProtocol(protocol))
41        self.client = transport
42        return True
43
44
45class SimpleRealm(object):
46    def requestAvatar(self, avatarId, mind, *interfaces):
47        u = ConchUser()
48        u.channelLookup['session'] = AMPSession
49        return IConchUser, u, lambda:None
50
51
52# generate you a key with: ckeygen -f id_rsa -t rsa
53with open('id_rsa') as privateBlobFile:
54    privateBlob = privateBlobFile.read()
55    privateKey = Key.fromString(data=privateBlob)
56
57with open('id_rsa.pub') as publicBlobFile:
58    publicBlob = publicBlobFile.read()
59    publicKey = Key.fromString(data=publicBlob)
60
61
62application = Application('testapp')
63
64factory = SSHFactory()
65factory.privateKeys = {'ssh-rsa': privateKey}
66factory.publicKeys = {'ssh-rsa': publicKey}
67factory.portal = Portal(SimpleRealm())
68
69from zope.interface import implements
70from twisted.cred.checkers import ICredentialsChecker
71from twisted.cred import credentials
72class TSAAgent:
73    implements(ICredentialsChecker)
74    credentialInterfaces = (credentials.IUsernamePassword,)
75    def requestAvatarId(self, c):
76        return 'OK'
77
78# checks keys against the authorized_keys file of the user
79# requesting login.
80factory.portal.registerChecker(TSAAgent())
81
82ssh = internet.TCPServer(2222, factory)
83ssh.setServiceParent(application)
84