114 lines
4.0 KiB
Python
114 lines
4.0 KiB
Python
import ipaddress
|
|
import subprocess
|
|
import time
|
|
|
|
def get_available_subnets() -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
|
|
"""Uses the 'ip' command to get a list of available subnets on the system."""
|
|
r = subprocess.run(['ip', 'route', 'show'], stdout=subprocess.PIPE)
|
|
output = r.stdout.decode('utf-8')
|
|
return [ipaddress.ip_network(line.split()[0]) for line in output.splitlines() if 'scope link' in line]
|
|
|
|
|
|
def get_device_ips(serial: str, adb_path='adb') -> list[ipaddress.IPv4Address]:
|
|
"""Get all the IP addresses of the connected ADB device."""
|
|
r = subprocess.run([
|
|
adb_path, '-s', serial,
|
|
'shell', f"ip -f inet addr show | grep inet"
|
|
], stdout=subprocess.PIPE)
|
|
output = r.stdout.decode('utf-8')
|
|
ips = []
|
|
for line in output.splitlines():
|
|
parts = line.strip().split()
|
|
if len(parts) >= 2:
|
|
addr = ipaddress.ip_address(parts[1].split('/')[0])
|
|
|
|
if isinstance(addr, ipaddress.IPv6Address): # Skip IPv6 addresses
|
|
continue
|
|
|
|
if addr.is_loopback or addr.is_link_local or not addr.is_private: # Skip unwanted addresses
|
|
continue
|
|
|
|
ips.append(addr)
|
|
|
|
return ips
|
|
|
|
|
|
def get_best_connections(serial: str, adb_path='adb') -> list[ipaddress.IPv4Address]:
|
|
"""Get a list of the best IP addresses to connect to the device.
|
|
|
|
They are ordered by preference, with the most preferred first.
|
|
The 'best' IPs are defined as those that are in the first available routed subnet on the host machine.
|
|
"""
|
|
device_ips = get_device_ips(serial, adb_path)
|
|
if not device_ips:
|
|
return []
|
|
|
|
available_subnets = get_available_subnets()
|
|
best_ips = []
|
|
for subnet in available_subnets:
|
|
for ip in device_ips:
|
|
if ip in subnet:
|
|
best_ips.append(ip)
|
|
|
|
return best_ips
|
|
|
|
|
|
def get_available_port(serial: str, adb_path='adb') -> int | None:
|
|
"""Get an available TCP port on the device for ADB connection."""
|
|
for i in range(5555, 5586):
|
|
r = subprocess.run([
|
|
adb_path, '-s', serial,
|
|
'shell', f"netstat -an | grep :{i} "
|
|
f"| grep LISTEN"
|
|
], stdout=subprocess.PIPE)
|
|
output = r.stdout.decode('utf-8')
|
|
if not output.strip():
|
|
return i
|
|
|
|
return None
|
|
|
|
|
|
def reconnect_as_tcpip(serial: str, adb_path='adb') -> str:
|
|
"""Reconnect the ADB device as TCP/IP. Returns the new connection string."""
|
|
|
|
# Find the best IP address to connect to
|
|
best_ips = get_best_connections(serial, adb_path)
|
|
if not best_ips:
|
|
raise RuntimeError("No suitable IP address found on device for TCP connection.")
|
|
|
|
# Find an available port on the device
|
|
port = get_available_port(serial, adb_path)
|
|
if port is None:
|
|
raise RuntimeError("No available TCP port found on device for ADB connection.")
|
|
|
|
# Restart ADB in TCP mode
|
|
r = subprocess.run([
|
|
adb_path, '-s', serial,
|
|
'tcpip', str(port)
|
|
], stdout=subprocess.PIPE)
|
|
output = r.stdout.decode('utf-8')
|
|
if 'restarting in TCP mode' not in output:
|
|
# If this doesn't work the first time, usually a second attempt after a few seconds will work
|
|
time.sleep(2)
|
|
r = subprocess.run([
|
|
adb_path, '-s', serial,
|
|
'tcpip', str(port)
|
|
], stdout=subprocess.PIPE)
|
|
output = r.stdout.decode('utf-8')
|
|
if 'restarting in TCP mode' not in output:
|
|
raise RuntimeError(f"Failed to restart device in TCP mode: {output.strip()}")
|
|
|
|
# A slight delay is needed to allow the device to switch modes
|
|
time.sleep(2)
|
|
|
|
# Connect to the device over TCP/IP
|
|
r = subprocess.run([
|
|
adb_path, 'connect', f"{best_ips[0]}:{port}"
|
|
], stdout=subprocess.PIPE, timeout=5)
|
|
output = r.stdout.decode('utf-8')
|
|
if 'connected to' not in output:
|
|
raise RuntimeError(f"Failed to connect to device over TCP/IP: {output.strip()}")
|
|
|
|
connection_string = f"{best_ips[0]}:{port}"
|
|
return connection_string
|