[Twisted-Python] Re: Running commands (ssh) from a GUI client

Marcin Kasperski Marcin.Kasperski at softax.com.pl
Thu Oct 11 10:31:54 MDT 2007


"Raúl Gómez C." <nachogomez at gmail.com> writes:

> Well Marcin, I think that looks pretty much close to what I want to
> achieve, so I haven't found any working example of this (yet
> :s). Can you share your code with us?

Well. You wanted it. 

a) This is messy.
b) I created it by randomly hacking here and there and I do not quite
   understand what is going on.
c) Error handling... Well. Is there any error handling?
d) It is to some degree polluted by the GUI window which I spawn to
   monitor what is going on.
e) It works (although reconnect is not handled). I tested it running
   client on windows and logging to remote linux, with ssh key
   login without password or passphrase (as Fritz seemed to expect
   .exe file, I used to run 'exemaker runme.py')

--------------------------------------------------
-- runme.py
--------------------------------------------------

#!/usr/bin/env python

import os, os.path, sys

RUN_DIR = os.path.dirname( os.path.abspath( sys.argv[0] ) )
LOG_DIR = os.path.join(RUN_DIR, 'log')
SSH_KEY = os.path.join(RUN_DIR, 'id_dsa')

if not os.path.exists(LOG_DIR):
    os.mkdir(LOG_DIR)

import sys
sys.path.append(RUN_DIR)

import RemoteEngine
RemoteEngine.RunRemoteEngine(
    ssh_key = SSH_KEY,
    remote_user = 'marcink',
    remote_host = 'myserver.home.local',
    remote_port = 22,
    remote_cmd = 'Szachy/Programy/Rybka/Rybka.sh',
    log_dir = LOG_DIR,
    show_output = False,
)

--------------------------------------------------
-- RemoteEngine.py
--------------------------------------------------

#!/usr/bin/env python
# -*- coding: utf8 -*-

# Pierwsze, bo przerabia reaktor
from LogWindow import LogBuffer, LogWindow

from twisted.internet import defer, protocol, reactor, stdio
from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
from twisted.protocols import basic
import struct, sys, getpass, os, os.path, logging

logger = logging.getLogger('reng')

class CallbackHandler(logging.Handler):
    def __init__(self, callback):
        logging.Handler.__init__(self)
        self.callback = callback
    def emit(self, record):
        msg = self.format(record)
        self.callback(msg)

def setupLogging(log_dir, logbuf = None):
    import twisted.python, twisted.python.logfile, os, os.path, logging, logging.handlers
    if not os.path.exists(log_dir):
        os.mkdir(log_dir)
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)
    debug_file = logging.handlers.RotatingFileHandler( os.path.join(log_dir, 'debug.log'), 'a', 4*1024*1024, 10)
    debug_file.setLevel(logging.DEBUG)
    debug_file.setFormatter( logging.Formatter('[%(asctime)s] [%(name)s/%(levelname)s] %(message)s') )
    root_logger.addHandler(debug_file)
    if logbuf:
        cbl = CallbackHandler( lambda txt: logbuf.on_debug_log(txt) )
        cbl.setLevel(logging.INFO)
        root_logger.addHandler( cbl )
    # I jeszcze logowanie twistdowe
    twistedLogFile = twisted.python.logfile.LogFile('twisted.log', log_dir, 16*1024*1024)
    twisted.python.log.startLogging(twistedLogFile)
    twisted.python.log.msg("Twisted log started")
    if logbuf:
        twisted.python.log.addObserver(logbuf.on_twisted_log)

class DataConsumer:
    """Klasa wykorzystywana do posredniczenia w transmisji danych, z obu stron
    polaczenia. Buforuje wszelkie dane otrzymane do czasu wywolania funkcji
    registerConsumer, potem przekazuje je juz bezposrednio"""
    def __init__(self, name):
        self.buffered = []
        self.name = name
        self.consumer = None
    def handleData(self, data):
        if self.consumer:
            logger.debug("[%s] write (%s)" % (self.name, data))
            self.consumer(data)
        else:
            logger.debug("[%s] buffering (%s)" % (self.name, data))
            self.buffered.append(data)
    def registerConsumer(self, consumer):
        "consumer to funkcja wołana dla wszelkich otrzymywanych danych"
        self.consumer = consumer
        if self.buffered:
            all = "".join(self.buffered)
            self.buffered = []
            logger.debug("[%s] write-buffer (%s)" % (self.name, all))
            consumer(all)

def restartRemoteConnection():
    raise "I do not know how to restart (yet)"

remoteConsumer = DataConsumer('remote')
localConsumer = DataConsumer('local')

class LocalProtocol(protocol.Protocol):
    #from os import linesep as delimiter
    delimiter = '\n'
    def connectionMade(self):
        logger.info("[LOCAL] ConnectionMade")
        localConsumer.registerConsumer(self.forwardData)
    def connectionLost(self, reason=protocol.connectionDone):
        logger.warn("[LOCAL] ConnectionLost")
        reactor.callLater(0, reactor.stop)
    def dataReceived(self, line):
        logger.info("[LOCAL] dataReceived(%s)" % line)
        line.replace(self.delimiter, '\n')
        remoteConsumer.handleData(line)
    def forwardData(self, data):
        data.replace('\n', self.delimiter)
        #logger.debug("[LOCAL] writing(%s)" % data)
        self.transport.write(data)

class RemoteProtocol(transport.SSHClientTransport):
    def __init__(self, ssh_key, remote_user, remote_cmd):
        #transport.SSHClientTransport.__init__(self)
        self.ssh_key = ssh_key
        self.remote_user = remote_user
        self.remote_cmd = remote_cmd
        self.connection = None
    def verifyHostKey(self, hostKey, fingerprint):
        logger.debug('[REMOTE] host key fingerprint: %s' % fingerprint)
        return defer.succeed(1) 
    def connectionSecure(self):
        logger.debug('[REMOTE] ssh connection established')
        self.connection = RemoteConnection(self.remote_cmd)
        self.connection.protocol = self
        self.requestService(
            RemoteUserAuth(self.ssh_key, self.remote_user, self.connection))

class RemoteUserAuth(userauth.SSHUserAuthClient):
    def __init__(self, ssh_key, user, connection):
        userauth.SSHUserAuthClient.__init__(self, user, connection)
        self.ssh_key = ssh_key
    def getPublicKey(self):
        path = os.path.expanduser(self.ssh_key) 
        if not os.path.exists(path) or self.lastPublicKey:
            return
        return keys.getPublicKeyString(path+'.pub')
    def getPrivateKey(self):
        path = os.path.expanduser(self.ssh_key)
        return defer.succeed(keys.getPrivateKeyObject(path))

class RemoteConnection(connection.SSHConnection):
    def __init__(self, remote_cmd):
        connection.SSHConnection.__init__(self)
        self.remote_cmd = remote_cmd
        self.engine = None
    def serviceStarted(self):
        self.engine = EngineChannel(self, self.remote_cmd)
        self.engine.connection = self
        self.openChannel(self.engine)
    def startupDataExchange(self):
        remoteConsumer.registerConsumer(self.writeDataToEngine)
    def writeDataToEngine(self, data):
        #logger.debug("[REMOTE] writing(%s)" % data)
        self.engine.write(data)

class EngineChannel(channel.SSHChannel):
    name = 'session'
    def __init__(self, connection, remote_cmd):
        channel.SSHChannel.__init__(self, 2**16, 2**15, connection)
        self.remote_cmd = remote_cmd
    def openFailed(self, reason):
        logger.warn("[REMOTE] failed (%s)" % reason)
        self.loseConnection()
    def channelOpen(self, ignoredData):
        #self.data = ''
        logger.info("[REMOTE] running (%s)" % self.remote_cmd)
        d = self.conn.sendRequest(self, 'exec', common.NS(self.remote_cmd), wantReply = 1)
        d.addCallback(self._cbRequest)
    def _cbRequest(self, ignored):
        logger.debug("[REMOTE] Remote cmd started")
        self.connection.startupDataExchange()
        #self.write('hello conch\n')
        #self.conn.sendEOF(self)
        #remote_connected.callback(self.writeDataToEngine)
        #remote_consumer = self.writeDataToEngine
    def dataReceived(self, data):
        logger.info('[REMOTE] dataReceived(%s)' % str(data))
        # Omijamy nieszczesne err:reg:SCSI_getprocentry SCSI type line scan count error
        if data.startswith('err:reg:'):
            return
        localConsumer.handleData(data)
    def closed(self):
        #print '[REMOTE] Closed. Accumulated data from engine: %s' % repr(self.data)
        logger.info('[REMOTE] Closed.')
        self.loseConnection()
        ##reactor.callLater(0, reactor.stop)
        reactor.callLater(0, restartRemoteConnection)

class RemoteFactory(protocol.ClientFactory):
    def __init__(self, ssh_key, remote_user, remote_cmd):
        self.ssh_key = ssh_key
        self.remote_user = remote_user
        self.remote_cmd = remote_cmd
        # failed on windows without this:
        self.protocol_instance = None
        self.buildProtocol(None)
    def buildProtocol(self, addr):
        if self.protocol_instance:
          return self.protocol_instance
        p = RemoteProtocol(self.ssh_key, self.remote_user, self.remote_cmd)
        p.factory = self
        self.protocol_instance = p
        return p
    def clientConnectionFailed(self, connector, reason):
        logger.warn('[REMOTE] connection failed:' + reason.getErrorMessage())
        reactor.callLater(0, reactor.stop)
    def clientConnectionLost(self, connector, reason):
        logger.warn('[REMOTE] connection lost:' + reason.getErrorMessage())
        reactor.callLater(0, reactor.stop)
  
def RunRemoteEngine(ssh_key, remote_user, remote_host, remote_port, remote_cmd, log_dir, show_output):
    logbuf = LogBuffer(show_output)
    logwin = LogWindow(logbuf, remote_host + ':' + remote_cmd)
    setupLogging(log_dir, logbuf)
    remote_factory = RemoteFactory(ssh_key, remote_user, remote_cmd)
    local = LocalProtocol()
    stdio.StandardIO(local)
    reactor.connectTCP(remote_host, remote_port, remote_factory)
    #logwin.show()
    reactor.run()

--------------------------------------------------
-- LogWindow.py
--------------------------------------------------

#!/usr/bin/env python
# -*- coding: utf8 -*-

from twisted.internet import gtk2reactor
gtk2reactor.install()

from twisted.internet import reactor
import pygtk, gtk
import re

re_received = re.compile(r'^\[(?P<dir>LOCAL|REMOTE)\]\s*dataReceived\((?P<data>.*)\)\s*$', re.DOTALL)

# Patrz /usr/share/doc/python-gtk2-tutorial/html/examples/testtext.py
class LogBuffer(gtk.TextBuffer):
    def __init__(self, show_output=True):
        gtk.TextBuffer.__init__(self)
        self.show_output = show_output
        self.input_tag = self.create_tag(editable = False, foreground = "darkgreen")
        self.output_tag = self.create_tag(editable= False, foreground = "brown")
        self.error_tag = self.create_tag(editable= False, foreground = "red")
        self.debug_tag = self.create_tag(editable = False, foreground = "black")
    def add_input(self, text):
        #self.insert(self.get_end_iter(), ">>> " + text + "\n")
        self.insert_with_tags(self.get_end_iter(), text, self.input_tag)
        self.place_cursor(self.get_end_iter())
    def add_output(self, text):
        if self.show_output:
            #self.insert(self.get_end_iter(), "<<< " + text + "\n")
            self.insert_with_tags(self.get_end_iter(), text, self.output_tag)
            self.place_cursor(self.get_end_iter())
    def add_error(self, text):
        self.insert_with_tags(self.get_end_iter(), "[ERR] " + text + "\n", self.error_tag)
        self.place_cursor(self.get_end_iter())
    def add_debug(self, text):
        self.insert_with_tags(self.get_end_iter(), "[DBG] " + text + "\n", self.debug_tag)        
        self.place_cursor(self.get_end_iter())
    def on_twisted_log(self, data):
        isError = data['isError']
        message = "\n".join(data['message'])
        if isError:
            self.add_error(text)
        else:
            self.add_debug("[TWISTED] " + text)
    def on_debug_log(self, text):
        m = re_received.match(text)
        if m:
            if m.group('dir') == "LOCAL":
                self.add_input(m.group('data'))
            else:
                self.add_output(m.group('data'))
        else:
            self.add_debug(text)            

class LogWindow(gtk.Window):
    def __init__(self, buffer, title):
        gtk.Window.__init__(self, gtk.WINDOW_TOPLEVEL)
        self.set_title(title)
        self.connect('destroy', self.close)
        #
        vbox = gtk.VBox(False, 0)
        self.add(vbox)
        #vbox.pack_start(self.item_factory.get_widget("<main>"), False, False, 0)        
        sw = gtk.ScrolledWindow()
        sw.set_policy(gtk.POLICY_AUTOMATIC, gtk.POLICY_AUTOMATIC)
        # Tekstowy panel
        self.textview = gtk.TextView(buffer)
        self.textview.set_editable(False)
        self.textview.set_wrap_mode(gtk.WRAP_NONE)
        self.textview.set_justification(gtk.JUSTIFY_LEFT)
        #self.textview.set_border_window_size(gtk.TEXT_WINDOW_TOP, 15)
        #self.textview.set_border_window_size(gtk.TEXT_WINDOW_BOTTOM, 15)
        #self.textview.set_border_window_size(gtk.TEXT_WINDOW_RIGHT, 30)
        #self.textview.set_border_window_size(gtk.TEXT_WINDOW_LEFT, 30)
        vbox.pack_start(sw, True, True, 0)
        sw.add(self.textview)
        self.set_default_size(500, 500)
        self.textview.grab_focus()        
        # Przycisk kończący
        #stop_button = gtk.Button('Stop')
        #stop_button.connect('clicked', self.close)
        #vbox.add(stop_button)
        # Główne okno
        self.show_all()
    def close(self, widget, data=None):
        reactor.stop()





More information about the Twisted-Python mailing list