From 9e4571ba3133211cad5d25451a1285c8aa1cb21a Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Sat, 4 Apr 2026 11:39:45 +0000 Subject: [PATCH 1/2] perf: reduce screenshot encoding time and XML parsing overhead - Scale down screenshots to 0.5x (configurable via SCREENSHOT_SCALE env) before annotation and encoding, cutting pixel count by 75% - Set PNG compress_level=1 in screenshot_in_bytes and as_base64 for 3-5x faster encoding with slightly larger output - Pre-compile bounds regex in tree/utils.py to avoid recompilation on every node during XML tree parsing - Move threading import to module level in mobile/service.py - Remove asyncio.sleep(1) from lifespan to eliminate startup delay Co-authored-by: Sanjar Afaq --- src/android_mcp/__main__.py | 2 -- src/android_mcp/mobile/service.py | 13 +++++++++---- src/android_mcp/tree/utils.py | 4 +++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/android_mcp/__main__.py b/src/android_mcp/__main__.py index bc3f454..58401aa 100644 --- a/src/android_mcp/__main__.py +++ b/src/android_mcp/__main__.py @@ -3,7 +3,6 @@ from dataclasses import dataclass from textwrap import dedent from typing import Literal, Optional -import asyncio import os from fastmcp import FastMCP @@ -185,7 +184,6 @@ def _connect_preferred_device() -> None: @asynccontextmanager async def lifespan(app: FastMCP): """Runs initialization code before the server starts and cleanup code after it shuts down.""" - await asyncio.sleep(1) yield diff --git a/src/android_mcp/mobile/service.py b/src/android_mcp/mobile/service.py index c45da89..7652946 100644 --- a/src/android_mcp/mobile/service.py +++ b/src/android_mcp/mobile/service.py @@ -4,6 +4,7 @@ from io import BytesIO from PIL import Image import subprocess +import threading import base64 import os from typing import Optional @@ -90,7 +91,6 @@ def get_device(self): return self.device def capture_data(self, use_vision: bool = True): - import threading data = {} def get_xml(): @@ -130,8 +130,13 @@ def get_state(self, use_vision=False, as_bytes: bool = False, as_base64: bool = if use_vision: nodes = tree_state.interactive_elements + scale = float(os.getenv("SCREENSHOT_SCALE", "0.5")) + w, h = screenshot_data.size + screenshot_data = screenshot_data.resize( + (int(w * scale), int(h * scale)), Image.Resampling.LANCZOS + ) if use_annotation: - screenshot = tree.annotated_screenshot(nodes=nodes, scale=1.0, screenshot=screenshot_data) + screenshot = tree.annotated_screenshot(nodes=nodes, scale=scale, screenshot=screenshot_data) else: screenshot = screenshot_data if os.getenv("SCREENSHOT_QUANTIZED") in ["1", "yes", "true", True]: @@ -172,7 +177,7 @@ def screenshot_in_bytes(self,screenshot:Image.Image)->bytes: if screenshot is None: raise ValueError("Screenshot is None") io=BytesIO() - screenshot.save(io,format='PNG') + screenshot.save(io,format='PNG',compress_level=1) bytes=io.getvalue() if len(bytes) == 0: raise ValueError("Screenshot conversion resulted in empty bytes.") @@ -185,7 +190,7 @@ def as_base64(self,screenshot:Image.Image)->str: if screenshot is None: raise ValueError("Screenshot is None") io=BytesIO() - screenshot.save(io,format='PNG') + screenshot.save(io,format='PNG',compress_level=1) bytes=io.getvalue() if len(bytes) == 0: raise ValueError("Screenshot conversion resulted in empty bytes.") diff --git a/src/android_mcp/tree/utils.py b/src/android_mcp/tree/utils.py index 3dc630e..939e8ee 100644 --- a/src/android_mcp/tree/utils.py +++ b/src/android_mcp/tree/utils.py @@ -1,9 +1,11 @@ import re +_BOUNDS_RE = re.compile(r'\[(\d+),(\d+)]\[(\d+),(\d+)]') + def extract_cordinates(node): attributes = node.attrib bounds=attributes.get('bounds') - match = re.search(r'\[(\d+),(\d+)]\[(\d+),(\d+)]', bounds) + match = _BOUNDS_RE.search(bounds) if match: x1, y1, x2, y2 = map(int, match.groups()) return x1, y1, x2, y2 From 48029ae7ecf1aacee58e5ca8261d45208aca2abe Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Sat, 4 Apr 2026 12:24:06 +0000 Subject: [PATCH 2/2] perf: add timing instrumentation to log performance metrics - Add perf_log.py with timed() context manager writing to {branch-name}.log - Instrument capture_data: dump_hierarchy, screenshot, total parallel time - Instrument get_state: capture, tree parsing, resize, annotation, encoding - Instrument get_element_tree, get_interactive_elements, annotated_screenshot - Add *.log to .gitignore so log files are not tracked Co-authored-by: Sanjar Afaq --- .gitignore | 5 +++- src/android_mcp/mobile/service.py | 41 ++++++++++++++++---------- src/android_mcp/perf_log.py | 48 +++++++++++++++++++++++++++++++ src/android_mcp/tree/service.py | 48 +++++++++++++++++-------------- 4 files changed, 105 insertions(+), 37 deletions(-) create mode 100644 src/android_mcp/perf_log.py diff --git a/.gitignore b/.gitignore index fa1d20d..d0dd712 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,7 @@ wheels/ # Virtual environments .venv *.mcpb -notebook.ipynb \ No newline at end of file +notebook.ipynb + +# Performance log files (branch-name.log) +*.log \ No newline at end of file diff --git a/src/android_mcp/mobile/service.py b/src/android_mcp/mobile/service.py index 7652946..9f9a25a 100644 --- a/src/android_mcp/mobile/service.py +++ b/src/android_mcp/mobile/service.py @@ -1,5 +1,6 @@ from android_mcp.mobile.views import MobileState from android_mcp.tree.service import Tree +from android_mcp.perf_log import timed, log_separator import uiautomator2 as u2 from io import BytesIO from PIL import Image @@ -95,14 +96,15 @@ def capture_data(self, use_vision: bool = True): def get_xml(): try: - data['xml'] = self.device.dump_hierarchy() + with timed("capture_data.dump_hierarchy"): + data['xml'] = self.device.dump_hierarchy() except Exception as e: data['xml_error'] = e def get_img(): try: - # Use format="pillow" to ensure we get a PIL image immediately - data['img'] = self.device.screenshot(format="pillow") + with timed("capture_data.screenshot"): + data['img'] = self.device.screenshot(format="pillow") except Exception as e: data['img_error'] = e @@ -112,8 +114,9 @@ def get_img(): for t in threads: t.start() - for t in threads: - t.join() + with timed("capture_data.total_parallel"): + for t in threads: + t.join() if 'xml_error' in data: raise data['xml_error'] @@ -124,23 +127,29 @@ def get_img(): def get_state(self, use_vision=False, as_bytes: bool = False, as_base64: bool = False, use_annotation: bool = True): try: - xml_data, screenshot_data = self.capture_data(use_vision=use_vision) - tree = Tree(self) - tree_state = tree.get_state(xml_data=xml_data) + log_separator(f"get_state use_vision={use_vision}") + with timed("get_state.capture_data"): + xml_data, screenshot_data = self.capture_data(use_vision=use_vision) + with timed("get_state.tree_state"): + tree = Tree(self) + tree_state = tree.get_state(xml_data=xml_data) if use_vision: nodes = tree_state.interactive_elements scale = float(os.getenv("SCREENSHOT_SCALE", "0.5")) w, h = screenshot_data.size - screenshot_data = screenshot_data.resize( - (int(w * scale), int(h * scale)), Image.Resampling.LANCZOS - ) + with timed("get_state.screenshot_resize"): + screenshot_data = screenshot_data.resize( + (int(w * scale), int(h * scale)), Image.Resampling.LANCZOS + ) if use_annotation: - screenshot = tree.annotated_screenshot(nodes=nodes, scale=scale, screenshot=screenshot_data) + with timed("get_state.annotated_screenshot"): + screenshot = tree.annotated_screenshot(nodes=nodes, scale=scale, screenshot=screenshot_data) else: screenshot = screenshot_data if os.getenv("SCREENSHOT_QUANTIZED") in ["1", "yes", "true", True]: - screenshot = self.quantized_screenshot(screenshot) + with timed("get_state.quantize"): + screenshot = self.quantized_screenshot(screenshot) if as_base64: screenshot = self.as_base64(screenshot) @@ -177,7 +186,8 @@ def screenshot_in_bytes(self,screenshot:Image.Image)->bytes: if screenshot is None: raise ValueError("Screenshot is None") io=BytesIO() - screenshot.save(io,format='PNG',compress_level=1) + with timed("screenshot_in_bytes.png_save"): + screenshot.save(io,format='PNG',compress_level=1) bytes=io.getvalue() if len(bytes) == 0: raise ValueError("Screenshot conversion resulted in empty bytes.") @@ -190,7 +200,8 @@ def as_base64(self,screenshot:Image.Image)->str: if screenshot is None: raise ValueError("Screenshot is None") io=BytesIO() - screenshot.save(io,format='PNG',compress_level=1) + with timed("as_base64.png_save"): + screenshot.save(io,format='PNG',compress_level=1) bytes=io.getvalue() if len(bytes) == 0: raise ValueError("Screenshot conversion resulted in empty bytes.") diff --git a/src/android_mcp/perf_log.py b/src/android_mcp/perf_log.py new file mode 100644 index 0000000..8b87c27 --- /dev/null +++ b/src/android_mcp/perf_log.py @@ -0,0 +1,48 @@ +""" +Performance instrumentation utility. +Timing data is appended to {branch-name}.log in the working directory. +""" +import subprocess +import time +from contextlib import contextmanager + + +def _get_branch_name() -> str: + try: + result = subprocess.run( + ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], + capture_output=True, text=True, timeout=5 + ) + branch = result.stdout.strip() + if branch and branch != 'HEAD': + return branch.replace('/', '-') + except Exception: + pass + return 'unknown' + + +_LOG_FILE = f"{_get_branch_name()}.log" + + +def _write(line: str) -> None: + try: + with open(_LOG_FILE, 'a') as f: + f.write(line + '\n') + except Exception: + pass + + +def log_separator(label: str = "") -> None: + """Write a section separator, e.g. at the start of a Snapshot call.""" + import datetime + ts = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] + _write(f"\n--- {label} [{ts}] ---") + + +@contextmanager +def timed(label: str): + """Context manager that times a block and appends the result to the log file.""" + t0 = time.perf_counter() + yield + elapsed_ms = (time.perf_counter() - t0) * 1000 + _write(f" {label}: {elapsed_ms:.1f}ms") diff --git a/src/android_mcp/tree/service.py b/src/android_mcp/tree/service.py index 6f0909d..4754d3e 100644 --- a/src/android_mcp/tree/service.py +++ b/src/android_mcp/tree/service.py @@ -1,6 +1,7 @@ from android_mcp.tree.views import TreeState, ElementNode, CenterCord, BoundingBox from android_mcp.tree.utils import extract_cordinates,get_center_cordinates from android_mcp.tree.config import INTERACTIVE_CLASSES +from android_mcp.perf_log import timed from PIL import Image, ImageFont, ImageDraw from xml.etree.ElementTree import Element from xml.etree import ElementTree @@ -20,7 +21,8 @@ def __init__(self,mobile:'Mobile'): def get_element_tree(self, xml_data=None)->'Element': tree_string = xml_data if xml_data else self.mobile.device.dump_hierarchy() - return ElementTree.fromstring(tree_string) + with timed("get_element_tree.fromstring"): + return ElementTree.fromstring(tree_string) def get_state(self, xml_data=None)->TreeState: interactive_elements=self.get_interactive_elements(xml_data=xml_data) @@ -29,23 +31,25 @@ def get_state(self, xml_data=None)->TreeState: def get_interactive_elements(self, xml_data=None)->list: interactive_elements=[] element_tree = self.get_element_tree(xml_data=xml_data) - nodes=element_tree.findall('.//node[@enabled="true"]') - for node in nodes: - if self.is_interactive(node): - x1,y1,x2,y2 = extract_cordinates(node) - name=self.get_element_name(node) - if not name: - continue - x_center,y_center = get_center_cordinates((x1,y1,x2,y2)) - raw_id=node.get('resource-id','') - short_id=raw_id.split('/')[-1] if '/' in raw_id else raw_id - interactive_elements.append(ElementNode(**{ - 'name':name, - 'class_name':node.get('class'), - 'coordinates':CenterCord(x=x_center,y=y_center), - 'bounding_box':BoundingBox(x1=x1,y1=y1,x2=x2,y2=y2), - 'resource_id':short_id - })) + with timed("get_interactive_elements.findall"): + nodes=element_tree.findall('.//node[@enabled="true"]') + with timed("get_interactive_elements.filter_loop"): + for node in nodes: + if self.is_interactive(node): + x1,y1,x2,y2 = extract_cordinates(node) + name=self.get_element_name(node) + if not name: + continue + x_center,y_center = get_center_cordinates((x1,y1,x2,y2)) + raw_id=node.get('resource-id','') + short_id=raw_id.split('/')[-1] if '/' in raw_id else raw_id + interactive_elements.append(ElementNode(**{ + 'name':name, + 'class_name':node.get('class'), + 'coordinates':CenterCord(x=x_center,y=y_center), + 'bounding_box':BoundingBox(x1=x1,y1=y1,x2=x2,y2=y2), + 'resource_id':short_id + })) return interactive_elements def get_element_name(self, node) -> str: @@ -95,7 +99,8 @@ def is_interactive(self, node) -> bool: def annotated_screenshot(self, nodes: list[ElementNode],scale:float=0.7, screenshot=None) -> Image.Image: if screenshot is None: - screenshot = self.mobile.get_screenshot(scale=scale) + with timed("annotated_screenshot.get_screenshot"): + screenshot = self.mobile.get_screenshot(scale=scale) draw = ImageDraw.Draw(screenshot) font_size = 12 @@ -135,7 +140,8 @@ def draw_annotation(label, node: ElementNode): draw.rectangle([(label_x1, label_y1), (label_x2, label_y2)], fill=color) draw.text((label_x1 + 2, label_y1 + 2), str(label), fill=(255, 255, 255), font=font) - for i, node in enumerate(nodes): - draw_annotation(i, node) + with timed("annotated_screenshot.draw_all"): + for i, node in enumerate(nodes): + draw_annotation(i, node) return screenshot