[PATCH] hwsim tests: use rfkill python module

Johannes Berg johannes at sipsolutions.net
Thu Jan 8 08:59:16 EST 2015


From: Johannes Berg <johannes.berg at intel.com>

Instead of calling the rfkill binary, use the built-in module.

Change-Id: Ie92f82a52e0732a00f10147949322dc7c42e24bd
Signed-off-by: Johannes Berg <johannes.berg at intel.com>
---
 tests/hwsim/test_rfkill.py | 81 ++++++++++++++++++++++------------------------
 tests/hwsim/test_wext.py   | 13 ++++----
 2 files changed, 45 insertions(+), 49 deletions(-)

diff --git a/tests/hwsim/test_rfkill.py b/tests/hwsim/test_rfkill.py
index a0b5b2aca23d..27c620f4e5b2 100644
--- a/tests/hwsim/test_rfkill.py
+++ b/tests/hwsim/test_rfkill.py
@@ -6,39 +6,36 @@
 
 import logging
 logger = logging.getLogger()
-import subprocess
 import time
 
 import hostapd
 from hostapd import HostapdGlobal
 import hwsim_utils
 from wpasupplicant import WpaSupplicant
+from rfkill import RFKill
 
-def get_rfkill_id(dev):
+def get_rfkill(dev):
+    phy = dev.get_driver_status_field("phyname")
     try:
-        cmd = subprocess.Popen(["rfkill", "list"], stdout=subprocess.PIPE)
+        for r, s, h in RFKill.list():
+            if r.name == phy:
+                return r
     except Exception, e:
         logger.info("No rfkill available: " + str(e))
-        return None
-    res = cmd.stdout.read()
-    cmd.stdout.close()
-    phy = dev.get_driver_status_field("phyname")
-    matches = [ line for line in res.splitlines() if phy + ':' in line ]
-    if len(matches) != 1:
-        return None
-    return matches[0].split(':')[0]
+
+    return None
 
 def test_rfkill_open(dev, apdev):
     """rfkill block/unblock during open mode connection"""
-    id = get_rfkill_id(dev[0])
-    if id is None:
+    rfk = get_rfkill(dev[0])
+    if rfk is None:
         return "skip"
 
     hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
     dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
     try:
         logger.info("rfkill block")
-        subprocess.call(['sudo', 'rfkill', 'block', id])
+        rfk.block()
         dev[0].wait_disconnected(timeout=10,
                                  error="Missing disconnection event on rfkill block")
 
@@ -52,17 +49,17 @@ def test_rfkill_open(dev, apdev):
             raise Exception("FETCH_OSU accepted while disabled")
 
         logger.info("rfkill unblock")
-        subprocess.call(['sudo', 'rfkill', 'unblock', id])
+        rfk.unblock()
         dev[0].wait_connected(timeout=10,
                               error="Missing connection event on rfkill unblock")
         hwsim_utils.test_connectivity(dev[0], hapd)
     finally:
-        subprocess.call(['sudo', 'rfkill', 'unblock', id])
+        rfk.unblock()
 
 def test_rfkill_wpa2_psk(dev, apdev):
     """rfkill block/unblock during WPA2-PSK connection"""
-    id = get_rfkill_id(dev[0])
-    if id is None:
+    rfk = get_rfkill(dev[0])
+    if rfk is None:
         return "skip"
 
     ssid = "test-wpa2-psk"
@@ -72,25 +69,25 @@ def test_rfkill_wpa2_psk(dev, apdev):
     dev[0].connect(ssid, psk=passphrase, scan_freq="2412")
     try:
         logger.info("rfkill block")
-        subprocess.call(['sudo', 'rfkill', 'block', id])
+        rfk.block()
         dev[0].wait_disconnected(timeout=10,
                                  error="Missing disconnection event on rfkill block")
 
         logger.info("rfkill unblock")
-        subprocess.call(['sudo', 'rfkill', 'unblock', id])
+        rfk.unblock()
         dev[0].wait_connected(timeout=10,
                               error="Missing connection event on rfkill unblock")
         hwsim_utils.test_connectivity(dev[0], hapd)
     finally:
-        subprocess.call(['sudo', 'rfkill', 'unblock', id])
+        rfk.unblock()
 
 def test_rfkill_autogo(dev, apdev):
     """rfkill block/unblock for autonomous P2P GO"""
-    id0 = get_rfkill_id(dev[0])
-    if id0 is None:
+    rfk0 = get_rfkill(dev[0])
+    if rfk0 is None:
         return "skip"
-    id1 = get_rfkill_id(dev[1])
-    if id1 is None:
+    rfk1 = get_rfkill(dev[1])
+    if rfk1 is None:
         return "skip"
 
     dev[0].p2p_start_go()
@@ -99,7 +96,7 @@ def test_rfkill_autogo(dev, apdev):
 
     try:
         logger.info("rfkill block 0")
-        subprocess.call(['sudo', 'rfkill', 'block', id0])
+        rfk0.block()
         ev = dev[0].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
         if ev is None:
             raise Exception("Group removal not reported")
@@ -111,7 +108,7 @@ def test_rfkill_autogo(dev, apdev):
             raise Exception("P2P_LISTEN accepted unexpectedly")
 
         logger.info("rfkill block 1")
-        subprocess.call(['sudo', 'rfkill', 'block', id1])
+        rfk1.block()
         ev = dev[1].wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
         if ev is None:
             raise Exception("Group removal not reported")
@@ -119,28 +116,28 @@ def test_rfkill_autogo(dev, apdev):
             raise Exception("Unexpected group removal reason: " + ev)
 
         logger.info("rfkill unblock 0")
-        subprocess.call(['sudo', 'rfkill', 'unblock', id0])
+        rfk0.unblock()
         logger.info("rfkill unblock 1")
-        subprocess.call(['sudo', 'rfkill', 'unblock', id1])
+        rfk1.unblock()
         time.sleep(1)
     finally:
-        subprocess.call(['sudo', 'rfkill', 'unblock', id0])
-        subprocess.call(['sudo', 'rfkill', 'unblock', id1])
+        rfk0.unblock()
+        rfk1.unblock()
 
 def test_rfkill_hostapd(dev, apdev):
     """rfkill block/unblock during and prior to hostapd operations"""
     hapd = hostapd.add_ap(apdev[0]['ifname'], { "ssid": "open" })
 
-    id = get_rfkill_id(hapd)
-    if id is None:
+    rfk = get_rfkill(hapd)
+    if rfk is None:
         return "skip"
 
     try:
-        subprocess.call(['rfkill', 'block', id])
+        rfk.block()
         ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5)
         if ev is None:
             raise Exception("INTERFACE-DISABLED event not seen")
-        subprocess.call(['rfkill', 'unblock', id])
+        rfk.unblock()
         ev = hapd.wait_event(["INTERFACE-ENABLED"], timeout=5)
         if ev is None:
             raise Exception("INTERFACE-ENABLED event not seen")
@@ -148,7 +145,7 @@ def test_rfkill_hostapd(dev, apdev):
         hapd.disable()
         hapd.enable()
         dev[0].connect("open", key_mgmt="NONE", scan_freq="2412")
-        subprocess.call(['rfkill', 'block', id])
+        rfk.block()
         ev = hapd.wait_event(["INTERFACE-DISABLED"], timeout=5)
         if ev is None:
             raise Exception("INTERFACE-DISABLED event not seen")
@@ -165,27 +162,27 @@ def test_rfkill_hostapd(dev, apdev):
         if "FAIL" not in hapd.request("ENABLE"):
             raise Exception("ENABLE succeeded unexpectedly (rfkill)")
     finally:
-        subprocess.call(['rfkill', 'unblock', id])
+        rfk.unblock()
 
 def test_rfkill_wpas(dev, apdev):
     """rfkill block prior to wpa_supplicant start"""
     wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
     wpas.interface_add("wlan5")
-    id = get_rfkill_id(wpas)
-    if id is None:
+    rfk = get_rfkill(wpas)
+    if rfk is None:
         return "skip"
     wpas.interface_remove("wlan5")
     try:
-        subprocess.call(['rfkill', 'block', id])
+        rfk.block()
         wpas.interface_add("wlan5")
         time.sleep(0.5)
         state = wpas.get_status_field("wpa_state")
         if state != "INTERFACE_DISABLED":
             raise Exception("Unexpected state with rfkill blocked: " + state)
-        subprocess.call(['rfkill', 'unblock', id])
+        rfk.unblock()
         time.sleep(0.5)
         state = wpas.get_status_field("wpa_state")
         if state == "INTERFACE_DISABLED":
             raise Exception("Unexpected state with rfkill unblocked: " + state)
     finally:
-        subprocess.call(['rfkill', 'unblock', id])
+        rfk.unblock()
diff --git a/tests/hwsim/test_wext.py b/tests/hwsim/test_wext.py
index 1efb5c91c33d..97f33284930d 100644
--- a/tests/hwsim/test_wext.py
+++ b/tests/hwsim/test_wext.py
@@ -7,12 +7,11 @@
 import logging
 logger = logging.getLogger()
 import os
-import subprocess
 
 import hostapd
 import hwsim_utils
 from wpasupplicant import WpaSupplicant
-from test_rfkill import get_rfkill_id
+from test_rfkill import get_rfkill
 
 def get_wext_interface():
     if not os.path.exists("/proc/net/wireless"):
@@ -244,8 +243,8 @@ def test_wext_rfkill(dev, apdev):
     """WEXT and rfkill block/unblock"""
     wpas = WpaSupplicant(global_iface='/tmp/wpas-wlan5')
     wpas.interface_add("wlan5")
-    id = get_rfkill_id(wpas)
-    if id is None:
+    rfk = get_rfkill(wpas)
+    if rfk is None:
         return "skip"
     wpas.interface_remove("wlan5")
 
@@ -257,14 +256,14 @@ def test_wext_rfkill(dev, apdev):
     wpas.connect("open", key_mgmt="NONE", scan_freq="2412")
     try:
         logger.info("rfkill block")
-        subprocess.call(['rfkill', 'block', id])
+        rfk.block()
         wpas.wait_disconnected(timeout=10,
                                error="Missing disconnection event on rfkill block")
 
         logger.info("rfkill unblock")
-        subprocess.call(['rfkill', 'unblock', id])
+        rfk.unblock()
         wpas.wait_connected(timeout=20,
                             error="Missing connection event on rfkill unblock")
         hwsim_utils.test_connectivity(wpas, hapd)
     finally:
-        subprocess.call(['rfkill', 'unblock', id])
+        rfk.unblock()
-- 
2.1.1



More information about the HostAP mailing list