|
@@ -0,0 +1,366 @@
|
|
|
|
|
+#!/usr/bin/python3
|
|
|
|
|
+
|
|
|
|
|
+import sys
|
|
|
|
|
+import os
|
|
|
|
|
+import multiprocessing
|
|
|
|
|
+from datetime import datetime as dt
|
|
|
|
|
+from PIL import Image
|
|
|
|
|
+import piexif
|
|
|
|
|
+import math
|
|
|
|
|
+import yaml
|
|
|
|
|
+from dataclasses import dataclass
|
|
|
|
|
+
|
|
|
|
|
+from typing import Optional
|
|
|
|
|
+
|
|
|
|
|
+# Requires pip install piexif, pillow, pyyaml
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class OutputDef:
|
|
|
|
|
+ MaxDimension:int # max dim. wins over min dim.
|
|
|
|
|
+ MinDimension:int
|
|
|
|
|
+ OutputFormat:str
|
|
|
|
|
+ OutputQuality:int
|
|
|
|
|
+ OutputPath:str
|
|
|
|
|
+ OutputSuffix:str
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+SocialMediaDef = OutputDef(1600, 1080, "JPEG", 96, "sm", "_sm.jpg")
|
|
|
|
|
+HqDef = OutputDef(2560, 1920, "JPEG", 98, "hq", "_hq.jpg")
|
|
|
|
|
+FullDef = OutputDef(3600, 2700, "JPEG", 98, "full", "_full.jpg")
|
|
|
|
|
+
|
|
|
|
|
+# --------
|
|
|
|
|
+
|
|
|
|
|
+_verbose = False
|
|
|
|
|
+
|
|
|
|
|
+WmScale = 1.6
|
|
|
|
|
+
|
|
|
|
|
+class Config:
|
|
|
|
|
+ def __init__(self, yaml_filename:str):
|
|
|
|
|
+ with open(yaml_filename) as f:
|
|
|
|
|
+ cfg = yaml.safe_load(f)
|
|
|
|
|
+ #cfg = yaml.safload(f.read(), Loader=yaml.SafeLoader)
|
|
|
|
|
+ self.CfgDict:dict = cfg
|
|
|
|
|
+ self.Copyright:str = cfg["Copyright"]
|
|
|
|
|
+ self.UserComment:str = cfg["UserComment"]
|
|
|
|
|
+ self.WmFile:str = cfg["WatermarkFile"]
|
|
|
|
|
+
|
|
|
|
|
+ def getFileSpecs(self, filename:str) -> dict:
|
|
|
|
|
+ cfg = self.CfgDict.get(filename)
|
|
|
|
|
+ if type(cfg) is dict:
|
|
|
|
|
+ return cfg
|
|
|
|
|
+ else:
|
|
|
|
|
+ return {}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+global _config
|
|
|
|
|
+_config:Config = None
|
|
|
|
|
+
|
|
|
|
|
+# ------
|
|
|
|
|
+
|
|
|
|
|
+class Watermark:
|
|
|
|
|
+ WidthRatio = 0.25 / WmScale
|
|
|
|
|
+ Xpos = 1.0 / WmScale
|
|
|
|
|
+ Ypos = 0.9 / WmScale
|
|
|
|
|
+ WmColorWhite = 240
|
|
|
|
|
+ WmColorBlack = 20
|
|
|
|
|
+ TranspBaseWhite = 1.2
|
|
|
|
|
+ TranspBaseBlack = 1.2
|
|
|
|
|
+ TranspStdevScale = 0.02
|
|
|
|
|
+ TranspIntensScale = 0.0025
|
|
|
|
|
+ TranspBase2 = 0.17
|
|
|
|
|
+ InvertWm = False
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, wm_black:str, wm_white:Optional[str]=None):
|
|
|
|
|
+ self.WmBlack = wm_black
|
|
|
|
|
+ self.WmWhite = wm_black if wm_white is None else wm_black
|
|
|
|
|
+ wm_tmp = Image.open(wm_black)
|
|
|
|
|
+ self.WmWidth = wm_tmp.size[0]
|
|
|
|
|
+ self.WmHeight = wm_tmp.size[1]
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _getIntensityVariance(img:Image, area):
|
|
|
|
|
+ # Calculate intensity and variance on area of img
|
|
|
|
|
+ cropped_area = img.crop(area)
|
|
|
|
|
+
|
|
|
|
|
+ gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
|
|
|
|
|
+
|
|
|
|
|
+ mean = sum(gray_pixels) / len(gray_pixels)
|
|
|
|
|
+ var = sum((x - mean) ** 2 for x in gray_pixels) / len(gray_pixels)
|
|
|
|
|
+ return (round(mean), var)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _getLoHiMean(img:Image, area):
|
|
|
|
|
+ # Calculate lo and high 10th percentile, and mean, on area of img
|
|
|
|
|
+ cropped_area = img.crop(area)
|
|
|
|
|
+ gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
|
|
|
|
|
+ gray_pixels.sort()
|
|
|
|
|
+
|
|
|
|
|
+ mean = sum(gray_pixels) / len(gray_pixels)
|
|
|
|
|
+ i_low = len(gray_pixels) // 10
|
|
|
|
|
+ i_high = len(gray_pixels) - i_low
|
|
|
|
|
+
|
|
|
|
|
+ return (gray_pixels[i_low], gray_pixels[i_high], round(mean))
|
|
|
|
|
+
|
|
|
|
|
+ def _planWatermark(self, main_image, watermark_area):
|
|
|
|
|
+ # Check out image intensity behind watermark
|
|
|
|
|
+ (gray_intens, gray_var) = self._getIntensityVariance(main_image, watermark_area)
|
|
|
|
|
+
|
|
|
|
|
+ gray_stdev = math.sqrt(gray_var)
|
|
|
|
|
+
|
|
|
|
|
+ if gray_intens < 128:
|
|
|
|
|
+ wm_color = 255
|
|
|
|
|
+ transparency = 1 - (1 / (self.TranspBaseWhite +
|
|
|
|
|
+ (gray_intens * self.TranspIntensScale) + gray_stdev * self.TranspStdevScale))
|
|
|
|
|
+ else:
|
|
|
|
|
+ wm_color = 0
|
|
|
|
|
+ transparency = 1 - (1 / (self.TranspBaseBlack +
|
|
|
|
|
+ (255 - gray_intens) * self.TranspIntensScale + gray_stdev * self.TranspStdevScale))
|
|
|
|
|
+ if _verbose:
|
|
|
|
|
+ print("Watermark plan: intens: %d, st.dev.: %d, wm_color: %d, transp: %0.2f" % (gray_intens, gray_stdev, wm_color, transparency))
|
|
|
|
|
+
|
|
|
|
|
+ return (wm_color, transparency)
|
|
|
|
|
+
|
|
|
|
|
+ def _planWatermark2(self, main_image, watermark_area):
|
|
|
|
|
+ (low, high, avg) = self._getLoHiMean(main_image, watermark_area)
|
|
|
|
|
+
|
|
|
|
|
+ f1 = (1 - self.TranspBase2) / 2 / 255
|
|
|
|
|
+ f2 = (1 - self.TranspBase2) / 2 / 255
|
|
|
|
|
+
|
|
|
|
|
+ if avg < 128:
|
|
|
|
|
+ wm_color = 255
|
|
|
|
|
+ transparency = self.TranspBase2 + (high - low) * f1 + high * f2
|
|
|
|
|
+ else:
|
|
|
|
|
+ wm_color = 0
|
|
|
|
|
+ transparency = self.TranspBase2 + (high - low) * f1 + (255 - low) * f2
|
|
|
|
|
+
|
|
|
|
|
+ if _verbose:
|
|
|
|
|
+ print("Watermark plan: intens: %d, lo: %d, hi: %d, wm_color: %d, transp: %0.2f" % (avg, low, high, wm_color, transparency))
|
|
|
|
|
+
|
|
|
|
|
+ return (wm_color, transparency)
|
|
|
|
|
+
|
|
|
|
|
+ def apply(self, main_image:Image, pos:str="bl"):
|
|
|
|
|
+ if not pos in ["tl", "tr", "bl", "br"]:
|
|
|
|
|
+ raise Exception("Invalid watermark position")
|
|
|
|
|
+
|
|
|
|
|
+ # Calculate an average image dimension by a weighted average,
|
|
|
|
|
+ # weighing the smallest dimension double:
|
|
|
|
|
+ max_dim = max(main_image.size[0], main_image.size[1])
|
|
|
|
|
+ min_dim = min(main_image.size[0], main_image.size[1])
|
|
|
|
|
+ avg_dimension = (max_dim + min_dim * 2) / 3
|
|
|
|
|
+
|
|
|
|
|
+ # This is used to size the water mark
|
|
|
|
|
+ wm_width = avg_dimension * self.WidthRatio
|
|
|
|
|
+ wm_size = (int(wm_width), int(self.WmHeight * wm_width / self.WmWidth))
|
|
|
|
|
+
|
|
|
|
|
+ # X and Y offset are specified in multiplum of watermark height
|
|
|
|
|
+ wm_x = round(wm_size[1] * self.Xpos)
|
|
|
|
|
+ wm_y = round(wm_size[1] * self.Ypos)
|
|
|
|
|
+
|
|
|
|
|
+ if "r" in pos: # right (otherwise l for left)
|
|
|
|
|
+ wm_x = main_image.size[0] - wm_size[0] - wm_x
|
|
|
|
|
+
|
|
|
|
|
+ if "b" in pos: # bottom (otherwise t for top)
|
|
|
|
|
+ wm_y = main_image.size[1] - wm_size[1] - wm_y
|
|
|
|
|
+
|
|
|
|
|
+ watermark_area = (wm_x, wm_y, wm_x + wm_size[0], wm_y + wm_size[1])
|
|
|
|
|
+
|
|
|
|
|
+ # Plan color and transparency of watermark by checking out the watermark area
|
|
|
|
|
+ (wm_color, transparency) = self._planWatermark2(main_image, watermark_area)
|
|
|
|
|
+
|
|
|
|
|
+ if wm_color > 128:
|
|
|
|
|
+ watermark = Image.open(self.WmWhite).resize(wm_size, Image.LANCZOS)
|
|
|
|
|
+ else:
|
|
|
|
|
+ watermark = Image.open(self.WmBlack).resize(wm_size, Image.LANCZOS)
|
|
|
|
|
+
|
|
|
|
|
+ # Make watermark transparent
|
|
|
|
|
+ watermark = watermark.convert("RGBA")
|
|
|
|
|
+ pixels = watermark.getdata()
|
|
|
|
|
+
|
|
|
|
|
+ if self.InvertWm:
|
|
|
|
|
+ new_pixels = [(wm_color, wm_color, wm_color, round((255 - pixel[3]) * transparency)) for pixel in pixels]
|
|
|
|
|
+ else:
|
|
|
|
|
+ new_pixels = [(wm_color, wm_color, wm_color, round(pixel[3] * transparency)) for pixel in pixels]
|
|
|
|
|
+
|
|
|
|
|
+ watermark.putdata(new_pixels)
|
|
|
|
|
+
|
|
|
|
|
+ # Paste the watermark onto the main image
|
|
|
|
|
+ main_image.paste(watermark, (wm_x, wm_y), watermark)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def makeExifBytes(copyright:Optional[str], user_comment:Optional[str], image_description:Optional[str]) -> bytes:
|
|
|
|
|
+ exif = {
|
|
|
|
|
+ "0th": {},
|
|
|
|
|
+ "Exif": {},
|
|
|
|
|
+ "GPS": {},
|
|
|
|
|
+ "1st": {},
|
|
|
|
|
+ "thumbnail": None
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if copyright is not None:
|
|
|
|
|
+ exif["0th"][piexif.ImageIFD.Copyright] = copyright.encode("ascii")
|
|
|
|
|
+ if user_comment is not None:
|
|
|
|
|
+ # = b'\0\0\0\0\0\0\0\0' + user_comment.encode('utf-8')
|
|
|
|
|
+ exif["Exif"][piexif.ExifIFD.UserComment] = b"ASCII\0\0\0" + user_comment.encode('ascii')
|
|
|
|
|
+ if image_description is not None:
|
|
|
|
|
+ exif['0th'][piexif.ImageIFD.ImageDescription] = image_description.encode('utf-8')
|
|
|
|
|
+
|
|
|
|
|
+ return piexif.dump(exif)
|
|
|
|
|
+
|
|
|
|
|
+def resizeSaveOutput(input_filename:str, out_name:str, img:Image, exif_bytes:bytes, output_def:OutputDef):
|
|
|
|
|
+
|
|
|
|
|
+ if output_def.MaxDimension == 0:
|
|
|
|
|
+ scale = None
|
|
|
|
|
+ else:
|
|
|
|
|
+ scale1 = output_def.MinDimension / min(img.size[0], img.size[1])
|
|
|
|
|
+ scale2 = output_def.MaxDimension / max(img.size[0], img.size[1])
|
|
|
|
|
+ scale = min(scale1, scale2)
|
|
|
|
|
+
|
|
|
|
|
+ if scale is not None and scale < 1.0:
|
|
|
|
|
+ width = round(img.size[0] * scale)
|
|
|
|
|
+ height = round(img.size[1] * scale)
|
|
|
|
|
+ scale_msg = "scaled by %0.2f to: %dx%d" % (scale, width, height)
|
|
|
|
|
+ img_scaled = img.resize((width, height), Image.LANCZOS)
|
|
|
|
|
+ else:
|
|
|
|
|
+ scale_msg = "not rescaled: %dx%d" % (img.size[0], img.size[1])
|
|
|
|
|
+ img_scaled = img
|
|
|
|
|
+
|
|
|
|
|
+ # in_path = os.path.dirname(input_filename)
|
|
|
|
|
+ # if in_path == "":
|
|
|
|
|
+ # in_path = "."
|
|
|
|
|
+ # out_path = in_path + "/" + output_def.OutputPath
|
|
|
|
|
+ out_path = "./" + output_def.OutputPath
|
|
|
|
|
+ in_fname = os.path.basename(input_filename)
|
|
|
|
|
+ out_fname = out_name + output_def.OutputSuffix
|
|
|
|
|
+ out_full = out_path + "/" + out_fname
|
|
|
|
|
+
|
|
|
|
|
+ if not os.path.isdir(out_path):
|
|
|
|
|
+ print("Creating dir: " + out_path)
|
|
|
|
|
+ os.makedirs(out_path, exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ img_scaled.save(out_full, output_def.OutputFormat, quality=output_def.OutputQuality, exif=exif_bytes)
|
|
|
|
|
+
|
|
|
|
|
+ size = os.path.getsize(out_full) // 1000
|
|
|
|
|
+ print("%s %s, saved as %s (%d KB)" % (in_fname, scale_msg,
|
|
|
|
|
+ output_def.OutputPath + "/" + out_fname, size))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def process_image(main_image_path:str, watermark:Watermark):
|
|
|
|
|
+ try:
|
|
|
|
|
+ img_name = os.path.basename(main_image_path).split(".")[0]
|
|
|
|
|
+ img_cfg = _config.getFileSpecs(img_name)
|
|
|
|
|
+
|
|
|
|
|
+ if "year" in img_cfg:
|
|
|
|
|
+ year = int(img_cfg["year"])
|
|
|
|
|
+ else:
|
|
|
|
|
+ mtime = os.path.getmtime(main_image_path)
|
|
|
|
|
+ year = dt.fromtimestamp(mtime).year
|
|
|
|
|
+
|
|
|
|
|
+ print("Input: %s, (year: %d to be used for copyright notice)" % (main_image_path, year))
|
|
|
|
|
+
|
|
|
|
|
+ exif = makeExifBytes(_config.Copyright % year, _config.UserComment, img_cfg.get("title"))
|
|
|
|
|
+
|
|
|
|
|
+ # Load the main image
|
|
|
|
|
+ main_image = Image.open(main_image_path)
|
|
|
|
|
+
|
|
|
|
|
+ # Save full output without water mark
|
|
|
|
|
+ out_name = img_cfg.get("name", img_name)
|
|
|
|
|
+ resizeSaveOutput(main_image_path, out_name, main_image, exif, FullDef)
|
|
|
|
|
+
|
|
|
|
|
+ # Make reduced quality watermarked outputs
|
|
|
|
|
+ wm_spec = img_cfg.get("watermark", "bl")
|
|
|
|
|
+ if wm_spec != False and wm_spec != "off":
|
|
|
|
|
+ watermark.apply(main_image, wm_spec)
|
|
|
|
|
+
|
|
|
|
|
+ resizeSaveOutput(main_image_path, out_name, main_image, exif, SocialMediaDef)
|
|
|
|
|
+ resizeSaveOutput(main_image_path, out_name, main_image, exif, HqDef)
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print("ERROR: Processing %s failed: %s" % (main_image_path, str(e)))
|
|
|
|
|
+
|
|
|
|
|
+# ----
|
|
|
|
|
+
|
|
|
|
|
+def process(num_processes:int, filenames:list):
|
|
|
|
|
+ print("Processes: %d" % (num_processes))
|
|
|
|
|
+ watermark = Watermark(_config.WmFile)
|
|
|
|
|
+
|
|
|
|
|
+ # Creating a pool of processes
|
|
|
|
|
+ with multiprocessing.Pool(processes=num_processes) as pool:
|
|
|
|
|
+ for filename in filenames:
|
|
|
|
|
+ pool.apply_async(process_image, args=(filename, watermark))
|
|
|
|
|
+ pool.close()
|
|
|
|
|
+ pool.join()
|
|
|
|
|
+
|
|
|
|
|
+# ----
|
|
|
|
|
+
|
|
|
|
|
+def welcome():
|
|
|
|
|
+ print("masterimg")
|
|
|
|
|
+
|
|
|
|
|
+def usage():
|
|
|
|
|
+ print("""
|
|
|
|
|
+Usage:
|
|
|
|
|
+ masterimg OPTIONS FILES ...
|
|
|
|
|
+
|
|
|
|
|
+Options:
|
|
|
|
|
+ -h Show this help
|
|
|
|
|
+ -v More verbose
|
|
|
|
|
+ -p NUM Specify number of processes (default: 6)
|
|
|
|
|
+""")
|
|
|
|
|
+
|
|
|
|
|
+class ShowHelpException(Exception):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+class Params:
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ self.Flags = {"-v": False}
|
|
|
|
|
+ self.Ints = {"-p": 6}
|
|
|
|
|
+ self.Strings = {}
|
|
|
|
|
+ self.Floats = {}
|
|
|
|
|
+ self.Args = []
|
|
|
|
|
+ self.ArgsMin = 1
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def parse(cmds:list):
|
|
|
|
|
+ p = Params()
|
|
|
|
|
+ while len(cmds) > 0 and cmds[0].startswith("-"):
|
|
|
|
|
+ if cmds[0] == "-h" or cmds[0] == "-H":
|
|
|
|
|
+ raise ShowHelpException
|
|
|
|
|
+ elif cmds[0] in p.Flags:
|
|
|
|
|
+ p.Flags[cmds[0]] = True
|
|
|
|
|
+ elif cmds[0] in p.Floats:
|
|
|
|
|
+ p.Floats[cmds[0]] = float(cmds[1])
|
|
|
|
|
+ cmds.pop(0)
|
|
|
|
|
+ elif cmds[0] in p.Ints:
|
|
|
|
|
+ p.Ints[cmds[0]] = int(cmds[1])
|
|
|
|
|
+ cmds.pop(0)
|
|
|
|
|
+ elif cmds[0] in p.Strings:
|
|
|
|
|
+ p.Strings[cmds[0]] = cmds[1]
|
|
|
|
|
+ cmds.pop(0)
|
|
|
|
|
+ else:
|
|
|
|
|
+ raise ValueError("Unknown option: " + cmds[0])
|
|
|
|
|
+ cmds.pop(0)
|
|
|
|
|
+ if len(cmds) < p.ArgsMin:
|
|
|
|
|
+ raise ShowHelpException
|
|
|
|
|
+ p.Args = cmds
|
|
|
|
|
+ return p
|
|
|
|
|
+
|
|
|
|
|
+def main(args):
|
|
|
|
|
+ welcome()
|
|
|
|
|
+ try:
|
|
|
|
|
+ if len(args) == 0:
|
|
|
|
|
+ raise ShowHelpException
|
|
|
|
|
+ p = Params.parse(args)
|
|
|
|
|
+ global _verbose
|
|
|
|
|
+ _verbose = p.Flags["-v"]
|
|
|
|
|
+ global _config
|
|
|
|
|
+ _config = Config("masterimg.yaml")
|
|
|
|
|
+ print("Read masterimg.yaml")
|
|
|
|
|
+ process(p.Ints["-p"], p.Args)
|
|
|
|
|
+ except ShowHelpException as e:
|
|
|
|
|
+ usage()
|
|
|
|
|
+ #except Exception as e:
|
|
|
|
|
+ # print(str(e))
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ import sys
|
|
|
|
|
+ main(sys.argv[1:])
|
|
|
|
|
+
|