diff --git a/twisted/protocols/ftp.py b/twisted/protocols/ftp.py
index b035f04..444accd 100644
--- a/twisted/protocols/ftp.py
+++ b/twisted/protocols/ftp.py
@@ -25,7 +25,7 @@ from zope.interface import Interface, implements
 
 # Twisted Imports
 from twisted import copyright
-from twisted.internet import reactor, interfaces, protocol, error, defer
+from twisted.internet import defer, error, interfaces, protocol, reactor, task
 from twisted.protocols import basic, policies
 
 from twisted.python import log, failure, filepath
@@ -72,7 +72,8 @@ REQ_FILE_ACTN_PENDING_FURTHER_INFO      = "350"
 
 SVC_NOT_AVAIL_CLOSING_CTRL_CNX          = "421.1"
 TOO_MANY_CONNECTIONS                    = "421.2"
-CANT_OPEN_DATA_CNX                      = "425"
+CANT_OPEN_DATA_CNX                      = "425.1"
+DTP_TIMEOUT                             = "425.2"
 CNX_CLOSED_TXFR_ABORTED                 = "426"
 REQ_ACTN_ABRTD_FILE_UNAVAIL             = "450"
 REQ_ACTN_ABRTD_LOCAL_ERR                = "451"
@@ -141,6 +142,7 @@ RESPONSE = {
     SVC_NOT_AVAIL_CLOSING_CTRL_CNX:     '421 Service not available, closing control connection.',
     TOO_MANY_CONNECTIONS:               '421 Too many users right now, try again in a few minutes.',
     CANT_OPEN_DATA_CNX:                 "425 Can't open data connection.",
+    DTP_TIMEOUT:                        '425 Data channel initialization timed out.',
     CNX_CLOSED_TXFR_ABORTED:            '426 Transfer aborted.  Data connection closed.',
 
     REQ_ACTN_ABRTD_FILE_UNAVAIL:        '450 Requested action aborted. File unavailable.',
@@ -777,6 +779,16 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
             (self.passivePortRange,))
 
 
+    def _checkDataTransportStarted(self, command):
+        """Checks that data transport is ready.
+
+        If data transport was not requested using PORT, PASV etc it raises
+        L{BadCmdSequenceError}.
+        """
+        if self.dtpInstance is None:
+            raise BadCmdSequenceError(
+                'PORT or PASV required before %s' % (command,))
+
     def ftp_USER(self, username):
         """
         First part of login.  Get the username the peer wants to
@@ -829,7 +841,8 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
 
 
     def ftp_PASV(self):
-        """Request for a passive connection
+        """
+        Request for a passive connection
 
         from the rfc::
 
@@ -876,17 +889,19 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
 
 
     def ftp_LIST(self, path=''):
-        """ This command causes a list to be sent from the server to the
-        passive DTP.  If the pathname specifies a directory or other
+        """
+        This command causes a list to be sent from the server to the
+        passive DTP.
+
+        If the pathname specifies a directory or other
         group of files, the server should transfer a list of files
-        in the specified directory.  If the pathname specifies a
-        file then the server should send current information on the
-        file.  A null argument implies the user's current working or
-        default directory.
+        in the specified directory.
+        If the pathname specifies a file then the server should send current
+        information on the file.
+        A null argument implies the user's current working or default
+        directory.
         """
-        # Uh, for now, do this retarded thing.
-        if self.dtpInstance is None or not self.dtpInstance.isConnected:
-            return defer.fail(BadCmdSequenceError('must send PORT or PASV before RETR'))
+        self._checkDataTransportStarted('LIST')
 
         # bug in konqueror
         if path == "-a":
@@ -908,6 +923,10 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
             self.dtpInstance.transport.loseConnection()
             return (TXFR_COMPLETE_OK,)
 
+        def ebDTPTimeout(failure):
+            failure.trap(defer.TimeoutError)
+            self.reply(DTP_TIMEOUT)
+
         try:
             segments = toSegments(self.workingDirectory, path)
         except InvalidPath:
@@ -917,7 +936,9 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
             segments,
             ('size', 'directory', 'permissions', 'hardlinks',
              'modified', 'owner', 'group'))
+        d.addCallback(self._cbWaitDTPConnectionWithTimeout)
         d.addCallback(gotListing)
+        d.addErrback(ebDTPTimeout)
         return d
 
 
@@ -936,11 +957,7 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
         @return: a L{Deferred} which will be fired when the listing request
             is finished.
         """
-        # XXX: why is this check different from ftp_RETR/ftp_STOR? See #4180
-        if self.dtpInstance is None or not self.dtpInstance.isConnected:
-            return defer.fail(
-                BadCmdSequenceError('must send PORT or PASV before RETR'))
-
+        self._checkDataTransportStarted('NLST')
         try:
             segments = toSegments(self.workingDirectory, path)
         except InvalidPath:
@@ -995,14 +1012,46 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
             '*' in segments[-1] or '?' in segments[-1] or
             ('[' in segments[-1] and ']' in segments[-1])):
             d = self.shell.list(segments[:-1])
+            d.addCallback(self._cbWaitDTPConnectionWithTimeout)
             d.addCallback(cbGlob)
         else:
             d = self.shell.list(segments)
+            d.addCallback(self._cbWaitDTPConnectionWithTimeout)
             d.addCallback(cbList)
-            # self.shell.list will generate an error if the path is invalid
-            d.addErrback(listErr)
+
+        # self.shell.list will generate an error if the path is invalid
+        d.addErrback(listErr)
         return d
 
+    def _cbWaitDTPConnectionWithTimeout(self, result):
+        """
+        Helper callback that waits for DTP instance to be connected.
+
+        It will raise a C{PortConnectionError} if DTP instance is not
+        connected after the interval defined by self.factory.timeOut.
+        """
+        def ebDTPTimeout(failure):
+            """
+            Called at data transport port timeout.
+            """
+            failure.trap(defer.CancelledError)
+            return defer.fail(defer.TimeoutError("DTP connection timeout"))
+
+        def cbContinueCommand(ignore, timeoutCall):
+            if timeoutCall is not None and timeoutCall.active():
+                timeoutCall.cancel()
+            return result
+
+        def cbCallTimeout(ignore):
+            self.dtpFactory.deferred.cancel()
+
+        timeoutCall = self.factory.callLater(
+            self.factory.timeOut, cbCallTimeout, None)
+
+        self.dtpFactory.deferred.addCallback(cbContinueCommand, timeoutCall)
+        self.dtpFactory.deferred.addErrback(ebDTPTimeout)
+
+        return self.dtpFactory.deferred
 
     def ftp_CWD(self, path):
         try:
@@ -1038,8 +1087,7 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
         @rtype: L{Deferred}
         @return: a L{Deferred} which will be fired when the transfer is done.
         """
-        if self.dtpInstance is None:
-            raise BadCmdSequenceError('PORT or PASV required before RETR')
+        self._checkDataTransportStarted('RETR')
 
         try:
             newsegs = toSegments(self.workingDirectory, path)
@@ -1100,8 +1148,23 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
 
 
     def ftp_STOR(self, path):
-        if self.dtpInstance is None:
-            raise BadCmdSequenceError('PORT or PASV required before STOR')
+        """
+        This command causes the server-DTP to accept the data
+        transferred via the data connection and to store the data as
+        a file at the server site.  If the file specified in the
+        pathname exists at the server site, then its contents shall
+        be replaced by the data being transferred.  A new file is
+        created at the server site if the file specified in the
+        pathname does not already exist.
+
+        @type path: C{str}
+        @param path: The file path where the content should be stored.
+
+        @rtype: L{Deferred}
+        @return: a L{Deferred} which will be fired when the transfer
+            is finished.
+        """
+        self._checkDataTransportStarted('STOR')
 
         try:
             newsegs = toSegments(self.workingDirectory, path)
@@ -1289,7 +1352,8 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
 
 
     def cleanupDTP(self):
-        """call when DTP connection exits
+        """
+        Called when DTP connection exits.
         """
         log.msg('cleanupDTP', debug=True)
 
@@ -1329,6 +1393,8 @@ class FTPFactory(policies.LimitTotalConnectionsFactory):
 
     passivePortRange = xrange(0, 1)
 
+    callLater = reactor.callLater
+
     def __init__(self, portal=None, userAnonymous='anonymous'):
         self.portal = portal
         self.userAnonymous = userAnonymous
diff --git a/twisted/test/test_ftp.py b/twisted/test/test_ftp.py
index 23ffcba..8789132 100644
--- a/twisted/test/test_ftp.py
+++ b/twisted/test/test_ftp.py
@@ -463,6 +463,85 @@ class BasicFTPServerTestCase(FTPServerTestCase):
         self.assertEqual(portRange, protocol.wrappedProtocol.passivePortRange)
 
 
+    def _startDataConnection(self):
+        """
+        Prepare data transport protocol to look like it was created by
+        a previous call to PASV or PORT
+        """
+        self.serverProtocol.dtpFactory = ftp.DTPFactory(self.serverProtocol)
+        self.serverProtocol.dtpFactory.buildProtocol('ignore_address')
+
+        self.serverProtocol.dtpPort = self.serverProtocol.listenFactory(
+            6000, self.serverProtocol.dtpFactory)
+
+        dtpTransport = proto_helpers.StringTransportWithDisconnection()
+        dtpTransport.protocol = ftp.DTP()
+        self.serverProtocol.dtpInstance.transport = dtpTransport
+
+
+    def test_LISTWithoutDataChannel(self):
+        """
+        Calling LIST without prior setup of data connection will result in a
+        incorrect sequence of commands error.
+        """
+        d = self._anonymousLogin()
+        self.assertCommandFailed(
+            'LIST .',
+            ["503 Incorrect sequence of commands: "
+             "PORT or PASV required before LIST"],
+            chainDeferred=d)
+        return d
+
+
+    def test_LISTTimeout(self):
+        """
+        LIST will timeout if setting up the DTP instance will take to long.
+        """
+        # Set timeout to a very small value to not slow down tests.
+        self.factory.timeOut = 0.01
+
+        d = self._anonymousLogin()
+        self._startDataConnection()
+
+        self.assertCommandFailed(
+            'LIST .',
+            ["425 Data channel initialization timed out."],
+            chainDeferred=d)
+        return d
+
+
+    def test_NLSTWithoutDataChannel(self):
+        """
+        Calling NLST without prior setup of data connection will result in a
+        incorrect sequence of commands error.
+        """
+        d = self._anonymousLogin()
+        self.assertCommandFailed(
+            'NLST .',
+            ["503 Incorrect sequence of commands: "
+             "PORT or PASV required before NLST"],
+            chainDeferred=d)
+        return d
+
+
+    def test_NLSTTimeout(self):
+        """
+        NLST will timeout if setting up the DTP instance will take to long,
+        but NLST will not return any error.
+        """
+        self.factory.timeOut = 0.01
+
+        d = self._anonymousLogin()
+        self._startDataConnection()
+
+        # Set timeout to a very small value to not slow down tests.
+        self.assertCommandResponse(
+            'NLST .',
+            ["226 Transfer Complete."],
+            chainDeferred=d)
+        return d
+
+
 
 class FTPServerTestCaseAdvancedClient(FTPServerTestCase):
     """
