You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

299 lines
11 KiB

#!/usr/bin/env python3
"""
Raspberry freeze-frame
"""
__author__ = "Hendrik Langer <hendrik.langer@iul.tu-dortmund.de>"
import multiprocessing
import os.path
import signal
import sys
import threading
import time
import subprocess
import glob
pin_led_ready=12 # 27
pin_led_busy=18 # 5
pin_switch=13
try:
import RPi.GPIO as GPIO
except RuntimeError:
print("Error importing RPi.GPIO! Please check of you're root or permissions are set correctly.")
except ImportError:
print('Python GPIO package missing. Are you running this on a Raspberry Pi?')
#apt install python3-rpi.gpio
sys.exit(1)
try:
import pyinotify
except ImportError:
print('Python pyinotify package is missing.')
#python3-pyinotify
sys.exit(1)
def watch_delay_call(
base_directory,
callback,
delay=0.5,
call_once_initially=True,
mask=pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE |
pyinotify.IN_MODIFY | pyinotify.IN_MOVED_TO):
"""Watch all files below a directory and execute a command on changes.
Add some delay so that multiple save operations trigger a single execution.
Example:
def filechanged(paths):
# TODO: Do something useful.
print(paths)
_watch_delay_call('.', filechanged)
Args:
base_directory: Directory to monitor, recursively.
callback: Function to call on file change, with a list of paths.
delay: Time in seconds to delay.
call_once_initially: Set to true to call the callback once initially.
mask: File system changes to listen for (by default any file change).
"""
class Process(pyinotify.ProcessEvent):
def __init__(self, immediate_callback):
self.immediate_callback = immediate_callback
def process_default(self, event):
target = os.path.join(event.path, event.name)
self.immediate_callback(target)
def delay_call(pipe, delayed_callback, delay, call_once_initially):
if call_once_initially:
delayed_callback(None)
path_list = []
while True:
# Wait until there is a change.
path_list.append(pipe.recv())
while pipe.poll():
path_list.append(pipe.recv())
# Delay
time.sleep(delay)
# If there are more changes, restart the timer.
if pipe.poll():
continue
# Execute the callback.
delayed_callback(path_list)
path_list = []
receiver, sender = multiprocessing.Pipe(False)
delay_callback_thread = threading.Thread(
target=delay_call,
args=(receiver, callback, delay, call_once_initially))
delay_callback_thread.daemon = True # dies with this program.
delay_callback_thread.start()
while True:
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm, Process(sender.send))
wm.add_watch(base_directory, mask, rec=True, auto_add=True)
try:
while True:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
notifier.stop()
break
def os_echo(filename, string):
with open(filename, 'w') as f:
f.write(string)
def os_echo_bin(filename, string):
with open(filename, 'wb') as f:
f.write(string)
def cleanup():
g="/sys/kernel/config/usb_gadget/freezer/"
GPIO.output(pin_led_busy, GPIO.LOW)
GPIO.output(pin_led_ready, GPIO.LOW)
subprocess.run(['killall', 'fim'], shell=False, check=False)
subprocess.run(['umount', '/mnt'], shell=False, check=False)
os_echo(g+"UDC", "")
time.sleep(0.5)
subprocess.run(['rmdir', g+"configs/c.1/strings/0x409"], shell=False, check=False)
subprocess.run(['rm', g+"configs/c.1/mass_storage.usb0"], shell=False, check=False)
subprocess.run(['rm', g+"configs/c.1/hid.usb0"], shell=False, check=False)
subprocess.run(['rmdir', g+"configs/c.1"], shell=False, check=False)
subprocess.run(['rmdir', g+"functions/mass_storage.usb0"], shell=False, check=False)
subprocess.run(['rmdir', g+"functions/hid.usb0"], shell=False, check=False)
subprocess.run(['rmdir', g+"strings/0x409"], shell=False, check=False)
subprocess.run(['rmdir', g], shell=False, check=False)
subprocess.run(['modprobe', '-r', 'usb_f_mass_storage'], shell=False, check=False)
subprocess.run(['modprobe', '-r', 'usb_f_hid'], shell=False, check=False)
subprocess.run(['modprobe', '-r', 'libcomposite'], shell=False, check=False)
def signal_term_handler(signal, frame):
print('got SIGTERM')
cleanup()
sys.exit(0)
def send_keypress():
with open('/dev/hidg0', 'wb') as f:
#f.write(bytearray(b'\x00\x00\x04\x00\x00\x00\x00\x00')) #press the A-button
f.write(bytearray(b'\x04\x00\x1B\x00\x00\x00\x00\x00')) #press Alt+X
time.sleep(0.05)
f.write(bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00')) #release all keys
if __name__ == '__main__':
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin_led_ready, GPIO.OUT)
GPIO.setup(pin_led_busy, GPIO.OUT)
GPIO.setup(pin_switch, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.output(pin_led_ready, GPIO.LOW)
GPIO.output(pin_led_busy, GPIO.LOW)
g="/sys/kernel/config/usb_gadget/freezer/"
try:
subprocess.run(['dd', 'if=/dev/zero', 'of=/tmp/usbdisk.img', 'bs=1M', 'count=96'], shell=False, check=True)
#sfdisk
#loop-mount
#mkfs
subprocess.run(['mkfs', '-t', 'vfat', '-n', 'SCREENSHOTS', '/tmp/usbdisk.img'], shell=False, check=True)
subprocess.run(['mount', '-o', 'loop,rw', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True)
subprocess.run('cp /opt/freezer/background.png /mnt/', shell=True, check=True)
subprocess.run('cp /opt/freezer/freeze.exe /mnt/', shell=True, check=True)
subprocess.run(['umount', '/mnt'], shell=False, check=True)
signal.signal(signal.SIGTERM, signal_term_handler)
# http://irq5.io/2016/12/22/raspberry-pi-zero-as-multiple-usb-gadgets/
# http://isticktoit.net/?p=1383
subprocess.run(['modprobe', 'libcomposite'], shell=False, check=True)
subprocess.run(['mkdir', g], shell=False, check=True)
os_echo(g+"idVendor", "0x1d6b") # Linux Foundation
os_echo(g+"idProduct", "0x0104") # Multifunction Composite Gadget
os_echo(g+"bcdDevice", "0x0100") # v1.0.0
os_echo(g+"bcdUSB", "0x0200") # USB 2.0
os_echo(g+"bDeviceClass", "0xEF") # windows enumeration install
os_echo(g+"bDeviceSubClass", "0x02") # correct bus driver
os_echo(g+"bDeviceProtocol", "0x01") # for multi function
subprocess.run(['mkdir', g+"strings/0x409"], shell=False, check=True) # english
os_echo(g+"strings/0x409/serialnumber", "34812ee8aa112377") # random
os_echo(g+"strings/0x409/manufacturer", "IUL TU Dortmund")
os_echo(g+"strings/0x409/product", "Raspberry Screenshot Gadget")
subprocess.run(['mkdir', g+"configs/c.1"], shell=False, check=True)
subprocess.run(['mkdir', g+"configs/c.1/strings/0x409"], shell=False, check=True) # english
os_echo(g+"configs/c.1/bmAttributes", "0x80") # bus powered
os_echo(g+"configs/c.1/MaxPower", "500")
os_echo(g+"configs/c.1/strings/0x409/configuration", "Config 1")
# config usb_mass_storage
subprocess.run(['mkdir', g+"functions/mass_storage.usb0"], shell=False, check=True)
os_echo(g+"functions/mass_storage.usb0/stall", "0")
os_echo(g+"functions/mass_storage.usb0/lun.0/cdrom", "0")
os_echo(g+"functions/mass_storage.usb0/lun.0/ro", "0")
os_echo(g+"functions/mass_storage.usb0/lun.0/nofua", "0")
os_echo(g+"functions/mass_storage.usb0/lun.0/removable", "1")
os_echo(g+"functions/mass_storage.usb0/lun.0/file", "/tmp/usbdisk.img")
subprocess.run(['ln', '-s', g+"functions/mass_storage.usb0", g+"configs/c.1"], shell=False, check=True)
# config keyboard
subprocess.run(['mkdir', g+"functions/hid.usb0"], shell=False, check=True)
time.sleep(0.2)
os_echo(g+"functions/hid.usb0/protocol", "1")
os_echo(g+"functions/hid.usb0/subclass", "1")
os_echo(g+"functions/hid.usb0/report_length", "8")
os_echo_bin(g+"functions/hid.usb0/report_desc", bytes(b'\x05\x01\x09\x06\xa1\x01\x05\x07\x19\xe0\x29\xe7\x15\x00\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x03\x95\x05\x75\x01\x05\x08\x19\x01\x29\x05\x91\x02\x95\x01\x75\x03\x91\x03\x95\x06\x75\x08\x15\x00\x25\x65\x05\x07\x19\x00\x29\x65\x81\x00\xc0'))
# subprocess.run('echo -ne \\x05\\x01\\x09\\x06\\xa1\\x01\\x05\\x07\\x19\\xe0\\x29\\xe7\\x15\\x00\\x25\\x01\\x75\\x01\\x95\\x08\\x81\\x02\\x95\\x01\\x75\\x08\\x81\\x03\\x95\\x05\\x75\\x01\\x05\\x08\\x19\\x01\\x29\\x05\\x91\\x02\\x95\\x01\\x75\\x03\\x91\\x03\\x95\\x06\\x75\\x08\\x15\\x00\\x25\\x65\\x05\\x07\\x19\\x00\\x29\\x65\\x81\\x00\\xc0 > functions/hid.usb0/report_desc', shell=True)
subprocess.run(['ln', '-s', g+"functions/hid.usb0", g+"configs/c.1"], shell=False, check=True)
# subprocess.run(['ln', '-s', g+"configs/c.1", g+"os_desc"], shell=False, check=True)
subprocess.run('ls /sys/class/udc/ > '+g+'UDC', shell=True, check=True)
# RNDIS OS descriptors
#os_echo(g+"os_desc/use", "1")
#os_echo(g+"os_desc/b_vendor_code", "0xcd")
#os_echo(g+"os_desc/qw_sign", "MSFT100")
#subprocess.run(['mkdir', g+"functions/rndis.usb0"], shell=False, check=True)
#os_echo(g+"functions/rndis.usb0/os_desc/interface.rndis/compatible_id", "RNDIS")
#os_echo(g+"functions/rndis.usb0/os_desc/interface.rndis/sub_compatible_id", "5162001")
# send hdmi-cec commands: turn on tv, make this active source
subprocess.run('echo "on 0" | cec-client RPI -s -d 1', shell=True, check=False)
subprocess.run('echo "as" | cec-client RPI -s -d 1', shell=True, check=False)
fallback_str = b'state 0x12000a [HDMI DMT (4) RGB full 4:3], 640x480 @ 60.00Hz, progressive\n'
p = subprocess.Popen(['tvservice', '-s'], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
_stdout, _stderr = p.communicate()
if (len(_stdout) > 0 and _stdout == fallback_str):
subprocess.run('tvservice --explicit="CEA 16 HDMI"', shell=True, check=False)
#subprocess.run(['fbset', '-fb', '/dev/fb0', '-depth', '16'], shell=False, check=False)
time.sleep(0.5)
subprocess.run('fbset -fb /dev/fb0 -a -g 1920 1080 1920 1080 32', shell=True, check=False)
time.sleep(0.5)
def callback(paths):
# if paths:
# for path in paths:
# print(' {0}'.format(path))
GPIO.output(pin_led_ready, GPIO.HIGH)
GPIO.output(pin_led_busy, GPIO.HIGH)
print('file changed')
subprocess.run(['mount', '-o', 'loop,ro', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True)
list_of_files = glob.glob('/mnt/*.png')
list_of_files.extend(glob.glob('/mnt/*.jpg'))
list_of_files.extend(glob.glob('/mnt/*.bmp'))
list_of_files.extend(glob.glob('/mnt/*.PNG'))
list_of_files.extend(glob.glob('/mnt/*.JPG'))
list_of_files.extend(glob.glob('/mnt/*.BMP'))
if list_of_files:
latest_file = max(list_of_files, key=os.path.getctime)
file_name, extension = os.path.splitext(latest_file)
tmpfilename = '/tmp/frame'+extension
print(latest_file)
subprocess.run(['killall', 'fim'], shell=False, check=False)
subprocess.run('cp "'+latest_file+'" "'+tmpfilename+'"', shell=True, check=True)
subprocess.run('fim -d /dev/fb0 -T 1 --autozoom --quiet "'+tmpfilename+'" &', shell=True)
subprocess.run(['umount', '/mnt'], shell=False, check=False)
GPIO.output(pin_led_busy, GPIO.LOW)
def callback_switch(channel):
send_keypress()
GPIO.add_event_detect(pin_switch, GPIO.FALLING, callback=callback_switch, bouncetime=250)
watch_delay_call('/tmp/usbdisk.img', callback, 0.2, True, pyinotify.IN_MODIFY)
finally:
cleanup()
sys.exit(0)