[PATCH] TCP: Reassemble TCP PDU segments for complete NFS message processing

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Previously, nfstest was unable to process NFS messages split across
multiple TCP segments, resulting in missed NFS operations
and numerous test failures.

For example:

4   0.000816 192.168.122.198 → 192.168.122.199 NFS 110 V4 NULL Call
5   0.001073 192.168.122.199 → 192.168.122.198 TCP 66 2049 → 775 [ACK]
		Seq=1 Ack=45 Win=65152 Len=0 TSval=3032720633 TSecr=443583529
6   0.001155 192.168.122.199 → 192.168.122.198 TCP 70 2049 → 775 [PSH, ACK]
		Seq=1 Ack=45 Win=65152 Len=4 TSval=3032720634 TSecr=443583529 [TCP segment of a reassembled PDU]
7   0.001155 192.168.122.199 → 192.168.122.198 NFS 90 V4 NULL Reply (Call In 4)

This patch introduces functionality to reassemble TCP segments,
allowing nfstest to accurately decode complete NFS messages,
similar to Wireshark's "TCP Segment of a Reassembled PDU.

Signed-off-by: Chen Hanxiao <chenhx.fnst@xxxxxxxxxxx>
---
 packet/pktt.py          |   6 +++
 packet/transport/tcp.py | 104 +++++++++++++++++++++++++++++++++++++++-
 packet/unpack.py        |   6 +++
 3 files changed, 115 insertions(+), 1 deletion(-)

diff --git a/packet/pktt.py b/packet/pktt.py
index b9420f9..3609fdc 100644
--- a/packet/pktt.py
+++ b/packet/pktt.py
@@ -368,6 +368,10 @@ class Pktt(BaseObj):
         # IPv4 fragments used in reassembly
         self._ipv4_fragments = {}
 
+        # IPv4 PDU fragments
+        self._tcp_pdu_map = {}
+        self._tcp_pdu_pkt = {}
+
         # RDMA reassembly object
         self._rdma_info = RDMAinfo()
 
@@ -419,6 +423,8 @@ class Pktt(BaseObj):
         del self._tcp_stream_map
         del self._rpc_xid_map
         del self._rdma_info
+        del self._tcp_pdu_map
+        del self._tcp_pdu_pkt
 
     def __del__(self):
         """Destructor
diff --git a/packet/transport/tcp.py b/packet/transport/tcp.py
index e7fadf4..432b9ff 100644
--- a/packet/transport/tcp.py
+++ b/packet/transport/tcp.py
@@ -20,6 +20,7 @@ RFC  793 TRANSMISSION CONTROL PROTOCOL
 RFC 2018 TCP Selective Acknowledgment Options
 RFC 7323 TCP Extensions for High Performance
 """
+import copy
 import nfstest_config as c
 from baseobj import BaseObj
 from packet.unpack import Unpack
@@ -27,7 +28,9 @@ from packet.transport.mpa import MPA
 from packet.application.dns import DNS
 from packet.application.rpc import RPC
 from packet.application.krb5 import KRB5
-from packet.utils import OptionFlags, ShortHex
+from packet.application.rpc_const import *
+from packet.application.rpc_creds import rpc_credential
+from packet.utils import OptionFlags, ShortHex, IntHex
 
 # Module constants
 __author__    = "Jorge Mora (%s)" % c.NFSTEST_AUTHOR_EMAIL
@@ -275,6 +278,35 @@ class TCP(BaseObj):
             # This is a re-transmission, do not process
             return
 
+        if self.flags.PSH:
+            if streamid not in pktt._tcp_pdu_map:
+                pktt._tcp_pdu_map[streamid] = unpack.getbytes()
+                pktt._tcp_pdu_pkt[streamid] = pktt.pkt
+            else:
+                # TCP PDU reassemble
+                pktt._tcp_pdu_map[streamid] += unpack.getbytes()
+                pktt._tcp_pdu_pkt[streamid] = pktt.pkt
+
+            pktt.unpack.replace_data(pktt._tcp_pdu_map[streamid])
+            pktt.pkt = pktt._tcp_pdu_pkt[streamid]
+
+            try:
+                self._has_rpc_header(pktt)
+                self._decode_payload(pktt, stream)
+                del pktt._tcp_pdu_map[streamid]
+                del pktt._tcp_pdu_pkt[streamid]
+
+                if self.length > 0:
+                    stream.last_seq = seq
+                    stream.next_seq = seq + self.length
+                    if self.seq_number + self.length > UINT32_MAX:
+                        # Next sequence number will wrap around
+                        stream.seq_wrap += UINT32_MAX + 1
+                        return
+            except:
+                # looks like parts of PDU?
+                return
+
         self._decode_payload(pktt, stream)
 
         if self.length > 0:
@@ -284,6 +316,76 @@ class TCP(BaseObj):
                 # Next sequence number will wrap around
                 stream.seq_wrap += UINT32_MAX + 1
 
+    def _has_rpc_header(self, pktt):
+        '''
+        Internal method, try to decode RPC header from unpack
+        Only TRY to decode, but save noting
+        '''
+        pktt_try_unpack = copy.copy(pktt.unpack)
+        try_init_size = pktt_try_unpack.size()
+        save_data = ''
+        while True:
+             # Decode fragment header
+            try_psize = pktt_try_unpack.unpack_uint()
+            size = (try_psize & 0x7FFFFFFF) + len(save_data)
+            last_fragment = (try_psize >> 31)
+            if size == 0:
+                 return False
+            if last_fragment == 0 and size < pktt_try_unpack.size():
+                # Save RPC fragment
+                save_data += pktt_try_unpack.read(size)
+            else:
+                if len(save_data):
+                    # Concatenate RPC fragments
+                    pktt_try_unpack.insert(save_data)
+                break
+
+        # Decode XID and RPC type
+        pktt_try_xid  = IntHex(pktt_try_unpack.unpack_uint())
+        pktt_try_type = pktt_try_unpack.unpack_uint()
+        if pktt_try_type == CALL:
+            # RPC call
+            rpc_version = pktt_try_unpack.unpack_uint()
+            program     = pktt_try_unpack.unpack_uint()
+            version     = pktt_try_unpack.unpack_uint()
+            procedure   = pktt_try_unpack.unpack_uint()
+            credential  = rpc_credential(pktt_try_unpack)
+            if not credential:
+                return
+            verifier = rpc_credential(pktt_try_unpack, True)
+            if rpc_version != 2 or (credential.flavor in [0,1] and not verifier):
+                return
+        elif type == REPLY and pktt.rpc_replies:
+            # RPC reply
+            reply_status = pktt_try_unpack.unpack_uint()
+            if reply_status == MSG_ACCEPTED:
+                verifier = rpc_credential(pktt_try_unpack, True)
+                if verifier:
+                    return
+                accepted_status = accept_stat_enum(pktt_try_unpack)
+                if accepted_status == PROG_MISMATCH:
+                    prog_mismatch = Prog(pktt_try_unpack)
+                elif accept_stat.get(accepted_status) is None:
+                    # Invalid accept_stat
+                    return
+            elif reply_status == MSG_DENIED:
+                rejected_status = reject_stat_enum(pktt_try_unpack)
+                if rejected_status == RPC_MISMATCH:
+                    rpc_mismatch = Prog(pktt_try_unpack)
+                elif rejected_status == AUTH_ERROR:
+                    auth_status = auth_stat_enum(pktt_try_unpack)
+                    if auth_stat.get(auth_status) is None:
+                        # Invalid auth_status
+                        return
+                elif reject_stat.get(rejected_status) is None:
+                    # Invalid rejected status
+                    return
+            elif reply_stat.get(reply_status) is None:
+                # Invalid reply status
+                return
+        else:
+            return
+
     def __str__(self):
         """String representation of object
 
diff --git a/packet/unpack.py b/packet/unpack.py
index 223cf60..603102e 100644
--- a/packet/unpack.py
+++ b/packet/unpack.py
@@ -194,6 +194,12 @@ class Unpack(object):
         self._state.append([sid, self._offset])
         return sid
 
+    def replace_data(self, data):
+        """replace current working buffer"""
+        self._data = data
+        self._offset = 0
+        self._state = []
+
     def restore_state(self, sid):
         """Restore state given by the state id"""
         max = len(self._state)
-- 
2.47.1





[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux