# Immich Wallpaper Rotator (Windows, Multi-Monitor) - Enhanced with Metadata Caching # Save as: immich_wallpaper_enhanced.pyw (no console window) # # What's new in this enhanced build: # - METADATA CACHING: Pre-analyzes all images and saves orientation metadata to file # - IMPROVED RANDOMIZATION: Uses proper pseudo-random number generator for selection # - PERSISTENT CACHE: Image metadata persists between runs for faster startup # - OPTIMIZED PERFORMANCE: Only downloads images when setting wallpaper # - All previous features and UI preserved import os, sys, io, json, base64, time, random, threading, tempfile, ctypes, winreg, secrets import tkinter as tk from tkinter import ttk, messagebox from typing import List, Iterable, Dict, Optional from urllib.parse import urlparse, urlunparse import ipaddress from concurrent.futures import ThreadPoolExecutor, as_completed import requests class _RunSettings: def __init__(self, **kw): self.__dict__.update(kw) def get(self, k, d=None): return self.__dict__.get(k, d) from PIL import Image _rng = secrets.SystemRandom() # Optional modern theme (Sun Valley) try: import sv_ttk # pip install sv-ttk HAVE_SV_TTK = True except Exception: HAVE_SV_TTK = False # ---------- Defaults (overridable in GUI) ---------- DEFAULT_CACHE_TARGET = 5000 # images to cache per run DEFAULT_HISTORY_LIMIT = 10000 # unique IDs to remember across runs DEFAULT_PAGE_TAKE = 5000 # requested page size (server may cap) DEFAULT_MAX_PAGES = 400 # safety cap (≈100k) MAX_CHECKER_THREADS = 10 # Number of parallel threads for checking images # ---------- Windows COM (multi-monitor) ---------- import comtypes from comtypes import GUID from ctypes import POINTER, HRESULT, c_uint, c_int, wintypes from comtypes import COMMETHOD CLSID_DesktopWallpaper = GUID('{C2CF3110-460E-4FC1-B9D0-8A1C0C9D6AF3}') IID_IDesktopWallpaper = GUID('{B92B56A9-8B55-4E14-9A89-0199BBB6F93B}') DWPOS_CENTER=0; DWPOS_TILE=1; DWPOS_STRETCH=2; DWPOS_FIT=3; DWPOS_FILL=4; DWPOS_SPAN=5 class IDesktopWallpaper(comtypes.IUnknown): _iid_ = IID_IDesktopWallpaper _methods_ = [ COMMETHOD([], HRESULT, 'SetWallpaper', (['in'], wintypes.LPCWSTR, 'monitorId'), (['in'], wintypes.LPCWSTR, 'wallpaper')), COMMETHOD([], HRESULT, 'GetMonitorDevicePathAt', (['in'], c_uint, 'monitorIndex'), (['out'], POINTER(wintypes.LPWSTR), 'monitorId')), COMMETHOD([], HRESULT, 'GetMonitorDevicePathCount', (['out'], POINTER(c_uint), 'count')), COMMETHOD([], HRESULT, 'SetPosition', (['in'], c_int, 'position')), ] def get_desktop_wallpaper_com() -> IDesktopWallpaper: comtypes.CoInitialize() import comtypes.client return comtypes.client.CreateObject(CLSID_DesktopWallpaper, interface=IDesktopWallpaper) def list_monitor_ids(dw: IDesktopWallpaper) -> List[str]: count = c_uint() hr = dw.GetMonitorDevicePathCount(ctypes.byref(count)) if hr != 0: raise OSError(f"GetMonitorDevicePathCount failed: 0x{hr:08X}") ids = [] for i in range(count.value): p = wintypes.LPWSTR() hr = dw.GetMonitorDevicePathAt(i, ctypes.byref(p)) if hr != 0: raise OSError(f"GetMonitorDevicePathAt({i}) failed: 0x{hr:08X}") try: ids.append(ctypes.wstring_at(p)) finally: ctypes.windll.ole32.CoTaskMemFree(p) return ids # ---------- URL normalization ---------- def normalize_base_url(url: str) -> str: if not url: return url p = urlparse(url.strip()) host = p.hostname or "" try: host = str(ipaddress.IPv4Address(host)) # strips leading zeros, etc. except Exception: pass netloc = host + (f":{p.port}" if p.port else "") return urlunparse((p.scheme or "http", netloc, p.path or "", "", "", "")) # ---------- Persistence (DPAPI + files) ---------- APP_DIR = os.path.join(os.environ.get("APPDATA", os.path.expanduser("~")), "ImmichWallpaperRotator") CONFIG_PATH = os.path.join(APP_DIR, "config.json") HISTORY_PATH = os.path.join(APP_DIR, "history.json") METADATA_PATH = os.path.join(APP_DIR, "image_metadata.json") # New metadata cache file class DATA_BLOB(ctypes.Structure): _fields_=[("cbData",wintypes.DWORD),("pbData",ctypes.POINTER(ctypes.c_byte))] CryptProtectData = ctypes.windll.crypt32.CryptProtectData CryptUnprotectData = ctypes.windll.crypt32.CryptUnprotectData LocalFree = ctypes.windll.kernel32.LocalFree def _to_blob(b: bytes): buf = ctypes.create_string_buffer(b, len(b)) return DATA_BLOB(len(b), ctypes.cast(buf, ctypes.POINTER(ctypes.c_byte))) def _from_blob(blob: DATA_BLOB) -> bytes: ptr = ctypes.cast(blob.pbData, ctypes.POINTER(ctypes.c_ubyte)) try: return bytes(bytearray(ptr[i] for i in range(blob.cbData))) finally: LocalFree(blob.pbData) def dpapi_protect(s: str) -> str: if not s: return "" inb=_to_blob(s.encode()); out=DATA_BLOB() if not CryptProtectData(ctypes.byref(inb), None, None, None, None, 0x1, ctypes.byref(out)): raise OSError("CryptProtectData failed") return base64.b64encode(_from_blob(out)).decode() def dpapi_unprotect(s: str) -> str: if not s: return "" inb=_to_blob(base64.b64decode(s)); out=DATA_BLOB() if not CryptUnprotectData(ctypes.byref(inb), None, None, None, None, 0x1, ctypes.byref(out)): raise OSError("CryptUnprotectData failed") return _from_blob(out).decode() def ensure_appdir(): os.makedirs(APP_DIR, exist_ok=True) def load_config(): ensure_appdir() if not os.path.exists(CONFIG_PATH): return {} try: data = json.load(open(CONFIG_PATH,"r",encoding="utf-8")) except Exception: return {} try: data["api_key"] = dpapi_unprotect(data.get("api_key_protected","")) except Exception: data["api_key"] = "" return data def save_config(d: dict): ensure_appdir() out = dict(d) out["api_key_protected"] = dpapi_protect(out.pop("api_key","")) json.dump(out, open(CONFIG_PATH,"w",encoding="utf-8"), indent=2) class SeenHistory: def __init__(self, path: str, limit: int): self.path = path self.limit = limit self.order: List[str] = [] if os.path.exists(path): try: self.order = json.load(open(path,"r")) except Exception: self.order = [] self._set = set(self.order) def add_many(self, ids: Iterable[str]): for i in ids: if i in self._set: try: self.order.remove(i) # move to end (most recent) except ValueError: pass else: self._set.add(i) self.order.append(i) if len(self.order) > self.limit: drop = len(self.order) - self.limit for _ in range(drop): old = self.order.pop(0) self._set.discard(old) def contains(self, id_: str) -> bool: return id_ in self._set def save(self): ensure_appdir() try: json.dump(self.order, open(self.path,"w")) except Exception: pass # ---------- Image Metadata Cache ---------- class ImageMetadataCache: """ Persistent cache for image metadata including orientation, dimensions, and other properties. """ def __init__(self, path: str): self.path = path self.cache: Dict[str, Dict] = {} self._lock = threading.Lock() self.load() def load(self): """Load metadata cache from file""" ensure_appdir() if os.path.exists(self.path): try: with open(self.path, 'r', encoding='utf-8') as f: data = json.load(f) # Validate cache structure with self._lock: if isinstance(data, dict) and 'metadata' in data and 'version' in data: self.cache = data['metadata'] else: self.cache = {} except Exception: with self._lock: self.cache = {} def save(self): """Save metadata cache to file""" ensure_appdir() with self._lock: # Copy data to avoid holding lock during file I/O metadata_copy = self.cache.copy() cache_data = { 'version': '1.0', 'last_updated': time.time(), 'metadata': metadata_copy } try: with open(self.path, 'w', encoding='utf-8') as f: json.dump(cache_data, f, indent=2) except Exception: pass def get_metadata(self, asset_id: str) -> Optional[Dict]: """Get cached metadata for an asset""" with self._lock: return self.cache.get(asset_id) def set_metadata(self, asset_id: str, width: int, height: int, is_landscape: bool, file_size: Optional[int] = None, last_modified: Optional[str] = None): """Cache metadata for an asset""" with self._lock: self.cache[asset_id] = { 'width': width, 'height': height, 'is_landscape': is_landscape, 'file_size': file_size, 'last_modified': last_modified, 'cached_at': time.time() } def is_cached(self, asset_id: str) -> bool: """Check if asset metadata is cached""" with self._lock: return asset_id in self.cache and self.cache[asset_id] is not None def get_cached_orientation(self, asset_id: str) -> Optional[bool]: """Get cached orientation (True for landscape, False for portrait)""" with self._lock: metadata = self.cache.get(asset_id) return metadata['is_landscape'] if metadata else None def clear_cache(self): """Clear all cached metadata""" with self._lock: self.cache = {} self.save() def get_cache_stats(self) -> Dict: """Get statistics about the cache""" with self._lock: return { 'total_entries': len(self.cache), 'landscape_count': sum(1 for m in self.cache.values() if m.get('is_landscape', False)), 'portrait_count': sum(1 for m in self.cache.values() if not m.get('is_landscape', True)) } # ---------- Enhanced Pseudo-Random Number Generator ---------- class EnhancedRandomSelector: """ Enhanced pseudo-random selector with better distribution and weighting options. """ def __init__(self, seed: Optional[int] = None): self.rng = random.Random(seed) self.system_rng = secrets.SystemRandom() # For cryptographically secure randomness self.selection_weights = {} # Track how often items have been selected def select_items(self, items: List, count: int, exclude_set: set = None, use_secure_random: bool = True, use_weighted_selection: bool = True) -> List: """ Select random items with optional exclusion set and weighted selection. Uses cryptographically secure randomness by default. """ if not items: return [] available_items = [item for item in items if not exclude_set or get_asset_id(item) not in exclude_set] if not available_items: # If no items available after exclusion, reset and use all items available_items = items count = min(count, len(available_items)) # Use weighted selection to favor less recently used items if use_weighted_selection and len(available_items) > count: return self._weighted_sample(available_items, count, use_secure_random) if use_secure_random: return self.system_rng.sample(available_items, count) else: return self.rng.sample(available_items, count) def _weighted_sample(self, items: List, count: int, use_secure_random: bool = True) -> List: """ Sample items with weights favoring less frequently selected items. """ # Calculate weights - items selected less frequently get higher weights weights = [] max_weight = 100 for item in items: asset_id = get_asset_id(item) selection_count = self.selection_weights.get(asset_id, 0) # Higher weight for items selected less frequently weight = max(1, max_weight - (selection_count * 10)) weights.append(weight) # Use weighted random selection selected = [] items_copy = items.copy() weights_copy = weights.copy() for _ in range(count): if not items_copy: break # Calculate cumulative weights total_weight = sum(weights_copy) if total_weight == 0: # Fallback to uniform selection if use_secure_random: idx = self.system_rng.randrange(len(items_copy)) else: idx = self.rng.randrange(len(items_copy)) else: # Weighted selection if use_secure_random: target = self.system_rng.uniform(0, total_weight) else: target = self.rng.uniform(0, total_weight) cumsum = 0 idx = 0 for i, weight in enumerate(weights_copy): cumsum += weight if cumsum >= target: idx = i break selected_item = items_copy.pop(idx) weights_copy.pop(idx) selected.append(selected_item) # Update selection count asset_id = get_asset_id(selected_item) self.selection_weights[asset_id] = self.selection_weights.get(asset_id, 0) + 1 return selected def shuffle_list(self, items: List, use_secure_random: bool = True) -> List: """Shuffle a list in place or return shuffled copy""" items_copy = items.copy() if use_secure_random: self.system_rng.shuffle(items_copy) else: self.rng.shuffle(items_copy) return items_copy def reset_weights(self): """Reset selection weights for truly fresh randomization""" self.selection_weights.clear() def get_selection_stats(self) -> Dict: """Get statistics about selection frequency""" if not self.selection_weights: return {"total_selections": 0, "unique_items": 0, "most_selected": 0} total_selections = sum(self.selection_weights.values()) unique_items = len(self.selection_weights) most_selected = max(self.selection_weights.values()) if self.selection_weights else 0 return { "total_selections": total_selections, "unique_items": unique_items, "most_selected": most_selected, "average_selections": total_selections / unique_items if unique_items > 0 else 0 } # ---------- Immich Client ---------- class ImmichClient: """ x-api-key client with enhanced metadata support. """ def __init__(self, base_url: str, api_key: str, timeout=15, logger=None): self.base_url = base_url.rstrip("/") self.session = requests.Session() self.session.headers.update({ "x-api-key": api_key, "Accept": "application/json", "Content-Type": "application/json", }) self.timeout = timeout self.log = logger or (lambda *a, **k: None) self.asset_list_path = None self.asset_original_style = "assets" self._detect_legacy_endpoints() def _get(self, path, **kw): r = self.session.get(f"{self.base_url}{path}", timeout=self.timeout, **kw) r.raise_for_status() return r def _post_no_raise(self, path, json=None, **kw): return self.session.post(f"{self.base_url}{path}", json=json or {}, timeout=self.timeout, **kw) def _detect_legacy_endpoints(self): # Detect /api/asset vs /api/assets for downloads for p in ("/api/asset/TEST/original", "/api/assets/TEST/original"): try: r = self.session.get(f"{self.base_url}{p}", timeout=self.timeout) # A 400 or 401 proves the endpoint exists. 404 means it does not exist. if r.status_code < 404: self.asset_original_style = "asset" if "/api/asset/" in p else "assets" self.log(f"API style '{self.asset_original_style}' detected via {r.status_code} response.") return except Exception as e: self.log(f"Detection error for {p}: {e}") self.log(f"Could not definitively detect API style. Sticking with default '{self.asset_original_style}'.") def list_all_assets_simple(self) -> List[dict]: """Fetches all assets using the simpler GET /api/asset endpoint.""" self.log("Attempting to fetch assets via GET /api/asset...") try: r = self._get("/api/asset") data = r.json() if isinstance(data, list): self.log(f"Successfully fetched {len(data)} assets via GET /api/asset.") return data self.log(f"Warning: GET /api/asset did not return a list. Response: {str(data)[:200]}") return [] except Exception as e: self.log(f"Failed to use GET /api/asset: {e}. Falling back to original method.") return [] # Return empty list on failure to allow fallback def _search_metadata(self, take, skip, only_fav, exclude_archived, album_id=None): """POST /api/search/metadata with flexible parsing across Immich versions.""" def post_and_parse(payload): try: r = self._post_no_raise("/api/search/metadata", json={k:v for k,v in payload.items() if v is not None}) except Exception as e: self.log(f"/api/search/metadata -> {e}"); return None if r.status_code not in (200,400): self.log(f"/api/search/metadata -> HTTP {r.status_code}") return None try: data = r.json() except Exception as e: self.log(f"/api/search/metadata JSON error: {e}"); return None if isinstance(data, list): return data if isinstance(data, dict): assets = data.get("assets") if isinstance(assets, dict) and isinstance(assets.get("items"), list): return assets["items"] if isinstance(assets, list): return assets if isinstance(data.get("items"), list): return data["items"] sample = str(data)[:160].replace("\n"," ") self.log(f"/api/search/metadata responded but not a list (sample): {sample}") return None for take_try in (take, 200, 100, 50, 1): for key,val in (("type","IMAGE"), ("assetType","IMAGE")): base = {key: val, "take": take_try, "skip": skip} if only_fav: base["isFavorite"] = True if exclude_archived: base["isArchived"] = False if album_id: base["albumId"] = album_id items = post_and_parse(base) if items is not None: return items return None def list_assets_paginated(self, take, max_pages) -> Iterable[dict]: skip=0; pages=0 while pages < max_pages: batch = self._search_metadata(take, skip, False, False) if not batch: break for a in batch: yield a got = len(batch) skip += got pages += 1 def list_album_assets(self, album_id: str) -> List[dict]: if not album_id: return [] self.log(f"Using album: {album_id}") paths = (f"/api/album/{album_id}",) items = None for p in paths: try: r = self.session.get(f"{self.base_url}{p}", timeout=self.timeout) if r.status_code != 200: continue data = r.json() except Exception: continue if isinstance(data, dict) and isinstance(data.get("assets"), list): items = data["assets"] if items is not None: break if items is None: raise RuntimeError("Album not found or API did not return assets for this album ID.") return items def get_image_dimensions_from_thumbnail(self, asset_id: str) -> tuple[Optional[int], Optional[int]]: """ Get image dimensions from thumbnail (faster than downloading full image). Returns (width, height) or (None, None) if failed. """ thumbnail_path = f"/api/assets/{asset_id}/thumbnail" if self.asset_original_style=="assets" else f"/api/asset/{asset_id}/thumbnail" try: r = self.session.get(f"{self.base_url}{thumbnail_path}", stream=True, timeout=10) r.raise_for_status() # Read just enough to get image dimensions img_data = r.content with Image.open(io.BytesIO(img_data)) as img: # For thumbnails, we need to get the original dimensions from EXIF or asset metadata # This is a fallback approach - thumbnail dimensions aren't the original dimensions return img.size except Exception: return None, None def download_original(self, asset_id: str) -> bytes: path = f"/api/assets/{asset_id}/original" if self.asset_original_style=="assets" else f"/api/asset/{asset_id}/original" r = self.session.get(f"{self.base_url}{path}", stream=True, timeout=30) r.raise_for_status() return r.content # ---------- Windows wallpaper helpers ---------- SPI_SETDESKWALLPAPER = 20 SPIF_UPDATEINIFILE = 0x01 SPIF_SENDWININICHANGE= 0x02 def set_wallpaper_style(fit: bool): try: with winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop", 0, winreg.KEY_SET_VALUE) as k: if fit: # Fit (no crop) winreg.SetValueEx(k, "WallpaperStyle", 0, winreg.REG_SZ, "6") winreg.SetValueEx(k, "TileWallpaper", 0, winreg.REG_SZ, "0") else: # Fill (crop) winreg.SetValueEx(k, "WallpaperStyle", 0, winreg.REG_SZ, "10") winreg.SetValueEx(k, "TileWallpaper", 0, winreg.REG_SZ, "0") except OSError: pass def apply_wallpaper_bmp_legacy(bmp_path: str, fit: bool): set_wallpaper_style(fit) ctypes.windll.user32.SystemParametersInfoW( SPI_SETDESKWALLPAPER, 0, bmp_path, SPIF_UPDATEINIFILE | SPIF_SENDWININICHANGE ) def ensure_img_file(image_bytes: bytes, suffix=".jpg") -> str: img = Image.open(io.BytesIO(image_bytes)) if img.mode in ("RGBA","LA"): bg = Image.new("RGB", img.size, (0,0,0)); bg.paste(img, mask=img.split()[-1]); img = bg elif img.mode != "RGB": img = img.convert("RGB") out = os.path.join(tempfile.gettempdir(), f"immich_wall_{random.randint(0,1_000_000)}{suffix}") img.save(out, format="JPEG", quality=92) return out def set_per_monitor_wallpapers(paths: List[str], same_for_all: bool, fit: bool): try: dw = get_desktop_wallpaper_com() dw.SetPosition(DWPOS_FIT if fit else DWPOS_FILL) ids = list_monitor_ids(dw) if not ids: bmp = paths[0] if not bmp.lower().endswith(".bmp"): tmp = os.path.join(tempfile.gettempdir(), "immich_wallpaper.bmp") Image.open(paths[0]).convert("RGB").save(tmp, "BMP"); bmp = tmp apply_wallpaper_bmp_legacy(bmp, fit) return if same_for_all: for mid in ids: dw.SetWallpaper(mid, paths[0]) else: for i, mid in enumerate(ids): dw.SetWallpaper(mid, paths[min(i, len(paths)-1)]) except Exception: bmp = paths[0] if not bmp.lower().endswith(".bmp"): tmp = os.path.join(tempfile.gettempdir(), "immich_wallpaper.bmp") Image.open(paths[0]).convert("RGB").save(tmp, "BMP"); bmp = tmp apply_wallpaper_bmp_legacy(bmp, fit) # ---------- Helpers ---------- def normalize_asset(rec: dict) -> dict: return rec["asset"] if isinstance(rec,dict) and isinstance(rec.get("asset"),dict) else rec def get_dims_from_asset(asset: dict): """Get dimensions from asset metadata with enhanced fallback logic""" exif = asset.get("exifInfo") or asset.get("exifinfo") or {} w = exif.get("exifImageWidth") or exif.get("imageWidth") or exif.get("width") h = exif.get("exifImageHeight") or exif.get("imageHeight") or exif.get("height") # Try alternative metadata locations if not w or not h: w = w or asset.get("originalWidth") or asset.get("width") h = h or asset.get("originalHeight") or asset.get("height") try: return int(w), int(h) except Exception: return None, None def orientation_match(asset: dict, landscape_only: bool, metadata_cache: ImageMetadataCache = None) -> bool: """Check orientation with metadata cache support""" asset_id = get_asset_id(asset) # Try to get from cache first if metadata_cache: cached_orientation = metadata_cache.get_cached_orientation(asset_id) if cached_orientation is not None: return cached_orientation if landscape_only else not cached_orientation # Fallback to asset metadata w, h = get_dims_from_asset(asset) if w and h: is_landscape = w >= h # Cache the result if we have a metadata cache if metadata_cache: metadata_cache.set_metadata(asset_id, w, h, is_landscape) return is_landscape if landscape_only else not is_landscape return True def is_image_asset(a: dict) -> bool: t = (a.get("type") or a.get("assetType") or "").upper() return t == "IMAGE" def is_archived(a: dict) -> bool: x = a.get("isArchived") if x is None: x = a.get("isTrashed") return bool(x) def is_favorite(a: dict) -> bool: return bool(a.get("isFavorite")) def get_asset_id(a: dict) -> str: return a.get("id") or a.get("assetId") or "" def get_asset_name(a: dict) -> str: return a.get("originalFileName") or a.get("fileName") or get_asset_id(a) # ---------- GUI ---------- class App(tk.Tk): def __init__(self): super().__init__() self.title("Immich Wallpaper Rotator (Windows, Multi-Monitor) - Enhanced") self.geometry("680x760") self.resizable(False, False) self._stop_event = threading.Event() self._runner_thread = None self._cached_assets: List[dict] = [] # Initialize enhanced components self._metadata_cache = ImageMetadataCache(METADATA_PATH) self._random_selector = EnhancedRandomSelector() cfg = load_config() # Core self.server_var = tk.StringVar(value=cfg.get("server","")) self.api_key_var = tk.StringVar(value=cfg.get("api_key","")) self.album_var = tk.StringVar(value=cfg.get("album_id","")) self.interval_var = tk.StringVar(value=str(cfg.get("interval_min","30"))) self.exclude_archived_var = tk.BooleanVar(value=bool(cfg.get("exclude_archived",True))) self.only_favorites_var = tk.BooleanVar(value=bool(cfg.get("only_favorites",False))) self.per_monitor_var = tk.BooleanVar(value=bool(cfg.get("per_monitor",True))) self.same_for_all_var = tk.BooleanVar(value=bool(cfg.get("same_for_all",True))) self.fit_image_var = tk.BooleanVar(value=bool(cfg.get("fit_image",False))) self.landscape_only_var = tk.BooleanVar(value=bool(cfg.get("landscape_only",False))) # Cache all images during build (ignoring current filters for metadata caching) cache_all_images = tk.BooleanVar(value=bool(cfg.get("cache_all_images", True))) # Advanced + Theme self.cache_target_var = tk.StringVar(value=str(cfg.get("cache_target", DEFAULT_CACHE_TARGET))) self.history_limit_var = tk.StringVar(value=str(cfg.get("history_limit", DEFAULT_HISTORY_LIMIT))) self.dark_mode_var = tk.BooleanVar(value=bool(cfg.get("dark_mode", False))) self.cache_all_images_var = cache_all_images # History after reading config hist_limit = self._get_int(self.history_limit_var, DEFAULT_HISTORY_LIMIT, 100, 200000) self._history = SeenHistory(HISTORY_PATH, hist_limit) # Build UI & Menu self._build_ui() self._build_menu() self._apply_theme(self.dark_mode_var.get()) self.protocol("WM_DELETE_WINDOW", self._on_close) # ----- Theme ----- def _apply_theme(self, dark: bool): if HAVE_SV_TTK: sv_ttk.set_theme("dark" if dark else "light") # Text widget manual colors for readability if hasattr(self,"status"): if dark: self.status.configure(bg="#1E1E1E", fg="#E6E6E6", insertbackground="#E6E6E6") else: self.status.configure(bg="#FFFFFF", fg="#000000", insertbackground="#000000") return # Fallback polished ttk styling style = ttk.Style(self) try: style.theme_use("clam") except Exception: pass if dark: palette = { "BG": "#121212", "SURF": "#1E1E1E", "TXT": "#E6E6E6", "MUT": "#C8C8C8", "ACC": "#3D6BE5", "BRD": "#2A2A2A", "FIELD": "#1A1A1A" } self.configure(bg=palette["BG"]) style.configure(".", background=palette["BG"], foreground=palette["TXT"]) style.configure("TFrame", background=palette["BG"]) style.configure("TLabelframe", background=palette["SURF"], bordercolor=palette["BRD"], relief="solid") style.configure("TLabelframe.Label", background=palette["SURF"], foreground=palette["TXT"]) style.configure("TLabel", background=palette["BG"], foreground=palette["TXT"]) style.configure("Small.TLabel", background=palette["BG"], foreground=palette["MUT"]) style.configure("TButton", background=palette["SURF"], foreground=palette["TXT"], bordercolor=palette["BRD"]) style.map("TButton", background=[("active", "#2A2A2A")]) style.configure("TEntry", fieldbackground=palette["FIELD"], background=palette["FIELD"], foreground=palette["TXT"], bordercolor=palette["BRD"]) style.configure("TCheckbutton", background=palette["SURF"], foreground=palette["TXT"]) style.configure("TNotebook", background=palette["BG"], borderwidth=0) style.configure("TNotebook.Tab", background=palette["BG"], foreground=palette["MUT"], borderwidth=1, padding=[8, 4]) style.map("TNotebook.Tab", background=[("selected", palette["SURF"])], foreground=[("selected", palette["TXT"])]) if hasattr(self,"status"): self.status.configure(bg=palette["FIELD"], fg=palette["TXT"], insertbackground=palette["TXT"]) self._apply_styles_to_widgets(True) else: self.configure(bg="") style.configure(".", background="", foreground="") style.configure("TFrame", background="") style.configure("TLabelframe", background="") style.configure("TLabelframe.Label", background="", foreground="") style.configure("TLabel", background="", foreground="") style.configure("Small.TLabel", foreground="gray") style.configure("TButton", background="", foreground="") style.configure("TEntry", fieldbackground="white", foreground="black") style.configure("TCheckbutton", background="", foreground="") style.configure("TNotebook", background="", borderwidth=1) style.configure("TNotebook.Tab", background="", foreground="", borderwidth=1) if hasattr(self,"status"): self.status.configure(bg="white", fg="black", insertbackground="black") self._apply_styles_to_widgets(False) def _apply_styles_to_widgets(self, dark: bool): # Only used by fallback theme for w in self._labels: w.configure(style="TLabel" if not dark else "TLabel") for w in self._small_labels: w.configure(style="Small.TLabel") for w in self._entries: w.configure(style="TEntry" if not dark else "TEntry") for w in self._buttons: w.configure(style="TButton" if not dark else "TButton") for w in self._groups: w.configure(style="TLabelframe" if not dark else "TLabelframe") for w in self._checks: w.configure(style="TCheckbutton" if not dark else "TCheckbutton") if hasattr(self, "_root_frame"): self._root_frame.configure(style="TFrame" if not dark else "TFrame") # ----- Menu ----- def _build_menu(self): menubar = tk.Menu(self); self.config(menu=menubar) view = tk.Menu(menubar, tearoff=False) view.add_checkbutton(label="Dark mode", variable=self.dark_mode_var, command=self._on_toggle_dark, accelerator="Ctrl+D") view.add_separator() view.add_command(label="Clear metadata cache", command=self._clear_metadata_cache) view.add_command(label="Cache statistics", command=self._show_cache_stats) menubar.add_cascade(label="View", menu=view) tools = tk.Menu(menubar, tearoff=False) tools.add_command(label="Rebuild metadata cache", command=self._rebuild_metadata_cache) tools.add_command(label="Export cache statistics", command=self._export_cache_stats) tools.add_separator() tools.add_command(label="Reset selection weights", command=self._reset_selection_weights) tools.add_command(label="Selection statistics", command=self._show_selection_stats) menubar.add_cascade(label="Tools", menu=tools) self.bind_all("", lambda e: self._toggle_dark()) def _toggle_dark(self): self.dark_mode_var.set(not self.dark_mode_var.get()) self._on_toggle_dark() def _clear_metadata_cache(self): if messagebox.askyesno("Clear Cache", "This will clear all cached image metadata and rebuild from scratch. Continue?"): self._metadata_cache.clear_cache() self._log("Metadata cache cleared completely. Next run will rebuild from scratch.") self._update_cache_info() def _show_cache_stats(self): stats = self._metadata_cache.get_cache_stats() msg = f"Metadata Cache Statistics:\n\nTotal entries: {stats['total_entries']}\nLandscape images: {stats['landscape_count']}\nPortrait images: {stats['portrait_count']}" messagebox.showinfo("Cache Statistics", msg) def _rebuild_metadata_cache(self): if messagebox.askyesno("Rebuild Cache", "This will clear and rebuild the metadata cache. This may take some time. Continue?"): self._metadata_cache.clear_cache() self._log("Metadata cache rebuild requested. Start the wallpaper rotation to rebuild.") self._update_cache_info() def _reset_selection_weights(self): if messagebox.askyesno("Reset Weights", "This will reset selection weights for truly random distribution. Continue?"): self._random_selector.reset_weights() self._log("Selection weights reset. Next selections will be completely random.") def _show_selection_stats(self): stats = self._random_selector.get_selection_stats() msg = (f"Selection Statistics:\n\n" f"Total selections: {stats['total_selections']}\n" f"Unique items selected: {stats['unique_items']}\n" f"Most selected count: {stats['most_selected']}\n" f"Average selections per item: {stats['average_selections']:.2f}") messagebox.showinfo("Selection Statistics", msg) def _export_cache_stats(self): try: stats = self._metadata_cache.get_cache_stats() selection_stats = self._random_selector.get_selection_stats() combined_stats = { "metadata_cache": stats, "selection_stats": selection_stats, "export_time": time.time() } stats_file = os.path.join(APP_DIR, "cache_stats.json") with open(stats_file, 'w') as f: json.dump(combined_stats, f, indent=2) self._log(f"Cache and selection statistics exported to: {stats_file}") except Exception as e: self._log(f"Failed to export statistics: {e}") # ----- UI ----- def _build_ui(self): self._labels, self._entries, self._buttons, self._groups, self._checks, self._small_labels = [], [], [], [], [], [] self._root_frame = ttk.Frame(self, padding=(12, 12, 12, 0)) self._root_frame.pack(fill="both", expand=True) # --- TABS --- notebook = ttk.Notebook(self._root_frame) notebook.pack(fill="x", pady=(0, 10)) tab_connect = ttk.Frame(notebook, padding=10) tab_options = ttk.Frame(notebook, padding=10) tab_advanced = ttk.Frame(notebook, padding=10) notebook.add(tab_connect, text="Connect & Run") notebook.add(tab_options, text="Filters & Display") notebook.add(tab_advanced, text="Advanced") # --- TAB 1: CONNECT & RUN --- connect_group = ttk.Frame(tab_connect) connect_group.pack(fill="x") connect_group.columnconfigure(1, weight=1) def L(parent, text, r, c, **kwargs): w = ttk.Label(parent, text=text, **kwargs); w.grid(row=r, column=c, sticky="w", padx=(0, 10), pady=6); self._labels.append(w); return w def SL(parent, text, r, c, **kwargs): w = ttk.Label(parent, text=text, style="Small.TLabel", **kwargs); w.grid(row=r, column=c, sticky="w", padx=0, pady=(0,6)); self._small_labels.append(w); return w def E(parent, var, r, c, **kwargs): w = ttk.Entry(parent, textvariable=var, **kwargs); w.grid(row=r, column=c, sticky="ew", pady=6); self._entries.append(w); return w def C(parent, text, var, r, c, **kwargs): cmd = kwargs.pop('command', self._save_now) w = ttk.Checkbutton(parent, text=text, variable=var, command=cmd, **kwargs) w.grid(row=r, column=c, sticky="w", padx=8, pady=4) self._checks.append(w) return w L(connect_group, "Immich Server URL", 0, 0); E(connect_group, self.server_var, 0, 1) L(connect_group, "API Key", 1, 0); E(connect_group, self.api_key_var, 1, 1, show="•") L(connect_group, "Album ID", 2, 0); E(connect_group, self.album_var, 2, 1) SL(connect_group, "(Leave blank to use all photos)", 3, 1) L(connect_group, "Interval (minutes)", 4, 0); E(connect_group, self.interval_var, 4, 1, width=12) # --- TAB 2: FILTERS & DISPLAY --- filter_group = ttk.Labelframe(tab_options, text="Image Filtering", padding=10) filter_group.pack(fill="x", pady=(0, 10)) self._groups.append(filter_group) C(filter_group, "Exclude archived", self.exclude_archived_var, 0, 0) C(filter_group, "Only favorites", self.only_favorites_var, 0, 1) C(filter_group, "Only landscape (instead of portrait)", self.landscape_only_var, 1, 0) display_group = ttk.Labelframe(tab_options, text="Display Options", padding=10) display_group.pack(fill="x") self._groups.append(display_group) C(display_group, "Use multi-monitor (IDesktopWallpaper)", self.per_monitor_var, 0, 0) C(display_group, "Same image on all monitors", self.same_for_all_var, 0, 1) C(display_group, "Show full image (Fit; no crop)", self.fit_image_var, 1, 0) C(display_group, "Dark mode", self.dark_mode_var, 2, 0, command=self._on_toggle_dark) # Use toggler command # --- TAB 3: ADVANCED --- adv_group = ttk.Frame(tab_advanced) adv_group.pack(fill="x") adv_group.columnconfigure(1, weight=1) L(adv_group, "Cache target", 0, 0); E(adv_group, self.cache_target_var, 0, 1, width=15) SL(adv_group, "(Number of image records to fetch per run)", 1, 1) L(adv_group, "History limit", 2, 0); E(adv_group, self.history_limit_var, 2, 1, width=15) SL(adv_group, "(Number of recent photos to avoid repeating)", 3, 1) # New option for complete metadata caching C(adv_group, "Cache all images (ignore current filters for metadata)", self.cache_all_images_var, 4, 0) SL(adv_group, "(Build complete metadata cache regardless of favorites/archive filters)", 5, 1) # Metadata cache info section cache_info_group = ttk.Labelframe(tab_advanced, text="Metadata Cache", padding=10) cache_info_group.pack(fill="x", pady=(10, 0)) self._groups.append(cache_info_group) cache_stats = self._metadata_cache.get_cache_stats() self.cache_info_label = ttk.Label(cache_info_group, text=f"Cached entries: {cache_stats['total_entries']} | Landscape: {cache_stats['landscape_count']} | Portrait: {cache_stats['portrait_count']}") self.cache_info_label.pack(anchor="w") self._labels.append(self.cache_info_label) # --- BUTTONS (below tabs) --- btn_frame = ttk.Frame(self._root_frame) btn_frame.pack(fill="x", pady=10) self.btn_start=ttk.Button(btn_frame,text="Start",command=self.start) self.btn_next =ttk.Button(btn_frame,text="Next now",command=self.next_now,state="disabled") self.btn_stop =ttk.Button(btn_frame,text="Stop",command=self.stop,state="disabled") self.btn_test =ttk.Button(btn_frame,text="Test",command=self.test_connection) for w in (self.btn_start, self.btn_next, self.btn_stop, self.btn_test): w.pack(side="left", padx=(0, 6)); self._buttons.append(w) # --- LOG (at the bottom) --- log_frame = ttk.Frame(self._root_frame) log_frame.pack(fill="both", expand=True, pady=(0, 12)) log_frame.rowconfigure(0, weight=1) log_frame.columnconfigure(0, weight=1) self.status=tk.Text(log_frame,height=10,wrap="word", relief="solid", borderwidth=1) self.status.grid(row=0,column=0,sticky="nsew") self._root_frame.rowconfigure(2,weight=1) self._log("Enhanced settings loaded. Metadata caching enabled for improved performance.") # Persist on change for v in (self.server_var, self.api_key_var, self.album_var, self.interval_var, self.cache_target_var, self.history_limit_var): v.trace_add("write", lambda *_: self._save_now()) # Also persist the cache all images setting self.cache_all_images_var.trace_add("write", lambda *_: self._save_now()) def _update_cache_info(self): """Update the cache info label with current statistics""" cache_stats = self._metadata_cache.get_cache_stats() self.cache_info_label.configure( text=f"Cached entries: {cache_stats['total_entries']} | Landscape: {cache_stats['landscape_count']} | Portrait: {cache_stats['portrait_count']}") # ----- persistence helpers ----- def _get_int(self, var: tk.StringVar, default: int, min_v: int, max_v: int) -> int: try: val = int(float(var.get().strip())) if val < min_v: val = min_v if val > max_v: val = max_v return val except Exception: return default def _collect_config(self): try: interval=float(self.interval_var.get().strip()) except Exception: interval=self.interval_var.get().strip() return { "server":self.server_var.get().strip(), "api_key":self.api_key_var.get(), "album_id":self.album_var.get().strip(), "interval_min":interval, "exclude_archived":self.exclude_archived_var.get(), "only_favorites":self.only_favorites_var.get(), "per_monitor":self.per_monitor_var.get(), "same_for_all":self.same_for_all_var.get(), "fit_image":self.fit_image_var.get(), "landscape_only":self.landscape_only_var.get(), "cache_target": self._get_int(self.cache_target_var, DEFAULT_CACHE_TARGET, 100, 200000), "history_limit": self._get_int(self.history_limit_var, DEFAULT_HISTORY_LIMIT, 100, 500000), "dark_mode": self.dark_mode_var.get(), "cache_all_images": self.cache_all_images_var.get(), } def _save_now(self, *args): cfg = self._collect_config() try: save_config(cfg) except Exception: pass # live-update history capacity self._history.limit = cfg["history_limit"] def _on_toggle_dark(self): self._apply_theme(self.dark_mode_var.get()) self._save_now() def _on_close(self): try: self.stop() except Exception: pass self._save_now() self._history.save() self._metadata_cache.save() self.destroy() # ----- logging ----- def _log(self, msg:str): self.status.configure(state="normal"); self.status.insert("end", msg+"\n") self.status.see("end"); self.status.configure(state="disabled") # ----- Test connection ----- def test_connection(self): server=normalize_base_url(self.server_var.get()); self.server_var.set(server) api_key=self.api_key_var.get().strip() if not server or not api_key: messagebox.showerror("Missing info","Server URL and API Key are required."); return self._log("Testing connection…") try: r=requests.get(server.rstrip("/")+"/api/docs",timeout=10) if r.status_code in (200,301,302): self._log("✓ /api/docs reachable.") else: self._log(f"… /api/docs returned HTTP {r.status_code} (may still be OK).") except Exception as e: self._log(f"… /api/docs not reachable: {e}") try: r=requests.post(server.rstrip("/")+"/api/search/metadata", json={"type":"IMAGE","take":1,"skip":0}, headers={"x-api-key":api_key,"Accept":"application/json","Content-Type":"application/json"}, timeout=10) if r.status_code in (200,400): self._log(f"✓ search/metadata available with x-api-key (HTTP {r.status_code}).") elif r.status_code in (401,403): self._log("✗ Auth error (401/403). Check your API key.") elif r.status_code==404: self._log("✗ search/metadata 404. If Immich is under a prefix (e.g., /immich), include it in Server URL.") else: self._log(f"… search/metadata HTTP {r.status_code}: {r.text[:200]}") except Exception as e: self._log(f"✗ search/metadata request failed: {e}") # ----- Start/Stop/Next ----- def start(self): server=normalize_base_url(self.server_var.get().strip()); self.server_var.set(server) api_key=self.api_key_var.get().strip() if not server or not api_key: messagebox.showerror("Missing info","Server URL and API Key are required."); return try: interval_min=float(self.interval_var.get().strip()) except ValueError: messagebox.showerror("Invalid interval","Please enter a number for minutes."); return if interval_min<=0: messagebox.showerror("Invalid interval","Interval must be greater than zero."); return self._save_now() self._stop_event.clear() self.btn_start.configure(state="disabled") self.btn_stop.configure(state="normal") self.btn_next.configure(state="disabled") # Keep 'Next' disabled until cache is ready t = threading.Thread(target=self._run_loop, args=(server, api_key, self.album_var.get().strip(), interval_min), daemon=True) t.start(); self._runner_thread=t self._log("Started enhanced wallpaper rotation with metadata caching.") def stop(self): self._stop_event.set() self.btn_stop.configure(state="disabled") self.btn_next.configure(state="disabled") self.btn_start.configure(state="normal") self._log("Stopping… (will stop after current cycle)") def next_now(self): threading.Thread(target=self._change_once, daemon=True).start() # ----- Enhanced Core loop with metadata caching ----- def _run_loop(self, server, api_key, album_id, interval_min, settings=None): if settings is None: settings = _RunSettings( landscape_only = bool(self.landscape_only_var.get()), only_favorites = bool(self.only_favorites_var.get()), exclude_archived = bool(self.exclude_archived_var.get()), same_for_all = bool(self.same_for_all_var.get()), fit_no_crop = bool(self.fit_image_var.get()), cache_target = self._get_int(self.cache_target_var, DEFAULT_CACHE_TARGET, 100, 200000), ) self._client = ImmichClient(server, api_key, logger=self._log) self._cached_assets = [] self._log("Connecting and building enhanced metadata cache...") cache_target = int(settings.get('cache_target', DEFAULT_CACHE_TARGET)) want_landscape = bool(settings.get('landscape_only', False)) only_favorites = bool(settings.get('only_favorites', False)) exclude_archived = bool(settings.get('exclude_archived', False)) self._log(f"Enhanced filters => landscape_only={want_landscape}, only_favorites={only_favorites}, exclude_archived={exclude_archived}") try: # Step 1: Get all assets from the server first self._log("Fetching full asset list from server...") initial_assets = self._client.list_all_assets_simple() if not initial_assets: if album_id: initial_assets = self._client.list_album_assets(album_id) else: initial_assets = list(self._client.list_assets_paginated( take=DEFAULT_PAGE_TAKE, max_pages=DEFAULT_MAX_PAGES )) self._log(f"Found {len(initial_assets)} total assets on server.") # Step 2: DISCOVERY PHASE - Find what needs to be cached (no threading here) self._log("Phase 1: Discovering uncached images...") assets_to_analyze = [] cache_hits = 0 filtered_out = 0 use_filters_for_caching = not bool(self.cache_all_images_var.get()) for asset in initial_assets: if self._stop_event.is_set(): break asset = normalize_asset(asset) if not is_image_asset(asset): continue # Apply filters if caching is not set to "all images" if use_filters_for_caching: if only_favorites and not is_favorite(asset): filtered_out += 1 continue if exclude_archived and is_archived(asset): filtered_out += 1 continue asset_id = get_asset_id(asset) if self._metadata_cache.is_cached(asset_id): cache_hits += 1 else: assets_to_analyze.append(asset) self._log(f"Discovery complete. Found {len(assets_to_analyze)} new images to cache. Hits: {cache_hits}. Filtered: {filtered_out}.") # Step 3: PROCESSING PHASE - Analyze and cache the discovered images using threads self._log("Phase 2: Caching new image metadata in parallel...") total_to_process = len(assets_to_analyze) images_cached_this_run = 0 with ThreadPoolExecutor(max_workers=MAX_CHECKER_THREADS) as executor: def analyze_asset(asset): try: asset_id = get_asset_id(asset) w, h = get_dims_from_asset(asset) if not (w and h): img_bytes = self._client.download_original(asset_id) with Image.open(io.BytesIO(img_bytes)) as im: w, h = im.size if w and h: is_landscape = w >= h self._metadata_cache.set_metadata(asset_id, w, h, is_landscape) return True except Exception as e: # Schedule the log message to be executed on the main GUI thread self.after(0, lambda: self._log(f"Skipping asset {get_asset_id(asset)} due to error: {e}")) return False futures = {executor.submit(analyze_asset, asset): asset for asset in assets_to_analyze} for i, future in enumerate(as_completed(futures)): if self._stop_event.is_set(): # Attempt to cancel remaining futures for f in futures: f.cancel() break if future.result(): images_cached_this_run += 1 # Log progress periodically if (i + 1) % 100 == 0 or (i + 1) == total_to_process: progress = int(((i + 1) / total_to_process) * 100) if total_to_process > 0 else 100 # Get stats directly from the thread-safe method current_stats = self._metadata_cache.get_cache_stats() current_cache_size = current_stats['total_entries'] self._log(f"Progress: {progress}% | Newly Cached: {images_cached_this_run}/{total_to_process} | Total in cache: {current_cache_size}") self.after(0, self._update_cache_info) # NEW termination condition based on TOTAL cache size if self._metadata_cache.get_cache_stats()['total_entries'] >= cache_target: self._log(f"Cache target of {cache_target} reached. Stopping analysis.") for f in futures: f.cancel() break except Exception as e: self._log(f"Error building metadata cache: {e}") self.after(0, self.stop) return # Step 4: Finalize and start rotation self._metadata_cache.save() self._metadata_cache.load() cache_stats = self._metadata_cache.get_cache_stats() self.after(0, self._update_cache_info) available_count = cache_stats['landscape_count'] if want_landscape else cache_stats['portrait_count'] self._log(f"Metadata cache saved and reloaded: {cache_stats['total_entries']} total images in file") self._log(f"Available for selection: {available_count} {'landscape' if want_landscape else 'portrait'} images") if available_count == 0: self._log("No images match your orientation preference. Check your filters.") self.after(0, self.stop) return self.after(0, lambda: self.btn_next.configure(state="normal")) self._change_once() seconds = max(10, int(interval_min * 60)) while not self._stop_event.is_set(): for _ in range(seconds): if self._stop_event.is_set(): break time.sleep(1) if self._stop_event.is_set(): break self._change_once() self._log("Enhanced rotation stopped.") def _get_random_assets_from_metadata(self, count: int, landscape_only: bool, exclude_set: set = None) -> List[str]: """ Select completely random asset IDs directly from the metadata cache file. """ try: # Read metadata file directly for each selection to ensure true randomness if not os.path.exists(METADATA_PATH): return [] with open(METADATA_PATH, 'r', encoding='utf-8') as f: data = json.load(f) if 'metadata' not in data: return [] metadata = data['metadata'] # Filter assets based on orientation preference eligible_assets = [] for asset_id, meta in metadata.items(): if exclude_set and asset_id in exclude_set: continue is_landscape = meta.get('is_landscape', True) if landscape_only == is_landscape: eligible_assets.append(asset_id) if not eligible_assets: # If no eligible assets after filtering, use all assets from metadata eligible_assets = [aid for aid in metadata.keys() if not exclude_set or aid not in exclude_set] if not eligible_assets: return [] # Use cryptographically secure random selection count = min(count, len(eligible_assets)) return self._random_selector.system_rng.sample(eligible_assets, count) except Exception as e: self._log(f"Error reading metadata for random selection: {e}") return [] def _change_once(self): # Check if we have metadata cache to use if not os.path.exists(METADATA_PATH): self._log("No metadata cache available. Please run initial scan first.") return per = self.per_monitor_var.get() n = 1 if per: try: n = max(1, len(list_monitor_ids(get_desktop_wallpaper_com()))) except Exception: n = 1 # Get orientation preference want_landscape = bool(self.landscape_only_var.get()) # Use minimal exclusion for truly random selection - only exclude last 20 selections exclude_set = set(self._history.order[-min(len(self._history.order), 20):]) if self._history.order else set() # Select completely random asset IDs directly from metadata file selected_asset_ids = self._get_random_assets_from_metadata( 1 if self.same_for_all_var.get() else n, want_landscape, exclude_set ) if not selected_asset_ids: # Try without exclusion if nothing found selected_asset_ids = self._get_random_assets_from_metadata( 1 if self.same_for_all_var.get() else n, want_landscape, None ) if not selected_asset_ids: self._log("Could not select any assets from metadata cache.") return saved_paths, names, used_ids = [], [], [] fit = self.fit_image_var.get() try: for asset_id in selected_asset_ids: # Download the image using the randomly selected asset ID img_bytes = self._client.download_original(asset_id) out = ensure_img_file(img_bytes, ".jpg") saved_paths.append(out) names.append(asset_id) # Use asset ID as name since we don't have full asset metadata used_ids.append(asset_id) if per and n > 1: paths = [saved_paths[0]] if self.same_for_all_var.get() else saved_paths set_per_monitor_wallpapers(paths, same_for_all=self.same_for_all_var.get(), fit=fit) self._log(f"Random wallpaper{'s' if not self.same_for_all_var.get() else ''} set on {n} monitor(s): " f"{names[0] if self.same_for_all_var.get() else 'multiple'}") else: bmp = os.path.join(tempfile.gettempdir(), "immich_wallpaper.bmp") Image.open(saved_paths[0]).convert("RGB").save(bmp, "BMP") apply_wallpaper_bmp_legacy(bmp, fit) self._log(f"Random wallpaper set: {names[0]}") # Add to history with minimal tracking self._history.add_many(used_ids) # Keep history very short for maximum randomness if len(self._history.order) > 50: self._history.order = self._history.order[-25:] # Keep only last 25 self._history._set = set(self._history.order) self._history.save() except requests.HTTPError as e: code = e.response.status_code if getattr(e, "response", None) is not None else "?" if code in (401, 403): self._log("Auth error (401/403). Check API key permissions.") elif code == 404: self._log(f"Original not found (404) for asset: {selected_asset_ids[0] if selected_asset_ids else 'unknown'}") else: self._log(f"HTTP error: {e}") except Exception as e: self._log(f"Failed to set wallpaper(s): {e}") # ---------- main ---------- def main(): if sys.platform != "win32": messagebox.showerror("Unsupported OS", "This script is intended for Windows.") return ensure_appdir() app = App() app.mainloop() if __name__ == "__main__": main()