#!/usr/bin/env python3 """ Raspberry freeze-frame """ __author__ = "Hendrik Langer " import multiprocessing import os.path import sys import threading import time import subprocess import glob pin_led_ready=12 # 27 pin_led_busy=18 # 5 pin_switch=13 try: import RPi.GPIO as GPIO except RuntimeError: print("Error importing RPi.GPIO! Please check of you're root or permissions are set correctly.") except ImportError: print('Python GPIO package missing. Are you running this on a Raspberry Pi?') #apt install python3-rpi.gpio sys.exit(1) try: import pyinotify except ImportError: print('Python pyinotify package is missing.') #python3-pyinotify sys.exit(1) def watch_delay_call( base_directory, callback, delay=0.5, call_once_initially=True, mask=pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MODIFY | pyinotify.IN_MOVED_TO): """Watch all files below a directory and execute a command on changes. Add some delay so that multiple save operations trigger a single execution. Example: def filechanged(paths): # TODO: Do something useful. print(paths) _watch_delay_call('.', filechanged) Args: base_directory: Directory to monitor, recursively. callback: Function to call on file change, with a list of paths. delay: Time in seconds to delay. call_once_initially: Set to true to call the callback once initially. mask: File system changes to listen for (by default any file change). """ class Process(pyinotify.ProcessEvent): def __init__(self, immediate_callback): self.immediate_callback = immediate_callback def process_default(self, event): target = os.path.join(event.path, event.name) self.immediate_callback(target) def delay_call(pipe, delayed_callback, delay, call_once_initially): if call_once_initially: delayed_callback(None) path_list = [] while True: # Wait until there is a change. path_list.append(pipe.recv()) while pipe.poll(): path_list.append(pipe.recv()) # Delay time.sleep(delay) # If there are more changes, restart the timer. if pipe.poll(): continue # Execute the callback. delayed_callback(path_list) path_list = [] receiver, sender = multiprocessing.Pipe(False) delay_callback_thread = threading.Thread( target=delay_call, args=(receiver, callback, delay, call_once_initially)) delay_callback_thread.daemon = True # dies with this program. delay_callback_thread.start() while True: wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm, Process(sender.send)) wm.add_watch(base_directory, mask, rec=True, auto_add=True) try: while True: notifier.process_events() if notifier.check_events(): notifier.read_events() except KeyboardInterrupt: notifier.stop() break def os_echo(filename, string): with open(filename, 'w') as f: f.write(string) def os_echo_bin(filename, string): with open(filename, 'wb') as f: f.write(string) def send_keypress(): with open('/dev/hidg0', 'wb') as f: #f.write(bytearray(b'\x00\x00\x04\x00\x00\x00\x00\x00')) #press the A-button f.write(bytearray(b'\x04\x00\x1B\x00\x00\x00\x00\x00')) #press Alt+X time.sleep(0.05) f.write(bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00')) #release all keys if __name__ == '__main__': GPIO.setmode(GPIO.BCM) GPIO.setup(pin_led_ready, GPIO.OUT) GPIO.setup(pin_led_busy, GPIO.OUT) GPIO.setup(pin_switch, GPIO.IN, pull_up_down=GPIO.PUD_UP) GPIO.output(pin_led_ready, GPIO.LOW) GPIO.output(pin_led_busy, GPIO.LOW) g="/sys/kernel/config/usb_gadget/freezer/" try: subprocess.run(['dd', 'if=/dev/zero', 'of=/tmp/usbdisk.img', 'bs=1M', 'count=96'], shell=False, check=True) #sfdisk #loop-mount #mkfs subprocess.run(['mkfs', '-t', 'vfat', '-n', 'SCREENSHOTS', '/tmp/usbdisk.img'], shell=False, check=True) subprocess.run(['mount', '-o', 'loop,rw', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True) subprocess.run('cp /opt/freezer/background.png /mnt/', shell=True, check=True) subprocess.run('cp /opt/freezer/freeze.exe /mnt/', shell=True, check=True) subprocess.run(['umount', '/mnt'], shell=False, check=True) # http://irq5.io/2016/12/22/raspberry-pi-zero-as-multiple-usb-gadgets/ # http://isticktoit.net/?p=1383 subprocess.run(['modprobe', 'libcomposite'], shell=False, check=True) subprocess.run(['mkdir', g], shell=False, check=True) os_echo(g+"idVendor", "0x1d6b") # Linux Foundation os_echo(g+"idProduct", "0x0104") # Multifunction Composite Gadget os_echo(g+"bcdDevice", "0x0100") # v1.0.0 os_echo(g+"bcdUSB", "0x0200") # USB 2.0 os_echo(g+"bDeviceClass", "0xEF") # windows enumeration install os_echo(g+"bDeviceSubClass", "0x02") # correct bus driver os_echo(g+"bDeviceProtocol", "0x01") # for multi function subprocess.run(['mkdir', g+"strings/0x409"], shell=False, check=True) # english os_echo(g+"strings/0x409/serialnumber", "34812ee8aa112377") # random os_echo(g+"strings/0x409/manufacturer", "IUL TU Dortmund") os_echo(g+"strings/0x409/product", "Raspberry Screenshot Gadget") subprocess.run(['mkdir', g+"configs/c.1"], shell=False, check=True) subprocess.run(['mkdir', g+"configs/c.1/strings/0x409"], shell=False, check=True) # english os_echo(g+"configs/c.1/bmAttributes", "0x80") # bus powered os_echo(g+"configs/c.1/MaxPower", "500") os_echo(g+"configs/c.1/strings/0x409/configuration", "Config 1") # config usb_mass_storage subprocess.run(['mkdir', g+"functions/mass_storage.usb0"], shell=False, check=True) os_echo(g+"functions/mass_storage.usb0/stall", "0") os_echo(g+"functions/mass_storage.usb0/lun.0/cdrom", "0") os_echo(g+"functions/mass_storage.usb0/lun.0/ro", "0") os_echo(g+"functions/mass_storage.usb0/lun.0/nofua", "0") os_echo(g+"functions/mass_storage.usb0/lun.0/removable", "1") os_echo(g+"functions/mass_storage.usb0/lun.0/file", "/tmp/usbdisk.img") subprocess.run(['ln', '-s', g+"functions/mass_storage.usb0", g+"configs/c.1"], shell=False, check=True) # config keyboard subprocess.run(['mkdir', g+"functions/hid.usb0"], shell=False, check=True) time.sleep(0.2) os_echo(g+"functions/hid.usb0/protocol", "1") os_echo(g+"functions/hid.usb0/subclass", "1") os_echo(g+"functions/hid.usb0/report_length", "8") os_echo_bin(g+"functions/hid.usb0/report_desc", bytes(b'\x05\x01\x09\x06\xa1\x01\x05\x07\x19\xe0\x29\xe7\x15\x00\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x03\x95\x05\x75\x01\x05\x08\x19\x01\x29\x05\x91\x02\x95\x01\x75\x03\x91\x03\x95\x06\x75\x08\x15\x00\x25\x65\x05\x07\x19\x00\x29\x65\x81\x00\xc0')) # subprocess.run('echo -ne \\x05\\x01\\x09\\x06\\xa1\\x01\\x05\\x07\\x19\\xe0\\x29\\xe7\\x15\\x00\\x25\\x01\\x75\\x01\\x95\\x08\\x81\\x02\\x95\\x01\\x75\\x08\\x81\\x03\\x95\\x05\\x75\\x01\\x05\\x08\\x19\\x01\\x29\\x05\\x91\\x02\\x95\\x01\\x75\\x03\\x91\\x03\\x95\\x06\\x75\\x08\\x15\\x00\\x25\\x65\\x05\\x07\\x19\\x00\\x29\\x65\\x81\\x00\\xc0 > functions/hid.usb0/report_desc', shell=True) subprocess.run(['ln', '-s', g+"functions/hid.usb0", g+"configs/c.1"], shell=False, check=True) # subprocess.run(['ln', '-s', g+"configs/c.1", g+"os_desc"], shell=False, check=True) subprocess.run('ls /sys/class/udc/ > '+g+'UDC', shell=True, check=True) # RNDIS OS descriptors #os_echo(g+"os_desc/use", "1") #os_echo(g+"os_desc/b_vendor_code", "0xcd") #os_echo(g+"os_desc/qw_sign", "MSFT100") #subprocess.run(['mkdir', g+"functions/rndis.usb0"], shell=False, check=True) #os_echo(g+"functions/rndis.usb0/os_desc/interface.rndis/compatible_id", "RNDIS") #os_echo(g+"functions/rndis.usb0/os_desc/interface.rndis/sub_compatible_id", "5162001") # send hdmi-cec commands: turn on tv, make this active source subprocess.run('echo "on 0" | cec-client RPI -s -d 1', shell=True, check=False) subprocess.run('echo "as" | cec-client RPI -s -d 1', shell=True, check=False) fallback_str = b'state 0x12000a [HDMI DMT (4) RGB full 4:3], 640x480 @ 60.00Hz, progressive\n' p = subprocess.Popen(['tvservice', '-s'], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) _stdout, _stderr = p.communicate() if (len(_stdout) > 0 and _stdout == fallback_str): subprocess.run(['tvservice', '--explicit="CEA 16 HDMI"'], shell=False, check=False) subprocess.run(['fbset', '-fb', '/dev/fb0', '-depth', '16'], shell=False, check=False) subprocess.run(['fbset', '-fb', '/dev/fb0', '-depth', '32'], shell=False, check=False) def callback(paths): # if paths: # for path in paths: # print(' {0}'.format(path)) GPIO.output(pin_led_ready, GPIO.HIGH) GPIO.output(pin_led_busy, GPIO.HIGH) print('file changed') subprocess.run(['mount', '-o', 'loop,ro', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True) list_of_files = glob.glob('/mnt/*.png') list_of_files.extend(glob.glob('/mnt/*.jpg')) list_of_files.extend(glob.glob('/mnt/*.bmp')) list_of_files.extend(glob.glob('/mnt/*.PNG')) list_of_files.extend(glob.glob('/mnt/*.JPG')) list_of_files.extend(glob.glob('/mnt/*.BMP')) if list_of_files: latest_file = max(list_of_files, key=os.path.getctime) file_name, extension = os.path.splitext(latest_file) tmpfilename = '/tmp/frame'+extension print(latest_file) subprocess.run(['killall', 'fim'], shell=False, check=False) subprocess.run('cp "'+latest_file+'" "'+tmpfilename+'"', shell=True, check=True) subprocess.run('fim -d /dev/fb0 -T 1 --autozoom --quiet "'+tmpfilename+'" &', shell=True) subprocess.run(['umount', '/mnt'], shell=False, check=False) GPIO.output(pin_led_busy, GPIO.LOW) def callback_switch(channel): send_keypress() GPIO.add_event_detect(pin_switch, GPIO.FALLING, callback=callback_switch, bouncetime=250) watch_delay_call('/tmp/usbdisk.img', callback, 0.2, True, pyinotify.IN_MODIFY) finally: GPIO.output(pin_led_busy, GPIO.LOW) GPIO.output(pin_led_ready, GPIO.LOW) subprocess.run(['killall', 'fim'], shell=False, check=False) subprocess.run(['umount', '/mnt'], shell=False, check=False) os_echo(g+"UDC", "") time.sleep(0.5) subprocess.run(['rmdir', g+"configs/c.1/strings/0x409"], shell=False, check=False) subprocess.run(['rmdir', g+"configs/c.1"], shell=False, check=False) subprocess.run(['rmdir', g+"functions/mass_storage.usb0"], shell=False, check=False) subprocess.run(['rmdir', g+"strings/0x409"], shell=False, check=False) subprocess.run(['rmdir', g], shell=False, check=False) subprocess.run(['modprobe', '-r', 'usb_f_mass_storage'], shell=False, check=False) subprocess.run(['modprobe', '-r', 'libcomposite'], shell=False, check=False) sys.exit(0)