TCP/IP reconnect
This commit is contained in:
113
tcpip.py
Normal file
113
tcpip.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import ipaddress
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
def get_available_subnets() -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
|
||||
"""Uses the 'ip' command to get a list of available subnets on the system."""
|
||||
r = subprocess.run(['ip', 'route', 'show'], stdout=subprocess.PIPE)
|
||||
output = r.stdout.decode('utf-8')
|
||||
return [ipaddress.ip_network(line.split()[0]) for line in output.splitlines() if 'scope link' in line]
|
||||
|
||||
|
||||
def get_device_ips(serial: str, adb_path='adb') -> list[ipaddress.IPv4Address]:
|
||||
"""Get all the IP addresses of the connected ADB device."""
|
||||
r = subprocess.run([
|
||||
adb_path, '-s', serial,
|
||||
'shell', f"ip -f inet addr show | grep inet"
|
||||
], stdout=subprocess.PIPE)
|
||||
output = r.stdout.decode('utf-8')
|
||||
ips = []
|
||||
for line in output.splitlines():
|
||||
parts = line.strip().split()
|
||||
if len(parts) >= 2:
|
||||
addr = ipaddress.ip_address(parts[1].split('/')[0])
|
||||
|
||||
if isinstance(addr, ipaddress.IPv6Address): # Skip IPv6 addresses
|
||||
continue
|
||||
|
||||
if addr.is_loopback or addr.is_link_local or not addr.is_private: # Skip unwanted addresses
|
||||
continue
|
||||
|
||||
ips.append(addr)
|
||||
|
||||
return ips
|
||||
|
||||
|
||||
def get_best_connections(serial: str, adb_path='adb') -> list[ipaddress.IPv4Address]:
|
||||
"""Get a list of the best IP addresses to connect to the device.
|
||||
|
||||
They are ordered by preference, with the most preferred first.
|
||||
The 'best' IPs are defined as those that are in the first available routed subnet on the host machine.
|
||||
"""
|
||||
device_ips = get_device_ips(serial, adb_path)
|
||||
if not device_ips:
|
||||
return []
|
||||
|
||||
available_subnets = get_available_subnets()
|
||||
best_ips = []
|
||||
for subnet in available_subnets:
|
||||
for ip in device_ips:
|
||||
if ip in subnet:
|
||||
best_ips.append(ip)
|
||||
|
||||
return best_ips
|
||||
|
||||
|
||||
def get_available_port(serial: str, adb_path='adb') -> int | None:
|
||||
"""Get an available TCP port on the device for ADB connection."""
|
||||
for i in range(5555, 5586):
|
||||
r = subprocess.run([
|
||||
adb_path, '-s', serial,
|
||||
'shell', f"netstat -an | grep :{i} "
|
||||
f"| grep LISTEN"
|
||||
], stdout=subprocess.PIPE)
|
||||
output = r.stdout.decode('utf-8')
|
||||
if not output.strip():
|
||||
return i
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def reconnect_as_tcpip(serial: str, adb_path='adb') -> str:
|
||||
"""Reconnect the ADB device as TCP/IP. Returns the new connection string."""
|
||||
|
||||
# Find the best IP address to connect to
|
||||
best_ips = get_best_connections(serial, adb_path)
|
||||
if not best_ips:
|
||||
raise RuntimeError("No suitable IP address found on device for TCP connection.")
|
||||
|
||||
# Find an available port on the device
|
||||
port = get_available_port(serial, adb_path)
|
||||
if port is None:
|
||||
raise RuntimeError("No available TCP port found on device for ADB connection.")
|
||||
|
||||
# Restart ADB in TCP mode
|
||||
r = subprocess.run([
|
||||
adb_path, '-s', serial,
|
||||
'tcpip', str(port)
|
||||
], stdout=subprocess.PIPE)
|
||||
output = r.stdout.decode('utf-8')
|
||||
if 'restarting in TCP mode' not in output:
|
||||
# If this doesn't work the first time, usually a second attempt after a few seconds will work
|
||||
time.sleep(2)
|
||||
r = subprocess.run([
|
||||
adb_path, '-s', serial,
|
||||
'tcpip', str(port)
|
||||
], stdout=subprocess.PIPE)
|
||||
output = r.stdout.decode('utf-8')
|
||||
if 'restarting in TCP mode' not in output:
|
||||
raise RuntimeError(f"Failed to restart device in TCP mode: {output.strip()}")
|
||||
|
||||
# A slight delay is needed to allow the device to switch modes
|
||||
time.sleep(2)
|
||||
|
||||
# Connect to the device over TCP/IP
|
||||
r = subprocess.run([
|
||||
adb_path, 'connect', f"{best_ips[0]}:{port}"
|
||||
], stdout=subprocess.PIPE, timeout=5)
|
||||
output = r.stdout.decode('utf-8')
|
||||
if 'connected to' not in output:
|
||||
raise RuntimeError(f"Failed to connect to device over TCP/IP: {output.strip()}")
|
||||
|
||||
connection_string = f"{best_ips[0]}:{port}"
|
||||
return connection_string
|
||||
Reference in New Issue
Block a user