This commit is contained in:
ABelliqueux 2022-11-05 20:05:09 +01:00
parent a3ca5174f3
commit 8ffa1bd0e3
1 changed files with 81 additions and 64 deletions

143
app.py
View File

@ -1,100 +1,117 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # pilpil-client 0.1
import os, time, subprocess, threading, toml # abelliqueux <contact@arthus.net>
from flask import Flask, flash, request, redirect, url_for from flask import Flask, flash, request, redirect, url_for
# HTTP auth
from flask_httpauth import HTTPBasicAuth from flask_httpauth import HTTPBasicAuth
import gettext
import os
import subprocess
import threading
import time
import toml
from waitress import serve
from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.security import generate_password_hash, check_password_hash
# FILE UPLOAD
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
app = Flask(__name__)
# l10n # l10n
import gettext
LOCALE = os.getenv('LANG', 'en') LOCALE = os.getenv('LANG', 'en')
_ = gettext.translation('template', localedir='locales', languages=[LOCALE]).gettext _ = gettext.translation('template', localedir='locales', languages=[LOCALE]).gettext
app = Flask(__name__)
app.config.from_file("defaults.toml", load=toml.load) app.config.from_file("defaults.toml", load=toml.load)
config_locations = ["./", "~/.", "~/.config/"] config_locations = ["./", "~/.", "~/.config/"]
for location in config_locations: for location in config_locations:
# Optional config files, ~ is expanded to $HOME on *nix, %USERPROFILE% on windows # Optional config files, ~ is expanded to $HOME on *nix, %USERPROFILE% on windows
# ~ app.config.from_file("videopi.toml", load=toml.load, silent=True)
if app.config.from_file(os.path.expanduser( location + "pilpil-client.toml"), load=toml.load, silent=True): if app.config.from_file(os.path.expanduser( location + "pilpil-client.toml"), load=toml.load, silent=True):
print(_("Found configuration file in {}").format( os.path.expanduser(location))) print(_("Found configuration file in {}").format( os.path.expanduser(location)))
#UPLOAD_FOLDER = os.path.expanduser('~/Videos') upload_folder = os.path.expanduser(app.config['DEFAULT']['media_folder_local'])
#ALLOWED_EXTENSIONS = {'avi', 'mkv', 'mp4'} media_exts = app.config['DEFAULT']['media_exts']
HTTP_secret = str(app.config['DEFAULT']['auth'])
debug = app.config['DEFAULT']['debug']
rssi_signal = 0
UPLOAD_FOLDER = os.path.expanduser(app.config['DEFAULT']['media_folder_local']) # HTTP upload settings
ALLOWED_EXTENSIONS = app.config['DEFAULT']['media_exts'] app.config['UPLOAD_FOLDER'] = upload_folder
HTTP_SECRET = str(app.config['DEFAULT']['auth'])
DEBUG = app.config['DEFAULT']['debug']
#HTTPS
#ASSETS_DIR = os.path.dirname(os.path.abspath(__file__))
# HTTP Serve
from waitress import serve
#app.secret_key = b'flafoudi'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# Max upload size 100M ( see nginx default also ) # Max upload size 100M ( see nginx default also )
app.config['MAX_CONTENT_LENGTH'] = 100 * 1000 * 1000 app.config['MAX_CONTENT_LENGTH'] = 100 * 1000 * 1000
# HTTP auth settings
auth = HTTPBasicAuth() auth = HTTPBasicAuth()
users = { users = {
"": generate_password_hash(HTTP_SECRET), "": generate_password_hash(HTTP_secret),
} }
@auth.verify_password @auth.verify_password
def verify_password(username, password): def verify_password(username, password):
'''
Check HTTP auth username and password
'''
if username in users and check_password_hash(users.get(username), password): if username in users and check_password_hash(users.get(username), password):
return username return username
signal = 0 def allowed_ext(filename):
'''
# Check file extension is allowed Check if filename has an allowed extension.
def allowed_file(filename): '''
# Check for dot in filename # Check for dot in filename
if "." in filename: if "." in filename:
# Split from right at first dot to find ext and allow files with "." in name # Split from right at first dot to find ext and allow files with "." in name
if filename.rsplit(".",1)[-1] in ALLOWED_EXTENSIONS: if filename.rsplit(".",1)[-1] in media_exts:
return True return True
# Get Wifi signal level def get_RSSI():
def getRSSI(): '''
signal = subprocess.run( "./get_rssi.sh", capture_output=True) Get wireless signal level to AP
print(signal) '''
signal = str(signal.stdout, 'UTF-8').strip("-").strip("\n") with open("/proc/net/wireless", "r") as wl:
return signal for line in wl:
pass
last_line = line
rssi_signal = last_line.split(' ')[3][:-1].strip("-").strip("\n")
if debug:
print(rssi_signal)
return rssi_signal
def ledSetup(): def led_init():
'''
Set ACT and PWR leds trigger and turn them off
'''
os.system('echo none | sudo tee /sys/class/leds/led0/trigger') os.system('echo none | sudo tee /sys/class/leds/led0/trigger')
os.system('echo none | sudo tee /sys/class/leds/led1/trigger') os.system('echo none | sudo tee /sys/class/leds/led1/trigger')
led(0, 0) led_set(0, 0)
led(1, 0) led_set(1, 0)
def led( led_id, state ): def led_set(led_id, state):
# 0 : off, 1: on '''
os.system('echo ' + str(state) + ' | sudo tee /sys/class/leds/led' + str(led_id) +'/brightness') Set led with id led_id to state.
'''
# led_id : 0 = PWR, 1 = ACT
# state : 0 = off, 1 = on
os.system("echo {} | sudo tee /sys/class/leds/led{}/brightness".format(str(state), str(led_id)))
# Blink the Pi led to allow identification def blink_pi(n):
def blinkPy(): '''
# Blink 10 times Blink ACT and PWR leds altenatively n times to allow physical identification.
for j in range(16): '''
led(0, 1) for i in range(n):
led(1, 0) led_set(0, 1)
led_set(1, 0)
time.sleep(.2) time.sleep(.2)
led(0, 0) led_set(0, 0)
led(1, 1) led_set(1, 1)
time.sleep(.2) time.sleep(.2)
# restore default behavior # restore default behavior
led(0, 0) led_set(0, 0)
led(1, 0) led_set(1, 0)
return "OK" return "OK"
def threadBlink(): def thread_blink():
th=threading.Thread(target=blinkPy) '''
Blink leds as a thread to avoid blocking.
'''
th = threading.Thread(target=blink_pi, args=(16,))
th.start() th.start()
@app.route("/") @app.route("/")
@ -105,12 +122,12 @@ def main():
@app.route("/rssi") @app.route("/rssi")
@auth.login_required @auth.login_required
def rssi(): def rssi():
return getRSSI() return get_RSSI()
@app.route("/blink") @app.route("/blink")
@auth.login_required @auth.login_required
def blink(): def blink():
threadBlink() thread_blink()
return _("Blinkin") return _("Blinkin")
@app.route("/reboot") @app.route("/reboot")
@ -132,23 +149,23 @@ def shutdown():
@app.route('/upload', methods=['GET', 'POST']) @app.route('/upload', methods=['GET', 'POST'])
@auth.login_required @auth.login_required
def upload_file(): def upload_file():
if request.method == 'POST': if request.method == "POST":
# check if the post request has the file part # Check if the post request has the file part
if 'file' not in request.files: if "file" not in request.files:
return _("No file part: {}").format(str(request.files)) return _("No file part: {}").format(str(request.files))
file = request.files['file'] file = request.files["file"]
# If the user does not select a file, the browser submits an # If the user does not select a file, the browser submits an
# empty file without a filename. # empty file without a filename.
if file.filename == '': if file.filename == "":
return _("No selected file") return _("No selected file")
if file and allowed_file(file.filename): if file and allowed_ext(file.filename):
filename = secure_filename(file.filename) filename = secure_filename(file.filename)
file.save(os.path.join(UPLOAD_FOLDER, filename)) file.save(os.path.join(upload_folder, filename))
return _("File saved in {}").format(UPLOAD_FOLDER) return _("File saved in {}").format(upload_folder)
return "OK" return "OK"
if __name__ == '__main__': if __name__ == '__main__':
# Turn ACT and POW leds off on start # Turn ACT and POW leds off on start
# ~ ledSetup() # ~ led_init()
# app.run() # app.run()
serve(app, host='127.0.0.1', port=5000, url_scheme='https') serve(app, host='127.0.0.1', port=5000, url_scheme='https')