#!/usr/bin/env python # pilpil-client 0.1 # abelliqueux from flask import Flask, flash, request, redirect, url_for, send_file from flask_httpauth import HTTPBasicAuth from io import BytesIO import gettext import os import subprocess import threading import time import toml from waitress import serve from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename # l10n LOCALE = os.getenv('LANG', 'en') _ = gettext.translation('template', localedir='locales', languages=[LOCALE]).gettext app = Flask(__name__) app.config.from_file("defaults.toml", load=toml.load) config_locations = ["./", "~/.", "~/.config/"] for location in config_locations: # Optional config files, ~ is expanded to $HOME on *nix, %USERPROFILE% on windows if app.config.from_file(os.path.expanduser(location + "pilpil-client.toml"), load=toml.load, silent=True): print(_("Found configuration file in {}").format(os.path.expanduser(location))) upload_folder = os.path.join(os.path.expanduser(app.config['DEFAULT']['media_folder_local']), "") thumbnails_folder = os.path.join(os.path.expanduser(app.config['DEFAULT']['thumbnails_folder']), "") media_exts = app.config['DEFAULT']['media_exts'] HTTP_secret = str(app.config['DEFAULT']['auth']) debug = app.config['DEFAULT']['debug'] useSSL = app.config['DEFAULT']['useSSL'] rssi_signal = 0 # HTTP upload settings app.config['UPLOAD_FOLDER'] = upload_folder # Max upload size 100M ( see nginx default also ) app.config['MAX_CONTENT_LENGTH'] = 100 * 1000 * 1000 # HTTP auth settings auth = HTTPBasicAuth() users = { "": generate_password_hash(HTTP_secret), } HTTP_url_scheme = "http" if useSSL: HTTP_url_scheme += "s" if debug: print(HTTP_url_scheme) upload_candidates = {} @auth.verify_password def verify_password(username, password): ''' Check HTTP auth username and password ''' if username in users and check_password_hash(users.get(username), password): return username def allowed_ext(filename): ''' Check if filename has an allowed extension. ''' # Check for dot in filename if "." in filename: # Split from right at first dot to find ext and allow files with "." in name if filename.rsplit(".", 1)[-1] in media_exts: return True def XMLify(string, child_node_name="child"): ''' ''' root_element = "root" if debug: print("<{0}><{1}>{2}".format(root_element, child_node_name, string)) return "<{0}><{1}>{2}".format(root_element, child_node_name, string) def get_RSSI(): ''' Get wireless signal level to AP ''' with open("/proc/net/wireless", "r") as wl: for line in wl: pass last_line = line rssi_signal = last_line.split(' ')[3][:-1].strip("-").strip("\n") if debug: print(rssi_signal) return XMLify(rssi_signal, child_node_name="rssi") def running_on_rpi(): with open("/proc/cpuinfo", "r") as wl: for line in wl: if line.lower().find("raspberry") > 0: return True return False def led_init(): ''' Set ACT and PWR leds trigger and turn them off ''' os.system('echo none | sudo tee /sys/class/leds/led0/trigger 1> /dev/null') os.system('echo none | sudo tee /sys/class/leds/led1/trigger 1> /dev/null') led_set(0, 0) led_set(1, 0) def led_set(led_id, state): ''' Set led with id led_id to state. ''' # led_id : 0 = PWR, 1 = ACT # state : 0 = off, 1 = on os.system("echo {} | sudo tee /sys/class/leds/led{}/brightness 1> /dev/null".format(str(state), str(led_id))) def blink_pi(n): ''' Blink ACT and PWR leds altenatively n times to allow physical identification. ''' for i in range(n): led_set(0, 1) led_set(1, 0) time.sleep(.2) led_set(0, 0) led_set(1, 1) time.sleep(.2) # restore default behavior led_set(0, 0) led_set(1, 0) return "OK" def thread_blink(): ''' Blink leds as a thread to avoid blocking. ''' th = threading.Thread(target=blink_pi, args=(16,)) th.start() def list_media_files(folder): ''' List files in folder which extension is allowed (exists in media_exts). ''' if os.path.exists(folder): files = os.listdir(folder) medias = [] for fd in files: if len(fd.split('.')) > 1: if fd.split('.')[1] in media_exts: fd_size = os.stat(folder + fd).st_size file_dict = {"filename": fd, "size": fd_size} medias.append(file_dict) return medias else: return [] def generate_thumbnails(filename=''): if not os.path.exists(thumbnails_folder): os.mkdir(thumbnails_folder) if filename: media_files = [{"filename": filename}] else: media_files = list_media_files(upload_folder) for media in media_files: if not os.path.exists(os.path.join(thumbnails_folder, media["filename"]) + ".jpg"): print("Generating thumbnail for {}...".format(media["filename"])) subprocess.call(['ffmpeg', '-v', '-8','-i', os.path.join(upload_folder, media["filename"]), '-q:v', '20', '-s', '320x240', '-vf', 'boxblur=2', '-ss', '00:00:01.000', '-vframes', '1', os.path.join(thumbnails_folder, media["filename"]) + ".jpg", "-y"]) else: print("Thumbnail exists for {}...".format(media["filename"])) def value_in_dict_list(value, dict_list, key): for dictionary in dict_list: if dictionary["filename"] == value: return dictionary return False def pre_run(): generate_thumbnails() # Turn ACT and POW leds off on start if running_on_rpi(): led_init() with app.app_context(): pre_run() @app.route("/") @auth.login_required def main(): return _("Nothing to see here !") @app.route("/rssi") @auth.login_required def rssi(): return get_RSSI() @app.route("/blink") @auth.login_required def blink(): if running_on_rpi(): thread_blink() return _("Blinkin") @app.route("/reboot") @auth.login_required def reboot(): stdout = subprocess.run(["sudo", "/usr/sbin/reboot"], capture_output=True) print(stdout) return _("Rebooting...") @app.route("/poweroff") @auth.login_required def shutdown(): stdout = subprocess.run(["sudo", "/usr/sbin/poweroff"], capture_output=True) print(stdout) return _("Shutting down...") # File upload @app.route('/upload/', defaults={"filename": "null", "chunk_size" : 102400}, methods=['GET', 'POST']) @app.route('/upload//', methods=['GET', 'POST']) @auth.login_required def upload_file(filename, chunk_size, over_write=1): global upload_candidates media_files = list_media_files(upload_folder) if debug: print("Upload candidates : {}".format(str(list_media_files(upload_folder)))) if request.method == "GET": # Send candidates list if debug: print("GET :" + str(upload_candidates)) return upload_candidates if request.method == "POST": # JSON received, parse it if request.is_json: upload_candidates = request.get_json() # ~ upload_candidates = [candidate for candidate in upload_candidates if candidate not in media_files] upload_laureates = [] for candidate in upload_candidates: if candidate not in media_files: upload_laureates.append(candidate) elif candidate in media_files: duplicate_index = media_files.index(candidate) if candidate["size"] != media_files[duplicate_index]["size"]: upload_laureates.append(candidate) upload_candidates = upload_laureates if debug: print("POST :" + str(upload_candidates)) return upload_candidates # File received, get it else: print("Sending " + filename) # Check if the post request has the file part # ~ if "file" not in request.files: # ~ return _("No file part: {}").format(str(request.files)) # ~ upload = request.files["file"] # If the user does not select a file, the browser submits an # empty file without a filename. if filename == "": return _("No selected file") if filename and allowed_ext(filename): if debug: print(media_files) print(upload_candidates) print("File {} is allowed.".format(str(filename.strip("/")))) local_file_exists = value_in_dict_list(filename, media_files, "filename") remote_file_infos = value_in_dict_list(filename, upload_candidates, "filename") filename_l = secure_filename(filename) file_path = os.path.join(upload_folder, filename_l) # TODO : Find a way to remove incomplete uploads # ~ if file_exists and over_write: #if local_file_exists["size"] < remote_file_infos["size"]/10): # ~ fd = open(file_path, "wb") # ~ fd.write(b'') # ~ over_write = 0 # ~ if debug: # ~ print("Overwriting file {} in {}.".format(filename_l, file_path)) # ~ if filename not in media_files: # ~ if (not local_file_exists) or (local_file_exists["size"] < remote_file_infos["size"]/10): if debug: print("Uploading file {} in {}.".format(filename_l, file_path)) with open(file_path, "ab") as dl_file: part_size = int(chunk_size) chunk = request.stream.read(part_size) if debug: print(len(chunk)) dl_file.write(chunk) # ~ file.save(os.path.join(upload_folder, filename)) generate_thumbnails(filename_l) return _("File saved as {}").format(file_path) return "OK" @app.route("/thumb/") def get_thumbnail(media): if media: media_full_path = thumbnails_folder + os.sep + media + ".jpg" if os.path.exists(media_full_path): return send_file(media_full_path, mimetype='image/jpg') else: # Return a svg file as bytes, so that we can use gettext on its content svg = '{}'.format(_("MEDIA NOT FOUND")) buf = BytesIO(bytes(svg, "UTF-8")) return send_file(buf, mimetype="image/svg+xml") if __name__ == '__main__': # ~ app.run() serve(app, host='127.0.0.1', port=5000, url_scheme=HTTP_url_scheme)