| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- #!/usr/bin/python3
- import sys
- import os
- import multiprocessing
- from datetime import datetime as dt
- from PIL import Image
- import piexif
- import math
- import yaml
- from dataclasses import dataclass
- from typing import Optional
- # Requires pip install piexif, pillow, pyyaml
- @dataclass
- class OutputDef:
- MaxDimension:int # max dim. wins over min dim.
- MinDimension:int
- OutputFormat:str
- OutputQuality:int
- OutputPath:str
- OutputSuffix:str
- SocialMediaDef = OutputDef(1600, 1080, "JPEG", 96, "sm", "_sm.jpg")
- HqDef = OutputDef(2560, 1920, "JPEG", 98, "hq", "_hq.jpg")
- FullDef = OutputDef(3600, 2700, "JPEG", 98, "full", "_full.jpg")
- # --------
- _verbose = False
- WmScale = 1.6
- class Config:
- def __init__(self, yaml_filename:str):
- with open(yaml_filename) as f:
- cfg = yaml.safe_load(f)
- #cfg = yaml.safload(f.read(), Loader=yaml.SafeLoader)
- self.CfgDict:dict = cfg
- self.Copyright:str = cfg["Copyright"]
- self.UserComment:str = cfg["UserComment"]
- self.WmFile:str = cfg["WatermarkFile"]
- def getFileSpecs(self, filename:str) -> dict:
- cfg = self.CfgDict.get(filename)
- if type(cfg) is dict:
- return cfg
- else:
- return {}
- global _config
- _config:Config = None
- # ------
- class Watermark:
- WidthRatio = 0.25 / WmScale
- Xpos = 1.0 / WmScale
- Ypos = 0.9 / WmScale
- WmColorWhite = 240
- WmColorBlack = 20
- TranspBaseWhite = 1.2
- TranspBaseBlack = 1.2
- TranspStdevScale = 0.02
- TranspIntensScale = 0.0025
- TranspBase2 = 0.17
- InvertWm = False
- def __init__(self, wm_black:str, wm_white:Optional[str]=None):
- self.WmBlack = wm_black
- self.WmWhite = wm_black if wm_white is None else wm_black
- wm_tmp = Image.open(wm_black)
- self.WmWidth = wm_tmp.size[0]
- self.WmHeight = wm_tmp.size[1]
- @staticmethod
- def _getIntensityVariance(img:Image, area):
- # Calculate intensity and variance on area of img
- cropped_area = img.crop(area)
- gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
- mean = sum(gray_pixels) / len(gray_pixels)
- var = sum((x - mean) ** 2 for x in gray_pixels) / len(gray_pixels)
- return (round(mean), var)
- @staticmethod
- def _getLoHiMean(img:Image, area):
- # Calculate lo and high 10th percentile, and mean, on area of img
- cropped_area = img.crop(area)
- gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
- gray_pixels.sort()
- mean = sum(gray_pixels) / len(gray_pixels)
- i_low = len(gray_pixels) // 10
- i_high = len(gray_pixels) - i_low
- return (gray_pixels[i_low], gray_pixels[i_high], round(mean))
- def _planWatermark(self, main_image, watermark_area):
- # Check out image intensity behind watermark
- (gray_intens, gray_var) = self._getIntensityVariance(main_image, watermark_area)
-
- gray_stdev = math.sqrt(gray_var)
-
- if gray_intens < 128:
- wm_color = 255
- transparency = 1 - (1 / (self.TranspBaseWhite +
- (gray_intens * self.TranspIntensScale) + gray_stdev * self.TranspStdevScale))
- else:
- wm_color = 0
- transparency = 1 - (1 / (self.TranspBaseBlack +
- (255 - gray_intens) * self.TranspIntensScale + gray_stdev * self.TranspStdevScale))
- if _verbose:
- print("Watermark plan: intens: %d, st.dev.: %d, wm_color: %d, transp: %0.2f" % (gray_intens, gray_stdev, wm_color, transparency))
-
- return (wm_color, transparency)
- def _planWatermark2(self, main_image, watermark_area):
- (low, high, avg) = self._getLoHiMean(main_image, watermark_area)
- f1 = (1 - self.TranspBase2) / 2 / 255
- f2 = (1 - self.TranspBase2) / 2 / 255
- if avg < 128:
- wm_color = 255
- transparency = self.TranspBase2 + (high - low) * f1 + high * f2
- else:
- wm_color = 0
- transparency = self.TranspBase2 + (high - low) * f1 + (255 - low) * f2
- if _verbose:
- print("Watermark plan: intens: %d, lo: %d, hi: %d, wm_color: %d, transp: %0.2f" % (avg, low, high, wm_color, transparency))
-
- return (wm_color, transparency)
- def apply(self, main_image:Image, pos:str="bl"):
- if not pos in ["tl", "tr", "bl", "br"]:
- raise Exception("Invalid watermark position")
-
- # Calculate an average image dimension by a weighted average,
- # weighing the smallest dimension double:
- max_dim = max(main_image.size[0], main_image.size[1])
- min_dim = min(main_image.size[0], main_image.size[1])
- avg_dimension = (max_dim + min_dim * 2) / 3
- # This is used to size the water mark
- wm_width = avg_dimension * self.WidthRatio
- wm_size = (int(wm_width), int(self.WmHeight * wm_width / self.WmWidth))
- # X and Y offset are specified in multiplum of watermark height
- wm_x = round(wm_size[1] * self.Xpos)
- wm_y = round(wm_size[1] * self.Ypos)
- if "r" in pos: # right (otherwise l for left)
- wm_x = main_image.size[0] - wm_size[0] - wm_x
- if "b" in pos: # bottom (otherwise t for top)
- wm_y = main_image.size[1] - wm_size[1] - wm_y
- watermark_area = (wm_x, wm_y, wm_x + wm_size[0], wm_y + wm_size[1])
- # Plan color and transparency of watermark by checking out the watermark area
- (wm_color, transparency) = self._planWatermark2(main_image, watermark_area)
- if wm_color > 128:
- watermark = Image.open(self.WmWhite).resize(wm_size, Image.LANCZOS)
- else:
- watermark = Image.open(self.WmBlack).resize(wm_size, Image.LANCZOS)
-
- # Make watermark transparent
- watermark = watermark.convert("RGBA")
- pixels = watermark.getdata()
- if self.InvertWm:
- new_pixels = [(wm_color, wm_color, wm_color, round((255 - pixel[3]) * transparency)) for pixel in pixels]
- else:
- new_pixels = [(wm_color, wm_color, wm_color, round(pixel[3] * transparency)) for pixel in pixels]
- watermark.putdata(new_pixels)
- # Paste the watermark onto the main image
- main_image.paste(watermark, (wm_x, wm_y), watermark)
- def makeExifBytes(copyright:Optional[str], user_comment:Optional[str], image_description:Optional[str]) -> bytes:
- exif = {
- "0th": {},
- "Exif": {},
- "GPS": {},
- "1st": {},
- "thumbnail": None
- }
- if copyright is not None:
- exif["0th"][piexif.ImageIFD.Copyright] = copyright.encode("ascii")
- if user_comment is not None:
- # = b'\0\0\0\0\0\0\0\0' + user_comment.encode('utf-8')
- exif["Exif"][piexif.ExifIFD.UserComment] = b"ASCII\0\0\0" + user_comment.encode('ascii')
- if image_description is not None:
- exif['0th'][piexif.ImageIFD.ImageDescription] = image_description.encode('utf-8')
- return piexif.dump(exif)
- def resizeSaveOutput(input_filename:str, out_name:str, img:Image, exif_bytes:bytes, output_def:OutputDef):
-
- if output_def.MaxDimension == 0:
- scale = None
- else:
- scale1 = output_def.MinDimension / min(img.size[0], img.size[1])
- scale2 = output_def.MaxDimension / max(img.size[0], img.size[1])
- scale = min(scale1, scale2)
- if scale is not None and scale < 1.0:
- width = round(img.size[0] * scale)
- height = round(img.size[1] * scale)
- scale_msg = "scaled by %0.2f to: %dx%d" % (scale, width, height)
- img_scaled = img.resize((width, height), Image.LANCZOS)
- else:
- scale_msg = "not rescaled: %dx%d" % (img.size[0], img.size[1])
- img_scaled = img
- # in_path = os.path.dirname(input_filename)
- # if in_path == "":
- # in_path = "."
- # out_path = in_path + "/" + output_def.OutputPath
- out_path = "./" + output_def.OutputPath
- in_fname = os.path.basename(input_filename)
- out_fname = out_name + output_def.OutputSuffix
- out_full = out_path + "/" + out_fname
- if not os.path.isdir(out_path):
- print("Creating dir: " + out_path)
- os.makedirs(out_path, exist_ok=True)
- img_scaled.save(out_full, output_def.OutputFormat, quality=output_def.OutputQuality, exif=exif_bytes)
- size = os.path.getsize(out_full) // 1000
- print("%s %s, saved as %s (%d KB)" % (in_fname, scale_msg,
- output_def.OutputPath + "/" + out_fname, size))
- def process_image(main_image_path:str, watermark:Watermark):
- try:
- img_name = os.path.basename(main_image_path).split(".")[0]
- img_cfg = _config.getFileSpecs(img_name)
- if "year" in img_cfg:
- year = int(img_cfg["year"])
- else:
- mtime = os.path.getmtime(main_image_path)
- year = dt.fromtimestamp(mtime).year
-
- print("Input: %s, (year: %d to be used for copyright notice)" % (main_image_path, year))
- exif = makeExifBytes(_config.Copyright % year, _config.UserComment, img_cfg.get("title"))
- # Load the main image
- main_image = Image.open(main_image_path)
- # Save full output without water mark
- out_name = img_cfg.get("name", img_name)
- resizeSaveOutput(main_image_path, out_name, main_image, exif, FullDef)
- # Make reduced quality watermarked outputs
- wm_spec = img_cfg.get("watermark", "bl")
- if wm_spec != False and wm_spec != "off":
- watermark.apply(main_image, wm_spec)
- resizeSaveOutput(main_image_path, out_name, main_image, exif, SocialMediaDef)
- resizeSaveOutput(main_image_path, out_name, main_image, exif, HqDef)
- except Exception as e:
- print("ERROR: Processing %s failed: %s" % (main_image_path, str(e)))
- # ----
- def process(num_processes:int, filenames:list):
- print("Processes: %d" % (num_processes))
- watermark = Watermark(_config.WmFile)
-
- # Creating a pool of processes
- with multiprocessing.Pool(processes=num_processes) as pool:
- for filename in filenames:
- pool.apply_async(process_image, args=(filename, watermark))
- pool.close()
- pool.join()
- # ----
-
- def welcome():
- print("masterimg")
- def usage():
- print("""
- Usage:
- masterimg OPTIONS FILES ...
-
- Options:
- -h Show this help
- -v More verbose
- -p NUM Specify number of processes (default: 6)
- """)
- class ShowHelpException(Exception):
- pass
- class Params:
- def __init__(self):
- self.Flags = {"-v": False}
- self.Ints = {"-p": 6}
- self.Strings = {}
- self.Floats = {}
- self.Args = []
- self.ArgsMin = 1
- @staticmethod
- def parse(cmds:list):
- p = Params()
- while len(cmds) > 0 and cmds[0].startswith("-"):
- if cmds[0] == "-h" or cmds[0] == "-H":
- raise ShowHelpException
- elif cmds[0] in p.Flags:
- p.Flags[cmds[0]] = True
- elif cmds[0] in p.Floats:
- p.Floats[cmds[0]] = float(cmds[1])
- cmds.pop(0)
- elif cmds[0] in p.Ints:
- p.Ints[cmds[0]] = int(cmds[1])
- cmds.pop(0)
- elif cmds[0] in p.Strings:
- p.Strings[cmds[0]] = cmds[1]
- cmds.pop(0)
- else:
- raise ValueError("Unknown option: " + cmds[0])
- cmds.pop(0)
- if len(cmds) < p.ArgsMin:
- raise ShowHelpException
- p.Args = cmds
- return p
- def main(args):
- welcome()
- try:
- if len(args) == 0:
- raise ShowHelpException
- p = Params.parse(args)
- global _verbose
- _verbose = p.Flags["-v"]
- global _config
- _config = Config("masterimg.yaml")
- print("Read masterimg.yaml")
- process(p.Ints["-p"], p.Args)
- except ShowHelpException as e:
- usage()
- #except Exception as e:
- # print(str(e))
- if __name__ == "__main__":
- import sys
- main(sys.argv[1:])
|