#!/usr/bin/env python # pilpil-client 0.1 # abelliqueux from flask import Flask, flash, request, redirect, url_for, send_file from flask_httpauth import HTTPBasicAuth from io import BytesIO import gettext import os import subprocess import threading import time import toml from waitress import serve from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename # l10n LOCALE = os.getenv('LANG', 'en') _ = gettext.translation('template', localedir='locales', languages=[LOCALE]).gettext app = Flask(__name__) app.config.from_file("defaults.toml", load=toml.load) config_locations = ["./", "~/.", "~/.config/"] for location in config_locations: # Optional config files, ~ is expanded to $HOME on *nix, %USERPROFILE% on windows if app.config.from_file(os.path.expanduser(location + "pilpil-client.toml"), load=toml.load, silent=True): print(_("Found configuration file in {}").format(os.path.expanduser(location))) upload_folder = os.path.join(os.path.expanduser(app.config['DEFAULT']['media_folder_local']), "") media_exts = app.config['DEFAULT']['media_exts'] HTTP_secret = str(app.config['DEFAULT']['auth']) debug = app.config['DEFAULT']['debug'] useSSL = app.config['DEFAULT']['useSSL'] rssi_signal = 0 thumbnails_folder = "thumb" # HTTP upload settings app.config['UPLOAD_FOLDER'] = upload_folder # Max upload size 100M ( see nginx default also ) app.config['MAX_CONTENT_LENGTH'] = 100 * 1000 * 1000 # HTTP auth settings auth = HTTPBasicAuth() users = { "": generate_password_hash(HTTP_secret), } HTTP_url_scheme = "http" if useSSL: HTTP_url_scheme += "s" if debug: print(HTTP_url_scheme) upload_candidates = {} @auth.verify_password def verify_password(username, password): ''' Check HTTP auth username and password ''' if username in users and check_password_hash(users.get(username), password): return username def allowed_ext(filename): ''' Check if filename has an allowed extension. ''' # Check for dot in filename if "." in filename: # Split from right at first dot to find ext and allow files with "." in name if filename.rsplit(".", 1)[-1] in media_exts: return True def XMLify(string, child_node_name="child"): ''' ''' root_element = "root" if debug: print("<{0}><{1}>{2}".format(root_element, child_node_name, string)) return "<{0}><{1}>{2}".format(root_element, child_node_name, string) def get_RSSI(): ''' Get wireless signal level to AP ''' with open("/proc/net/wireless", "r") as wl: for line in wl: pass last_line = line rssi_signal = last_line.split(' ')[3][:-1].strip("-").strip("\n") if debug: print(rssi_signal) return XMLify(rssi_signal, child_node_name="rssi") def running_on_rpi(): with open("/proc/cpuinfo", "r") as wl: for line in wl: if line.lower().find("raspberry") > 0: return True return False def led_init(): ''' Set ACT and PWR leds trigger and turn them off ''' os.system('echo none | sudo tee /sys/class/leds/led0/trigger') os.system('echo none | sudo tee /sys/class/leds/led1/trigger') led_set(0, 0) led_set(1, 0) def led_set(led_id, state): ''' Set led with id led_id to state. ''' # led_id : 0 = PWR, 1 = ACT # state : 0 = off, 1 = on os.system("echo {} | sudo tee /sys/class/leds/led{}/brightness".format(str(state), str(led_id))) def blink_pi(n): ''' Blink ACT and PWR leds altenatively n times to allow physical identification. ''' for i in range(n): led_set(0, 1) led_set(1, 0) time.sleep(.2) led_set(0, 0) led_set(1, 1) time.sleep(.2) # restore default behavior led_set(0, 0) led_set(1, 0) return "OK" def thread_blink(): ''' Blink leds as a thread to avoid blocking. ''' th = threading.Thread(target=blink_pi, args=(16,)) th.start() def list_media_files(folder): ''' List files in folder which extension is allowed (exists in media_exts). ''' if os.path.exists(folder): files = os.listdir(folder) medias = [] for fd in files: if len(fd.split('.')) > 1: if fd.split('.')[1] in media_exts: # TODO : Return a dict list [{"filename":"foo", "size":-1}, {...}] fd_size = os.stat(folder + fd).st_size file_dict = {"filename": fd, "size": fd_size} medias.append(file_dict) return medias else: return [] def generate_thumbnails(): media_files = list_media_files(upload_folder) for media in media_files: subprocess.call(['ffmpeg', '-i', upload_folder + "/" + media, '-q:v', '30', '-s', '160x120', '-vf', 'boxblur=2', '-ss', '00:00:01.000', '-vframes', '1', thumbnails_folder + "/" + media + ".jpg", "-y"]) @app.route("/") @auth.login_required def main(): return _("Nothing to see here !") @app.route("/rssi") @auth.login_required def rssi(): return get_RSSI() @app.route("/blink") @auth.login_required def blink(): if running_on_rpi(): thread_blink() return _("Blinkin") @app.route("/reboot") @auth.login_required def reboot(): stdout = subprocess.run(["sudo", "/usr/sbin/reboot"], capture_output=True) print(stdout) return _("Rebooting...") @app.route("/poweroff") @auth.login_required def shutdown(): stdout = subprocess.run(["sudo", "/usr/sbin/poweroff"], capture_output=True) print(stdout) return _("Shutting down...") # File upload @app.route('/upload', methods=['GET', 'POST']) @auth.login_required def upload_file(over_write=1): global upload_candidates media_files = list_media_files(upload_folder) if debug: print("Upload candidates : {}".format(str(list_media_files(upload_folder)))) if request.method == "GET": # Send candidates list if debug: print("GET :" + str(upload_candidates)) return upload_candidates if request.method == "POST": if request.is_json: print("HERE") # JSON received, parse it upload_candidates = request.get_json() # ~ upload_candidates = [candidate for candidate in upload_candidates if candidate not in media_files] upload_laureates = [] for candidate in upload_candidates: if candidate not in media_files: upload_laureates.append(candidate) # TODO : Compare existing file size to new file and upload if different # received data of type list of dict [{"filename": "foo", "size": 0}, {...}] elif candidate in media_files: duplicate_index = media_files.index(candidate) if candidate["size"] != media_files[duplicate_index]["size"]: upload_laureates.append(candidate) upload_candidates = upload_laureates if debug: print("POST :" + str(upload_candidates)) return upload_candidates else: # Check if the post request has the file part if "file" not in request.files: return _("No file part: {}").format(str(request.files)) file = request.files["file"] # If the user does not select a file, the browser submits an # empty file without a filename. if file.filename == "": return _("No selected file") if file and allowed_ext(file.filename): if debug: print("Uploading file {} in {}.".format(str(file.filename.strip("/")), upload_folder)) if (file.filename.strip("/") not in media_files) or over_write: filename = secure_filename(file.filename) file.save(os.path.join(upload_folder, filename)) return _("File saved in {}").format(upload_folder) if debug: print("File exists, skipping...") return "File exists, skipping..." return "OK" @app.route("/thumb/") def get_thumbnail(media): if media: media_full_path = thumbnails_folder + os.sep + media + ".jpg" if os.path.exists(media_full_path): return send_file(media_full_path, mimetype='image/jpg') else: # Return a svg file as bytes, so that we can use gettext on its content svg = '{}'.format(_("MEDIA NOT FOUND")) buf = BytesIO(bytes(svg, "UTF-8")) return send_file(buf, mimetype="image/svg+xml") if __name__ == '__main__': generate_thumbnails() # Turn ACT and POW leds off on start if running_on_rpi(): led_init() # ~ app.run() serve(app, host='127.0.0.1', port=5000, url_scheme=HTTP_url_scheme)