Ticket #2157: t-conch-test.patch

File t-conch-test.patch, 23.2 KB (added by John Popplewell, 6 years ago)

lots of minor tweaks to the conch tests

  • twisted/conch/test/test_cftp.py

     
    88
    99import locale
    1010import time, sys, os, operator, getpass, struct
     11import posixpath
    1112from StringIO import StringIO
    1213
    1314from twisted.conch.test.test_ssh import Crypto, pyasn1
     15from twisted.python.runtime import platform
    1416
    1517_reason = None
    1618if Crypto and pyasn1:
    1719    try:
    18         from twisted.conch import unix
     20        if platform.isWindows():
     21            from twisted.conch import windows as unix
     22        else:
     23            from twisted.conch import unix
    1924        from twisted.conch.scripts import cftp
    2025        from twisted.conch.test.test_filetransfer import FileTransferForTestAvatar
    2126    except ImportError as e:
     
    216221        # which uses features not provided by our dumb Connection fake.
    217222        self.client.transport = StringTransport()
    218223
     224        if platform.isWindows():
     225            os.getuid = lambda: 0
    219226
     227    def tearDown(self):
     228        if platform.isWindows():
     229            del os.getuid
     230
    220231    def test_exec(self):
    221232        """
    222233        The I{exec} command runs its arguments locally in a child process
     
    227238            sys.executable)
    228239
    229240        d = self.client._dispatchCommand("exec print 1 + 2")
    230         d.addCallback(self.assertEqual, "3\n")
     241        d.addCallback(self.assertEqual, "3"+os.linesep)
    231242        return d
    232243
    233244
     
    240251            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', '')
    241252
    242253        d = self.client._dispatchCommand("exec echo hello")
    243         d.addCallback(self.assertEqual, "hello\n")
     254        d.addCallback(self.assertEqual, "hello"+os.linesep)
    244255        return d
    245256
    246257
     
    248259        """
    249260        The I{exec} command is run for lines which start with C{"!"}.
    250261        """
     262        if platform.isWindows():
     263            shell = os.environ['COMSPEC']
     264        else:
     265            shell = '/bin/sh'
    251266        self.database.addUser(
    252             getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
    253             '/bin/sh')
     267            getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', shell)
    254268
    255269        d = self.client._dispatchCommand("!echo hello")
    256         d.addCallback(self.assertEqual, "hello\n")
     270        d.addCallback(self.assertEqual, "hello"+os.linesep)
    257271        return d
    258272
    259273
     
    267281        @param height: the height in characters
    268282        @type height: C{int}
    269283        """
    270         import tty # local import to avoid win32 issues
     284        try:
     285            from tty import TIOCGWINSZ
     286        except ImportError:
     287            TIOCGWINSZ = 0x5413
    271288        class FakeFcntl(object):
    272289            def ioctl(self, fd, opt, mutate):
    273                 if opt != tty.TIOCGWINSZ:
     290                if opt != TIOCGWINSZ:
    274291                    self.fail("Only window-size queries supported.")
    275292                return struct.pack("4H", height, width, 0, 0)
    276293        self.patch(cftp, "fcntl", FakeFcntl())
     
    404421        """
    405422        self._expectingCommand = defer.Deferred()
    406423        self.clearBuffer()
    407         self.transport.write(command + '\n')
     424        self.transport.write(command + os.linesep)
    408425        return self._expectingCommand
    409426
    410427    def runScript(self, commands):
     
    449466
    450467class CFTPClientTestBase(SFTPTestBase):
    451468    def setUp(self):
    452         f = open('dsa_test.pub','w')
     469        f = open('dsa_test.pub','wb')
    453470        f.write(test_ssh.publicDSA_openssh)
    454471        f.close()
    455         f = open('dsa_test','w')
     472        f = open('dsa_test','wb')
    456473        f.write(test_ssh.privateDSA_openssh)
    457474        f.close()
    458         os.chmod('dsa_test', 33152)
    459         f = open('kh_test','w')
     475        if not platform.isWindows():
     476            os.chmod('dsa_test', 33152)
     477        f = open('kh_test','wb')
    460478        f.write('127.0.0.1 ' + test_ssh.publicRSA_openssh)
    461479        f.close()
    462480        return SFTPTestBase.setUp(self)
     
    544562        """
    545563        return self.processProtocol.runScript(commands)
    546564
     565    def getCwd(self):
     566        cwd = os.getcwd()
     567        if platform.isWindows():
     568            cwd = os.path.splitdrive(cwd)[1]    # remove drive spec.
     569            return cwd.replace('\\', '/')
     570        return cwd
     571
    547572    def testCdPwd(self):
    548573        """
    549574        Test that 'pwd' reports the current remote directory, that 'lpwd'
     
    552577        remote directory.
    553578        """
    554579        # XXX - not actually a unit test, see docstring.
    555         homeDir = os.path.join(os.getcwd(), self.testDir)
     580        cwd = self.getCwd()
     581        homeDir = posixpath.join(cwd, self.testDir)
    556582        d = self.runScript('pwd', 'lpwd', 'cd testDirectory', 'cd ..', 'pwd')
    557583        d.addCallback(lambda xs: xs[:3] + xs[4:])
    558584        d.addCallback(self.assertEqual,
     
    566592        """
    567593        def _check(results):
    568594            self.flushLoggedErrors()
    569             self.assertTrue(results[0].startswith('-rw-r--r--'))
     595            if platform.isWindows():
     596                self.assertTrue(results[0].startswith('-rw-rw-rw-'))
     597            else:
     598                self.assertTrue(results[0].startswith('-rw-r--r--'))
    570599            self.assertEqual(results[1], '')
    571             self.assertTrue(results[2].startswith('----------'), results[2])
     600            if platform.isWindows():
     601                self.assertTrue(results[2].startswith('-r--r--r--'), results[2])
     602            else:
     603                self.assertTrue(results[2].startswith('----------'), results[2])
    572604            self.assertEqual(results[3], '')
    573605
    574606        d = self.runScript('ls -l testfile1', 'chmod 0 testfile1',
     
    611643        Assert that the files at C{name1} and C{name2} contain exactly the
    612644        same data.
    613645        """
    614         f1 = file(name1).read()
    615         f2 = file(name2).read()
     646        f1 = file(name1, "rb").read()
     647        f2 = file(name2, "rb").read()
    616648        self.assertEqual(f1, f2, msg)
    617649
    618650
     
    624656        """
    625657        # XXX - not actually a unit test
    626658        expectedOutput = ("Transferred %s/%s/testfile1 to %s/test file2"
    627                           % (os.getcwd(), self.testDir, self.testDir))
     659                          % (self.getCwd(), self.testDir, self.testDir))
    628660        def _checkGet(result):
    629661            self.assertTrue(result.endswith(expectedOutput))
    630662            self.assertFilesEqual(self.testDir + '/testfile1',
     
    661693        successfully removed. Also check the output of the put command.
    662694        """
    663695        # XXX - not actually a unit test
    664         expectedOutput = ('Transferred %s/testfile1 to %s/%s/test"file2'
    665                           % (self.testDir, os.getcwd(), self.testDir))
     696        if platform.isWindows():
     697            testfile2 = 'testfile2'
     698            escaped_testfile2 = 'testfile2'
     699        else:
     700            testfile2 = 'test"file2'
     701            escaped_testfile2 = 'test\\"file2'
     702
     703        expectedOutput = ('Transferred %s/testfile1 to %s/%s/%s'
     704                          % (self.testDir, self.getCwd(), self.testDir, testfile2))
    666705        def _checkPut(result):
    667706            self.assertFilesEqual(self.testDir + '/testfile1',
    668                                   self.testDir + '/test"file2')
     707                                  self.testDir + '/'+testfile2)
    669708            self.failUnless(result.endswith(expectedOutput))
    670             return self.runCommand('rm "test\\"file2"')
     709            return self.runCommand('rm "%s"' % escaped_testfile2)
    671710
    672         d = self.runCommand('put %s/testfile1 "test\\"file2"'
    673                             % (self.testDir,))
     711        d = self.runCommand('put %s/testfile1 "%s"'
     712                            % (self.testDir, escaped_testfile2))
    674713        d.addCallback(_checkPut)
    675714        d.addCallback(lambda _: self.failIf(
    676             os.path.exists(self.testDir + '/test"file2')))
     715            os.path.exists(self.testDir + testfile2)))
    677716        return d
    678717
    679718
     
    683722        file.
    684723        """
    685724        # XXX - not actually a unit test
    686         f = file(os.path.join(self.testDir, 'shorterFile'), 'w')
     725        f = file(os.path.join(self.testDir, 'shorterFile'), 'wb')
    687726        f.write("a")
    688727        f.close()
    689         f = file(os.path.join(self.testDir, 'longerFile'), 'w')
     728        f = file(os.path.join(self.testDir, 'longerFile'), 'wb')
    690729        f.write("bb")
    691730        f.close()
    692731        def _checkPut(result):
     
    706745        """
    707746        # XXX - not actually a unit test
    708747        os.mkdir(os.path.join(self.testDir, 'dir'))
    709         f = file(os.path.join(self.testDir, 'dir', 'file'), 'w')
     748        f = file(os.path.join(self.testDir, 'dir', 'file'), 'wb')
    710749        f.write("a")
    711750        f.close()
    712         f = file(os.path.join(self.testDir, 'file'), 'w')
     751        f = file(os.path.join(self.testDir, 'file'), 'wb')
    713752        f.write("bb")
    714753        f.close()
    715754        def _checkPut(result):
     
    760799        d.addCallback(_check)
    761800        d.addCallback(self.assertEqual, '')
    762801        return d
     802    if platform.isWindows():
     803        testLink.skip = "Windows sftp server doesn't support symbolic links."
    763804
    764805
    765806    def testRemoteDirectory(self):
     
    781822    def test_existingRemoteDirectory(self):
    782823        """
    783824        Test that a C{mkdir} on an existing directory fails with the
    784         appropriate error, and doesn't log an useless error server side.
     825        appropriate error, and doesn't log a useless error server side.
    785826        """
    786827        def _check(results):
    787828            self.assertEqual(results[0], '')
     
    9711012    from twisted.python.procutils import which
    9721013    if not which('sftp'):
    9731014        TestOurServerSftpClient.skip = "no sftp command-line client available"
     1015
  • twisted/conch/test/test_checkers.py

     
    175175    """
    176176    Tests for L{SSHPublicKeyDatabase}.
    177177    """
    178     skip = euidSkip or dependencySkip
     178    skip = dependencySkip
    179179
    180180    def setUp(self):
    181181        self.checker = checkers.SSHPublicKeyDatabase()
     
    253253        self.assertEqual(self.mockos.seteuidCalls, [0, 1, 0, 2345])
    254254        self.assertEqual(self.mockos.setegidCalls, [2, 1234])
    255255
     256    if euidSkip:
     257        test_checkKeyAsRoot.skip = "Not supported on Windows"
    256258
    257259    def test_requestAvatarId(self):
    258260        """
  • twisted/conch/test/test_conch.py

     
    526526    """
    527527    Connection forwarding tests run against the Conch command line client.
    528528    """
    529     if runtime.platformType == 'win32':
    530         skip = "can't run cmdline client on win32"
    531 
    532529    def execute(self, remoteCommand, process, sshArgs=''):
    533530        """
    534531        As for L{OpenSSHClientTestCase.execute}, except it runs the 'conch'
     
    550547        env['PYTHONPATH'] = os.pathsep.join(sys.path)
    551548        reactor.spawnProcess(process, sys.executable, cmds, env=env)
    552549        return process.deferred
     550
  • twisted/conch/test/test_filetransfer.py

     
    1010import re
    1111import struct
    1212import sys
     13import posixpath
    1314
    1415from twisted.trial import unittest
    15 try:
    16     from twisted.conch import unix
    17     unix # shut up pyflakes
    18 except ImportError:
    19     unix = None
     16from twisted.python.runtime import platform
    2017
     18if platform.isWindows():
     19    from twisted.conch.windows import SFTPServerForWindowsConchUser as SFTPServerForConchUser
     20else:
     21    from twisted.conch.unix import SFTPServerForUnixConchUser as SFTPServerForConchUser
     22
    2123from twisted.conch import avatar
    2224from twisted.conch.ssh import common, connection, filetransfer, session
    2325from twisted.internet import defer
     
    4446        return r
    4547
    4648
     49def posixGetCWD():
     50    basepath = os.getcwd()
     51    path = os.path.splitdrive(basepath)[1]    # remove drive spec.
     52    return path.replace('\\','/')
     53   
     54
    4755class FileTransferTestAvatar(TestAvatar):
    4856
    4957    def __init__(self, homeDir):
     
    5159        self.homeDir = homeDir
    5260
    5361    def getHomeDir(self):
    54         return os.path.join(os.getcwd(), self.homeDir)
     62        return posixpath.join(posixGetCWD(), self.homeDir)
    5563
    5664
    5765class ConchSessionForTestAvatar:
     
    5967    def __init__(self, avatar):
    6068        self.avatar = avatar
    6169
    62 if unix:
    63     if not hasattr(unix, 'SFTPServerForUnixConchUser'):
    64         # unix should either be a fully working module, or None.  I'm not sure
    65         # how this happens, but on win32 it does.  Try to cope.  --spiv.
    66         import warnings
    67         warnings.warn(("twisted.conch.unix imported %r, "
    68                        "but doesn't define SFTPServerForUnixConchUser'")
    69                       % (unix,))
    70         unix = None
    71     else:
    72         class FileTransferForTestAvatar(unix.SFTPServerForUnixConchUser):
     70class FileTransferForTestAvatar(SFTPServerForConchUser):
    7371
    74             def gotVersion(self, version, otherExt):
    75                 return {'conchTest' : 'ext data'}
     72    def gotVersion(self, version, otherExt):
     73        return {'conchTest' : 'ext data'}
    7674
    77             def extendedRequest(self, extName, extData):
    78                 if extName == 'testExtendedRequest':
    79                     return 'bar'
    80                 raise NotImplementedError
     75    def extendedRequest(self, extName, extData):
     76        if extName == 'testExtendedRequest':
     77            return 'bar'
     78        raise NotImplementedError
    8179
    82         components.registerAdapter(FileTransferForTestAvatar,
    83                                    TestAvatar,
    84                                    filetransfer.ISFTPServer)
     80components.registerAdapter(FileTransferForTestAvatar,
     81                           TestAvatar,
     82                           filetransfer.ISFTPServer)
    8583
     84
    8685class SFTPTestBase(unittest.TestCase):
    8786
    8887    def setUp(self):
    89         self.testDir = self.mktemp()
     88        basepath = self.mktemp()
     89        path = os.path.splitdrive(basepath)[1]    # remove drive spec.
     90        self.testDir = path.replace('\\','/')+"/"
    9091        # Give the testDir another level so we can safely "cd .." from it in
    9192        # tests.
    92         self.testDir = os.path.join(self.testDir, 'extra')
    93         os.makedirs(os.path.join(self.testDir, 'testDirectory'))
     93        self.testDir = posixpath.join(self.testDir, 'extra')
     94        os.makedirs(posixpath.join(self.testDir, 'testDirectory'))
    9495
    95         f = file(os.path.join(self.testDir, 'testfile1'),'w')
     96        f = file(posixpath.join(self.testDir, 'testfile1'),'wb')
    9697        f.write('a'*10+'b'*10)
    97         f.write(file('/dev/urandom').read(1024*64)) # random data
    98         os.chmod(os.path.join(self.testDir, 'testfile1'), 0644)
    99         file(os.path.join(self.testDir, 'testRemoveFile'), 'w').write('a')
    100         file(os.path.join(self.testDir, 'testRenameFile'), 'w').write('a')
    101         file(os.path.join(self.testDir, '.testHiddenFile'), 'w').write('a')
     98        f.write(os.urandom(1024*64)) # random data
     99        if not platform.isWindows():
     100            os.chmod(os.path.join(self.testDir, 'testfile1'), 0644)
     101        file(posixpath.join(self.testDir, 'testRemoveFile'), 'wb').write('a')
     102        file(posixpath.join(self.testDir, 'testRenameFile'), 'wb').write('a')
     103        file(posixpath.join(self.testDir, '.testHiddenFile'), 'wb').write('a')
    102104
    103105
    104106class TestOurServerOurClient(SFTPTestBase):
    105107
    106     if not unix:
    107         skip = "can't run on non-posix computers"
    108 
    109108    def setUp(self):
    110109        SFTPTestBase.setUp(self)
    111110
     
    438437            d = self.client.readLink('testLink')
    439438            self._emptyBuffers()
    440439            d.addCallback(self.assertEqual,
    441                           os.path.join(os.getcwd(), self.testDir, 'testfile1'))
     440                          posixpath.join(posixGetCWD(), self.testDir, 'testfile1'))
    442441            return d
    443442        def _realPath(_):
    444443            d = self.client.realPath('testLink')
    445444            self._emptyBuffers()
    446445            d.addCallback(self.assertEqual,
    447                           os.path.join(os.getcwd(), self.testDir, 'testfile1'))
     446                          posixpath.join(posixGetCWD(), self.testDir, 'testfile1'))
    448447            return d
    449448        d.addCallback(_readLink)
    450449        d.addCallback(_realPath)
    451450        return d
    452451
     452    if platform.isWindows():
     453        testLinkSharesAttrs.skip = "Not supported on Windows"
     454        testLinkPath.skip = "Not supported on Windows"
     455
    453456    def testExtendedRequest(self):
    454457        d = self.client.extendedRequest('testExtendedRequest', 'foo')
    455458        self._emptyBuffers()
     
    470473
    471474class TestFileTransferClose(unittest.TestCase):
    472475
    473     if not unix:
    474         skip = "can't run on non-posix computers"
    475 
    476476    def setUp(self):
    477477        self.avatar = TestAvatar()
    478478
     
    760760        """
    761761        self.assertEqual(result[0], 'msg')
    762762        self.assertEqual(result[1], '')
     763
     764
     765class TestFileTransferClientMakeConnection(SFTPTestBase):
     766    """
     767    Test for L{filetransfer.FileTransferClient} makeConnection().
     768    """
     769
     770    def setUp(self):
     771        SFTPTestBase.setUp(self)
     772        self.avatar = FileTransferTestAvatar(self.testDir)
     773        self.server = filetransfer.FileTransferServer(avatar=self.avatar)
     774        self.clientTransport = loopback.LoopbackRelay(self.server)
     775        extData = {"test_key":"test_value"}
     776        self.client = filetransfer.FileTransferClient(extData)
     777        self.assertEqual(self.client.extData, extData)
     778        self.serverTransport = loopback.LoopbackRelay(self.client)
     779 
     780    def tearDown(self):
     781        self.serverTransport.loseConnection()
     782        self.clientTransport.loseConnection()
     783        self.serverTransport.clearBuffer()
     784        self.clientTransport.clearBuffer()
     785 
     786    def test_makeConnection(self):
     787        self.client.makeConnection(self.clientTransport)
     788        self.server.makeConnection(self.serverTransport)
     789 
  • twisted/conch/test/test_openssh_compat.py

     
    2626    """
    2727    Tests for L{OpenSSHFactory}.
    2828    """
    29     if getattr(os, "geteuid", None) is None:
    30         skip = "geteuid/seteuid not available"
    31     elif OpenSSHFactory is None:
     29    if OpenSSHFactory is None:
    3230        skip = "Cannot run without PyCrypto or PyASN1"
    3331
    3432    def setUp(self):
     
    5048            keydata.publicRSA_openssh)
    5149
    5250        self.mockos = MockOS()
    53         self.patch(os, "seteuid", self.mockos.seteuid)
    54         self.patch(os, "setegid", self.mockos.setegid)
     51        if getattr(os, "geteuid", None) is not None:
     52            self.patch(os, "seteuid", self.mockos.seteuid)
     53            self.patch(os, "setegid", self.mockos.setegid)
    5554
    5655
    5756    def test_getPublicKeys(self):
     
    9998        self.assertEqual(set(keyTypes), set(['ssh-rsa', 'ssh-dss']))
    10099        self.assertEqual(self.mockos.seteuidCalls, [0, os.geteuid()])
    101100        self.assertEqual(self.mockos.setegidCalls, [0, os.getegid()])
     101
     102    if getattr(os, "geteuid", None) is None:
     103        test_getPrivateKeysAsRoot.skip = "Not supported on windows"
     104
  • twisted/conch/test/test_recvline.py

     
    1717from twisted.trial import unittest
    1818from twisted.cred import portal
    1919from twisted.test.proto_helpers import StringTransport
     20from twisted.python.runtime import platform
    2021
    2122class Arrows(unittest.TestCase):
    2223    def setUp(self):
     
    566567        env["PYTHONPATH"] = os.pathsep.join(sys.path)
    567568
    568569        from twisted.internet import reactor
     570        usePTY = not platform.isWindows()
    569571        clientTransport = reactor.spawnProcess(processClient, exe, args,
    570                                                env=env, usePTY=True)
     572                                               env=env, usePTY=usePTY)
    571573
    572574        self.recvlineClient = self.testTerminal = testTerminal
    573575        self.processClient = processClient
     
    588590            pass
    589591        def trap(failure):
    590592            failure.trap(error.ProcessTerminated)
    591             self.assertEqual(failure.value.exitCode, None)
    592             self.assertEqual(failure.value.status, 9)
     593            if platform.isWindows():
     594                self.assertEqual(failure.value.exitCode, 1)
     595                self.assertEqual(failure.value.status, None)
     596            else:
     597                self.assertEqual(failure.value.exitCode, None)
     598                self.assertEqual(failure.value.status, 9)
    593599        return self.testTerminal.onDisconnection.addErrback(trap)
    594600
    595601    def _testwrite(self, bytes):
     
    704710class HistoricRecvlineLoopbackStdio(_StdioMixin, unittest.TestCase, HistoricRecvlineLoopbackMixin):
    705711    if stdio is None:
    706712        skip = "Terminal requirements missing, can't run historic recvline tests over stdio"
     713
  • twisted/conch/test/test_scripts.py

     
    5353
    5454    def test_conch(self):
    5555        self.scriptTest("conch/conch")
    56     test_conch.skip = ttySkip or skip
     56    test_conch.skip = skip
    5757
    5858
    5959    def test_cftp(self):
    6060        self.scriptTest("conch/cftp")
    61     test_cftp.skip = ttySkip or skip
     61    test_cftp.skip = skip
    6262
    6363
    6464    def test_ckeygen(self):
  • twisted/conch/test/test_tap.py

     
    1515except ImportError:
    1616    pyasn1 = None
    1717
    18 try:
    19     from twisted.conch import unix
    20 except ImportError:
    21     unix = None
    22 
    23 if Crypto and pyasn1 and unix:
     18if Crypto and pyasn1:
    2419    from twisted.conch import tap
    2520    from twisted.conch.openssh_compat.factory import OpenSSHFactory
    2621
     
    4540    if not pyasn1:
    4641        skip = "Cannot run without PyASN1"
    4742
    48     if not unix:
    49         skip = "can't run on non-posix computers"
    50 
    5143    usernamePassword = ('iamuser', 'thisispassword')
    5244
    5345    def setUp(self):
  • twisted/conch/test/test_userauth.py

     
    281281
    282282
    283283    if keys is None:
    284         skip = "cannot run w/o PyCrypto"
     284        skip = "cannot run without PyCrypto and PyASN1"
    285285
    286286
    287287    def setUp(self):
     
    724724
    725725
    726726    if keys is None:
    727         skip = "cannot run w/o PyCrypto"
     727        skip = "cannot run without PyCrypto and PyASN1"
    728728
    729729
    730730    def setUp(self):
     
    998998
    999999
    10001000    if keys is None:
    1001         skip = "cannot run w/o PyCrypto or PyASN1"
     1001        skip = "cannot run without PyCrypto and PyASN1"
    10021002
    10031003
    10041004    class Factory:
     
    10651065
    10661066class ModuleInitializationTestCase(unittest.TestCase):
    10671067    if keys is None:
    1068         skip = "cannot run w/o PyCrypto or PyASN1"
     1068        skip = "cannot run without PyCrypto and PyASN1"
    10691069
    10701070
    10711071    def test_messages(self):