#!/usr/bin/python3 import sys import os import multiprocessing from datetime import datetime as dt from PIL import Image import piexif import re import math import yaml from dataclasses import dataclass from typing import Optional @dataclass class OutputDef: MaxDimension:int # max dim. wins over min dim. MinDimension:int OutputFormat:str OutputQuality:int OutputPath:str OutputSuffix:str SocialMediaDef = OutputDef(1600, 1080, "JPEG", 96, "sm", "_sm.jpg") HqDef = OutputDef(2560, 1920, "JPEG", 95, "hq", "_hq.jpg") FullDef = OutputDef(3600, 2700, "JPEG", 98, "full", "_full.jpg") # -------- _verbose = False WmScale = 1.6 class Config: def __init__(self, yaml_filename:str): with open(yaml_filename) as f: cfg = yaml.load(f.read(), Loader=yaml.SafeLoader) self.CfgDict:dict = cfg self.Copyright:str = cfg["Copyright"] self.UserComment:str = cfg["UserComment"] self.WmFile:str = cfg["WatermarkFile"] def getFileSpecs(self, filename:str) -> dict: cfg = self.CfgDict.get(filename) if type(cfg) is dict: return cfg else: return {} global _config _config:Config = None # ------ class Watermark: WidthRatio = 0.25 / WmScale Xpos = 1.0 / WmScale Ypos = 0.9 / WmScale WmColorWhite = 240 WmColorBlack = 20 TranspBaseWhite = 1.2 TranspBaseBlack = 1.2 TranspStdevScale = 0.02 TranspIntensScale = 0.0025 TranspBase2 = 0.17 InvertWm = False def __init__(self, wm_black:str, wm_white:Optional[str]=None): self.WmBlack = wm_black self.WmWhite = wm_black if wm_white is None else wm_black wm_tmp = Image.open(wm_black) self.WmWidth = wm_tmp.size[0] self.WmHeight = wm_tmp.size[1] @staticmethod def _getIntensityVariance(img:Image, area): # Calculate intensity and variance on area of img cropped_area = img.crop(area) gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()] mean = sum(gray_pixels) / len(gray_pixels) var = sum((x - mean) ** 2 for x in gray_pixels) / len(gray_pixels) return (round(mean), var) @staticmethod def _getLoHiMean(img:Image, area): # Calculate lo and high 10th percentile, and mean, on area of img cropped_area = img.crop(area) gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()] gray_pixels.sort() mean = sum(gray_pixels) / len(gray_pixels) i_low = len(gray_pixels) // 10 i_high = len(gray_pixels) - i_low return (gray_pixels[i_low], gray_pixels[i_high], round(mean)) def _planWatermark(self, main_image, watermark_area): # Check out image intensity behind watermark (gray_intens, gray_var) = self._getIntensityVariance(main_image, watermark_area) gray_stdev = math.sqrt(gray_var) if gray_intens < 128: wm_color = 255 transparency = 1 - (1 / (self.TranspBaseWhite + (gray_intens * self.TranspIntensScale) + gray_stdev * self.TranspStdevScale)) else: wm_color = 0 transparency = 1 - (1 / (self.TranspBaseBlack + (255 - gray_intens) * self.TranspIntensScale + gray_stdev * self.TranspStdevScale)) if _verbose: print("Watermark plan: intens: %d, st.dev.: %d, wm_color: %d, transp: %0.2f" % (gray_intens, gray_stdev, wm_color, transparency)) return (wm_color, transparency) def _planWatermark2(self, main_image, watermark_area): (low, high, avg) = self._getLoHiMean(main_image, watermark_area) f1 = (1 - self.TranspBase2) / 2 / 255 f2 = (1 - self.TranspBase2) / 2 / 255 if avg < 128: wm_color = 255 transparency = self.TranspBase2 + (high - low) * f1 + high * f2 else: wm_color = 0 transparency = self.TranspBase2 + (high - low) * f1 + (255 - low) * f2 if _verbose: print("Watermark plan: intens: %d, lo: %d, hi: %d, wm_color: %d, transp: %0.2f" % (avg, low, high, wm_color, transparency)) return (wm_color, transparency) def apply(self, main_image:Image, pos:str="bl"): if not pos in ["tl", "tr", "bl", "br"]: raise Exception("Invalid watermark position") # Calculate an average image dimension by a weighted average, # weighing the smallest dimension double: max_dim = max(main_image.size[0], main_image.size[1]) min_dim = min(main_image.size[0], main_image.size[1]) avg_dimension = (max_dim + min_dim * 2) / 3 # This is used to size the water mark wm_width = avg_dimension * self.WidthRatio wm_size = (int(wm_width), int(self.WmHeight * wm_width / self.WmWidth)) # X and Y offset are specified in multiplum of watermark height wm_x = round(wm_size[1] * self.Xpos) wm_y = round(wm_size[1] * self.Ypos) if "r" in pos: # right (otherwise l for left) wm_x = main_image.size[0] - wm_size[0] - wm_x if "b" in pos: # bottom (otherwise t for top) wm_y = main_image.size[1] - wm_size[1] - wm_y watermark_area = (wm_x, wm_y, wm_x + wm_size[0], wm_y + wm_size[1]) # Plan color and transparency of watermark by checking out the watermark area (wm_color, transparency) = self._planWatermark2(main_image, watermark_area) if wm_color > 128: watermark = Image.open(self.WmWhite).resize(wm_size, Image.LANCZOS) else: watermark = Image.open(self.WmBlack).resize(wm_size, Image.LANCZOS) # Make watermark transparent watermark = watermark.convert("RGBA") pixels = watermark.getdata() if self.InvertWm: new_pixels = [(wm_color, wm_color, wm_color, round((255 - pixel[3]) * transparency)) for pixel in pixels] else: new_pixels = [(wm_color, wm_color, wm_color, round(pixel[3] * transparency)) for pixel in pixels] watermark.putdata(new_pixels) # Paste the watermark onto the main image main_image.paste(watermark, (wm_x, wm_y), watermark) def makeExifDict(copyright, usercomment) -> bytes: exif_dict = { "0th": { piexif.ImageIFD.Copyright: copyright.encode("ascii") }, "Exif": { piexif.ExifIFD.UserComment: b"ASCII\0\0\0" + usercomment.encode('ascii') }, "GPS": {}, "1st": {}, "thumbnail": None } return piexif.dump(exif_dict) def resizeSaveOutput(input_filename:str, out_name:str, img:Image, exif_bytes:bytes, output_def:OutputDef): if output_def.MaxDimension == 0: scale = None else: scale1 = output_def.MinDimension / min(img.size[0], img.size[1]) scale2 = output_def.MaxDimension / max(img.size[0], img.size[1]) scale = min(scale1, scale2) if scale is not None and scale < 1.0: width = round(img.size[0] * scale) height = round(img.size[1] * scale) scale_msg = "scaled by %0.2f to: %dx%d" % (scale, width, height) img_scaled = img.resize((width, height), Image.LANCZOS) else: scale_msg = "not rescaled: %dx%d" % (img.size[0], img.size[1]) img_scaled = img # in_path = os.path.dirname(input_filename) # if in_path == "": # in_path = "." # out_path = in_path + "/" + output_def.OutputPath out_path = "./" + output_def.OutputPath in_fname = os.path.basename(input_filename) out_fname = out_name + output_def.OutputSuffix out_full = out_path + "/" + out_fname if not os.path.isdir(out_path): print("Creating dir: " + out_path) os.makedirs(out_path, exist_ok=True) img_scaled.save(out_full, output_def.OutputFormat, quality=output_def.OutputQuality, exif=exif_bytes) size = os.path.getsize(out_full) // 1000 print("%s %s, saved as %s (%d KB)" % (in_fname, scale_msg, output_def.OutputPath + "/" + out_fname, size)) def process_image(main_image_path:str, watermark:Watermark): try: img_name = os.path.basename(main_image_path).split(".")[0] img_cfg = _config.getFileSpecs(img_name) if "year" in img_cfg: year = int(img_cfg["year"]) else: mtime = os.path.getmtime(main_image_path) year = dt.fromtimestamp(mtime).year print("Input: %s, (year: %d to be used for copyright notice)" % (main_image_path, year)) exif = makeExifDict(_config.Copyright % year, _config.UserComment) # Load the main image main_image = Image.open(main_image_path) # Save full output without water mark out_name = img_cfg.get("name", img_name) resizeSaveOutput(main_image_path, out_name, main_image, exif, FullDef) # Make reduced quality watermarked outputs wm_spec = img_cfg.get("watermark", "bl") if wm_spec != False and wm_spec != "off": watermark.apply(main_image, wm_spec) resizeSaveOutput(main_image_path, out_name, main_image, exif, SocialMediaDef) resizeSaveOutput(main_image_path, out_name, main_image, exif, HqDef) except Exception as e: print("ERROR: Processing %s failed: %s" % (main_image_path, str(e))) # ---- def process(num_processes:int, filenames:list): print("Processes: %d" % (num_processes)) watermark = Watermark(_config.WmFile) # Creating a pool of processes with multiprocessing.Pool(processes=num_processes) as pool: for filename in filenames: pool.apply_async(process_image, args=(filename, watermark)) pool.close() pool.join() # ---- def welcome(): print("masterimg") def usage(): print(""" Usage: masterimg OPTIONS FILES ... Options: -h Show this help -v More verbose -p NUM Specify number of processes (default: 6) """) class ShowHelpException(Exception): pass class Params: def __init__(self): self.Flags = {"-v": False} self.Ints = {"-p": 6} self.Strings = {} self.Floats = {} self.Args = [] self.ArgsMin = 1 @staticmethod def parse(cmds:list): p = Params() while len(cmds) > 0 and cmds[0].startswith("-"): if cmds[0] == "-h" or cmds[0] == "-H": raise ShowHelpException elif cmds[0] in p.Flags: p.Flags[cmds[0]] = True elif cmds[0] in p.Floats: p.Floats[cmds[0]] = float(cmds[1]) cmds.pop(0) elif cmds[0] in p.Ints: p.Ints[cmds[0]] = int(cmds[1]) cmds.pop(0) elif cmds[0] in p.Strings: p.Strings[cmds[0]] = cmds[1] cmds.pop(0) else: raise ValueError("Unknown option: " + cmds[0]) cmds.pop(0) if len(cmds) < p.ArgsMin: raise ShowHelpException p.Args = cmds return p def main(args): welcome() try: if len(args) == 0: raise ShowHelpException p = Params.parse(args) global _verbose _verbose = p.Flags["-v"] global _config _config = Config("masterimg.yaml") print("Read masterimg.yaml") process(p.Ints["-p"], p.Args) except ShowHelpException as e: usage() #except Exception as e: # print(str(e)) if __name__ == "__main__": import sys main(sys.argv[1:])