masterimg.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. #!/usr/bin/python3
  2. import sys
  3. import os
  4. import multiprocessing
  5. from datetime import datetime as dt
  6. from PIL import Image
  7. import piexif
  8. import re
  9. import math
  10. import yaml
  11. from dataclasses import dataclass
  12. from typing import Optional
  13. @dataclass
  14. class OutputDef:
  15. MaxDimension:int # max dim. wins over min dim.
  16. MinDimension:int
  17. OutputFormat:str
  18. OutputQuality:int
  19. OutputPath:str
  20. OutputSuffix:str
  21. SocialMediaDef = OutputDef(1600, 1080, "JPEG", 96, "sm", "_sm.jpg")
  22. HqDef = OutputDef(2560, 1920, "JPEG", 95, "hq", "_hq.jpg")
  23. FullDef = OutputDef(3600, 2700, "JPEG", 98, "full", "_full.jpg")
  24. # --------
  25. _verbose = False
  26. WmScale = 1.6
  27. class Config:
  28. def __init__(self, yaml_filename:str):
  29. with open(yaml_filename) as f:
  30. cfg = yaml.load(f.read(), Loader=yaml.SafeLoader)
  31. self.CfgDict:dict = cfg
  32. self.Copyright:str = cfg["Copyright"]
  33. self.UserComment:str = cfg["UserComment"]
  34. self.WmFile:str = cfg["WatermarkFile"]
  35. def getFileSpecs(self, filename:str) -> dict:
  36. cfg = self.CfgDict.get(filename)
  37. if type(cfg) is dict:
  38. return cfg
  39. else:
  40. return {}
  41. global _config
  42. _config:Config = None
  43. # ------
  44. class Watermark:
  45. WidthRatio = 0.25 / WmScale
  46. Xpos = 1.0 / WmScale
  47. Ypos = 0.9 / WmScale
  48. WmColorWhite = 240
  49. WmColorBlack = 20
  50. TranspBaseWhite = 1.2
  51. TranspBaseBlack = 1.2
  52. TranspStdevScale = 0.02
  53. TranspIntensScale = 0.0025
  54. TranspBase2 = 0.17
  55. InvertWm = False
  56. def __init__(self, wm_black:str, wm_white:Optional[str]=None):
  57. self.WmBlack = wm_black
  58. self.WmWhite = wm_black if wm_white is None else wm_black
  59. wm_tmp = Image.open(wm_black)
  60. self.WmWidth = wm_tmp.size[0]
  61. self.WmHeight = wm_tmp.size[1]
  62. @staticmethod
  63. def _getIntensityVariance(img:Image, area):
  64. # Calculate intensity and variance on area of img
  65. cropped_area = img.crop(area)
  66. gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
  67. mean = sum(gray_pixels) / len(gray_pixels)
  68. var = sum((x - mean) ** 2 for x in gray_pixels) / len(gray_pixels)
  69. return (round(mean), var)
  70. @staticmethod
  71. def _getLoHiMean(img:Image, area):
  72. # Calculate lo and high 10th percentile, and mean, on area of img
  73. cropped_area = img.crop(area)
  74. gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
  75. gray_pixels.sort()
  76. mean = sum(gray_pixels) / len(gray_pixels)
  77. i_low = len(gray_pixels) // 10
  78. i_high = len(gray_pixels) - i_low
  79. return (gray_pixels[i_low], gray_pixels[i_high], round(mean))
  80. def _planWatermark(self, main_image, watermark_area):
  81. # Check out image intensity behind watermark
  82. (gray_intens, gray_var) = self._getIntensityVariance(main_image, watermark_area)
  83. gray_stdev = math.sqrt(gray_var)
  84. if gray_intens < 128:
  85. wm_color = 255
  86. transparency = 1 - (1 / (self.TranspBaseWhite +
  87. (gray_intens * self.TranspIntensScale) + gray_stdev * self.TranspStdevScale))
  88. else:
  89. wm_color = 0
  90. transparency = 1 - (1 / (self.TranspBaseBlack +
  91. (255 - gray_intens) * self.TranspIntensScale + gray_stdev * self.TranspStdevScale))
  92. if _verbose:
  93. print("Watermark plan: intens: %d, st.dev.: %d, wm_color: %d, transp: %0.2f" % (gray_intens, gray_stdev, wm_color, transparency))
  94. return (wm_color, transparency)
  95. def _planWatermark2(self, main_image, watermark_area):
  96. (low, high, avg) = self._getLoHiMean(main_image, watermark_area)
  97. f1 = (1 - self.TranspBase2) / 2 / 255
  98. f2 = (1 - self.TranspBase2) / 2 / 255
  99. if avg < 128:
  100. wm_color = 255
  101. transparency = self.TranspBase2 + (high - low) * f1 + high * f2
  102. else:
  103. wm_color = 0
  104. transparency = self.TranspBase2 + (high - low) * f1 + (255 - low) * f2
  105. if _verbose:
  106. print("Watermark plan: intens: %d, lo: %d, hi: %d, wm_color: %d, transp: %0.2f" % (avg, low, high, wm_color, transparency))
  107. return (wm_color, transparency)
  108. def apply(self, main_image:Image, pos:str="bl"):
  109. if not pos in ["tl", "tr", "bl", "br"]:
  110. raise Exception("Invalid watermark position")
  111. # Calculate an average image dimension by a weighted average,
  112. # weighing the smallest dimension double:
  113. max_dim = max(main_image.size[0], main_image.size[1])
  114. min_dim = min(main_image.size[0], main_image.size[1])
  115. avg_dimension = (max_dim + min_dim * 2) / 3
  116. # This is used to size the water mark
  117. wm_width = avg_dimension * self.WidthRatio
  118. wm_size = (int(wm_width), int(self.WmHeight * wm_width / self.WmWidth))
  119. # X and Y offset are specified in multiplum of watermark height
  120. wm_x = round(wm_size[1] * self.Xpos)
  121. wm_y = round(wm_size[1] * self.Ypos)
  122. if "r" in pos: # right (otherwise l for left)
  123. wm_x = main_image.size[0] - wm_size[0] - wm_x
  124. if "b" in pos: # bottom (otherwise t for top)
  125. wm_y = main_image.size[1] - wm_size[1] - wm_y
  126. watermark_area = (wm_x, wm_y, wm_x + wm_size[0], wm_y + wm_size[1])
  127. # Plan color and transparency of watermark by checking out the watermark area
  128. (wm_color, transparency) = self._planWatermark2(main_image, watermark_area)
  129. if wm_color > 128:
  130. watermark = Image.open(self.WmWhite).resize(wm_size, Image.LANCZOS)
  131. else:
  132. watermark = Image.open(self.WmBlack).resize(wm_size, Image.LANCZOS)
  133. # Make watermark transparent
  134. watermark = watermark.convert("RGBA")
  135. pixels = watermark.getdata()
  136. if self.InvertWm:
  137. new_pixels = [(wm_color, wm_color, wm_color, round((255 - pixel[3]) * transparency)) for pixel in pixels]
  138. else:
  139. new_pixels = [(wm_color, wm_color, wm_color, round(pixel[3] * transparency)) for pixel in pixels]
  140. watermark.putdata(new_pixels)
  141. # Paste the watermark onto the main image
  142. main_image.paste(watermark, (wm_x, wm_y), watermark)
  143. def makeExifDict(copyright, usercomment) -> bytes:
  144. exif_dict = {
  145. "0th": {
  146. piexif.ImageIFD.Copyright: copyright.encode("ascii")
  147. },
  148. "Exif": {
  149. piexif.ExifIFD.UserComment: b"ASCII\0\0\0" + usercomment.encode('ascii')
  150. },
  151. "GPS": {},
  152. "1st": {},
  153. "thumbnail": None
  154. }
  155. return piexif.dump(exif_dict)
  156. def resizeSaveOutput(input_filename:str, out_name:str, img:Image, exif_bytes:bytes, output_def:OutputDef):
  157. if output_def.MaxDimension == 0:
  158. scale = None
  159. else:
  160. scale1 = output_def.MinDimension / min(img.size[0], img.size[1])
  161. scale2 = output_def.MaxDimension / max(img.size[0], img.size[1])
  162. scale = min(scale1, scale2)
  163. if scale is not None and scale < 1.0:
  164. width = round(img.size[0] * scale)
  165. height = round(img.size[1] * scale)
  166. scale_msg = "scaled by %0.2f to: %dx%d" % (scale, width, height)
  167. img_scaled = img.resize((width, height), Image.LANCZOS)
  168. else:
  169. scale_msg = "not rescaled: %dx%d" % (img.size[0], img.size[1])
  170. img_scaled = img
  171. # in_path = os.path.dirname(input_filename)
  172. # if in_path == "":
  173. # in_path = "."
  174. # out_path = in_path + "/" + output_def.OutputPath
  175. out_path = "./" + output_def.OutputPath
  176. in_fname = os.path.basename(input_filename)
  177. out_fname = out_name + output_def.OutputSuffix
  178. out_full = out_path + "/" + out_fname
  179. if not os.path.isdir(out_path):
  180. print("Creating dir: " + out_path)
  181. os.makedirs(out_path, exist_ok=True)
  182. img_scaled.save(out_full, output_def.OutputFormat, quality=output_def.OutputQuality, exif=exif_bytes)
  183. size = os.path.getsize(out_full) // 1000
  184. print("%s %s, saved as %s (%d KB)" % (in_fname, scale_msg,
  185. output_def.OutputPath + "/" + out_fname, size))
  186. def process_image(main_image_path:str, watermark:Watermark):
  187. try:
  188. img_name = os.path.basename(main_image_path).split(".")[0]
  189. img_cfg = _config.getFileSpecs(img_name)
  190. if "year" in img_cfg:
  191. year = int(img_cfg["year"])
  192. else:
  193. mtime = os.path.getmtime(main_image_path)
  194. year = dt.fromtimestamp(mtime).year
  195. print("Input: %s, (year: %d to be used for copyright notice)" % (main_image_path, year))
  196. exif = makeExifDict(_config.Copyright % year, _config.UserComment)
  197. # Load the main image
  198. main_image = Image.open(main_image_path)
  199. # Save full output without water mark
  200. out_name = img_cfg.get("name", img_name)
  201. resizeSaveOutput(main_image_path, out_name, main_image, exif, FullDef)
  202. # Make reduced quality watermarked outputs
  203. wm_spec = img_cfg.get("watermark", "bl")
  204. if wm_spec != False and wm_spec != "off":
  205. watermark.apply(main_image, wm_spec)
  206. resizeSaveOutput(main_image_path, out_name, main_image, exif, SocialMediaDef)
  207. resizeSaveOutput(main_image_path, out_name, main_image, exif, HqDef)
  208. except Exception as e:
  209. print("ERROR: Processing %s failed: %s" % (main_image_path, str(e)))
  210. # ----
  211. def process(num_processes:int, filenames:list):
  212. print("Processes: %d" % (num_processes))
  213. watermark = Watermark(_config.WmFile)
  214. # Creating a pool of processes
  215. with multiprocessing.Pool(processes=num_processes) as pool:
  216. for filename in filenames:
  217. pool.apply_async(process_image, args=(filename, watermark))
  218. pool.close()
  219. pool.join()
  220. # ----
  221. def welcome():
  222. print("masterimg")
  223. def usage():
  224. print("""
  225. Usage:
  226. masterimg OPTIONS FILES ...
  227. Options:
  228. -h Show this help
  229. -v More verbose
  230. -p NUM Specify number of processes (default: 6)
  231. """)
  232. class ShowHelpException(Exception):
  233. pass
  234. class Params:
  235. def __init__(self):
  236. self.Flags = {"-v": False}
  237. self.Ints = {"-p": 6}
  238. self.Strings = {}
  239. self.Floats = {}
  240. self.Args = []
  241. self.ArgsMin = 1
  242. @staticmethod
  243. def parse(cmds:list):
  244. p = Params()
  245. while len(cmds) > 0 and cmds[0].startswith("-"):
  246. if cmds[0] == "-h" or cmds[0] == "-H":
  247. raise ShowHelpException
  248. elif cmds[0] in p.Flags:
  249. p.Flags[cmds[0]] = True
  250. elif cmds[0] in p.Floats:
  251. p.Floats[cmds[0]] = float(cmds[1])
  252. cmds.pop(0)
  253. elif cmds[0] in p.Ints:
  254. p.Ints[cmds[0]] = int(cmds[1])
  255. cmds.pop(0)
  256. elif cmds[0] in p.Strings:
  257. p.Strings[cmds[0]] = cmds[1]
  258. cmds.pop(0)
  259. else:
  260. raise ValueError("Unknown option: " + cmds[0])
  261. cmds.pop(0)
  262. if len(cmds) < p.ArgsMin:
  263. raise ShowHelpException
  264. p.Args = cmds
  265. return p
  266. def main(args):
  267. welcome()
  268. try:
  269. if len(args) == 0:
  270. raise ShowHelpException
  271. p = Params.parse(args)
  272. global _verbose
  273. _verbose = p.Flags["-v"]
  274. global _config
  275. _config = Config("masterimg.yaml")
  276. print("Read masterimg.yaml")
  277. process(p.Ints["-p"], p.Args)
  278. except ShowHelpException as e:
  279. usage()
  280. #except Exception as e:
  281. # print(str(e))
  282. if __name__ == "__main__":
  283. import sys
  284. main(sys.argv[1:])