# Immich Wallpaper Rotator (Windows, Multi-Monitor) - Enhanced with Metadata Caching # Save as: immich_wallpaper_enhanced.pyw (no console window) # # What's new in this enhanced build: # - METADATA CACHING: Pre-analyzes all images and saves orientation metadata to file # - IMPROVED RANDOMIZATION: Uses proper pseudo-random number generator for selection # - PERSISTENT CACHE: Image metadata persists between runs for faster startup # - OPTIMIZED PERFORMANCE: Only downloads images when setting wallpaper # - FIXED API USAGE: Uses proper Immich API endpoints for better performance # - All previous features and UI preserved import os, sys, io, json, base64, time, random, threading, tempfile, ctypes, winreg, secrets import tkinter as tk from tkinter import ttk, messagebox from typing import List, Iterable, Dict, Optional from urllib.parse import urlparse, urlunparse import ipaddress from concurrent.futures import ThreadPoolExecutor, as_completed import requests class _RunSettings: def __init__(self, **kw): self.__dict__.update(kw) def get(self, k, d=None): return self.__dict__.get(k, d) from PIL import Image _rng = secrets.SystemRandom() # Optional modern theme (Sun Valley) try: import sv_ttk # pip install sv-ttk HAVE_SV_TTK = True except Exception: HAVE_SV_TTK = False # ---------- Defaults (overridable in GUI) ---------- DEFAULT_CACHE_TARGET = 50000 # images to cache per run (increased from 5000) DEFAULT_HISTORY_LIMIT = 10000 # unique IDs to remember across runs DEFAULT_PAGE_TAKE = 5000 # requested page size (server may cap) DEFAULT_MAX_PAGES = 400 # safety cap (≈100k) MAX_CHECKER_THREADS = 20 # Number of parallel threads for checking images (increased) # ---------- Windows COM (multi-monitor) ---------- import comtypes from comtypes import GUID from ctypes import POINTER, HRESULT, c_uint, c_int, wintypes from comtypes import COMMETHOD CLSID_DesktopWallpaper = GUID('{C2CF3110-460E-4FC1-B9D0-8A1C0C9D6AF3}') IID_IDesktopWallpaper = GUID('{B92B56A9-8B55-4E14-9A89-0199BBB6F93B}') DWPOS_CENTER=0; DWPOS_TILE=1; DWPOS_STRETCH=2; DWPOS_FIT=3; DWPOS_FILL=4; DWPOS_SPAN=5 class IDesktopWallpaper(comtypes.IUnknown): _iid_ = IID_IDesktopWallpaper _methods_ = [ COMMETHOD([], HRESULT, 'SetWallpaper', (['in'], wintypes.LPCWSTR, 'monitorId'), (['in'], wintypes.LPCWSTR, 'wallpaper')), COMMETHOD([], HRESULT, 'GetMonitorDevicePathAt', (['in'], c_uint, 'monitorIndex'), (['out'], POINTER(wintypes.LPWSTR), 'monitorId')), COMMETHOD([], HRESULT, 'GetMonitorDevicePathCount', (['out'], POINTER(c_uint), 'count')), COMMETHOD([], HRESULT, 'SetPosition', (['in'], c_int, 'position')), ] def get_desktop_wallpaper_com() -> IDesktopWallpaper: comtypes.CoInitialize() import comtypes.client return comtypes.client.CreateObject(CLSID_DesktopWallpaper, interface=IDesktopWallpaper) def list_monitor_ids(dw: IDesktopWallpaper) -> List[str]: count = c_uint() hr = dw.GetMonitorDevicePathCount(ctypes.byref(count)) if hr != 0: raise OSError(f"GetMonitorDevicePathCount failed: 0x{hr:08X}") ids = [] for i in range(count.value): p = wintypes.LPWSTR() hr = dw.GetMonitorDevicePathAt(i, ctypes.byref(p)) if hr != 0: raise OSError(f"GetMonitorDevicePathAt({i}) failed: 0x{hr:08X}") try: ids.append(ctypes.wstring_at(p)) finally: ctypes.windll.ole32.CoTaskMemFree(p) return ids # ---------- URL normalization ---------- def normalize_base_url(url: str) -> str: if not url: return url p = urlparse(url.strip()) host = p.hostname or "" try: host = str(ipaddress.IPv4Address(host)) # strips leading zeros, etc. except Exception: pass netloc = host + (f":{p.port}" if p.port else "") return urlunparse((p.scheme or "http", netloc, p.path or "", "", "", "")) # ---------- Persistence (DPAPI + files) ---------- APP_DIR = os.path.join(os.environ.get("APPDATA", os.path.expanduser("~")), "ImmichWallpaperRotator") CONFIG_PATH = os.path.join(APP_DIR, "config.json") HISTORY_PATH = os.path.join(APP_DIR, "history.json") METADATA_PATH = os.path.join(APP_DIR, "image_metadata.json") # New metadata cache file class DATA_BLOB(ctypes.Structure): _fields_=[("cbData",wintypes.DWORD),("pbData",ctypes.POINTER(ctypes.c_byte))] CryptProtectData = ctypes.windll.crypt32.CryptProtectData CryptUnprotectData = ctypes.windll.crypt32.CryptUnprotectData LocalFree = ctypes.windll.kernel32.LocalFree def _to_blob(b: bytes): buf = ctypes.create_string_buffer(b, len(b)) return DATA_BLOB(len(b), ctypes.cast(buf, ctypes.POINTER(ctypes.c_byte))) def _from_blob(blob: DATA_BLOB) -> bytes: ptr = ctypes.cast(blob.pbData, ctypes.POINTER(ctypes.c_ubyte)) try: return bytes(bytearray(ptr[i] for i in range(blob.cbData))) finally: LocalFree(blob.pbData) def dpapi_protect(s: str) -> str: if not s: return "" inb=_to_blob(s.encode()); out=DATA_BLOB() if not CryptProtectData(ctypes.byref(inb), None, None, None, None, 0x1, ctypes.byref(out)): raise OSError("CryptProtectData failed") return base64.b64encode(_from_blob(out)).decode() def dpapi_unprotect(s: str) -> str: if not s: return "" inb=_to_blob(base64.b64decode(s)); out=DATA_BLOB() if not CryptUnprotectData(ctypes.byref(inb), None, None, None, None, 0x1, ctypes.byref(out)): raise OSError("CryptUnprotectData failed") return _from_blob(out).decode() def ensure_appdir(): os.makedirs(APP_DIR, exist_ok=True) def load_config(): ensure_appdir() if not os.path.exists(CONFIG_PATH): return {} try: data = json.load(open(CONFIG_PATH,"r",encoding="utf-8")) except Exception: return {} try: data["api_key"] = dpapi_unprotect(data.get("api_key_protected","")) except Exception: data["api_key"] = "" return data def save_config(d: dict): ensure_appdir() out = dict(d) out["api_key_protected"] = dpapi_protect(out.pop("api_key","")) json.dump(out, open(CONFIG_PATH,"w",encoding="utf-8"), indent=2) class SeenHistory: def __init__(self, path: str, limit: int): self.path = path self.limit = limit self.order: List[str] = [] if os.path.exists(path): try: self.order = json.load(open(path,"r")) except Exception: self.order = [] self._set = set(self.order) def add_many(self, ids: Iterable[str]): for i in ids: if i in self._set: try: self.order.remove(i) # move to end (most recent) except ValueError: pass else: self._set.add(i) self.order.append(i) if len(self.order) > self.limit: drop = len(self.order) - self.limit for _ in range(drop): old = self.order.pop(0) self._set.discard(old) def contains(self, id_: str) -> bool: return id_ in self._set def save(self): ensure_appdir() try: json.dump(self.order, open(self.path,"w")) except Exception: pass # ---------- Image Metadata Cache ---------- class ImageMetadataCache: """ Persistent cache for image metadata including orientation, dimensions, and other properties. """ def __init__(self, path: str): self.path = path self.cache: Dict[str, Dict] = {} self._lock = threading.Lock() self.load() def load(self): """Load metadata cache from file""" ensure_appdir() if os.path.exists(self.path): try: with open(self.path, 'r', encoding='utf-8') as f: data = json.load(f) # Validate cache structure with self._lock: if isinstance(data, dict) and 'metadata' in data and 'version' in data: self.cache = data['metadata'] else: self.cache = {} except Exception: with self._lock: self.cache = {} def save(self): """Save metadata cache to file with improved error handling""" ensure_appdir() # Create a backup of existing cache before saving backup_path = self.path + ".backup" if os.path.exists(self.path): try: import shutil shutil.copy2(self.path, backup_path) except Exception: pass with self._lock: # Copy data to avoid holding lock during file I/O metadata_copy = self.cache.copy() cache_data = { 'version': '1.0', 'last_updated': time.time(), 'metadata': metadata_copy } try: # Write to temporary file first, then rename (atomic operation) temp_path = self.path + ".tmp" with open(temp_path, 'w', encoding='utf-8') as f: json.dump(cache_data, f, indent=2) # Atomic rename if os.path.exists(self.path): os.remove(self.path) os.rename(temp_path, self.path) # Verify the save worked try: with open(self.path, 'r', encoding='utf-8') as f: verify_data = json.load(f) saved_count = len(verify_data.get('metadata', {})) expected_count = len(metadata_copy) if saved_count != expected_count: print(f"WARNING: Cache save verification failed! Expected {expected_count}, got {saved_count}") # Try to restore from backup if os.path.exists(backup_path): shutil.copy2(backup_path, self.path) print("Restored from backup") else: print(f"Cache save verified: {saved_count} entries successfully saved") except Exception as e: print(f"Cache save verification failed: {e}") except Exception as e: print(f"Failed to save cache: {e}") # Try to restore from backup if os.path.exists(backup_path): try: import shutil shutil.copy2(backup_path, self.path) print("Restored cache from backup after save failure") except Exception: pass def get_metadata(self, asset_id: str) -> Optional[Dict]: """Get cached metadata for an asset""" with self._lock: return self.cache.get(asset_id) def set_metadata(self, asset_id: str, width: int, height: int, is_landscape: bool, file_size: Optional[int] = None, last_modified: Optional[str] = None): """Cache metadata for an asset""" with self._lock: self.cache[asset_id] = { 'width': width, 'height': height, 'is_landscape': is_landscape, 'file_size': file_size, 'last_modified': last_modified, 'cached_at': time.time() } def is_cached(self, asset_id: str) -> bool: """Check if asset metadata is cached""" with self._lock: return asset_id in self.cache and self.cache[asset_id] is not None def get_cached_orientation(self, asset_id: str) -> Optional[bool]: """Get cached orientation (True for landscape, False for portrait)""" with self._lock: metadata = self.cache.get(asset_id) return metadata['is_landscape'] if metadata else None def clear_cache(self): """Clear all cached metadata""" with self._lock: self.cache = {} self.save() def get_cache_stats(self) -> Dict: """Get statistics about the cache""" with self._lock: return { 'total_entries': len(self.cache), 'landscape_count': sum(1 for m in self.cache.values() if m.get('is_landscape', False)), 'portrait_count': sum(1 for m in self.cache.values() if not m.get('is_landscape', True)) } # ---------- Enhanced Pseudo-Random Number Generator ---------- class EnhancedRandomSelector: """ Enhanced pseudo-random selector with better distribution and weighting options. """ def __init__(self, seed: Optional[int] = None): self.rng = random.Random(seed) self.system_rng = secrets.SystemRandom() # For cryptographically secure randomness self.selection_weights = {} # Track how often items have been selected def select_items(self, items: List, count: int, exclude_set: set = None, use_secure_random: bool = True, use_weighted_selection: bool = True) -> List: """ Select random items with optional exclusion set and weighted selection. Uses cryptographically secure randomness by default. """ if not items: return [] available_items = [item for item in items if not exclude_set or get_asset_id(item) not in exclude_set] if not available_items: # If no items available after exclusion, reset and use all items available_items = items count = min(count, len(available_items)) # Use weighted selection to favor less recently used items if use_weighted_selection and len(available_items) > count: return self._weighted_sample(available_items, count, use_secure_random) if use_secure_random: return self.system_rng.sample(available_items, count) else: return self.rng.sample(available_items, count) def _weighted_sample(self, items: List, count: int, use_secure_random: bool = True) -> List: """ Sample items with weights favoring less frequently selected items. """ # Calculate weights - items selected less frequently get higher weights weights = [] max_weight = 100 for item in items: asset_id = get_asset_id(item) selection_count = self.selection_weights.get(asset_id, 0) # Higher weight for items selected less frequently weight = max(1, max_weight - (selection_count * 10)) weights.append(weight) # Use weighted random selection selected = [] items_copy = items.copy() weights_copy = weights.copy() for _ in range(count): if not items_copy: break # Calculate cumulative weights total_weight = sum(weights_copy) if total_weight == 0: # Fallback to uniform selection if use_secure_random: idx = self.system_rng.randrange(len(items_copy)) else: idx = self.rng.randrange(len(items_copy)) else: # Weighted selection if use_secure_random: target = self.system_rng.uniform(0, total_weight) else: target = self.rng.uniform(0, total_weight) cumsum = 0 idx = 0 for i, weight in enumerate(weights_copy): cumsum += weight if cumsum >= target: idx = i break selected_item = items_copy.pop(idx) weights_copy.pop(idx) selected.append(selected_item) # Update selection count asset_id = get_asset_id(selected_item) self.selection_weights[asset_id] = self.selection_weights.get(asset_id, 0) + 1 return selected def shuffle_list(self, items: List, use_secure_random: bool = True) -> List: """Shuffle a list in place or return shuffled copy""" items_copy = items.copy() if use_secure_random: self.system_rng.shuffle(items_copy) else: self.rng.shuffle(items_copy) return items_copy def reset_weights(self): """Reset selection weights for truly fresh randomization""" self.selection_weights.clear() def get_selection_stats(self) -> Dict: """Get statistics about selection frequency""" if not self.selection_weights: return {"total_selections": 0, "unique_items": 0, "most_selected": 0} total_selections = sum(self.selection_weights.values()) unique_items = len(self.selection_weights) most_selected = max(self.selection_weights.values()) if self.selection_weights else 0 return { "total_selections": total_selections, "unique_items": unique_items, "most_selected": most_selected, "average_selections": total_selections / unique_items if unique_items > 0 else 0 } # ---------- Immich Client ---------- class ImmichClient: """ x-api-key client with enhanced metadata support and proper API usage. """ def __init__(self, base_url: str, api_key: str, timeout=15, logger=None): self.base_url = base_url.rstrip("/") self.session = requests.Session() self.session.headers.update({ "x-api-key": api_key, "Accept": "application/json", "Content-Type": "application/json", }) self.timeout = timeout self.log = logger or (lambda *a, **k: None) def _get(self, path, **kw): r = self.session.get(f"{self.base_url}{path}", timeout=self.timeout, **kw) r.raise_for_status() return r def _post(self, path, json=None, **kw): r = self.session.post(f"{self.base_url}{path}", json=json or {}, timeout=self.timeout, **kw) r.raise_for_status() return r def list_all_assets_with_metadata(self, page_size: int = 5000) -> List[dict]: """ Fetch all assets using the modern searchAssets endpoint. Based on Immich API documentation, search without filters returns all assets. """ all_assets = [] # Try the modern searchAssets endpoint first methods = [ self._try_search_assets_endpoint, self._try_search_metadata_paginated, self._try_assets_endpoint, self._try_simple_assets_endpoint ] for method in methods: try: self.log(f"Trying {method.__name__}...") assets = method(page_size) if assets and len(assets) > len(all_assets): all_assets = assets self.log(f"Successfully fetched {len(all_assets)} assets using {method.__name__}") break except Exception as e: self.log(f"{method.__name__} failed: {e}") continue if not all_assets: self.log("All asset fetching methods failed!") return all_assets def _try_search_assets_endpoint(self, page_size: int) -> List[dict]: """Try the modern /api/search/assets endpoint (POST)""" all_assets = [] page = 1 while True: # Search without filters to get all assets payload = { "page": page, "size": page_size, "withExif": True } try: r = self._post("/api/search/assets", json=payload) data = r.json() # Handle different response formats if isinstance(data, list): assets = data elif isinstance(data, dict): # Look for assets in various possible locations if "assets" in data: if isinstance(data["assets"], dict) and "items" in data["assets"]: assets = data["assets"]["items"] else: assets = data["assets"] elif "items" in data: assets = data["items"] elif "results" in data: assets = data["results"] else: assets = [] else: assets = [] if not assets: self.log(f"No assets on page {page}, stopping") break all_assets.extend(assets) self.log(f"Search page {page}: {len(assets)} assets (total: {len(all_assets)})") # If we got fewer than requested, we've reached the end if len(assets) < page_size: self.log(f"Got {len(assets)} < {page_size} requested, search complete") break page += 1 # Safety limit if len(all_assets) > 200000: self.log("Safety limit reached - stopping search") break except Exception as e: self.log(f"Error on search page {page}: {e}") break return all_assets def _try_assets_endpoint(self, page_size: int) -> List[dict]: """Try the /api/assets endpoint with cursor-based pagination""" all_assets = [] cursor = None while True: params = { "size": page_size, "withExif": True } if cursor: params["cursor"] = cursor r = self._get("/api/assets", params=params) data = r.json() # Handle different response formats if isinstance(data, list): assets = data cursor = None # No pagination info elif isinstance(data, dict): if "assets" in data: assets = data["assets"] elif "items" in data: assets = data["items"] else: assets = [] # Look for cursor/pagination info cursor = data.get("nextCursor") or data.get("cursor") else: break if not assets: break all_assets.extend(assets) self.log(f"Fetched {len(assets)} assets (total: {len(all_assets)})") # If no cursor or got fewer than requested, we're done if not cursor or len(assets) < page_size: break # Safety limit if len(all_assets) > 200000: self.log("Safety limit reached (200k assets)") break return all_assets def _try_search_metadata_paginated(self, page_size: int) -> List[dict]: """Try search/metadata with duplicate detection to handle buggy servers""" all_assets = [] skip = 0 page_num = 1 seen_asset_ids = set() while True: payload = { "type": "IMAGE", "take": page_size, "skip": skip, "withExif": True } self.log(f"DEBUG: Page {page_num} request: skip={skip}, take={page_size}") try: r = self._post("/api/search/metadata", json=payload) data = r.json() # Handle response format if isinstance(data, dict) and "assets" in data: assets_obj = data["assets"] if isinstance(assets_obj, dict): assets = assets_obj.get("items", []) has_next_page = assets_obj.get("nextPage") is not None server_total = assets_obj.get("total", 0) else: assets = assets_obj if isinstance(assets_obj, list) else [] has_next_page = False server_total = len(assets) else: assets = [] has_next_page = False server_total = 0 if not assets: self.log(f"No assets on page {page_num}, stopping") break # Detect duplicate assets (server bug workaround) new_assets = [] duplicates = 0 for asset in assets: asset_id = get_asset_id(asset) if asset_id not in seen_asset_ids: seen_asset_ids.add(asset_id) new_assets.append(asset) else: duplicates += 1 self.log(f"Page {page_num}: {len(assets)} returned, {len(new_assets)} new, {duplicates} duplicates") # If all assets are duplicates, server is looping - stop if len(new_assets) == 0: self.log("All assets on this page are duplicates - server is looping, stopping") break all_assets.extend(new_assets) self.log(f"Search page {page_num}: {len(new_assets)} new assets (total unique: {len(all_assets)})") # Stop if we've seen all unique assets (common with buggy servers) if len(all_assets) >= server_total and server_total > 0: self.log(f"Collected {len(all_assets)} assets, server reports {server_total} total - stopping") break # If less than 25% of returned assets are new, server is likely looping if duplicates > 0 and (len(new_assets) / len(assets)) < 0.25: self.log("High duplicate rate detected - server appears to be looping, stopping") break skip += len(assets) # Use original count for server compatibility page_num += 1 # Reasonable safety limits if len(all_assets) > 150000: # More than any reasonable personal collection self.log("Reasonable asset limit reached - stopping") break if page_num > 1000: # Prevent infinite loops self.log("Page limit reached - stopping to prevent infinite loop") break except Exception as e: self.log(f"Error on search page {page_num}: {e}") break self.log(f"Pagination complete. Total unique assets fetched: {len(all_assets)}") return all_assets def _try_simple_assets_endpoint(self, page_size: int) -> List[dict]: """Try simple GET /api/asset (legacy)""" try: r = self._get("/api/asset") data = r.json() if isinstance(data, list): self.log(f"Legacy endpoint returned {len(data)} assets") return data except Exception: pass return [] def _fallback_asset_listing(self) -> List[dict]: """Fallback method using search/metadata endpoint""" self.log("Using fallback search/metadata method...") try: # Try the search endpoint as fallback r = self._post("/api/search/metadata", json={ "type": "IMAGE", "take": 50000, # Large number to get as many as possible "skip": 0, "withExif": True }) data = r.json() if isinstance(data, list): return data elif isinstance(data, dict): # Handle different response structures if "assets" in data: if isinstance(data["assets"], dict) and "items" in data["assets"]: return data["assets"]["items"] elif isinstance(data["assets"], list): return data["assets"] elif "items" in data: return data["items"] return [] except Exception as e: self.log(f"Fallback method also failed: {e}") return [] def list_album_assets(self, album_id: str) -> List[dict]: """Get assets from a specific album with metadata""" if not album_id: return [] self.log(f"Fetching assets from album: {album_id}") try: r = self._get(f"/api/albums/{album_id}") data = r.json() if isinstance(data, dict) and "assets" in data: assets = data["assets"] self.log(f"Found {len(assets)} assets in album {album_id}") return assets else: self.log(f"Unexpected album response format: {type(data)}") return [] except requests.HTTPError as e: if e.response.status_code == 404: self.log(f"Album {album_id} not found") else: self.log(f"Error fetching album {album_id}: HTTP {e.response.status_code}") return [] except Exception as e: self.log(f"Error fetching album {album_id}: {e}") return [] def download_original(self, asset_id: str) -> bytes: """Download the original image file""" try: # Try modern endpoint first path = f"/api/assets/{asset_id}/original" r = self.session.get(f"{self.base_url}{path}", stream=True, timeout=30) r.raise_for_status() return r.content except requests.HTTPError as e: if e.response.status_code == 404: # Try legacy endpoint try: path = f"/api/asset/{asset_id}/original" r = self.session.get(f"{self.base_url}{path}", stream=True, timeout=30) r.raise_for_status() return r.content except Exception: pass raise # ---------- Windows wallpaper helpers ---------- SPI_SETDESKWALLPAPER = 20 SPIF_UPDATEINIFILE = 0x01 SPIF_SENDWININICHANGE= 0x02 def set_wallpaper_style(fit: bool): try: with winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop", 0, winreg.KEY_SET_VALUE) as k: if fit: # Fit (no crop) winreg.SetValueEx(k, "WallpaperStyle", 0, winreg.REG_SZ, "6") winreg.SetValueEx(k, "TileWallpaper", 0, winreg.REG_SZ, "0") else: # Fill (crop) winreg.SetValueEx(k, "WallpaperStyle", 0, winreg.REG_SZ, "10") winreg.SetValueEx(k, "TileWallpaper", 0, winreg.REG_SZ, "0") except OSError: pass def apply_wallpaper_bmp_legacy(bmp_path: str, fit: bool): set_wallpaper_style(fit) ctypes.windll.user32.SystemParametersInfoW( SPI_SETDESKWALLPAPER, 0, bmp_path, SPIF_UPDATEINIFILE | SPIF_SENDWININICHANGE ) def ensure_img_file(image_bytes: bytes, suffix=".jpg") -> str: img = Image.open(io.BytesIO(image_bytes)) if img.mode in ("RGBA","LA"): bg = Image.new("RGB", img.size, (0,0,0)); bg.paste(img, mask=img.split()[-1]); img = bg elif img.mode != "RGB": img = img.convert("RGB") out = os.path.join(tempfile.gettempdir(), f"immich_wall_{random.randint(0,1_000_000)}{suffix}") img.save(out, format="JPEG", quality=92) return out def set_per_monitor_wallpapers(paths: List[str], same_for_all: bool, fit: bool): try: dw = get_desktop_wallpaper_com() dw.SetPosition(DWPOS_FIT if fit else DWPOS_FILL) ids = list_monitor_ids(dw) if not ids: bmp = paths[0] if not bmp.lower().endswith(".bmp"): tmp = os.path.join(tempfile.gettempdir(), "immich_wallpaper.bmp") Image.open(paths[0]).convert("RGB").save(tmp, "BMP"); bmp = tmp apply_wallpaper_bmp_legacy(bmp, fit) return if same_for_all: for mid in ids: dw.SetWallpaper(mid, paths[0]) else: for i, mid in enumerate(ids): dw.SetWallpaper(mid, paths[min(i, len(paths)-1)]) except Exception: bmp = paths[0] if not bmp.lower().endswith(".bmp"): tmp = os.path.join(tempfile.gettempdir(), "immich_wallpaper.bmp") Image.open(paths[0]).convert("RGB").save(tmp, "BMP"); bmp = tmp apply_wallpaper_bmp_legacy(bmp, fit) # ---------- Helpers ---------- def normalize_asset(rec: dict) -> dict: return rec["asset"] if isinstance(rec,dict) and isinstance(rec.get("asset"),dict) else rec def get_dims_from_asset(asset: dict) -> tuple[Optional[int], Optional[int]]: """ Get dimensions from asset metadata with enhanced fallback logic. Fixed for Immich v1.143.1 API format based on debug data. """ # Based on debug output, dimensions are in exifInfo.exifImageWidth/exifImageHeight exif = asset.get("exifInfo") if exif and isinstance(exif, dict): w = exif.get("exifImageWidth") h = exif.get("exifImageHeight") if w is not None and h is not None: try: return int(w), int(h) except (ValueError, TypeError): pass # Try other possible locations as fallback w = asset.get("originalWidth") or asset.get("width") h = asset.get("originalHeight") or asset.get("height") if w is not None and h is not None: try: return int(w), int(h) except (ValueError, TypeError): pass # Try resized data as fallback resized = asset.get('resized') if resized and isinstance(resized, dict): w = resized.get('width') h = resized.get('height') if w is not None and h is not None: try: return int(w), int(h) except (ValueError, TypeError): pass return None, None def orientation_match(asset: dict, landscape_only: bool, metadata_cache: ImageMetadataCache = None) -> bool: """Check orientation with metadata cache support""" asset_id = get_asset_id(asset) # Try to get from cache first if metadata_cache: cached_orientation = metadata_cache.get_cached_orientation(asset_id) if cached_orientation is not None: return cached_orientation if landscape_only else not cached_orientation # Fallback to asset metadata w, h = get_dims_from_asset(asset) if w and h: is_landscape = w >= h # Cache the result if we have a metadata cache if metadata_cache: metadata_cache.set_metadata(asset_id, w, h, is_landscape) return is_landscape if landscape_only else not is_landscape return True def is_image_asset(a: dict) -> bool: t = (a.get("type") or a.get("assetType") or "").upper() return t == "IMAGE" def is_archived(a: dict) -> bool: x = a.get("isArchived") if x is None: x = a.get("isTrashed") return bool(x) def is_favorite(a: dict) -> bool: return bool(a.get("isFavorite")) def get_asset_id(a: dict) -> str: return a.get("id") or a.get("assetId") or "" def get_asset_name(a: dict) -> str: return a.get("originalFileName") or a.get("fileName") or get_asset_id(a) # ---------- GUI ---------- class App(tk.Tk): def __init__(self): super().__init__() self.title("Immich Wallpaper Rotator (Windows, Multi-Monitor) - Enhanced") self.geometry("680x760") self.resizable(False, False) self._stop_event = threading.Event() self._runner_thread = None self._cached_assets: List[dict] = [] # Initialize enhanced components self._metadata_cache = ImageMetadataCache(METADATA_PATH) self._random_selector = EnhancedRandomSelector() cfg = load_config() # Core self.server_var = tk.StringVar(value=cfg.get("server","")) self.api_key_var = tk.StringVar(value=cfg.get("api_key","")) self.album_var = tk.StringVar(value=cfg.get("album_id","")) self.interval_var = tk.StringVar(value=str(cfg.get("interval_min","30"))) self.exclude_archived_var = tk.BooleanVar(value=bool(cfg.get("exclude_archived",True))) self.only_favorites_var = tk.BooleanVar(value=bool(cfg.get("only_favorites",False))) self.per_monitor_var = tk.BooleanVar(value=bool(cfg.get("per_monitor",True))) self.same_for_all_var = tk.BooleanVar(value=bool(cfg.get("same_for_all",True))) self.fit_image_var = tk.BooleanVar(value=bool(cfg.get("fit_image",False))) self.landscape_only_var = tk.BooleanVar(value=bool(cfg.get("landscape_only",False))) # Cache all images during build (ignoring current filters for metadata caching) cache_all_images = tk.BooleanVar(value=bool(cfg.get("cache_all_images", True))) # Advanced + Theme self.cache_target_var = tk.StringVar(value=str(cfg.get("cache_target", DEFAULT_CACHE_TARGET))) self.history_limit_var = tk.StringVar(value=str(cfg.get("history_limit", DEFAULT_HISTORY_LIMIT))) self.dark_mode_var = tk.BooleanVar(value=bool(cfg.get("dark_mode", False))) self.cache_all_images_var = cache_all_images # History after reading config hist_limit = self._get_int(self.history_limit_var, DEFAULT_HISTORY_LIMIT, 100, 200000) self._history = SeenHistory(HISTORY_PATH, hist_limit) # Build UI & Menu self._build_ui() self._build_menu() self._apply_theme(self.dark_mode_var.get()) self.protocol("WM_DELETE_WINDOW", self._on_close) # ----- Theme ----- def _apply_theme(self, dark: bool): if HAVE_SV_TTK: sv_ttk.set_theme("dark" if dark else "light") # Text widget manual colors for readability if hasattr(self,"status"): if dark: self.status.configure(bg="#1E1E1E", fg="#E6E6E6", insertbackground="#E6E6E6") else: self.status.configure(bg="#FFFFFF", fg="#000000", insertbackground="#000000") return # Fallback polished ttk styling style = ttk.Style(self) try: style.theme_use("clam") except Exception: pass if dark: palette = { "BG": "#121212", "SURF": "#1E1E1E", "TXT": "#E6E6E6", "MUT": "#C8C8C8", "ACC": "#3D6BE5", "BRD": "#2A2A2A", "FIELD": "#1A1A1A" } self.configure(bg=palette["BG"]) style.configure(".", background=palette["BG"], foreground=palette["TXT"]) style.configure("TFrame", background=palette["BG"]) style.configure("TLabelframe", background=palette["SURF"], bordercolor=palette["BRD"], relief="solid") style.configure("TLabelframe.Label", background=palette["SURF"], foreground=palette["TXT"]) style.configure("TLabel", background=palette["BG"], foreground=palette["TXT"]) style.configure("Small.TLabel", background=palette["BG"], foreground=palette["MUT"]) style.configure("TButton", background=palette["SURF"], foreground=palette["TXT"], bordercolor=palette["BRD"]) style.map("TButton", background=[("active", "#2A2A2A")]) style.configure("TEntry", fieldbackground=palette["FIELD"], background=palette["FIELD"], foreground=palette["TXT"], bordercolor=palette["BRD"]) style.configure("TCheckbutton", background=palette["SURF"], foreground=palette["TXT"]) style.configure("TNotebook", background=palette["BG"], borderwidth=0) style.configure("TNotebook.Tab", background=palette["BG"], foreground=palette["MUT"], borderwidth=1, padding=[8, 4]) style.map("TNotebook.Tab", background=[("selected", palette["SURF"])], foreground=[("selected", palette["TXT"])]) if hasattr(self,"status"): self.status.configure(bg=palette["FIELD"], fg=palette["TXT"], insertbackground=palette["TXT"]) self._apply_styles_to_widgets(True) else: self.configure(bg="") style.configure(".", background="", foreground="") style.configure("TFrame", background="") style.configure("TLabelframe", background="") style.configure("TLabelframe.Label", background="", foreground="") style.configure("TLabel", background="", foreground="") style.configure("Small.TLabel", foreground="gray") style.configure("TButton", background="", foreground="") style.configure("TEntry", fieldbackground="white", foreground="black") style.configure("TCheckbutton", background="", foreground="") style.configure("TNotebook", background="", borderwidth=1) style.configure("TNotebook.Tab", background="", foreground="", borderwidth=1) if hasattr(self,"status"): self.status.configure(bg="white", fg="black", insertbackground="black") self._apply_styles_to_widgets(False) def _apply_styles_to_widgets(self, dark: bool): # Only used by fallback theme for w in self._labels: w.configure(style="TLabel" if not dark else "TLabel") for w in self._small_labels: w.configure(style="Small.TLabel") for w in self._entries: w.configure(style="TEntry" if not dark else "TEntry") for w in self._buttons: w.configure(style="TButton" if not dark else "TButton") for w in self._groups: w.configure(style="TLabelframe" if not dark else "TLabelframe") for w in self._checks: w.configure(style="TCheckbutton" if not dark else "TCheckbutton") if hasattr(self, "_root_frame"): self._root_frame.configure(style="TFrame" if not dark else "TFrame") # ----- Menu ----- def _build_menu(self): menubar = tk.Menu(self); self.config(menu=menubar) view = tk.Menu(menubar, tearoff=False) view.add_checkbutton(label="Dark mode", variable=self.dark_mode_var, command=self._on_toggle_dark, accelerator="Ctrl+D") view.add_separator() view.add_command(label="Clear metadata cache", command=self._clear_metadata_cache) view.add_command(label="Cache statistics", command=self._show_cache_stats) menubar.add_cascade(label="View", menu=view) tools = tk.Menu(menubar, tearoff=False) tools.add_command(label="Rebuild metadata cache", command=self._rebuild_metadata_cache) tools.add_command(label="Export cache statistics", command=self._export_cache_stats) tools.add_separator() tools.add_command(label="Reset selection weights", command=self._reset_selection_weights) tools.add_command(label="Selection statistics", command=self._show_selection_stats) menubar.add_cascade(label="Tools", menu=tools) self.bind_all("", lambda e: self._toggle_dark()) def _toggle_dark(self): self.dark_mode_var.set(not self.dark_mode_var.get()) self._on_toggle_dark() def _clear_metadata_cache(self): if messagebox.askyesno("Clear Cache", "This will clear all cached image metadata and rebuild from scratch. Continue?"): self._metadata_cache.clear_cache() self._log("Metadata cache cleared completely. Next run will rebuild from scratch.") self._update_cache_info() def _show_cache_stats(self): stats = self._metadata_cache.get_cache_stats() msg = f"Metadata Cache Statistics:\n\nTotal entries: {stats['total_entries']}\nLandscape images: {stats['landscape_count']}\nPortrait images: {stats['portrait_count']}" messagebox.showinfo("Cache Statistics", msg) def _rebuild_metadata_cache(self): if messagebox.askyesno("Rebuild Cache", "This will clear and rebuild the metadata cache. This may take some time. Continue?"): self._metadata_cache.clear_cache() self._log("Metadata cache rebuild requested. Start the wallpaper rotation to rebuild.") self._update_cache_info() def _reset_selection_weights(self): if messagebox.askyesno("Reset Weights", "This will reset selection weights for truly random distribution. Continue?"): self._random_selector.reset_weights() self._log("Selection weights reset. Next selections will be completely random.") def _show_selection_stats(self): stats = self._random_selector.get_selection_stats() msg = (f"Selection Statistics:\n\n" f"Total selections: {stats['total_selections']}\n" f"Unique items selected: {stats['unique_items']}\n" f"Most selected count: {stats['most_selected']}\n" f"Average selections per item: {stats['average_selections']:.2f}") messagebox.showinfo("Selection Statistics", msg) def _export_cache_stats(self): try: stats = self._metadata_cache.get_cache_stats() selection_stats = self._random_selector.get_selection_stats() combined_stats = { "metadata_cache": stats, "selection_stats": selection_stats, "export_time": time.time() } stats_file = os.path.join(APP_DIR, "cache_stats.json") with open(stats_file, 'w') as f: json.dump(combined_stats, f, indent=2) self._log(f"Cache and selection statistics exported to: {stats_file}") except Exception as e: self._log(f"Failed to export statistics: {e}") # ----- UI ----- def _build_ui(self): self._labels, self._entries, self._buttons, self._groups, self._checks, self._small_labels = [], [], [], [], [], [] self._root_frame = ttk.Frame(self, padding=(12, 12, 12, 0)) self._root_frame.pack(fill="both", expand=True) # --- TABS --- notebook = ttk.Notebook(self._root_frame) notebook.pack(fill="x", pady=(0, 10)) tab_connect = ttk.Frame(notebook, padding=10) tab_options = ttk.Frame(notebook, padding=10) tab_advanced = ttk.Frame(notebook, padding=10) notebook.add(tab_connect, text="Connect & Run") notebook.add(tab_options, text="Filters & Display") notebook.add(tab_advanced, text="Advanced") # --- TAB 1: CONNECT & RUN --- connect_group = ttk.Frame(tab_connect) connect_group.pack(fill="x") connect_group.columnconfigure(1, weight=1) def L(parent, text, r, c, **kwargs): w = ttk.Label(parent, text=text, **kwargs); w.grid(row=r, column=c, sticky="w", padx=(0, 10), pady=6); self._labels.append(w); return w def SL(parent, text, r, c, **kwargs): w = ttk.Label(parent, text=text, style="Small.TLabel", **kwargs); w.grid(row=r, column=c, sticky="w", padx=0, pady=(0,6)); self._small_labels.append(w); return w def E(parent, var, r, c, **kwargs): w = ttk.Entry(parent, textvariable=var, **kwargs); w.grid(row=r, column=c, sticky="ew", pady=6); self._entries.append(w); return w def C(parent, text, var, r, c, **kwargs): cmd = kwargs.pop('command', self._save_now) w = ttk.Checkbutton(parent, text=text, variable=var, command=cmd, **kwargs) w.grid(row=r, column=c, sticky="w", padx=8, pady=4) self._checks.append(w) return w L(connect_group, "Immich Server URL", 0, 0); E(connect_group, self.server_var, 0, 1) L(connect_group, "API Key", 1, 0); E(connect_group, self.api_key_var, 1, 1, show="•") L(connect_group, "Album ID", 2, 0); E(connect_group, self.album_var, 2, 1) SL(connect_group, "(Leave blank to use all photos)", 3, 1) L(connect_group, "Interval (minutes)", 4, 0); E(connect_group, self.interval_var, 4, 1, width=12) # --- TAB 2: FILTERS & DISPLAY --- filter_group = ttk.Labelframe(tab_options, text="Image Filtering", padding=10) filter_group.pack(fill="x", pady=(0, 10)) self._groups.append(filter_group) C(filter_group, "Exclude archived", self.exclude_archived_var, 0, 0) C(filter_group, "Only favorites", self.only_favorites_var, 0, 1) C(filter_group, "Only landscape (instead of portrait)", self.landscape_only_var, 1, 0) display_group = ttk.Labelframe(tab_options, text="Display Options", padding=10) display_group.pack(fill="x") self._groups.append(display_group) C(display_group, "Use multi-monitor (IDesktopWallpaper)", self.per_monitor_var, 0, 0) C(display_group, "Same image on all monitors", self.same_for_all_var, 0, 1) C(display_group, "Show full image (Fit; no crop)", self.fit_image_var, 1, 0) C(display_group, "Dark mode", self.dark_mode_var, 2, 0, command=self._on_toggle_dark) # Use toggler command # --- TAB 3: ADVANCED --- adv_group = ttk.Frame(tab_advanced) adv_group.pack(fill="x") adv_group.columnconfigure(1, weight=1) L(adv_group, "Cache target", 0, 0); E(adv_group, self.cache_target_var, 0, 1, width=15) SL(adv_group, "(Number of image records to fetch per run)", 1, 1) L(adv_group, "History limit", 2, 0); E(adv_group, self.history_limit_var, 2, 1, width=15) SL(adv_group, "(Number of recent photos to avoid repeating)", 3, 1) # New option for complete metadata caching C(adv_group, "Cache all images (ignore current filters for metadata)", self.cache_all_images_var, 4, 0) SL(adv_group, "(Build complete metadata cache regardless of favorites/archive filters)", 5, 1) # Metadata cache info section cache_info_group = ttk.Labelframe(tab_advanced, text="Metadata Cache", padding=10) cache_info_group.pack(fill="x", pady=(10, 0)) self._groups.append(cache_info_group) cache_stats = self._metadata_cache.get_cache_stats() self.cache_info_label = ttk.Label(cache_info_group, text=f"Cached entries: {cache_stats['total_entries']} | Landscape: {cache_stats['landscape_count']} | Portrait: {cache_stats['portrait_count']}") self.cache_info_label.pack(anchor="w") self._labels.append(self.cache_info_label) # --- BUTTONS (below tabs) --- btn_frame = ttk.Frame(self._root_frame) btn_frame.pack(fill="x", pady=10) self.btn_start=ttk.Button(btn_frame,text="Start",command=self.start) self.btn_next =ttk.Button(btn_frame,text="Next now",command=self.next_now,state="disabled") self.btn_stop =ttk.Button(btn_frame,text="Stop",command=self.stop,state="disabled") self.btn_test =ttk.Button(btn_frame,text="Test",command=self.test_connection) for w in (self.btn_start, self.btn_next, self.btn_stop, self.btn_test): w.pack(side="left", padx=(0, 6)); self._buttons.append(w) # --- LOG (at the bottom) --- log_frame = ttk.Frame(self._root_frame) log_frame.pack(fill="both", expand=True, pady=(0, 12)) log_frame.rowconfigure(0, weight=1) log_frame.columnconfigure(0, weight=1) self.status=tk.Text(log_frame,height=10,wrap="word", relief="solid", borderwidth=1) self.status.grid(row=0,column=0,sticky="nsew") self._root_frame.rowconfigure(2,weight=1) self._log("Enhanced settings loaded. Metadata caching enabled for improved performance.") # Persist on change for v in (self.server_var, self.api_key_var, self.album_var, self.interval_var, self.cache_target_var, self.history_limit_var): v.trace_add("write", lambda *_: self._save_now()) # Also persist the cache all images setting self.cache_all_images_var.trace_add("write", lambda *_: self._save_now()) def _update_cache_info(self): """Update the cache info label with current statistics""" cache_stats = self._metadata_cache.get_cache_stats() self.cache_info_label.configure( text=f"Cached entries: {cache_stats['total_entries']} | Landscape: {cache_stats['landscape_count']} | Portrait: {cache_stats['portrait_count']}") # ----- persistence helpers ----- def _get_int(self, var: tk.StringVar, default: int, min_v: int, max_v: int) -> int: try: val = int(float(var.get().strip())) if val < min_v: val = min_v if val > max_v: val = max_v return val except Exception: return default def _collect_config(self): try: interval=float(self.interval_var.get().strip()) except Exception: interval=self.interval_var.get().strip() return { "server":self.server_var.get().strip(), "api_key":self.api_key_var.get(), "album_id":self.album_var.get().strip(), "interval_min":interval, "exclude_archived":self.exclude_archived_var.get(), "only_favorites":self.only_favorites_var.get(), "per_monitor":self.per_monitor_var.get(), "same_for_all":self.same_for_all_var.get(), "fit_image":self.fit_image_var.get(), "landscape_only":self.landscape_only_var.get(), "cache_target": self._get_int(self.cache_target_var, DEFAULT_CACHE_TARGET, 100, 500000), "history_limit": self._get_int(self.history_limit_var, DEFAULT_HISTORY_LIMIT, 100, 500000), "dark_mode": self.dark_mode_var.get(), "cache_all_images": self.cache_all_images_var.get(), } def _save_now(self, *args): cfg = self._collect_config() try: save_config(cfg) except Exception: pass # live-update history capacity self._history.limit = cfg["history_limit"] def _on_toggle_dark(self): self._apply_theme(self.dark_mode_var.get()) self._save_now() def _on_close(self): try: self.stop() except Exception: pass self._save_now() self._history.save() self._metadata_cache.save() self.destroy() # ----- logging ----- def _log(self, msg:str): self.status.configure(state="normal"); self.status.insert("end", msg+"\n") self.status.see("end"); self.status.configure(state="disabled") # ----- Test connection ----- def test_connection(self): server=normalize_base_url(self.server_var.get()); self.server_var.set(server) api_key=self.api_key_var.get().strip() if not server or not api_key: messagebox.showerror("Missing info","Server URL and API Key are required."); return self._log("Testing external library access specifically...") headers = {"x-api-key":api_key,"Accept":"application/json","Content-Type":"application/json"} # Test 1: Get libraries try: r = requests.get(server.rstrip("/")+"/api/libraries", headers=headers, timeout=10) if r.status_code == 200: libraries = r.json() self._log(f"✓ Found {len(libraries)} libraries") for lib in libraries: lib_id = lib.get('id') lib_name = lib.get('name', 'unnamed') self._log(f" - {lib_name} (ID: {lib_id})") else: self._log(f"✗ Libraries failed: HTTP {r.status_code}") return except Exception as e: self._log(f"✗ Libraries failed: {e}") return # Test 2: Search default library (no libraryId) try: payload = {"type": "IMAGE", "take": 10, "skip": 0} r = requests.post(server.rstrip("/")+"/api/search/metadata", json=payload, headers=headers, timeout=15) if r.status_code == 200: data = r.json() if isinstance(data, dict) and 'assets' in data: assets_obj = data['assets'] if isinstance(assets_obj, dict): count = len(assets_obj.get('items', [])) total = assets_obj.get('total', 0) self._log(f"✓ Default library: {count} returned, {total} total") else: self._log(f"✗ Default library search failed: HTTP {r.status_code}") except Exception as e: self._log(f"✗ Default library search failed: {e}") # Test 3: Search external library specifically external_lib_id = "8855617e-b0a1-4324-87f9-a1b10de11c27" try: payload = { "type": "IMAGE", "take": 10, "skip": 0, "libraryId": external_lib_id } r = requests.post(server.rstrip("/")+"/api/search/metadata", json=payload, headers=headers, timeout=15) if r.status_code == 200: data = r.json() if isinstance(data, dict) and 'assets' in data: assets_obj = data['assets'] if isinstance(assets_obj, dict): count = len(assets_obj.get('items', [])) total = assets_obj.get('total', 0) self._log(f"✓ External library: {count} returned, {total} total") if total > 250: self._log("🎉 FOUND THE MISSING ASSETS! External library has the photos!") else: self._log("⚠️ External library also limited to 250") else: self._log(f"✗ External library search failed: HTTP {r.status_code}") self._log(f"Response: {r.text[:200]}") except Exception as e: self._log(f"✗ External library search failed: {e}") # Test 4: Try without type filter on external library try: payload = { "take": 10, "skip": 0, "libraryId": external_lib_id } r = requests.post(server.rstrip("/")+"/api/search/metadata", json=payload, headers=headers, timeout=15) if r.status_code == 200: data = r.json() if isinstance(data, dict) and 'assets' in data: assets_obj = data['assets'] if isinstance(assets_obj, dict): count = len(assets_obj.get('items', [])) total = assets_obj.get('total', 0) self._log(f"✓ External library (no type filter): {count} returned, {total} total") else: self._log(f"✗ External library (no type) failed: HTTP {r.status_code}") except Exception as e: self._log(f"✗ External library (no type) failed: {e}") self._log("\n📋 EXTERNAL LIBRARY TEST COMPLETE") self._log("If external library shows high totals, the wallpaper app should be updated to use libraryId parameter.") # ----- Start/Stop/Next ----- def start(self): server=normalize_base_url(self.server_var.get().strip()); self.server_var.set(server) api_key=self.api_key_var.get().strip() if not server or not api_key: messagebox.showerror("Missing info","Server URL and API Key are required."); return try: interval_min=float(self.interval_var.get().strip()) except ValueError: messagebox.showerror("Invalid interval","Please enter a number for minutes."); return if interval_min<=0: messagebox.showerror("Invalid interval","Interval must be greater than zero."); return self._save_now() self._stop_event.clear() self.btn_start.configure(state="disabled") self.btn_stop.configure(state="normal") self.btn_next.configure(state="disabled") # Keep 'Next' disabled until cache is ready t = threading.Thread(target=self._run_loop, args=(server, api_key, self.album_var.get().strip(), interval_min), daemon=True) t.start(); self._runner_thread=t self._log("Started enhanced wallpaper rotation with metadata caching.") def stop(self): self._stop_event.set() self.btn_stop.configure(state="disabled") self.btn_next.configure(state="disabled") self.btn_start.configure(state="normal") self._log("Stopping… (will stop after current cycle)") def next_now(self): threading.Thread(target=self._change_once, daemon=True).start() # ----- Enhanced Core loop with metadata caching ----- def _run_loop(self, server, api_key, album_id, interval_min, settings=None): if settings is None: settings = _RunSettings( landscape_only = bool(self.landscape_only_var.get()), only_favorites = bool(self.only_favorites_var.get()), exclude_archived = bool(self.exclude_archived_var.get()), same_for_all = bool(self.same_for_all_var.get()), fit_no_crop = bool(self.fit_image_var.get()), cache_target = self._get_int(self.cache_target_var, DEFAULT_CACHE_TARGET, 100, 500000), ) self._client = ImmichClient(server, api_key, logger=self._log) self._cached_assets = [] self._log("Connecting and building enhanced metadata cache...") cache_target = int(settings.get('cache_target', DEFAULT_CACHE_TARGET)) want_landscape = bool(settings.get('landscape_only', False)) only_favorites = bool(settings.get('only_favorites', False)) exclude_archived = bool(settings.get('exclude_archived', False)) self._log(f"Enhanced filters => landscape_only={want_landscape}, only_favorites={only_favorites}, exclude_archived={exclude_archived}") self._log(f"Cache target set to: {cache_target} images") try: # Step 1: Get all assets from the server first self._log("Fetching full asset list from server...") if album_id: initial_assets = self._client.list_album_assets(album_id) else: initial_assets = self._client.list_all_assets_with_metadata(page_size=10000) if not initial_assets: self._log("No assets found on server or API error. Check your connection and permissions.") self.after(0, self.stop) return self._log(f"Found {len(initial_assets)} total assets on server.") # Step 2: DISCOVERY PHASE - Find what needs to be cached self._log("Phase 1: Discovering uncached images...") assets_to_analyze = [] cache_hits = 0 filtered_out = 0 use_filters_for_caching = not bool(self.cache_all_images_var.get()) for asset in initial_assets: if self._stop_event.is_set(): break asset = normalize_asset(asset) if not is_image_asset(asset): continue # Apply filters if caching is not set to "all images" if use_filters_for_caching: if only_favorites and not is_favorite(asset): filtered_out += 1 continue if exclude_archived and is_archived(asset): filtered_out += 1 continue asset_id = get_asset_id(asset) cached_metadata = self._metadata_cache.get_metadata(asset_id) # Only consider it cached if it has valid dimensions if (cached_metadata and cached_metadata.get('width') and cached_metadata.get('height')): cache_hits += 1 else: assets_to_analyze.append(asset) # REMOVED: Cache target check during discovery phase # This allows all assets to be processed self._log(f"Discovery complete. Found {len(assets_to_analyze)} new images to cache. Hits: {cache_hits}. Filtered: {filtered_out}.") # Step 3: PROCESSING PHASE - Analyze and cache the discovered images self._log("Phase 2: Caching new image metadata...") total_to_process = len(assets_to_analyze) images_cached_this_run = 0 # Process assets sequentially to avoid threading issues with cache for i, asset in enumerate(assets_to_analyze): if self._stop_event.is_set(): break try: asset_id = get_asset_id(asset) w, h = get_dims_from_asset(asset) if w and h: is_landscape = w >= h self._metadata_cache.set_metadata(asset_id, w, h, is_landscape) images_cached_this_run += 1 # Progress reporting every 1000 images if (i + 1) % 1000 == 0 or (i + 1) == total_to_process: progress = int(((i + 1) / total_to_process) * 100) if total_to_process > 0 else 100 current_stats = self._metadata_cache.get_cache_stats() self._log(f"Progress: {progress}% | Processed: {i + 1}/{total_to_process} | " f"Successfully cached: {images_cached_this_run} | Total in cache: {current_stats['total_entries']}") self.after(0, self._update_cache_info) else: # Debug first few failures if images_cached_this_run < 5: self._log(f"No dimensions for asset {asset_id[:8]}...") except Exception as e: if images_cached_this_run < 3: self._log(f"Error processing asset {get_asset_id(asset)}: {e}") self._log(f"Processing complete. Successfully cached {images_cached_this_run} out of {total_to_process} assets") except Exception as e: self._log(f"Error building metadata cache: {e}") self.after(0, self.stop) return # Step 4: Finalize and start rotation self._log(f"DEBUG: Before save - in-memory cache has {len(self._metadata_cache.cache)} entries") self._metadata_cache.save() self._log(f"DEBUG: After save - file should contain the same number of entries") self._metadata_cache.load() cache_stats = self._metadata_cache.get_cache_stats() self._log(f"DEBUG: After reload - cache stats show {cache_stats['total_entries']} entries") self.after(0, self._update_cache_info) available_count = cache_stats['landscape_count'] if want_landscape else cache_stats['portrait_count'] self._log(f"Metadata cache saved and reloaded: {cache_stats['total_entries']} total images") self._log(f"Available for selection: {available_count} {'landscape' if want_landscape else 'portrait'} images") # Debug: Check if cache file actually contains what we expect self._log(f"DEBUG: Cache stats breakdown - Total: {cache_stats['total_entries']}, " f"Landscape: {cache_stats['landscape_count']}, Portrait: {cache_stats['portrait_count']}") # Debug: Sample some cache entries sample_entries = 0 with open(METADATA_PATH, 'r') as f: cache_data = json.load(f) if 'metadata' in cache_data: for asset_id, metadata in list(cache_data['metadata'].items())[:3]: sample_entries += 1 self._log(f"DEBUG: Sample cache entry {sample_entries}: {asset_id[:8]}... = " f"w={metadata.get('width')}, h={metadata.get('height')}, " f"landscape={metadata.get('is_landscape')}") self._log(f"DEBUG: Images cached this run: {images_cached_this_run}") if available_count == 0: self._log("No images match your orientation preference. Check your filters.") self.after(0, self.stop) return self.after(0, lambda: self.btn_next.configure(state="normal")) self._change_once() seconds = max(10, int(interval_min * 60)) while not self._stop_event.is_set(): for _ in range(seconds): if self._stop_event.is_set(): break time.sleep(1) if self._stop_event.is_set(): break self._change_once() self._log("Enhanced rotation stopped.") def _get_random_assets_from_metadata(self, count: int, landscape_only: bool, exclude_set: set = None) -> List[str]: """ Select completely random asset IDs directly from the metadata cache file. """ try: # Read metadata file directly for each selection to ensure true randomness if not os.path.exists(METADATA_PATH): return [] with open(METADATA_PATH, 'r', encoding='utf-8') as f: data = json.load(f) if 'metadata' not in data: return [] metadata = data['metadata'] # Filter assets based on orientation preference eligible_assets = [] for asset_id, meta in metadata.items(): if exclude_set and asset_id in exclude_set: continue is_landscape = meta.get('is_landscape', True) if landscape_only == is_landscape: eligible_assets.append(asset_id) if not eligible_assets: # If no eligible assets after filtering, use all assets from metadata eligible_assets = [aid for aid in metadata.keys() if not exclude_set or aid not in exclude_set] if not eligible_assets: return [] # Use cryptographically secure random selection count = min(count, len(eligible_assets)) return self._random_selector.system_rng.sample(eligible_assets, count) except Exception as e: self._log(f"Error reading metadata for random selection: {e}") return [] def _change_once(self): # Check if we have metadata cache to use if not os.path.exists(METADATA_PATH): self._log("No metadata cache available. Please run initial scan first.") return per = self.per_monitor_var.get() n = 1 if per: try: n = max(1, len(list_monitor_ids(get_desktop_wallpaper_com()))) except Exception: n = 1 # Get orientation preference want_landscape = bool(self.landscape_only_var.get()) # Use minimal exclusion for truly random selection - only exclude last 20 selections exclude_set = set(self._history.order[-min(len(self._history.order), 20):]) if self._history.order else set() # Select completely random asset IDs directly from metadata file selected_asset_ids = self._get_random_assets_from_metadata( 1 if self.same_for_all_var.get() else n, want_landscape, exclude_set ) if not selected_asset_ids: # Try without exclusion if nothing found selected_asset_ids = self._get_random_assets_from_metadata( 1 if self.same_for_all_var.get() else n, want_landscape, None ) if not selected_asset_ids: self._log("Could not select any assets from metadata cache.") return saved_paths, names, used_ids = [], [], [] fit = self.fit_image_var.get() try: for asset_id in selected_asset_ids: # Download the image using the randomly selected asset ID img_bytes = self._client.download_original(asset_id) out = ensure_img_file(img_bytes, ".jpg") saved_paths.append(out) names.append(asset_id) # Use asset ID as name since we don't have full asset metadata used_ids.append(asset_id) if per and n > 1: paths = [saved_paths[0]] if self.same_for_all_var.get() else saved_paths set_per_monitor_wallpapers(paths, same_for_all=self.same_for_all_var.get(), fit=fit) self._log(f"Random wallpaper{'s' if not self.same_for_all_var.get() else ''} set on {n} monitor(s): " f"{names[0] if self.same_for_all_var.get() else 'multiple'}") else: bmp = os.path.join(tempfile.gettempdir(), "immich_wallpaper.bmp") Image.open(saved_paths[0]).convert("RGB").save(bmp, "BMP") apply_wallpaper_bmp_legacy(bmp, fit) self._log(f"Random wallpaper set: {names[0]}") # Add to history with minimal tracking self._history.add_many(used_ids) # Keep history very short for maximum randomness if len(self._history.order) > 50: self._history.order = self._history.order[-25:] # Keep only last 25 self._history._set = set(self._history.order) self._history.save() except requests.HTTPError as e: code = e.response.status_code if getattr(e, "response", None) is not None else "?" if code in (401, 403): self._log("Auth error (401/403). Check API key permissions.") elif code == 404: self._log(f"Original not found (404) for asset: {selected_asset_ids[0] if selected_asset_ids else 'unknown'}") else: self._log(f"HTTP error: {e}") except Exception as e: self._log(f"Failed to set wallpaper(s): {e}") # ---------- main ---------- def main(): if sys.platform != "win32": messagebox.showerror("Unsupported OS", "This script is intended for Windows.") return ensure_appdir() app = App() app.mainloop() if __name__ == "__main__": main()