You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
299 lines
11 KiB
299 lines
11 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Raspberry freeze-frame
|
|
"""
|
|
|
|
__author__ = "Hendrik Langer <hendrik.langer@iul.tu-dortmund.de>"
|
|
|
|
import multiprocessing
|
|
import os.path
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
import subprocess
|
|
import glob
|
|
|
|
pin_led_ready=12 # 27
|
|
pin_led_busy=18 # 5
|
|
pin_switch=13
|
|
|
|
try:
|
|
import RPi.GPIO as GPIO
|
|
except RuntimeError:
|
|
print("Error importing RPi.GPIO! Please check of you're root or permissions are set correctly.")
|
|
except ImportError:
|
|
print('Python GPIO package missing. Are you running this on a Raspberry Pi?')
|
|
#apt install python3-rpi.gpio
|
|
sys.exit(1)
|
|
|
|
|
|
try:
|
|
import pyinotify
|
|
except ImportError:
|
|
print('Python pyinotify package is missing.')
|
|
#python3-pyinotify
|
|
sys.exit(1)
|
|
|
|
|
|
def watch_delay_call(
|
|
base_directory,
|
|
callback,
|
|
delay=0.5,
|
|
call_once_initially=True,
|
|
mask=pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE |
|
|
pyinotify.IN_MODIFY | pyinotify.IN_MOVED_TO):
|
|
"""Watch all files below a directory and execute a command on changes.
|
|
|
|
Add some delay so that multiple save operations trigger a single execution.
|
|
|
|
Example:
|
|
def filechanged(paths):
|
|
# TODO: Do something useful.
|
|
print(paths)
|
|
|
|
_watch_delay_call('.', filechanged)
|
|
|
|
Args:
|
|
base_directory: Directory to monitor, recursively.
|
|
callback: Function to call on file change, with a list of paths.
|
|
delay: Time in seconds to delay.
|
|
call_once_initially: Set to true to call the callback once initially.
|
|
mask: File system changes to listen for (by default any file change).
|
|
"""
|
|
|
|
class Process(pyinotify.ProcessEvent):
|
|
|
|
def __init__(self, immediate_callback):
|
|
self.immediate_callback = immediate_callback
|
|
|
|
def process_default(self, event):
|
|
target = os.path.join(event.path, event.name)
|
|
self.immediate_callback(target)
|
|
|
|
def delay_call(pipe, delayed_callback, delay, call_once_initially):
|
|
if call_once_initially:
|
|
delayed_callback(None)
|
|
|
|
path_list = []
|
|
|
|
while True:
|
|
# Wait until there is a change.
|
|
path_list.append(pipe.recv())
|
|
while pipe.poll():
|
|
path_list.append(pipe.recv())
|
|
|
|
# Delay
|
|
time.sleep(delay)
|
|
|
|
# If there are more changes, restart the timer.
|
|
if pipe.poll():
|
|
continue
|
|
|
|
# Execute the callback.
|
|
delayed_callback(path_list)
|
|
|
|
path_list = []
|
|
|
|
receiver, sender = multiprocessing.Pipe(False)
|
|
|
|
delay_callback_thread = threading.Thread(
|
|
target=delay_call,
|
|
args=(receiver, callback, delay, call_once_initially))
|
|
delay_callback_thread.daemon = True # dies with this program.
|
|
delay_callback_thread.start()
|
|
|
|
while True:
|
|
wm = pyinotify.WatchManager()
|
|
notifier = pyinotify.Notifier(wm, Process(sender.send))
|
|
wm.add_watch(base_directory, mask, rec=True, auto_add=True)
|
|
try:
|
|
while True:
|
|
notifier.process_events()
|
|
if notifier.check_events():
|
|
notifier.read_events()
|
|
except KeyboardInterrupt:
|
|
notifier.stop()
|
|
break
|
|
|
|
|
|
|
|
def os_echo(filename, string):
|
|
with open(filename, 'w') as f:
|
|
f.write(string)
|
|
|
|
def os_echo_bin(filename, string):
|
|
with open(filename, 'wb') as f:
|
|
f.write(string)
|
|
|
|
def cleanup():
|
|
g="/sys/kernel/config/usb_gadget/freezer/"
|
|
GPIO.output(pin_led_busy, GPIO.LOW)
|
|
GPIO.output(pin_led_ready, GPIO.LOW)
|
|
subprocess.run(['killall', 'fim'], shell=False, check=False)
|
|
subprocess.run(['umount', '/mnt'], shell=False, check=False)
|
|
os_echo(g+"UDC", "")
|
|
time.sleep(0.5)
|
|
subprocess.run(['rmdir', g+"configs/c.1/strings/0x409"], shell=False, check=False)
|
|
subprocess.run(['rm', g+"configs/c.1/mass_storage.usb0"], shell=False, check=False)
|
|
subprocess.run(['rm', g+"configs/c.1/hid.usb0"], shell=False, check=False)
|
|
subprocess.run(['rmdir', g+"configs/c.1"], shell=False, check=False)
|
|
subprocess.run(['rmdir', g+"functions/mass_storage.usb0"], shell=False, check=False)
|
|
subprocess.run(['rmdir', g+"functions/hid.usb0"], shell=False, check=False)
|
|
subprocess.run(['rmdir', g+"strings/0x409"], shell=False, check=False)
|
|
subprocess.run(['rmdir', g], shell=False, check=False)
|
|
subprocess.run(['modprobe', '-r', 'usb_f_mass_storage'], shell=False, check=False)
|
|
subprocess.run(['modprobe', '-r', 'usb_f_hid'], shell=False, check=False)
|
|
subprocess.run(['modprobe', '-r', 'libcomposite'], shell=False, check=False)
|
|
|
|
def signal_term_handler(signal, frame):
|
|
print('got SIGTERM')
|
|
cleanup()
|
|
sys.exit(0)
|
|
|
|
|
|
def send_keypress():
|
|
with open('/dev/hidg0', 'wb') as f:
|
|
#f.write(bytearray(b'\x00\x00\x04\x00\x00\x00\x00\x00')) #press the A-button
|
|
f.write(bytearray(b'\x04\x00\x1B\x00\x00\x00\x00\x00')) #press Alt+X
|
|
time.sleep(0.05)
|
|
f.write(bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00')) #release all keys
|
|
|
|
|
|
if __name__ == '__main__':
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(pin_led_ready, GPIO.OUT)
|
|
GPIO.setup(pin_led_busy, GPIO.OUT)
|
|
GPIO.setup(pin_switch, GPIO.IN, pull_up_down=GPIO.PUD_UP)
|
|
GPIO.output(pin_led_ready, GPIO.LOW)
|
|
GPIO.output(pin_led_busy, GPIO.LOW)
|
|
|
|
g="/sys/kernel/config/usb_gadget/freezer/"
|
|
|
|
try:
|
|
subprocess.run(['dd', 'if=/dev/zero', 'of=/tmp/usbdisk.img', 'bs=1M', 'count=96'], shell=False, check=True)
|
|
#sfdisk
|
|
#loop-mount
|
|
#mkfs
|
|
subprocess.run(['mkfs', '-t', 'vfat', '-n', 'SCREENSHOTS', '/tmp/usbdisk.img'], shell=False, check=True)
|
|
subprocess.run(['mount', '-o', 'loop,rw', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True)
|
|
subprocess.run('cp /opt/freezer/background.png /mnt/', shell=True, check=True)
|
|
subprocess.run('cp /opt/freezer/freeze.exe /mnt/', shell=True, check=True)
|
|
subprocess.run(['umount', '/mnt'], shell=False, check=True)
|
|
|
|
signal.signal(signal.SIGTERM, signal_term_handler)
|
|
|
|
# http://irq5.io/2016/12/22/raspberry-pi-zero-as-multiple-usb-gadgets/
|
|
# http://isticktoit.net/?p=1383
|
|
subprocess.run(['modprobe', 'libcomposite'], shell=False, check=True)
|
|
|
|
subprocess.run(['mkdir', g], shell=False, check=True)
|
|
|
|
os_echo(g+"idVendor", "0x1d6b") # Linux Foundation
|
|
os_echo(g+"idProduct", "0x0104") # Multifunction Composite Gadget
|
|
os_echo(g+"bcdDevice", "0x0100") # v1.0.0
|
|
os_echo(g+"bcdUSB", "0x0200") # USB 2.0
|
|
|
|
os_echo(g+"bDeviceClass", "0xEF") # windows enumeration install
|
|
os_echo(g+"bDeviceSubClass", "0x02") # correct bus driver
|
|
os_echo(g+"bDeviceProtocol", "0x01") # for multi function
|
|
|
|
subprocess.run(['mkdir', g+"strings/0x409"], shell=False, check=True) # english
|
|
os_echo(g+"strings/0x409/serialnumber", "34812ee8aa112377") # random
|
|
os_echo(g+"strings/0x409/manufacturer", "IUL TU Dortmund")
|
|
os_echo(g+"strings/0x409/product", "Raspberry Screenshot Gadget")
|
|
|
|
subprocess.run(['mkdir', g+"configs/c.1"], shell=False, check=True)
|
|
subprocess.run(['mkdir', g+"configs/c.1/strings/0x409"], shell=False, check=True) # english
|
|
os_echo(g+"configs/c.1/bmAttributes", "0x80") # bus powered
|
|
os_echo(g+"configs/c.1/MaxPower", "500")
|
|
os_echo(g+"configs/c.1/strings/0x409/configuration", "Config 1")
|
|
|
|
# config usb_mass_storage
|
|
subprocess.run(['mkdir', g+"functions/mass_storage.usb0"], shell=False, check=True)
|
|
os_echo(g+"functions/mass_storage.usb0/stall", "0")
|
|
os_echo(g+"functions/mass_storage.usb0/lun.0/cdrom", "0")
|
|
os_echo(g+"functions/mass_storage.usb0/lun.0/ro", "0")
|
|
os_echo(g+"functions/mass_storage.usb0/lun.0/nofua", "0")
|
|
os_echo(g+"functions/mass_storage.usb0/lun.0/removable", "1")
|
|
os_echo(g+"functions/mass_storage.usb0/lun.0/file", "/tmp/usbdisk.img")
|
|
|
|
subprocess.run(['ln', '-s', g+"functions/mass_storage.usb0", g+"configs/c.1"], shell=False, check=True)
|
|
|
|
# config keyboard
|
|
subprocess.run(['mkdir', g+"functions/hid.usb0"], shell=False, check=True)
|
|
time.sleep(0.2)
|
|
os_echo(g+"functions/hid.usb0/protocol", "1")
|
|
os_echo(g+"functions/hid.usb0/subclass", "1")
|
|
os_echo(g+"functions/hid.usb0/report_length", "8")
|
|
os_echo_bin(g+"functions/hid.usb0/report_desc", bytes(b'\x05\x01\x09\x06\xa1\x01\x05\x07\x19\xe0\x29\xe7\x15\x00\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x03\x95\x05\x75\x01\x05\x08\x19\x01\x29\x05\x91\x02\x95\x01\x75\x03\x91\x03\x95\x06\x75\x08\x15\x00\x25\x65\x05\x07\x19\x00\x29\x65\x81\x00\xc0'))
|
|
# subprocess.run('echo -ne \\x05\\x01\\x09\\x06\\xa1\\x01\\x05\\x07\\x19\\xe0\\x29\\xe7\\x15\\x00\\x25\\x01\\x75\\x01\\x95\\x08\\x81\\x02\\x95\\x01\\x75\\x08\\x81\\x03\\x95\\x05\\x75\\x01\\x05\\x08\\x19\\x01\\x29\\x05\\x91\\x02\\x95\\x01\\x75\\x03\\x91\\x03\\x95\\x06\\x75\\x08\\x15\\x00\\x25\\x65\\x05\\x07\\x19\\x00\\x29\\x65\\x81\\x00\\xc0 > functions/hid.usb0/report_desc', shell=True)
|
|
|
|
subprocess.run(['ln', '-s', g+"functions/hid.usb0", g+"configs/c.1"], shell=False, check=True)
|
|
|
|
# subprocess.run(['ln', '-s', g+"configs/c.1", g+"os_desc"], shell=False, check=True)
|
|
subprocess.run('ls /sys/class/udc/ > '+g+'UDC', shell=True, check=True)
|
|
|
|
# RNDIS OS descriptors
|
|
#os_echo(g+"os_desc/use", "1")
|
|
#os_echo(g+"os_desc/b_vendor_code", "0xcd")
|
|
#os_echo(g+"os_desc/qw_sign", "MSFT100")
|
|
|
|
#subprocess.run(['mkdir', g+"functions/rndis.usb0"], shell=False, check=True)
|
|
#os_echo(g+"functions/rndis.usb0/os_desc/interface.rndis/compatible_id", "RNDIS")
|
|
#os_echo(g+"functions/rndis.usb0/os_desc/interface.rndis/sub_compatible_id", "5162001")
|
|
|
|
# send hdmi-cec commands: turn on tv, make this active source
|
|
subprocess.run('echo "on 0" | cec-client RPI -s -d 1', shell=True, check=False)
|
|
subprocess.run('echo "as" | cec-client RPI -s -d 1', shell=True, check=False)
|
|
|
|
fallback_str = b'state 0x12000a [HDMI DMT (4) RGB full 4:3], 640x480 @ 60.00Hz, progressive\n'
|
|
p = subprocess.Popen(['tvservice', '-s'], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
_stdout, _stderr = p.communicate()
|
|
if (len(_stdout) > 0 and _stdout == fallback_str):
|
|
subprocess.run('tvservice --explicit="CEA 16 HDMI"', shell=True, check=False)
|
|
#subprocess.run(['fbset', '-fb', '/dev/fb0', '-depth', '16'], shell=False, check=False)
|
|
time.sleep(0.5)
|
|
subprocess.run('fbset -fb /dev/fb0 -a -g 1920 1080 1920 1080 32', shell=True, check=False)
|
|
time.sleep(0.5)
|
|
|
|
|
|
def callback(paths):
|
|
# if paths:
|
|
# for path in paths:
|
|
# print(' {0}'.format(path))
|
|
GPIO.output(pin_led_ready, GPIO.HIGH)
|
|
GPIO.output(pin_led_busy, GPIO.HIGH)
|
|
print('file changed')
|
|
|
|
subprocess.run(['mount', '-o', 'loop,ro', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True)
|
|
|
|
list_of_files = glob.glob('/mnt/*.png')
|
|
list_of_files.extend(glob.glob('/mnt/*.jpg'))
|
|
list_of_files.extend(glob.glob('/mnt/*.bmp'))
|
|
list_of_files.extend(glob.glob('/mnt/*.PNG'))
|
|
list_of_files.extend(glob.glob('/mnt/*.JPG'))
|
|
list_of_files.extend(glob.glob('/mnt/*.BMP'))
|
|
|
|
if list_of_files:
|
|
latest_file = max(list_of_files, key=os.path.getctime)
|
|
file_name, extension = os.path.splitext(latest_file)
|
|
tmpfilename = '/tmp/frame'+extension
|
|
print(latest_file)
|
|
subprocess.run(['killall', 'fim'], shell=False, check=False)
|
|
subprocess.run('cp "'+latest_file+'" "'+tmpfilename+'"', shell=True, check=True)
|
|
subprocess.run('fim -d /dev/fb0 -T 1 --autozoom --quiet "'+tmpfilename+'" &', shell=True)
|
|
subprocess.run(['umount', '/mnt'], shell=False, check=False)
|
|
GPIO.output(pin_led_busy, GPIO.LOW)
|
|
|
|
def callback_switch(channel):
|
|
send_keypress()
|
|
|
|
|
|
GPIO.add_event_detect(pin_switch, GPIO.FALLING, callback=callback_switch, bouncetime=250)
|
|
watch_delay_call('/tmp/usbdisk.img', callback, 0.2, True, pyinotify.IN_MODIFY)
|
|
|
|
finally:
|
|
cleanup()
|
|
sys.exit(0)
|
|
|
|
|