diff --git a/twisted/protocols/ftp.py b/twisted/protocols/ftp.py
index b035f04..38740d8 100644
--- a/twisted/protocols/ftp.py
+++ b/twisted/protocols/ftp.py
@@ -25,7 +25,7 @@ from zope.interface import Interface, implements
 
 # Twisted Imports
 from twisted import copyright
-from twisted.internet import reactor, interfaces, protocol, error, defer
+from twisted.internet import defer, error, interfaces, protocol, reactor, task
 from twisted.protocols import basic, policies
 
 from twisted.python import log, failure, filepath
@@ -72,7 +72,8 @@ REQ_FILE_ACTN_PENDING_FURTHER_INFO      = "350"
 
 SVC_NOT_AVAIL_CLOSING_CTRL_CNX          = "421.1"
 TOO_MANY_CONNECTIONS                    = "421.2"
-CANT_OPEN_DATA_CNX                      = "425"
+CANT_OPEN_DATA_CNX                      = "425.1"
+DTP_TIMEOUT                             = "425.2"
 CNX_CLOSED_TXFR_ABORTED                 = "426"
 REQ_ACTN_ABRTD_FILE_UNAVAIL             = "450"
 REQ_ACTN_ABRTD_LOCAL_ERR                = "451"
@@ -141,6 +142,7 @@ RESPONSE = {
     SVC_NOT_AVAIL_CLOSING_CTRL_CNX:     '421 Service not available, closing control connection.',
     TOO_MANY_CONNECTIONS:               '421 Too many users right now, try again in a few minutes.',
     CANT_OPEN_DATA_CNX:                 "425 Can't open data connection.",
+    DTP_TIMEOUT:                        '425 Data channel initialization timed out.',
     CNX_CLOSED_TXFR_ABORTED:            '426 Transfer aborted.  Data connection closed.',
 
     REQ_ACTN_ABRTD_FILE_UNAVAIL:        '450 Requested action aborted. File unavailable.',
@@ -777,6 +779,16 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
             (self.passivePortRange,))
 
 
+    def _checkDataTransportStarted(self, command):
+        """Checks that data transport is ready.
+
+        If data transport was not requested using PORT, PASV etc it raises
+        L{BadCmdSequenceError}.
+        """
+        if self.dtpInstance is None:
+            raise BadCmdSequenceError(
+                'PORT or PASV required before %s' % (command,))
+
     def ftp_USER(self, username):
         """
         First part of login.  Get the username the peer wants to
@@ -884,9 +896,7 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
         file.  A null argument implies the user's current working or
         default directory.
         """
-        # Uh, for now, do this retarded thing.
-        if self.dtpInstance is None or not self.dtpInstance.isConnected:
-            return defer.fail(BadCmdSequenceError('must send PORT or PASV before RETR'))
+        self._checkDataTransportStarted('LIST')
 
         # bug in konqueror
         if path == "-a":
@@ -917,6 +927,7 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
             segments,
             ('size', 'directory', 'permissions', 'hardlinks',
              'modified', 'owner', 'group'))
+        d.addCallback(self._cbWaitDTPConnectionWithTimeout)
         d.addCallback(gotListing)
         return d
 
@@ -936,11 +947,7 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
         @return: a L{Deferred} which will be fired when the listing request
             is finished.
         """
-        # XXX: why is this check different from ftp_RETR/ftp_STOR? See #4180
-        if self.dtpInstance is None or not self.dtpInstance.isConnected:
-            return defer.fail(
-                BadCmdSequenceError('must send PORT or PASV before RETR'))
-
+        self._checkDataTransportStarted('NLST')
         try:
             segments = toSegments(self.workingDirectory, path)
         except InvalidPath:
@@ -995,14 +1002,40 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
             '*' in segments[-1] or '?' in segments[-1] or
             ('[' in segments[-1] and ']' in segments[-1])):
             d = self.shell.list(segments[:-1])
+            d.addCallback(self._cbWaitDTPConnectionWithTimeout)
             d.addCallback(cbGlob)
         else:
             d = self.shell.list(segments)
+            d.addCallback(self._cbWaitDTPConnectionWithTimeout)
             d.addCallback(cbList)
-            # self.shell.list will generate an error if the path is invalid
-            d.addErrback(listErr)
+
+        # self.shell.list will generate an error if the path is invalid
+        d.addErrback(listErr)
         return d
 
+    def _cbWaitDTPConnectionWithTimeout(self, result):
+        """Helper callback that waits for DTP instance to be connected.
+
+        It will raise a {PortConnectionError} if DTP instance is not
+        connected after the interval defined by self.factory.timeOut.
+        """
+        def cbTimeout(ignore):
+            self.reply(DTP_TIMEOUT)
+            return defer.fail(
+                     PortConnectionError(
+                         defer.TimeoutError("DTP connection timeout")))
+
+        def cbContinueCommand(ignore, timeoutDeferred):
+            if timeoutDeferred is not None and timeoutDeferred.active():
+                timeoutDeferred.cancel()
+            return result
+
+        deferred = self.factory._reactor.callLater(
+            self.factory.timeOut, cbTimeout, None)
+
+        self.dtpFactory.deferred.addCallback(
+            cbContinueCommand, deferred)
+        return self.dtpFactory.deferred
 
     def ftp_CWD(self, path):
         try:
@@ -1038,8 +1071,7 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
         @rtype: L{Deferred}
         @return: a L{Deferred} which will be fired when the transfer is done.
         """
-        if self.dtpInstance is None:
-            raise BadCmdSequenceError('PORT or PASV required before RETR')
+        self._checkDataTransportStarted('RETR')
 
         try:
             newsegs = toSegments(self.workingDirectory, path)
@@ -1100,8 +1132,23 @@ class FTP(object, basic.LineReceiver, policies.TimeoutMixin):
 
 
     def ftp_STOR(self, path):
-        if self.dtpInstance is None:
-            raise BadCmdSequenceError('PORT or PASV required before STOR')
+        """
+        This command causes the server-DTP to accept the data
+        transferred via the data connection and to store the data as
+        a file at the server site.  If the file specified in the
+        pathname exists at the server site, then its contents shall
+        be replaced by the data being transferred.  A new file is
+        created at the server site if the file specified in the
+        pathname does not already exist.
+
+        @type path: C{str}
+        @param path: The file path where the content should be stored.
+
+        @rtype: L{Deferred}
+        @return: a L{Deferred} which will be fired when the transfer
+            is finished.
+        """
+        self._checkDataTransportStarted('STOR')
 
         try:
             newsegs = toSegments(self.workingDirectory, path)
@@ -1329,10 +1376,14 @@ class FTPFactory(policies.LimitTotalConnectionsFactory):
 
     passivePortRange = xrange(0, 1)
 
-    def __init__(self, portal=None, userAnonymous='anonymous'):
+    def __init__(self, portal=None, userAnonymous='anonymous', reactor=None):
         self.portal = portal
         self.userAnonymous = userAnonymous
         self.instances = []
+        if reactor is None:
+            from twisted.internet import reactor
+        self._reactor = reactor
+
 
     def buildProtocol(self, addr):
         p = policies.LimitTotalConnectionsFactory.buildProtocol(self, addr)
diff --git a/twisted/test/test_ftp.py b/twisted/test/test_ftp.py
index 23ffcba..1883507 100644
--- a/twisted/test/test_ftp.py
+++ b/twisted/test/test_ftp.py
@@ -463,6 +463,91 @@ class BasicFTPServerTestCase(FTPServerTestCase):
         self.assertEqual(portRange, protocol.wrappedProtocol.passivePortRange)
 
 
+    def _startDataConnection(self):
+        """
+        Prepare data transport protocol to look like it was created by
+        a previous call to PASV or PORT
+        """
+        self.serverProtocol.dtpFactory = ftp.DTPFactory(self.serverProtocol)
+        self.serverProtocol.dtpFactory.buildProtocol('ignore_address')
+
+        self.serverProtocol.dtpPort = self.serverProtocol.listenFactory(
+            6000, self.serverProtocol.dtpFactory)
+
+        dtpTransport = proto_helpers.StringTransportWithDisconnection()
+        dtpTransport.protocol = ftp.DTP()
+        self.serverProtocol.dtpInstance.transport = dtpTransport
+
+
+    def test_LISTWithoutDataChannel(self):
+        """
+        Calling LIST without prior setup of data connection will result in a
+        incorrect sequence of commands error.
+        """
+        d = self._anonymousLogin()
+        self.assertCommandFailed(
+            'LIST .',
+            ["503 Incorrect sequence of commands: "
+             "PORT or PASV required before LIST"],
+            chainDeferred=d)
+        return d
+
+
+    def test_LISTTimeout(self):
+        """
+        LIST will timeout if setting up the DTP instance will take to long.
+        """
+        d = self._anonymousLogin()
+        self._startDataConnection()
+
+        # Set timeout to a very small value to not slow down tests.
+        self.serverProtocol.factory.timeOut = 0.1
+
+        self.assertCommandFailed(
+            'LIST .',
+            ["425 Data channel initialization timed out."],
+            chainDeferred=d)
+        return d
+
+
+    def test_NLSTWithoutDataChannel(self):
+        """
+        Calling NLST without prior setup of data connection will result in a
+        incorrect sequence of commands error.
+        """
+        d = self._anonymousLogin()
+        self.assertCommandFailed(
+            'NLST .',
+            ["503 Incorrect sequence of commands: "
+             "PORT or PASV required before NLST"],
+            chainDeferred=d)
+        return d
+
+
+    def test_NLSTTimeout(self):
+        """
+        NLST will timeout if setting up the DTP instance will take to long.
+        """
+
+        def cbAdvanceClock(result, clock):
+            clock.advance(6)
+            return result
+
+        self.factory.timeOut = 5
+        self.factory._reactor = task.Clock()
+
+        d = self._anonymousLogin()
+        self._startDataConnection()
+
+        # Set timeout to a very small value to not slow down tests.
+        self.assertCommandFailed(
+            'NLST .',
+            ["425 Data channel initialization timed out."],
+            chainDeferred=d)
+        d.addCallback(cbAdvanceClock, self.factory._reactor)
+        return d
+
+
 
 class FTPServerTestCaseAdvancedClient(FTPServerTestCase):
     """
