From: Benjamin Berg <benjamin.berg@xxxxxxxxx> Add a test that injects various problematic ML elements and verify that MLO is disabled in that case. Signed-off-by: Benjamin Berg <benjamin.berg@xxxxxxxxx> --- tests/hwsim/test_eht.py | 117 +++++++++++++++++++++++++++++++++++ tests/hwsim/wpasupplicant.py | 44 +++++++++++++ 2 files changed, 161 insertions(+) diff --git a/tests/hwsim/test_eht.py b/tests/hwsim/test_eht.py index f254d00b3f..42904a9401 100644 --- a/tests/hwsim/test_eht.py +++ b/tests/hwsim/test_eht.py @@ -2411,6 +2411,123 @@ def test_eht_mlo_color_change(dev, apdev): hapd0.dump_monitor() hapd1.dump_monitor() +def test_eht_mld_invalid_ml_elem(dev, apdev): + """EHT AP MLD with various errors in its beacon""" + + cases = { + '2nd link misses ML elem': { + 'hapd1_ml_elem': '', + 'hapd1_parse_error': True, + }, + '2nd link has wrong MLD capa with 3 maximum links' : { + 'hapd1_ml_elem': 'ff106bb0010d__MLD_ADDR__010181000200', + }, + '2nd link has incorrect MLD addr': { + 'hapd1_ml_elem': 'ff106bb0010d112233445566010181000100', + }, + '2nd link missing EMLSR': { + 'hapd1_ml_elem': 'ff106bb0010d__MLD_ADDR__010101000100', + }, + '2nd link missing MLD capa': { + 'hapd1_ml_elem': 'ff0e6bb0000b__MLD_ADDR__01018100', + 'hapd1_parse_error': True + }, + '2nd link added/different extended MLD capa': { + 'hapd1_ml_elem': 'ff126bb0050f__MLD_ADDR__0101810001000100', + 'hapd1_parse_error': True + }, + '1st link missing MLD capa': { + 'hapd0_ml_elem': 'ff0e6bb0000b__MLD_ADDR__00018100', + 'hapd0_parse_error': True + }, + } + + with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ + HWSimRadio(use_mlo=True) as (wpas_radio, wpas_iface): + + wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5') + wpas.interface_add(wpas_iface) + + passphrase = 'qwertyuiop' + ssid = "mld_ap_two_links" + + params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe="1") + hapd0 = eht_mld_enable_ap(hapd_iface, 0, params) + + params = eht_mld_ap_wpa2_params(ssid, passphrase, key_mgmt="SAE", mfp="2", pwe="1") + params['channel'] = '6' + hapd1 = eht_mld_enable_ap(hapd_iface, 1, params) + + # hapd0 as the primary link + wpas.set("mld_connect_bssid_pref", hapd0.own_addr()) + wpas.set("sae_pwe", "1") + wpas.connect(ssid, scan_freq="2412 2437", psk=passphrase, key_mgmt="SAE", ieee80211w="2") + + eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=3, active_links=3) + eht_verify_wifi_version(wpas) + + # Grab the BSS information + hapd0_bss = wpas.get_bss(hapd0.own_addr()) + hapd1_bss = wpas.get_bss(hapd1.own_addr()) + + assert wpas.request('DISCONNECT').strip() == 'OK' + + # Iterate the testcases + for name, case in cases.items(): + logger.info(f"Testing case: {name}") + wpas.scan() + time.sleep(5) + + new_hapd1_bss = hapd1_bss.copy() + if 'beacon_ie' in new_hapd1_bss: + del new_hapd1_bss['beacon_ie'] + if 'hapd1_ml_elem' in case: + ml_addr = hapd0.own_mld_addr().replace(":", "") + orig_ml_elem = f'ff106bb0010d{ml_addr}010181000100' + new_ml_elem = case['hapd1_ml_elem'].replace('__MLD_ADDR__', ml_addr) + new_hapd1_bss['ie'] = \ + hapd1_bss['ie'].replace(orig_ml_elem, new_ml_elem) + + new_hapd1_bss['est_throughput'] = 1 + + # Always override hapd1 to lower est_throughput + override = { + new_hapd1_bss['bssid']: new_hapd1_bss + } + + if 'hapd0_ml_elem' in case: + new_hapd0_bss = hapd0_bss.copy() + if 'beacon_ie' in new_hapd0_bss: + del new_hapd0_bss['beacon_ie'] + + ml_addr = hapd0.own_mld_addr().replace(":", "") + orig_ml_elem = f'ff106bb0010d{ml_addr}000181000100' + new_ml_elem = case['hapd0_ml_elem'].replace('__MLD_ADDR__', ml_addr) + new_hapd0_bss['ie'] = \ + hapd0_bss['ie'].replace(orig_ml_elem, new_ml_elem) + + override[new_hapd0_bss['bssid']] = new_hapd0_bss + + wpas.override_scan(override) + + # Ensure we could parse the ML information + if not case.get('hapd0_parse_error', False): + assert 'ap_mld_addr' in wpas.get_bss(hapd0_bss['bssid']) + else: + assert 'ap_mld_addr' not in wpas.get_bss(hapd0_bss['bssid']) + # Nothing else to test, not connecting in MLO mode + continue + if not case.get('hapd1_parse_error', False): + assert 'ap_mld_addr' in wpas.get_bss(hapd1_bss['bssid']) + + # This will connect to hapd0, but the second link is ignored + wpas.connect(ssid, psk=passphrase, key_mgmt="SAE", ieee80211w="2") + + eht_verify_status(wpas, hapd0, 2412, 20, is_ht=True, mld=True, valid_links=1, active_links=1) + eht_verify_wifi_version(wpas) + + assert wpas.request('DISCONNECT').strip() == 'OK' + def test_eht_mld_control_socket_connectivity(dev, apdev): """AP MLD control socket connectivity""" with HWSimRadio(use_mlo=True) as (hapd_radio, hapd_iface), \ diff --git a/tests/hwsim/wpasupplicant.py b/tests/hwsim/wpasupplicant.py index 4ea27a771b..6798748ca8 100644 --- a/tests/hwsim/wpasupplicant.py +++ b/tests/hwsim/wpasupplicant.py @@ -1242,6 +1242,50 @@ class WpaSupplicant: if len(res.splitlines()) > 1: logger.info("flush_scan_cache: Could not clear all BSS entries. These remain:\n" + res) + def override_scan(self, inject): + """ + Simulate a scan with all existing BSSs overriding/adding any BSS listed + in the inject dictionary. + """ + attrs = { + 'bssid': 'bssid', + 'freq': 'freq', + 'beacon_int': 'beacon_int', + 'caps': 'capabilities', + 'qual': 'qual', + 'noise': 'noise', + 'level': 'level', + 'est_throughput': 'est_throughput', + 'snr': 'snr', + 'ie': 'ie', + 'beacon_ie': 'beacon_ie', + } + + all_aps = {} + + # Grab all existing BSSs, so that they are part of the "last" scan + for bssid in re.findall('bssid=(?P<bssid>.*?)\n', self.request("BSS RANGE=ALL MASK=0x2")): + scan_bss = self.get_bss(bssid) + all_aps[bssid] = scan_bss + + # Override the one we want to inject + all_aps.update(inject) + + assert self.request('DRIVER_EVENT SCAN_RES START').strip() == 'OK' + + # And, insert them all + for bss in all_aps.values(): + bss_info = ' '.join(f'{k}={bss[v]}' for k, v in attrs.items() if v in bss) + + if 'tsf' in bss: + bss_info += (f' tsf={int(bss["tsf"]):x}') + + if bss['bssid'] in inject: + logger.info(f'Injecting BSS: {bss_info}') + assert self.request(f'DRIVER_EVENT SCAN_RES BSS {bss_info}').strip() == 'OK' + + assert self.request('DRIVER_EVENT SCAN_RES END').strip() == 'OK' + def disconnect_and_stop_scan(self): self.request("DISCONNECT") res = self.request("ABORT_SCAN") -- 2.49.0 _______________________________________________ Hostap mailing list Hostap@xxxxxxxxxxxxxxxxxxx http://lists.infradead.org/mailman/listinfo/hostap