You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

169 lines
5.0 KiB

#!/usr/bin/env python3
"""
Raspberry freeze-frame
"""
__author__ = "Hendrik Langer <hendrik.langer@iul.tu-dortmund.de>"
import multiprocessing
import os.path
import sys
import threading
import time
import subprocess
import glob
try:
import RPi.GPIO as GPIO
except RuntimeError:
print("Error importing RPi.GPIO! Please check of you're root or permissions are set correctly.")
except ImportError:
print('Python GPIO package missing. Are you running this on a Raspberry Pi?')
#apt install python3-rpi.gpio
sys.exit(1)
try:
import pyinotify
except ImportError:
print('Python pyinotify package is missing.')
#python3-pyinotify
sys.exit(1)
def watch_delay_call(
base_directory,
callback,
delay=0.5,
call_once_initially=True,
mask=pyinotify.IN_DELETE | pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE |
pyinotify.IN_MODIFY | pyinotify.IN_MOVED_TO):
"""Watch all files below a directory and execute a command on changes.
Add some delay so that multiple save operations trigger a single execution.
Example:
def filechanged(paths):
# TODO: Do something useful.
print(paths)
_watch_delay_call('.', filechanged)
Args:
base_directory: Directory to monitor, recursively.
callback: Function to call on file change, with a list of paths.
delay: Time in seconds to delay.
call_once_initially: Set to true to call the callback once initially.
mask: File system changes to listen for (by default any file change).
"""
class Process(pyinotify.ProcessEvent):
def __init__(self, immediate_callback):
self.immediate_callback = immediate_callback
def process_default(self, event):
target = os.path.join(event.path, event.name)
self.immediate_callback(target)
def delay_call(pipe, delayed_callback, delay, call_once_initially):
if call_once_initially:
delayed_callback(None)
path_list = []
while True:
# Wait until there is a change.
path_list.append(pipe.recv())
while pipe.poll():
path_list.append(pipe.recv())
# Delay
time.sleep(delay)
# If there are more changes, restart the timer.
if pipe.poll():
continue
# Execute the callback.
delayed_callback(path_list)
path_list = []
receiver, sender = multiprocessing.Pipe(False)
delay_callback_thread = threading.Thread(
target=delay_call,
args=(receiver, callback, delay, call_once_initially))
delay_callback_thread.daemon = True # dies with this program.
delay_callback_thread.start()
while True:
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm, Process(sender.send))
wm.add_watch(base_directory, mask, rec=True, auto_add=True)
try:
while True:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
except KeyboardInterrupt:
notifier.stop()
break
if __name__ == '__main__':
GPIO.setmode(GPIO.BCM)
GPIO.setup(18, GPIO.OUT)
GPIO.setup(23, GPIO.OUT)
GPIO.output(18, GPIO.LOW)
GPIO.output(23, GPIO.LOW)
try:
subprocess.run(['dd', 'if=/dev/zero', 'of=/tmp/usbdisk.img', 'bs=1M', 'count=96'], shell=False, check=True)
#sfdisk
#loop-mount
#mkfs
subprocess.run(['mkfs', '-t', 'vfat', '-n', 'SCREENSHOTS', '/tmp/usbdisk.img'], shell=False, check=True)
subprocess.run(['mount', '-o', 'loop,rw', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True)
subprocess.run('cp /opt/freezer/background.png /mnt/', shell=True, check=True)
subprocess.run('cp /opt/freezer/freeze.exe /mnt/', shell=True, check=True)
subprocess.run(['umount', '/mnt'], shell=False, check=True)
subprocess.run(['modprobe', 'g_mass_storage', 'file=/tmp/usbdisk.img', 'stall=0', 'removable=1', 'iSerialNumber=1234567890'], shell=False, check=True)
#tvservice -p
# tvservice -e "DMT 82"
# fbset -g 1920 1080 1920 1080 16
def callback(paths):
# if paths:
# for path in paths:
# print(' {0}'.format(path))
GPIO.output(18, GPIO.HIGH)
GPIO.output(23, GPIO.HIGH)
print('file changed')
subprocess.run(['killall', 'fim'], shell=False)
subprocess.run(['umount', '/mnt'], shell=False, check=False)
subprocess.run(['mount', '-o', 'loop,ro', '-t', 'vfat', '/tmp/usbdisk.img', '/mnt'], shell=False, check=True)
list_of_files = glob.glob('/mnt/*.png')
list_of_files.extend(glob.glob('/mnt/*.jpg'))
list_of_files.extend(glob.glob('/mnt/*.bmp'))
if list_of_files:
latest_file = max(list_of_files, key=os.path.getctime)
print(latest_file)
subprocess.run('fim -d /dev/fb0 -T 1 --autozoom --quiet "'+latest_file+'" &', shell=True)
GPIO.output(23, GPIO.LOW)
watch_delay_call('/tmp/usbdisk.img', callback, 0.5, True, pyinotify.IN_MODIFY)
finally:
GPIO.output(23, GPIO.LOW)
GPIO.output(18, GPIO.LOW)
subprocess.run(['killall', 'fim'], shell=False, check=False)
subprocess.run(['umount', '/mnt'], shell=False, check=False)
subprocess.run(['modprobe', '-r', 'g_mass_storage'], shell=False, check=False)
sys.exit(0)