#!/usr/bin/env python3 # import collections import locale import os import sys import threading import time import tkinter as tk from tkinter import filedialog from PIL import Image, ImageTk, ImageFilter # DLSR support import gphoto2 as gp # async FFMPEG exportation support import asyncio from ffmpeg.asyncio import FFmpeg from itertools import count from send2trash import send2trash # TODO # # X wait event mode # X remove frames # X keep images in memory (odict > list ?) # o resize images upon shooting/ crop output video ? # o project workflow # X use a different folder for each project (A, B, C, etc...) # X startup : check for existing folder with name A, B, C, etc. # - startup : offer to continue previous project or new project # - if continue, find last frame in folder X else, find next letter and create folder # o notify export endingS # X colored onion skin frame (pillow filters) # V startup frame # o open existing project screen running_from_folder = os.path.realpath(__file__) alphabet = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'] project_settings = { 'file_extension':'JPG', 'trigger_mode':'', 'projects_folder': '', # ~ 'project_letter': 'A' } class KISStopmo(tk.Tk): def __init__(self, *args, **kargs): # Default config # TODO : Import value from config file # Script settings self.onion_skin = False self.onionskin_alpha = 0.4 self.onionskin_fx = False self.fullscreen_bool = True self.screen_w, self.screen_h = 640, 480 self.framerate = 16 self.photo = None # ~ self.savepath = os.getcwd() self.end_thread = False # ~ self.project_settings = {'file_extension' : 'JPG'} # ~ self.project_letter = "A" # Camera Settings self.camera_settings = {'capturemode': 3, 'imagesize': 2, 'whitebalance': 5, 'capturetarget': 0, 'nocfcardrelease': 0, 'recordingmedia' : 1 } self.camera_status = { 'acpower': 0 # we d'rather have this set to 0 which means we're running on AC } # Window setup self.screen_w, self.screen_h = root.winfo_screenwidth(), root.winfo_screenheight() root.wm_attributes("-fullscreen", self.fullscreen_bool) root.title('KISStopmotion') root.iconbitmap('@' + os.path.join(os.getcwd(), 'kisstopmo.xbm')) # Image container creation self.label = tk.Label(root) self.label.pack() # Savepath setup self.projects_folder = filedialog.askdirectory() self.savepath = self.get_session_folder() if len(self.savepath): self.project_letter = self.savepath.split(os.sep)[-1] else: self.project_letter = 'A' # Export settings # ~ self.input_filename = "{folder}{sep}{letter}.%04d.{ext}".format(folder=self.savepath, sep=os.sep, letter=project_settings['project_letter'], ext=project_settings["file_extension"]) self.input_filename = "{folder}{sep}{letter}.%04d.{ext}".format(folder=self.savepath, sep=os.sep, letter=self.project_letter, ext=project_settings["file_extension"]) self.input_options = {"f": "image2", "r": str(self.framerate)} # ~ self.output_filename = "{folder}{sep}{filename}.mp4".format(folder=self.projects_folder, sep=os.sep, filename=self.savepath.split(os.sep)[-1]) self.output_filename = "{filename}.mp4".format(filename=self.project_letter) self.output_options = {"vf":"scale=1920:-1,crop=1920:1080:0:102"} # Get frames list self.img_list = {} self.img_list = self.get_frames_list(self.savepath) self.img_index = self.check_range(len(self.img_list)-1, False) self.splash_text = 'Stopmo!' # Camera setup self.camera = gp.check_result(gp.gp_camera_new()) try: gp.check_result(gp.gp_camera_init(self.camera)) # get configuration tree self.current_camera_config = gp.check_result(gp.gp_camera_get_config(self.camera)) self.apply_camera_settings(self.camera, self.current_camera_config) if self.check_status(self.camera, self.current_camera_config) is False: print("Warning: Some settings are not set to the recommended value!") except: self.splash_text = 'Camera not found or busy.' self.timeout = 3000 # milliseconds self.splashscreen = self.generate_splashscreen() image = self.update_image() if image: if self.onion_skin: if self.img_index: photo = self.apply_onionskin(image, self.onionskin_alpha, self.onionskin_fx) else: photo = ImageTk.PhotoImage(image) self.label.configure(image=photo) self.label.image = photo if project_settings['trigger_mode'] == 'event': root.after(1000, self.trigger_bg_loop) # ~ root.after(1000, self.wait_for_capture) # Key binding root.bind("", lambda event: root.attributes("-fullscreen", False)) root.bind("", lambda event: root.attributes("-fullscreen", True)) root.bind("", self.next_frame) root.bind("", self.previous_frame) root.bind("", self.toggle_onionskin) root.bind("

", self.preview_animation) root.bind("", self.trigger_export_animation) root.bind("", self.remove_frame) root.bind("", self.print_imglist) if project_settings['trigger_mode'] != 'event': root.bind("", self.capture_image) def generate_splashscreen(self): from PIL import Image, ImageDraw, ImageFont from io import BytesIO img = Image.new('RGB', (self.screen_w, self.screen_h), (128,128,128)) d = ImageDraw.Draw(img) if self.splash_text is not None: fnt = ImageFont.truetype("FreeMono", 100) d.text((900, 500 ), self.splash_text, fill=(255, 0, 0), font=fnt) # Save to file # img.save('test.png') # Use in-memory s = BytesIO() img.save(s, 'png') # ~ return s.getvalue() return img def find_letter_after(self, letter:str): if letter in alphabet and alphabet.index(letter) < len(alphabet) - 1: return alphabet[alphabet.index(letter) + 1] else: return False def get_projects_folder(self): if len(self.projects_folder): project_folder = self.projects_folder else: # Get user folder project_folder = os.path.expanduser('~') # If a project folder is defined in settings, use it if project_settings['projects_folder'] != '': subfolder = project_settings['projects_folder'] else: # If it doesn't exist, use a default name subfolder = 'Stopmotion Projects' project_folder = os.path.join(project_folder, subfolder) # Create folder if it doesn't exist if os.path.exists(project_folder) == False: os.mkdir(project_folder) else: if not os.path.isdir(project_folder): # If file exists but is not a folder, can't create it, abort return False return project_folder def get_session_folder(self): project_folder = self.get_projects_folder() if project_folder: sessions_list = [] dir_list = os.listdir(project_folder) # Filter folders with name only one char long for dir in dir_list: if len(dir) == 1 and dir in alphabet: sessions_list.append(dir) # If folders exist, find last folder in alphabetical order if len(sessions_list): sessions_list.sort() last_letter = sessions_list[-1] next_letter = self.find_letter_after(last_letter) if next_letter is False: return False else: next_letter = 'A' if os.path.exists(os.path.join(project_folder, next_letter)) is False: os.mkdir(os.path.join(project_folder, next_letter)) return os.path.join(project_folder, next_letter) return False def trigger_bg_loop(self): self.event_thread = threading.Thread(target=self.wait_for_capture) self.event_thread.start() def wait_for_event(self): # ~ while True: event_type, event_data = self.camera.wait_for_event(self.timeout) if event_type == gp.GP_EVENT_FILE_ADDED: cam_file = self.camera.file_get( event_data.folder, event_data.name, gp.GP_FILE_TYPE_NORMAL) # ~ next_filename = prefix+next(filename)+ext next_filename = self.return_next_frame_number(self.get_last_frame(os.getcwd())) target_path = os.path.join(os.getcwd(), next_filename) print("Image is being saved to {}".format(target_path)) cam_file.save(target_path) # ~ return 0 def print_imglist(self, event): print(self.img_list) def get_frames_list(self, folder:str): # Get JPG files list in current directory # ~ existing_animation_files = [] # ~ if not len(self.img_list): existing_animation_files = self.img_list # ~ existing_animation_files = file_list = os.listdir(folder) for file in file_list: if (file.startswith(self.project_letter) and file.endswith(project_settings['file_extension'])): if file not in existing_animation_files: # ~ existing_animation_files.append(file) existing_animation_files[file] = None if len(existing_animation_files) == 0: # If no images were found, return fake name set to -001 to init file count to 000 # ~ return ["{}.{:04d}.{}".format(project_settings['project_letter'], -1, project_settings['file_extension'])] return {"{}.{:04d}.{}".format(self.project_letter, -1, project_settings['file_extension']):None} # ~ else: # Remove fake file name as soon as we have real pics # ~ if 'A.-001.JPG' in existing_animation_files: # ~ existing_animation_files.pop('A.-001.JPG') # ~ existing_animation_files.sort() existing_animation_files = collections.OrderedDict(sorted(existing_animation_files.items())) return existing_animation_files def clean_img_list(self, folder_path): # Check file in dict exists, else remove it file_list = os.listdir(folder_path) # Iterate over copy of dict to avoid OOR error img_list_copy = dict(self.img_list) for file in img_list_copy: if file not in file_list: self.img_list.pop(file) # TODO : check this works with new workflow def batch_rename(self, folder:str): # initialize counter to 0 frame_list = self.get_frames_list(folder) counter = (".%04i." % x for x in count(0)) # ~ for i in range(len(frame_list)): for i in frame_list.keys(): # ~ if os.path.exists(os.path.realpath(frame_list[i])): if os.path.exists(os.path.realpath(i)): # ~ os.rename(os.path.realpath(frame_list[i]), os.path.realpath("{}{}{}".format(project_settings['project_letter'], next(counter), project_settings['file_extension']))) os.rename(os.path.realpath(i), os.path.realpath("{}{}{}".format(self.project_letter, next(counter), project_settings['file_extension']))) else: print(str(i) + " does not exist") return self.get_frames_list(folder) def offset_dictvalues(self, from_index=0): dict_copy = dict(self.img_list) for i in range(from_index, len(dict_copy)): if i < len(self.img_list)-1: self.img_list[list(self.img_list.keys())[i]] = list(self.img_list.values())[i+1] else: self.img_list[list(self.img_list.keys())[i]] = None # FIXME def remove_frame(self, event): if len(list(self.img_list.items())): folder_path = os.path.realpath(self.savepath) frame_name = list(self.img_list.items())[self.img_index][0] frame_path = os.path.realpath(frame_name) if not os.path.exists(frame_path): return 0 # ~ print(self.img_list) print("removing {}".format(frame_path)) # trash file send2trash(frame_path) # remove entry from dict self.img_list.pop(frame_name) # offset cached images self.offset_dictvalues(self.img_index) # rename files and get new list self.img_list = self.batch_rename(folder_path) print(self.img_list) self.clean_img_list(folder_path) # ~ print(self.img_list) # update index if possible # ~ print(self.img_index) self.img_index = self.check_range(self.img_index, False) # ~ print(self.img_index) # update display # ~ print(self.img_index) self.update_image(None, self.img_index) # ~ def open_jpg(self, filepath:str, w:int, h:int): def open_jpg(self, filetuple:tuple, w:int, h:int): # If pic not cached if filetuple[-1] is None: try: # ~ image = Image.open(filepath) # ~ print( filetuple[0] + "is not in cache") image = Image.open(os.path.join(self.savepath, filetuple[0])) # TODO : Keep aspect ratio image = image.resize((w, h)) self.img_list[filetuple[0]] = image except FileNotFoundError: return False else: # ~ print( filetuple[0] + "is in cache") image = filetuple[-1] return image # ~ def inc(self, x): # ~ if x >= len(self.img_list)-1: # ~ return 0 # ~ else: # ~ return x + 1 def check_range(self, x, loop=True): if x < 0: if loop: return len(self.img_list)-1 else: return 0 elif x > len(self.img_list)-1: if loop: return 0 else: return len(self.img_list)-1 else: return x def apply_onionskin(self, image, alpha, fx=False): # ~ prev_image = Image.open(self.img_list[self.img_index-1]) # ~ prev_image = prev_image.resize((self.screen_w, self.screen_h)) prev_image = list(self.img_list.items())[self.img_index-1] if prev_image[-1] is None: prev_image = self.open_jpg(prev_image, self.screen_w, self.screen_h) else: prev_image = prev_image[-1] if fx: prev_image = prev_image.filter(ImageFilter.FIND_EDGES) composite = Image.blend(prev_image, image, alpha) photo = ImageTk.PhotoImage(composite) return photo def update_image(self, event=None, index=None): if event is not None: self.img_index = self.check_range(self.img_index+1) if index is None: # ~ index = self.img_index index = self.check_range(self.img_index) if len(list(self.img_list.items())): # TODO: better approach if list(self.img_list.items())[index][0] == "{}.{:04d}.{}".format(self.project_letter, -1, project_settings['file_extension']): new_image = self.splashscreen else: new_image = self.open_jpg(list(self.img_list.items())[index], self.screen_w, self.screen_h) else: new_image = self.splashscreen # ~ return False # ~ print(new_image) # ~ new_image = self.open_jpg(self.img_list[index], self.screen_w, self.screen_h) if new_image: photo = ImageTk.PhotoImage(new_image) if self.onion_skin: if index: photo = self.apply_onionskin(new_image, self.onionskin_alpha, self.onionskin_fx) # ~ print(photo) self.label.configure(image=photo) self.label.image = photo return new_image else: return False def next_frame(self, event): self.img_index = self.check_range(self.img_index+1) self.update_image(None, self.img_index) def previous_frame(self, event): self.img_index = self.check_range(self.img_index-1) self.update_image(None, self.img_index) def preview_animation(self, event): # save OS state if self.onion_skin: self.onion_skin = False onion_skin_was_on = True # playback for img in self.img_list: # ~ self.update_image(None, self.img_list.index(img)) self.update_image(None, list(self.img_list.keys()).index(img)) root.update_idletasks() time.sleep(1/self.framerate) # ~ self.update_image(None, self.img_index) self.display_last_frame() # restore OS state if 'onion_skin_was_on' in locals(): self.toggle_onionskin() return 0 def toggle_onionskin(self, event=None): self.onion_skin = not self.onion_skin self.update_image() def get_last_frame(self, folder:str): # Refresh file list existing_animation_files = self.get_frames_list(folder) # Get last file # Filename pattern is A.0001.JPG # ~ return existing_animation_files[-1].split('.') print(next(reversed(existing_animation_files.keys()))) return next(reversed(existing_animation_files.keys())).split('.') def display_last_frame(self): self.img_list = self.get_frames_list(self.savepath) self.img_index = len(self.img_list)-1 self.update_image(None, self.img_index) def return_next_frame_number(self, last_frame_name): prefix, filecount, ext = last_frame_name filename = '.{:04d}.'.format(int(filecount)+1) # ~ filename = (".%04i." % x for x in count(int(filecount) + 1)) # ~ return prefix + next(filename) + ext return prefix + filename + ext def set_config_value(self, config, setting, new_value): cur_setting = gp.check_result(gp.gp_widget_get_child_by_name(config, setting)) # find corresponding choice cur_setting_choice = gp.check_result(gp.gp_widget_get_choice(cur_setting, new_value)) # set config value gp.check_result(gp.gp_widget_set_value(cur_setting, cur_setting_choice)) def apply_camera_settings(self, camera, config): # iterate over the settings dictionary for setting in self.camera_settings: # find the capture mode config item # ~ cur_setting = gp.check_result(gp.gp_widget_get_child_by_name(config, setting)) # ~ # find corresponding choice # ~ cur_setting_choice = gp.check_result(gp.gp_widget_get_choice(cur_setting, camera_settings[setting])) # ~ # set config value # ~ gp.check_result(gp.gp_widget_set_value(cur_setting, cur_setting_choice)) self.set_config_value(config, setting, self.camera_settings[setting]) # validate config gp.check_result(gp.gp_camera_set_config(self.camera, config)) def check_status_value(self, config, value, optimal_value=None): cur_check = gp.check_result(gp.gp_widget_get_child_by_name(config, value)) cur_check_value = gp.check_result(gp.gp_widget_get_value(cur_check)) if optimal_value is not None: cur_check_choice = gp.check_result(gp.gp_widget_get_choice(cur_check, optimal_value[value])) return [cur_check_value, cur_check_choice] else: return cur_check_value def check_status(self, camera, config): for value in self.camera_status: # ~ cur_check = gp.check_result(gp.gp_widget_get_child_by_name(config, value)) # ~ cur_check_choice = gp.check_result(gp.gp_widget_get_choice(cur_check, camera_status[value])) # ~ cur_check_value = gp.check_result(gp.gp_widget_get_value(cur_check)) cur_check_value, cur_check_choice = self.check_status_value(config, value, self.camera_status) if cur_check_value == cur_check_choice: return True else: # Some values are not optimal return False # ~ def find_shutterspeed(camera, config): def find_shutterspeed(self, camera): # get exposure using /main/status/lightmeter > should be 0 config = gp.check_result(gp.gp_camera_get_config(camera)) current_lightmeter_value = self.check_status_value(config, 'lightmeter') current_shutterspeed_value = self.check_status_value(config, 'shutterspeed2') # ~ previous_shutterspeed_value = -1 print(str(current_lightmeter_value) + " - " + str(current_shutterspeed_value)) speed = 30 if current_lightmeter_value < 0: # or current_lightmeter_value > 7: # ~ while current_lightmeter_value < -7: print("speed too high") while current_lightmeter_value < -7: self.set_config_value(config, 'shutterspeed2', speed) # 0 to 46 gp.check_result(gp.gp_camera_set_config(camera, config)) time.sleep(1) config = gp.check_result(gp.gp_camera_get_config(camera)) # ~ previous_shutterspeed_value = current_shutterspeed_value current_lightmeter_value = self.check_status_value(config, 'lightmeter') current_shutterspeed_value = self.check_status_value(config, 'shutterspeed2') speed += 1 print(speed) print(str(current_lightmeter_value) + " - " + str(current_shutterspeed_value)) if current_lightmeter_value > 0: print("speed too low") while current_lightmeter_value > 7: self.set_config_value(config, 'shutterspeed2', speed) # 0 to 46 gp.check_result(gp.gp_camera_set_config(camera, config)) time.sleep(1) config = gp.check_result(gp.gp_camera_get_config(self.camera)) # ~ previous_shutterspeed_value = current_shutterspeed_value current_lightmeter_value = self.check_status_value(config, 'lightmeter') current_shutterspeed_value = self.check_status_value(config, 'shutterspeed2') speed -= 1 print(speed) print(str(current_lightmeter_value) + " - " + str(current_shutterspeed_value)) return True # ~ print(str(current_lightmeter_value) + " - " + str(current_shutterspeed_value)) def capture_image(self, event=None, event_data=None): # get net file name based on prefix, file count and extension next_filename = self.return_next_frame_number(self.get_last_frame(self.savepath)) # build full path to file target_path = os.path.join(self.savepath, next_filename) print(target_path) # Get file from camera if event_data is None: # ~ print("j pressed") new_frame_path = self.camera.capture(gp.GP_CAPTURE_IMAGE) # ~ self.camera.trigger_capture() new_frame = self.camera.file_get( new_frame_path.folder, new_frame_path.name, gp.GP_FILE_TYPE_NORMAL) print("saving " + new_frame_path.folder + new_frame_path.name ) else: print("getting file {}".format(event_data.name)) new_frame = self.camera.file_get( event_data.folder, event_data.name, gp.GP_FILE_TYPE_NORMAL) new_frame.save(target_path) # ~ next_filename = prefix+next(filename)+ext print(self.img_list) if '{letter}.-001.JPG'.format(letter=self.project_letter) in self.img_list: # ~ self.img_list.pop('A.-001.JPG') self.img_list.pop('{letter}.-001.JPG'.format(letter=self.project_letter)) # Display new frame self.display_last_frame() def wait_for_capture(self): if self.end_thread == True: print("ending thread") return # ~ print("waiting for capture") event_type, event_data = self.camera.wait_for_event(self.timeout) if event_type == gp.GP_EVENT_FILE_ADDED: # ~ print("file added") self.capture_image(None, event_data) # ~ root.after(self.timeout, self.wait_for_capture) self.wait_for_capture() async def export_animation(self): ffmpeg = ( FFmpeg() # overwrite file .option("y") .input( self.input_filename, self.input_options) .output( self.output_filename, self.output_options ) ) await ffmpeg.execute() def trigger_export_animation(self, event): output_folder = filedialog.askdirectory() output_folder = "{folder}{sep}{filename}".format(folder=output_folder, sep=os.sep, filename=self.output_filename) print("exporting to " + output_folder) self.export_task = asyncio.run(self.export_animation()) # check with self.export_task.done() == True ? https://stackoverflow.com/questions/69350645/proper-way-to-retrieve-the-result-of-tasks-in-asyncio def end_bg_loop(KISStopmo): KISStopmo.end_thread = True root = tk.Tk() toot = KISStopmo(root) # ~ root.protocol('WM_DELETE_WINDOW', end_bg_loop(toot)) root.mainloop()