pilpil-client/app.py

331 lines
11 KiB
Python
Raw Normal View History

2022-10-06 11:47:21 +02:00
#!/usr/bin/env python
2022-11-24 20:15:22 +01:00
# pilpil-client 0.1
2022-11-05 20:05:09 +01:00
# abelliqueux <contact@arthus.net>
2022-11-24 20:15:22 +01:00
from flask import Flask, flash, request, redirect, url_for, send_file
2022-10-06 11:47:21 +02:00
from flask_httpauth import HTTPBasicAuth
2022-11-25 14:36:09 +01:00
from io import BytesIO
2022-11-05 20:05:09 +01:00
import gettext
import os
import subprocess
import threading
import time
import toml
from waitress import serve
2022-10-06 11:47:21 +02:00
from werkzeug.security import generate_password_hash, check_password_hash
2022-10-19 15:14:43 +02:00
from werkzeug.utils import secure_filename
2022-10-19 16:22:05 +02:00
2022-11-03 14:42:17 +01:00
# l10n
LOCALE = os.getenv('LANG', 'en')
_ = gettext.translation('template', localedir='locales', languages=[LOCALE]).gettext
2022-11-05 20:05:09 +01:00
app = Flask(__name__)
2022-10-19 16:22:05 +02:00
app.config.from_file("defaults.toml", load=toml.load)
config_locations = ["./", "~/.", "~/.config/"]
for location in config_locations:
# Optional config files, ~ is expanded to $HOME on *nix, %USERPROFILE% on windows
2022-11-24 20:15:22 +01:00
if app.config.from_file(os.path.expanduser(location + "pilpil-client.toml"), load=toml.load, silent=True):
print(_("Found configuration file in {}").format(os.path.expanduser(location)))
2022-10-19 16:22:05 +02:00
2022-12-05 13:19:43 +01:00
upload_folder = os.path.join(os.path.expanduser(app.config['DEFAULT']['media_folder_local']), "")
2023-01-06 14:18:29 +01:00
thumbnails_folder = os.path.join(os.path.expanduser(app.config['DEFAULT']['thumbnails_folder']), "")
2022-11-05 20:05:09 +01:00
media_exts = app.config['DEFAULT']['media_exts']
HTTP_secret = str(app.config['DEFAULT']['auth'])
debug = app.config['DEFAULT']['debug']
useSSL = app.config['DEFAULT']['useSSL']
2022-11-05 20:05:09 +01:00
rssi_signal = 0
2023-01-06 14:18:29 +01:00
2022-10-06 11:47:21 +02:00
2022-11-05 20:05:09 +01:00
# HTTP upload settings
app.config['UPLOAD_FOLDER'] = upload_folder
2022-10-19 15:14:43 +02:00
# Max upload size 100M ( see nginx default also )
app.config['MAX_CONTENT_LENGTH'] = 100 * 1000 * 1000
2022-10-06 11:47:21 +02:00
2022-11-05 20:05:09 +01:00
# HTTP auth settings
2022-10-06 11:47:21 +02:00
auth = HTTPBasicAuth()
users = {
2022-11-05 20:05:09 +01:00
"": generate_password_hash(HTTP_secret),
2022-10-06 11:47:21 +02:00
}
HTTP_url_scheme = "http"
if useSSL:
HTTP_url_scheme += "s"
if debug:
print(HTTP_url_scheme)
2022-12-05 11:57:29 +01:00
upload_candidates = {}
2022-11-24 20:15:22 +01:00
2022-12-05 13:19:43 +01:00
2022-10-06 11:47:21 +02:00
@auth.verify_password
def verify_password(username, password):
2022-11-05 20:05:09 +01:00
'''
Check HTTP auth username and password
'''
2022-10-06 11:47:21 +02:00
if username in users and check_password_hash(users.get(username), password):
return username
2022-11-24 20:15:22 +01:00
2022-11-05 20:05:09 +01:00
def allowed_ext(filename):
'''
Check if filename has an allowed extension.
'''
2022-10-19 15:14:43 +02:00
# Check for dot in filename
if "." in filename:
# Split from right at first dot to find ext and allow files with "." in name
2022-11-24 20:15:22 +01:00
if filename.rsplit(".", 1)[-1] in media_exts:
2022-10-19 15:14:43 +02:00
return True
2022-11-08 15:28:53 +01:00
2022-11-24 20:15:22 +01:00
2022-11-07 12:53:26 +01:00
def XMLify(string, child_node_name="child"):
'''
'''
root_element = "root"
if debug:
print("<{0}><{1}>{2}</{1}></{0}>".format(root_element, child_node_name, string))
return "<{0}><{1}>{2}</{1}></{0}>".format(root_element, child_node_name, string)
2022-11-24 20:15:22 +01:00
2022-11-05 20:05:09 +01:00
def get_RSSI():
'''
Get wireless signal level to AP
'''
with open("/proc/net/wireless", "r") as wl:
for line in wl:
pass
last_line = line
rssi_signal = last_line.split(' ')[3][:-1].strip("-").strip("\n")
if debug:
print(rssi_signal)
2022-11-07 12:53:26 +01:00
return XMLify(rssi_signal, child_node_name="rssi")
2022-11-05 20:05:09 +01:00
2022-11-24 20:15:22 +01:00
2022-11-15 15:57:59 +01:00
def running_on_rpi():
with open("/proc/cpuinfo", "r") as wl:
for line in wl:
2022-11-24 20:15:22 +01:00
if line.lower().find("raspberry") > 0:
2022-11-15 15:57:59 +01:00
return True
return False
2022-11-24 20:15:22 +01:00
2022-11-05 20:05:09 +01:00
def led_init():
'''
Set ACT and PWR leds trigger and turn them off
'''
os.system('echo none | sudo tee /sys/class/leds/led0/trigger 1> /dev/null')
os.system('echo none | sudo tee /sys/class/leds/led1/trigger 1> /dev/null')
2022-11-05 20:05:09 +01:00
led_set(0, 0)
led_set(1, 0)
2022-11-24 20:15:22 +01:00
2022-11-05 20:05:09 +01:00
def led_set(led_id, state):
'''
Set led with id led_id to state.
'''
# led_id : 0 = PWR, 1 = ACT
# state : 0 = off, 1 = on
os.system("echo {} | sudo tee /sys/class/leds/led{}/brightness 1> /dev/null".format(str(state), str(led_id)))
2022-11-05 20:05:09 +01:00
2022-11-24 20:15:22 +01:00
2022-11-05 20:05:09 +01:00
def blink_pi(n):
'''
Blink ACT and PWR leds altenatively n times to allow physical identification.
'''
for i in range(n):
led_set(0, 1)
led_set(1, 0)
2022-10-06 11:47:21 +02:00
time.sleep(.2)
2022-11-05 20:05:09 +01:00
led_set(0, 0)
led_set(1, 1)
2022-10-06 11:47:21 +02:00
time.sleep(.2)
2022-10-24 19:42:11 +02:00
# restore default behavior
2022-11-05 20:05:09 +01:00
led_set(0, 0)
led_set(1, 0)
2022-10-06 11:47:21 +02:00
return "OK"
2022-11-24 20:15:22 +01:00
2022-11-05 20:05:09 +01:00
def thread_blink():
'''
Blink leds as a thread to avoid blocking.
'''
th = threading.Thread(target=blink_pi, args=(16,))
2022-10-25 12:50:56 +02:00
th.start()
2022-11-08 15:28:53 +01:00
2022-11-24 20:15:22 +01:00
2022-11-08 15:28:53 +01:00
def list_media_files(folder):
'''
List files in folder which extension is allowed (exists in media_exts).
'''
if os.path.exists(folder):
2022-11-24 20:15:22 +01:00
files = os.listdir(folder)
2022-11-08 15:28:53 +01:00
medias = []
for fd in files:
if len(fd.split('.')) > 1:
if fd.split('.')[1] in media_exts:
2022-12-05 11:57:29 +01:00
fd_size = os.stat(folder + fd).st_size
file_dict = {"filename": fd, "size": fd_size}
medias.append(file_dict)
2022-11-08 15:28:53 +01:00
return medias
else:
return []
2022-11-24 20:15:22 +01:00
2023-01-09 16:08:06 +01:00
def generate_thumbnails(filename=''):
2023-01-06 14:18:29 +01:00
if not os.path.exists(thumbnails_folder):
2023-01-09 16:08:06 +01:00
os.mkdir(thumbnails_folder)
if filename:
media_files = [{"filename": filename}]
else:
media_files = list_media_files(upload_folder)
2022-11-24 20:15:22 +01:00
for media in media_files:
2022-12-14 17:04:44 +01:00
if not os.path.exists(os.path.join(thumbnails_folder, media["filename"]) + ".jpg"):
print("Generating thumbnail for {}...".format(media["filename"]))
subprocess.call(['ffmpeg', '-v', '-8','-i', os.path.join(upload_folder, media["filename"]), '-q:v', '20', '-s', '320x240', '-vf', 'boxblur=2', '-ss', '00:00:01.000', '-vframes', '1', os.path.join(thumbnails_folder, media["filename"]) + ".jpg", "-y"])
else:
print("Thumbnail exists for {}...".format(media["filename"]))
2022-11-24 20:15:22 +01:00
2022-12-14 15:02:11 +01:00
def value_in_dict_list(value, dict_list, key):
for dictionary in dict_list:
if dictionary["filename"] == value:
return dictionary
return False
2022-11-24 20:15:22 +01:00
def pre_run():
generate_thumbnails()
# Turn ACT and POW leds off on start
if running_on_rpi():
led_init()
with app.app_context():
pre_run()
2022-10-06 11:47:21 +02:00
@app.route("/")
@auth.login_required
def main():
2022-11-03 14:42:17 +01:00
return _("Nothing to see here !")
2022-10-06 11:47:21 +02:00
2022-11-24 20:15:22 +01:00
2022-10-06 11:47:21 +02:00
@app.route("/rssi")
@auth.login_required
2022-10-25 12:50:56 +02:00
def rssi():
2022-11-05 20:05:09 +01:00
return get_RSSI()
2022-10-06 11:47:21 +02:00
2022-11-24 20:15:22 +01:00
2022-10-06 11:47:21 +02:00
@app.route("/blink")
@auth.login_required
def blink():
2022-11-15 17:02:38 +01:00
if running_on_rpi():
thread_blink()
2022-11-03 14:42:17 +01:00
return _("Blinkin")
2022-10-06 11:47:21 +02:00
2022-11-24 20:15:22 +01:00
2022-10-06 11:47:21 +02:00
@app.route("/reboot")
@auth.login_required
def reboot():
stdout = subprocess.run(["sudo", "/usr/sbin/reboot"], capture_output=True)
print(stdout)
2022-11-03 14:42:17 +01:00
return _("Rebooting...")
2022-10-06 11:47:21 +02:00
2022-11-24 20:15:22 +01:00
2022-10-06 11:47:21 +02:00
@app.route("/poweroff")
@auth.login_required
def shutdown():
stdout = subprocess.run(["sudo", "/usr/sbin/poweroff"], capture_output=True)
print(stdout)
2022-11-03 14:42:17 +01:00
return _("Shutting down...")
2022-10-06 11:47:21 +02:00
2022-10-19 15:14:43 +02:00
# File upload
2022-11-24 20:15:22 +01:00
2022-12-14 15:02:11 +01:00
@app.route('/upload/', defaults={"filename": "null", "chunk_size" : 102400}, methods=['GET', 'POST'])
@app.route('/upload/<filename>/<chunk_size>', methods=['GET', 'POST'])
2022-10-19 15:14:43 +02:00
@auth.login_required
2022-12-14 15:02:11 +01:00
def upload_file(filename, chunk_size, over_write=1):
2022-12-05 11:57:29 +01:00
global upload_candidates
media_files = list_media_files(upload_folder)
2022-11-08 16:55:32 +01:00
if debug:
2022-12-05 13:19:43 +01:00
print("Upload candidates : {}".format(str(list_media_files(upload_folder))))
2022-12-05 11:57:29 +01:00
if request.method == "GET":
# Send candidates list
if debug:
print("GET :" + str(upload_candidates))
return upload_candidates
2022-11-05 20:05:09 +01:00
if request.method == "POST":
2022-12-14 15:02:11 +01:00
# JSON received, parse it
2022-12-05 13:19:43 +01:00
if request.is_json:
2022-12-05 11:57:29 +01:00
upload_candidates = request.get_json()
# ~ upload_candidates = [candidate for candidate in upload_candidates if candidate not in media_files]
upload_laureates = []
for candidate in upload_candidates:
if candidate not in media_files:
upload_laureates.append(candidate)
elif candidate in media_files:
2022-12-05 13:19:43 +01:00
duplicate_index = media_files.index(candidate)
2022-12-05 11:57:29 +01:00
if candidate["size"] != media_files[duplicate_index]["size"]:
upload_laureates.append(candidate)
upload_candidates = upload_laureates
if debug:
print("POST :" + str(upload_candidates))
return upload_candidates
2022-12-14 15:02:11 +01:00
# File received, get it
2022-12-05 13:19:43 +01:00
else:
2022-12-14 15:02:11 +01:00
print("Sending " + filename)
2022-12-05 13:19:43 +01:00
# Check if the post request has the file part
2022-12-14 15:02:11 +01:00
# ~ if "file" not in request.files:
# ~ return _("No file part: {}").format(str(request.files))
# ~ upload = request.files["file"]
2022-12-05 13:19:43 +01:00
# If the user does not select a file, the browser submits an
# empty file without a filename.
2022-12-14 15:02:11 +01:00
if filename == "":
2022-12-05 13:19:43 +01:00
return _("No selected file")
2022-12-14 15:02:11 +01:00
if filename and allowed_ext(filename):
2022-12-05 13:19:43 +01:00
if debug:
2022-12-14 15:02:11 +01:00
print(media_files)
print(upload_candidates)
print("File {} is allowed.".format(str(filename.strip("/"))))
local_file_exists = value_in_dict_list(filename, media_files, "filename")
remote_file_infos = value_in_dict_list(filename, upload_candidates, "filename")
filename_l = secure_filename(filename)
file_path = os.path.join(upload_folder, filename_l)
# TODO : Find a way to remove incomplete uploads
# ~ if file_exists and over_write:
#if local_file_exists["size"] < remote_file_infos["size"]/10):
# ~ fd = open(file_path, "wb")
# ~ fd.write(b'')
# ~ over_write = 0
# ~ if debug:
# ~ print("Overwriting file {} in {}.".format(filename_l, file_path))
# ~ if filename not in media_files:
# ~ if (not local_file_exists) or (local_file_exists["size"] < remote_file_infos["size"]/10):
2022-12-05 13:19:43 +01:00
if debug:
2022-12-14 15:02:11 +01:00
print("Uploading file {} in {}.".format(filename_l, file_path))
with open(file_path, "ab") as dl_file:
part_size = int(chunk_size)
chunk = request.stream.read(part_size)
if debug:
print(len(chunk))
dl_file.write(chunk)
# ~ file.save(os.path.join(upload_folder, filename))
2023-01-09 16:08:06 +01:00
generate_thumbnails(filename_l)
2022-12-14 15:02:11 +01:00
return _("File saved as {}").format(file_path)
2022-10-19 15:14:43 +02:00
return "OK"
2022-11-24 20:15:22 +01:00
@app.route("/thumb/<media>")
def get_thumbnail(media):
2022-11-25 14:36:09 +01:00
if media:
media_full_path = thumbnails_folder + os.sep + media + ".jpg"
if os.path.exists(media_full_path):
return send_file(media_full_path, mimetype='image/jpg')
else:
# Return a svg file as bytes, so that we can use gettext on its content
svg = '<svg width="320" height="240" viewBox="0 0 84.6 63.5" version="1.1" id="not_found" xmlns="http://www.w3.org/2000/svg" xmlns:svg="http://www.w3.org/2000/svg"><path id="not_found_x" style="fill:#f00;" d="M 3.076298,0.04960937 H 0.04960937 V 1.5394409 L 40.0389,31.278194 0.04960937,61.017464 v 2.432927 H 1.8076416 L 42.553475,33.148364 83.299825,63.450391 h 1.317232 V 60.691386 L 45.067533,31.278194 84.617057,1.8655192 V 0.04960937 H 82.031169 L 42.553475,29.408541 Z" /><text xml:space="preserve" style="font-size:8px;stroke:#000;stroke-width:2px;" x="50%" y="50%" id="not_found_txt"><tspan id="not_found_span" style="text-align:center;text-anchor:middle;stroke:none;">{}</tspan></text></svg>'.format(_("MEDIA NOT FOUND"))
buf = BytesIO(bytes(svg, "UTF-8"))
return send_file(buf, mimetype="image/svg+xml")
2022-10-06 11:47:21 +02:00
if __name__ == '__main__':
# ~ app.run()
serve(app, host='127.0.0.1', port=5000, url_scheme=HTTP_url_scheme)