pilpil-client/app.py

219 lines
6.3 KiB
Python
Executable File

#!/usr/bin/env python
# pilpil-client 0.1
# abelliqueux <contact@arthus.net>
from flask import Flask, flash, request, redirect, url_for
from flask_httpauth import HTTPBasicAuth
import gettext
import os
import subprocess
import threading
import time
import toml
from waitress import serve
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
# l10n
LOCALE = os.getenv('LANG', 'en')
_ = gettext.translation('template', localedir='locales', languages=[LOCALE]).gettext
app = Flask(__name__)
app.config.from_file("defaults.toml", load=toml.load)
config_locations = ["./", "~/.", "~/.config/"]
for location in config_locations:
# Optional config files, ~ is expanded to $HOME on *nix, %USERPROFILE% on windows
if app.config.from_file(os.path.expanduser( location + "pilpil-client.toml"), load=toml.load, silent=True):
print(_("Found configuration file in {}").format( os.path.expanduser(location)))
upload_folder = os.path.expanduser(app.config['DEFAULT']['media_folder_local'])
media_exts = app.config['DEFAULT']['media_exts']
HTTP_secret = str(app.config['DEFAULT']['auth'])
debug = app.config['DEFAULT']['debug']
useSSL = app.config['DEFAULT']['useSSL']
rssi_signal = 0
# HTTP upload settings
app.config['UPLOAD_FOLDER'] = upload_folder
# Max upload size 100M ( see nginx default also )
app.config['MAX_CONTENT_LENGTH'] = 100 * 1000 * 1000
# HTTP auth settings
auth = HTTPBasicAuth()
users = {
"": generate_password_hash(HTTP_secret),
}
HTTP_url_scheme = "http"
if useSSL:
HTTP_url_scheme += "s"
if debug:
print(HTTP_url_scheme)
@auth.verify_password
def verify_password(username, password):
'''
Check HTTP auth username and password
'''
if username in users and check_password_hash(users.get(username), password):
return username
def allowed_ext(filename):
'''
Check if filename has an allowed extension.
'''
# Check for dot in filename
if "." in filename:
# Split from right at first dot to find ext and allow files with "." in name
if filename.rsplit(".",1)[-1] in media_exts:
return True
def XMLify(string, child_node_name="child"):
'''
'''
root_element = "root"
if debug:
print("<{0}><{1}>{2}</{1}></{0}>".format(root_element, child_node_name, string))
return "<{0}><{1}>{2}</{1}></{0}>".format(root_element, child_node_name, string)
def get_RSSI():
'''
Get wireless signal level to AP
'''
with open("/proc/net/wireless", "r") as wl:
for line in wl:
pass
last_line = line
rssi_signal = last_line.split(' ')[3][:-1].strip("-").strip("\n")
if debug:
print(rssi_signal)
return XMLify(rssi_signal, child_node_name="rssi")
def running_on_rpi():
with open("/proc/cpuinfo", "r") as wl:
for line in wl:
if line.lower().find("raspberry") > 0 :
return True
return False
def led_init():
'''
Set ACT and PWR leds trigger and turn them off
'''
os.system('echo none | sudo tee /sys/class/leds/led0/trigger')
os.system('echo none | sudo tee /sys/class/leds/led1/trigger')
led_set(0, 0)
led_set(1, 0)
def led_set(led_id, state):
'''
Set led with id led_id to state.
'''
# led_id : 0 = PWR, 1 = ACT
# state : 0 = off, 1 = on
os.system("echo {} | sudo tee /sys/class/leds/led{}/brightness".format(str(state), str(led_id)))
def blink_pi(n):
'''
Blink ACT and PWR leds altenatively n times to allow physical identification.
'''
for i in range(n):
led_set(0, 1)
led_set(1, 0)
time.sleep(.2)
led_set(0, 0)
led_set(1, 1)
time.sleep(.2)
# restore default behavior
led_set(0, 0)
led_set(1, 0)
return "OK"
def thread_blink():
'''
Blink leds as a thread to avoid blocking.
'''
th = threading.Thread(target=blink_pi, args=(16,))
th.start()
def list_media_files(folder):
'''
List files in folder which extension is allowed (exists in media_exts).
'''
if os.path.exists(folder):
files = os.listdir(folder);
medias = []
for fd in files:
if len(fd.split('.')) > 1:
if fd.split('.')[1] in media_exts:
medias.append(fd)
return medias
else:
return []
@app.route("/")
@auth.login_required
def main():
return _("Nothing to see here !")
@app.route("/rssi")
@auth.login_required
def rssi():
return get_RSSI()
@app.route("/blink")
@auth.login_required
def blink():
thread_blink()
return _("Blinkin")
@app.route("/reboot")
@auth.login_required
def reboot():
stdout = subprocess.run(["sudo", "/usr/sbin/reboot"], capture_output=True)
print(stdout)
return _("Rebooting...")
@app.route("/poweroff")
@auth.login_required
def shutdown():
stdout = subprocess.run(["sudo", "/usr/sbin/poweroff"], capture_output=True)
print(stdout)
return _("Shutting down...")
# File upload
@app.route('/upload', methods=['GET', 'POST'])
@auth.login_required
def upload_file(over_write=1):
if debug:
print("Existing files : {}".format(str(list_media_files(upload_folder))))
if request.method == "POST":
# Check if the post request has the file part
if "file" not in request.files:
return _("No file part: {}").format(str(request.files))
file = request.files["file"]
# If the user does not select a file, the browser submits an
# empty file without a filename.
if file.filename == "":
return _("No selected file")
if file and allowed_ext(file.filename):
if debug:
print("Uploading file {} in {}.".format(str(file.filename.strip("/")), upload_folder))
if (file.filename.strip("/") not in list_media_files(upload_folder)) and over_write:
filename = secure_filename(file.filename)
file.save(os.path.join(upload_folder, filename))
return _("File saved in {}").format(upload_folder)
if debug:
print("File exists, skipping...")
return "File exists, skipping..."
return "OK"
if __name__ == '__main__':
# Turn ACT and POW leds off on start
if running_on_rpi():
led_init()
# ~ app.run()
serve(app, host='127.0.0.1', port=5000, url_scheme=HTTP_url_scheme)