Procházet zdrojové kódy

masterimg.py added

Lars před 1 rokem
rodič
revize
db04b4c911
1 změnil soubory, kde provedl 361 přidání a 0 odebrání
  1. 361 0
      masterimg.py

+ 361 - 0
masterimg.py

@@ -0,0 +1,361 @@
+#!/usr/bin/python3
+
+import sys
+import os
+import multiprocessing
+from datetime import datetime as dt
+from PIL import Image
+import piexif
+import re
+import math
+import yaml
+from dataclasses import dataclass
+
+from typing import Optional
+
+
+@dataclass
+class OutputDef:
+    MaxDimension:int # max dim. wins over min dim.
+    MinDimension:int
+    OutputFormat:str
+    OutputQuality:int
+    OutputPath:str
+    OutputSuffix:str
+
+
+SocialMediaDef = OutputDef(1600, 1080, "JPEG", 96, "sm", "_sm.jpg")
+HqDef = OutputDef(2560, 1920, "JPEG", 95, "hq", "_hq.jpg")
+FullDef = OutputDef(3600, 2700, "JPEG", 98, "full", "_full.jpg")
+
+# --------
+
+_verbose = False
+
+WmScale = 1.6
+
+class Config:
+    def __init__(self, yaml_filename:str):        
+        with open(yaml_filename) as f:            
+            cfg = yaml.load(f.read(), Loader=yaml.SafeLoader)
+        self.CfgDict:dict = cfg
+        self.Copyright:str = cfg["Copyright"]
+        self.UserComment:str = cfg["UserComment"]
+        self.WmFile:str = cfg["WatermarkFile"]
+
+    def getFileSpecs(self, filename:str) -> dict:
+        cfg = self.CfgDict.get(filename)
+        if type(cfg) is dict:
+            return cfg
+        else:
+            return {}
+
+
+global _config
+_config:Config = None
+
+# ------
+
+class Watermark:
+    WidthRatio = 0.25 / WmScale
+    Xpos = 1.0 / WmScale
+    Ypos = 0.9 / WmScale
+    WmColorWhite = 240
+    WmColorBlack = 20
+    TranspBaseWhite = 1.2
+    TranspBaseBlack = 1.2
+    TranspStdevScale = 0.02
+    TranspIntensScale = 0.0025
+    TranspBase2 = 0.17
+    InvertWm = False
+
+    def __init__(self, wm_black:str, wm_white:Optional[str]=None):
+        self.WmBlack = wm_black
+        self.WmWhite = wm_black if wm_white is None else wm_black
+        wm_tmp = Image.open(wm_black)
+        self.WmWidth = wm_tmp.size[0]
+        self.WmHeight = wm_tmp.size[1]
+
+    @staticmethod
+    def _getIntensityVariance(img:Image, area):
+        # Calculate intensity and variance on area of img
+        cropped_area = img.crop(area)
+
+        gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
+
+        mean = sum(gray_pixels) / len(gray_pixels)
+        var = sum((x - mean) ** 2 for x in gray_pixels) / len(gray_pixels)
+        return (round(mean), var)
+
+    @staticmethod
+    def _getLoHiMean(img:Image, area):
+        # Calculate lo and high 10th percentile, and mean, on area of img
+        cropped_area = img.crop(area)
+        gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
+        gray_pixels.sort()
+
+        mean = sum(gray_pixels) / len(gray_pixels)
+        i_low = len(gray_pixels) // 10
+        i_high = len(gray_pixels) - i_low
+
+        return (gray_pixels[i_low], gray_pixels[i_high], round(mean))
+
+    def _planWatermark(self, main_image, watermark_area):
+        # Check out image intensity behind watermark        
+        (gray_intens, gray_var) = self._getIntensityVariance(main_image, watermark_area)
+        
+        gray_stdev = math.sqrt(gray_var)
+        
+        if gray_intens < 128:
+            wm_color = 255
+            transparency = 1 - (1 / (self.TranspBaseWhite + 
+                                     (gray_intens * self.TranspIntensScale) + gray_stdev * self.TranspStdevScale))
+        else:
+            wm_color = 0
+            transparency = 1 - (1 / (self.TranspBaseBlack + 
+                                     (255 - gray_intens) * self.TranspIntensScale + gray_stdev * self.TranspStdevScale))
+        if _verbose:
+            print("Watermark plan: intens: %d, st.dev.: %d, wm_color: %d, transp: %0.2f" % (gray_intens, gray_stdev, wm_color, transparency))
+        
+        return (wm_color, transparency)
+
+    def _planWatermark2(self, main_image, watermark_area):
+        (low, high, avg) = self._getLoHiMean(main_image, watermark_area)
+
+        f1 = (1 - self.TranspBase2) / 2 / 255
+        f2 = (1 - self.TranspBase2) / 2 / 255
+
+        if avg < 128:
+            wm_color = 255
+            transparency = self.TranspBase2 + (high - low) * f1 + high * f2
+        else:
+            wm_color = 0
+            transparency = self.TranspBase2 + (high - low) * f1 + (255 - low) * f2
+
+        if _verbose:
+            print("Watermark plan: intens: %d, lo: %d, hi: %d, wm_color: %d, transp: %0.2f" % (avg, low, high, wm_color, transparency))
+        
+        return (wm_color, transparency)
+
+    def apply(self, main_image:Image, pos:str="bl"):
+        if not pos in ["tl", "tr", "bl", "br"]:
+            raise Exception("Invalid watermark position")
+        
+        # Calculate an average image dimension by a weighted average,
+        # weighing the smallest dimension double:
+        max_dim = max(main_image.size[0], main_image.size[1])
+        min_dim = min(main_image.size[0], main_image.size[1])
+        avg_dimension = (max_dim + min_dim * 2) / 3
+
+        # This is used to size the water mark
+        wm_width = avg_dimension * self.WidthRatio
+        wm_size = (int(wm_width), int(self.WmHeight * wm_width / self.WmWidth))
+
+        # X and Y offset are specified in multiplum of watermark height
+        wm_x = round(wm_size[1] * self.Xpos)
+        wm_y = round(wm_size[1] * self.Ypos)
+
+        if "r" in pos: # right (otherwise l for left)
+            wm_x = main_image.size[0] - wm_size[0] - wm_x
+
+        if "b" in pos: # bottom (otherwise t for top)
+            wm_y = main_image.size[1] - wm_size[1] - wm_y
+
+        watermark_area = (wm_x, wm_y, wm_x + wm_size[0], wm_y + wm_size[1])
+
+        # Plan color and transparency of watermark by checking out the watermark area
+        (wm_color, transparency) = self._planWatermark2(main_image, watermark_area)
+
+        if wm_color > 128:
+            watermark = Image.open(self.WmWhite).resize(wm_size, Image.LANCZOS)
+        else:
+            watermark = Image.open(self.WmBlack).resize(wm_size, Image.LANCZOS)
+        
+        # Make watermark transparent
+        watermark = watermark.convert("RGBA")
+        pixels = watermark.getdata()
+
+        if self.InvertWm:
+            new_pixels = [(wm_color, wm_color, wm_color, round((255 - pixel[3]) * transparency)) for pixel in pixels]
+        else:
+            new_pixels = [(wm_color, wm_color, wm_color, round(pixel[3] * transparency)) for pixel in pixels]
+
+        watermark.putdata(new_pixels)
+
+        # Paste the watermark onto the main image
+        main_image.paste(watermark, (wm_x, wm_y), watermark)
+
+
+def makeExifDict(copyright, usercomment) -> bytes:
+    exif_dict = {
+        "0th": {
+            piexif.ImageIFD.Copyright: copyright.encode("ascii")
+        },
+        "Exif": {
+            piexif.ExifIFD.UserComment: b"ASCII\0\0\0" + usercomment.encode('ascii')
+        },
+        "GPS": {},
+        "1st": {},
+        "thumbnail": None
+    }
+
+    return piexif.dump(exif_dict)
+
+def resizeSaveOutput(input_filename:str, out_name:str, img:Image, exif_bytes:bytes, output_def:OutputDef):
+    
+    if output_def.MaxDimension == 0:
+        scale = None
+    else:
+        scale1 = output_def.MinDimension / min(img.size[0], img.size[1])
+        scale2 = output_def.MaxDimension / max(img.size[0], img.size[1])
+        scale = min(scale1, scale2)
+
+    if scale is not None and scale < 1.0:
+        width = round(img.size[0] * scale)
+        height = round(img.size[1] * scale)
+        scale_msg = "scaled by %0.2f to: %dx%d" % (scale, width, height)
+        img_scaled = img.resize((width, height), Image.LANCZOS)
+    else:
+        scale_msg = "not rescaled: %dx%d" % (img.size[0], img.size[1])
+        img_scaled = img
+
+    # in_path = os.path.dirname(input_filename)
+    # if in_path == "":
+    #     in_path = "."
+    # out_path = in_path + "/" + output_def.OutputPath
+    out_path = "./" + output_def.OutputPath
+    in_fname = os.path.basename(input_filename)
+    out_fname = out_name + output_def.OutputSuffix
+    out_full = out_path + "/" + out_fname
+
+    if not os.path.isdir(out_path):
+        print("Creating dir: " + out_path)
+        os.makedirs(out_path, exist_ok=True)
+
+    img_scaled.save(out_full, output_def.OutputFormat, quality=output_def.OutputQuality, exif=exif_bytes)
+
+    size = os.path.getsize(out_full) // 1000
+    print("%s %s, saved as %s (%d KB)" % (in_fname, scale_msg, 
+        output_def.OutputPath + "/" + out_fname, size))
+
+
+def process_image(main_image_path:str, watermark:Watermark):
+    try:
+        img_name = os.path.basename(main_image_path).split(".")[0]
+        img_cfg = _config.getFileSpecs(img_name)
+
+        if "year" in img_cfg:
+            year = int(img_cfg["year"])
+        else:
+            mtime = os.path.getmtime(main_image_path)
+            year = dt.fromtimestamp(mtime).year
+        
+        print("Input: %s, (year: %d to be used for copyright notice)" % (main_image_path, year))
+
+        exif = makeExifDict(_config.Copyright % year, _config.UserComment)
+
+        # Load the main image
+        main_image = Image.open(main_image_path)
+
+        # Save full output without water mark
+        out_name = img_cfg.get("name", img_name)
+        resizeSaveOutput(main_image_path, out_name, main_image, exif, FullDef)
+
+        # Make reduced quality watermarked outputs
+        wm_spec = img_cfg.get("watermark", "bl")
+        if wm_spec != False and wm_spec != "off":
+            watermark.apply(main_image, wm_spec)
+
+        resizeSaveOutput(main_image_path, out_name, main_image, exif, SocialMediaDef)
+        resizeSaveOutput(main_image_path, out_name, main_image, exif, HqDef)
+
+    except Exception as e:
+        print("ERROR: Processing %s failed: %s" % (main_image_path, str(e)))
+
+# ----
+
+def process(num_processes:int, filenames:list):
+    print("Processes: %d" % (num_processes))
+    watermark = Watermark(_config.WmFile)
+   
+    # Creating a pool of processes
+    with multiprocessing.Pool(processes=num_processes) as pool:
+        for filename in filenames:
+            pool.apply_async(process_image, args=(filename, watermark))
+        pool.close()
+        pool.join()
+
+# ----
+        
+def welcome():
+    print("masterimg")
+
+def usage():
+    print("""
+Usage:
+    masterimg OPTIONS FILES ...
+          
+Options:
+    -h          Show this help
+    -v          More verbose
+    -p NUM      Specify number of processes (default: 6)
+""")
+
+class ShowHelpException(Exception):
+    pass
+
+class Params:
+    def __init__(self):
+        self.Flags = {"-v": False}
+        self.Ints = {"-p": 6}
+        self.Strings = {}
+        self.Floats = {}
+        self.Args = []
+        self.ArgsMin = 1
+
+    @staticmethod
+    def parse(cmds:list):
+        p = Params()
+        while len(cmds) > 0 and cmds[0].startswith("-"):
+            if cmds[0] == "-h" or cmds[0] == "-H":
+                raise ShowHelpException
+            elif cmds[0] in p.Flags:
+                p.Flags[cmds[0]] = True
+            elif cmds[0] in p.Floats:
+                p.Floats[cmds[0]] = float(cmds[1])
+                cmds.pop(0)
+            elif cmds[0] in p.Ints:
+                p.Ints[cmds[0]] = int(cmds[1])
+                cmds.pop(0)
+            elif cmds[0] in p.Strings:
+                p.Strings[cmds[0]] = cmds[1]
+                cmds.pop(0)
+            else:
+                raise ValueError("Unknown option: " + cmds[0])
+            cmds.pop(0)
+        if len(cmds) < p.ArgsMin:
+            raise ShowHelpException
+        p.Args = cmds
+        return p
+
+def main(args):
+    welcome()
+    try:
+        if len(args) == 0:
+            raise ShowHelpException
+        p = Params.parse(args)
+        global _verbose
+        _verbose = p.Flags["-v"]
+        global _config
+        _config = Config("masterimg.yaml")
+        print("Read masterimg.yaml")
+        process(p.Ints["-p"], p.Args)
+    except ShowHelpException as e:
+        usage()
+    #except Exception as e:
+    #    print(str(e))
+
+if __name__ == "__main__":
+    import sys
+    main(sys.argv[1:])
+