# Copyright (c) 2014-present PlatformIO # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from time import sleep import click import serial from platformio import exception, util from platformio.commands.test.processor import TestProcessorBase from platformio.managers.platform import PlatformFactory class EmbeddedTestProcessor(TestProcessorBase): SERIAL_TIMEOUT = 600 def process(self): if not self.options["without_building"]: self.print_progress("Building...") target = ["__test"] if self.options["without_uploading"]: target.append("checkprogsize") if not self.build_or_upload(target): return False if not self.options["without_uploading"]: self.print_progress("Uploading...") target = ["upload"] if self.options["without_building"]: target.append("nobuild") else: target.append("__test") if not self.build_or_upload(target): return False if self.options["without_testing"]: return True self.print_progress("Testing...") return self.run() def run(self): click.echo( "If you don't see any output for the first 10 secs, " "please reset board (press reset button)" ) click.echo() try: ser = serial.Serial( baudrate=self.get_baudrate(), timeout=self.SERIAL_TIMEOUT ) ser.port = self.get_test_port() ser.rts = self.options["monitor_rts"] ser.dtr = self.options["monitor_dtr"] ser.open() except serial.SerialException as e: click.secho(str(e), fg="red", err=True) return False if not self.options["no_reset"]: ser.flushInput() ser.setDTR(False) ser.setRTS(False) sleep(0.1) ser.setDTR(True) ser.setRTS(True) sleep(0.1) while True: line = ser.readline().strip() # fix non-ascii output from device for i, c in enumerate(line[::-1]): if not isinstance(c, int): c = ord(c) if c > 127: line = line[-i:] break if not line: continue if isinstance(line, bytes): line = line.decode("utf8", "ignore") self.on_run_out(line) if all([l in line for l in ("Tests", "Failures", "Ignored")]): break ser.close() return not self._run_failed def get_test_port(self): # if test port is specified manually or in config if self.options.get("test_port"): return self.options.get("test_port") if self.env_options.get("test_port"): return self.env_options.get("test_port") assert set(["platform", "board"]) & set(self.env_options.keys()) p = PlatformFactory.newPlatform(self.env_options["platform"]) board_hwids = p.board_config(self.env_options["board"]).get("build.hwids", []) port = None elapsed = 0 while elapsed < 5 and not port: for item in util.get_serialports(): port = item["port"] for hwid in board_hwids: hwid_str = ("%s:%s" % (hwid[0], hwid[1])).replace("0x", "") if hwid_str in item["hwid"]: return port # check if port is already configured try: serial.Serial(port, timeout=self.SERIAL_TIMEOUT).close() except serial.SerialException: port = None if not port: sleep(0.25) elapsed += 0.25 if not port: raise exception.PlatformioException( "Please specify `test_port` for environment or use " "global `--test-port` option." ) return port