masterimg.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. #!/usr/bin/python3
  2. import sys
  3. import os
  4. import multiprocessing
  5. from datetime import datetime as dt
  6. from PIL import Image
  7. import piexif
  8. import math
  9. import yaml
  10. from dataclasses import dataclass
  11. from typing import Optional
  12. # Requires pip install piexif, pillow, pyyaml
  13. @dataclass
  14. class OutputDef:
  15. MaxDimension:int # max dim. wins over min dim.
  16. MinDimension:int
  17. OutputFormat:str
  18. OutputQuality:int
  19. OutputPath:str
  20. OutputSuffix:str
  21. SocialMediaDef = OutputDef(1600, 1080, "JPEG", 96, "sm", "_sm.jpg")
  22. HqDef = OutputDef(2560, 1920, "JPEG", 98, "hq", "_hq.jpg")
  23. FullDef = OutputDef(3600, 2700, "JPEG", 98, "full", "_full.jpg")
  24. # --------
  25. _verbose = False
  26. WmScale = 1.6
  27. class Config:
  28. def __init__(self, yaml_filename:str):
  29. with open(yaml_filename) as f:
  30. cfg = yaml.safe_load(f)
  31. #cfg = yaml.safload(f.read(), Loader=yaml.SafeLoader)
  32. self.CfgDict:dict = cfg
  33. self.Copyright:str = cfg["Copyright"]
  34. self.UserComment:str = cfg["UserComment"]
  35. self.WmFile:str = cfg["WatermarkFile"]
  36. def getFileSpecs(self, filename:str) -> dict:
  37. cfg = self.CfgDict.get(filename)
  38. if type(cfg) is dict:
  39. return cfg
  40. else:
  41. return {}
  42. global _config
  43. _config:Config = None
  44. # ------
  45. class Watermark:
  46. WidthRatio = 0.25 / WmScale
  47. Xpos = 1.0 / WmScale
  48. Ypos = 0.9 / WmScale
  49. WmColorWhite = 240
  50. WmColorBlack = 20
  51. TranspBaseWhite = 1.2
  52. TranspBaseBlack = 1.2
  53. TranspStdevScale = 0.02
  54. TranspIntensScale = 0.0025
  55. TranspBase2 = 0.17
  56. InvertWm = False
  57. def __init__(self, wm_black:str, wm_white:Optional[str]=None):
  58. self.WmBlack = wm_black
  59. self.WmWhite = wm_black if wm_white is None else wm_black
  60. wm_tmp = Image.open(wm_black)
  61. self.WmWidth = wm_tmp.size[0]
  62. self.WmHeight = wm_tmp.size[1]
  63. @staticmethod
  64. def _getIntensityVariance(img:Image, area):
  65. # Calculate intensity and variance on area of img
  66. cropped_area = img.crop(area)
  67. gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
  68. mean = sum(gray_pixels) / len(gray_pixels)
  69. var = sum((x - mean) ** 2 for x in gray_pixels) / len(gray_pixels)
  70. return (round(mean), var)
  71. @staticmethod
  72. def _getLoHiMean(img:Image, area):
  73. # Calculate lo and high 10th percentile, and mean, on area of img
  74. cropped_area = img.crop(area)
  75. gray_pixels = [0.21 * pixel[0] + 0.72 * pixel[1] + 0.07 * pixel[2] for pixel in cropped_area.getdata()]
  76. gray_pixels.sort()
  77. mean = sum(gray_pixels) / len(gray_pixels)
  78. i_low = len(gray_pixels) // 10
  79. i_high = len(gray_pixels) - i_low
  80. return (gray_pixels[i_low], gray_pixels[i_high], round(mean))
  81. def _planWatermark(self, main_image, watermark_area):
  82. # Check out image intensity behind watermark
  83. (gray_intens, gray_var) = self._getIntensityVariance(main_image, watermark_area)
  84. gray_stdev = math.sqrt(gray_var)
  85. if gray_intens < 128:
  86. wm_color = 255
  87. transparency = 1 - (1 / (self.TranspBaseWhite +
  88. (gray_intens * self.TranspIntensScale) + gray_stdev * self.TranspStdevScale))
  89. else:
  90. wm_color = 0
  91. transparency = 1 - (1 / (self.TranspBaseBlack +
  92. (255 - gray_intens) * self.TranspIntensScale + gray_stdev * self.TranspStdevScale))
  93. if _verbose:
  94. print("Watermark plan: intens: %d, st.dev.: %d, wm_color: %d, transp: %0.2f" % (gray_intens, gray_stdev, wm_color, transparency))
  95. return (wm_color, transparency)
  96. def _planWatermark2(self, main_image, watermark_area):
  97. (low, high, avg) = self._getLoHiMean(main_image, watermark_area)
  98. f1 = (1 - self.TranspBase2) / 2 / 255
  99. f2 = (1 - self.TranspBase2) / 2 / 255
  100. if avg < 128:
  101. wm_color = 255
  102. transparency = self.TranspBase2 + (high - low) * f1 + high * f2
  103. else:
  104. wm_color = 0
  105. transparency = self.TranspBase2 + (high - low) * f1 + (255 - low) * f2
  106. if _verbose:
  107. print("Watermark plan: intens: %d, lo: %d, hi: %d, wm_color: %d, transp: %0.2f" % (avg, low, high, wm_color, transparency))
  108. return (wm_color, transparency)
  109. def apply(self, main_image:Image, pos:str="bl"):
  110. if not pos in ["tl", "tr", "bl", "br"]:
  111. raise Exception("Invalid watermark position")
  112. # Calculate an average image dimension by a weighted average,
  113. # weighing the smallest dimension double:
  114. max_dim = max(main_image.size[0], main_image.size[1])
  115. min_dim = min(main_image.size[0], main_image.size[1])
  116. avg_dimension = (max_dim + min_dim * 2) / 3
  117. # This is used to size the water mark
  118. wm_width = avg_dimension * self.WidthRatio
  119. wm_size = (int(wm_width), int(self.WmHeight * wm_width / self.WmWidth))
  120. # X and Y offset are specified in multiplum of watermark height
  121. wm_x = round(wm_size[1] * self.Xpos)
  122. wm_y = round(wm_size[1] * self.Ypos)
  123. if "r" in pos: # right (otherwise l for left)
  124. wm_x = main_image.size[0] - wm_size[0] - wm_x
  125. if "b" in pos: # bottom (otherwise t for top)
  126. wm_y = main_image.size[1] - wm_size[1] - wm_y
  127. watermark_area = (wm_x, wm_y, wm_x + wm_size[0], wm_y + wm_size[1])
  128. # Plan color and transparency of watermark by checking out the watermark area
  129. (wm_color, transparency) = self._planWatermark2(main_image, watermark_area)
  130. if wm_color > 128:
  131. watermark = Image.open(self.WmWhite).resize(wm_size, Image.LANCZOS)
  132. else:
  133. watermark = Image.open(self.WmBlack).resize(wm_size, Image.LANCZOS)
  134. # Make watermark transparent
  135. watermark = watermark.convert("RGBA")
  136. pixels = watermark.getdata()
  137. if self.InvertWm:
  138. new_pixels = [(wm_color, wm_color, wm_color, round((255 - pixel[3]) * transparency)) for pixel in pixels]
  139. else:
  140. new_pixels = [(wm_color, wm_color, wm_color, round(pixel[3] * transparency)) for pixel in pixels]
  141. watermark.putdata(new_pixels)
  142. # Paste the watermark onto the main image
  143. main_image.paste(watermark, (wm_x, wm_y), watermark)
  144. def makeExifBytes(copyright:Optional[str], user_comment:Optional[str], image_description:Optional[str]) -> bytes:
  145. exif = {
  146. "0th": {},
  147. "Exif": {},
  148. "GPS": {},
  149. "1st": {},
  150. "thumbnail": None
  151. }
  152. if copyright is not None:
  153. exif["0th"][piexif.ImageIFD.Copyright] = copyright.encode("ascii")
  154. if user_comment is not None:
  155. # = b'\0\0\0\0\0\0\0\0' + user_comment.encode('utf-8')
  156. exif["Exif"][piexif.ExifIFD.UserComment] = b"ASCII\0\0\0" + user_comment.encode('ascii')
  157. if image_description is not None:
  158. exif['0th'][piexif.ImageIFD.ImageDescription] = image_description.encode('utf-8')
  159. return piexif.dump(exif)
  160. def resizeSaveOutput(input_filename:str, out_name:str, img:Image, exif_bytes:bytes, output_def:OutputDef):
  161. if output_def.MaxDimension == 0:
  162. scale = None
  163. else:
  164. scale1 = output_def.MinDimension / min(img.size[0], img.size[1])
  165. scale2 = output_def.MaxDimension / max(img.size[0], img.size[1])
  166. scale = min(scale1, scale2)
  167. if scale is not None and scale < 1.0:
  168. width = round(img.size[0] * scale)
  169. height = round(img.size[1] * scale)
  170. scale_msg = "scaled by %0.2f to: %dx%d" % (scale, width, height)
  171. img_scaled = img.resize((width, height), Image.LANCZOS)
  172. else:
  173. scale_msg = "not rescaled: %dx%d" % (img.size[0], img.size[1])
  174. img_scaled = img
  175. # in_path = os.path.dirname(input_filename)
  176. # if in_path == "":
  177. # in_path = "."
  178. # out_path = in_path + "/" + output_def.OutputPath
  179. out_path = "./" + output_def.OutputPath
  180. in_fname = os.path.basename(input_filename)
  181. out_fname = out_name + output_def.OutputSuffix
  182. out_full = out_path + "/" + out_fname
  183. if not os.path.isdir(out_path):
  184. print("Creating dir: " + out_path)
  185. os.makedirs(out_path, exist_ok=True)
  186. img_scaled.save(out_full, output_def.OutputFormat, quality=output_def.OutputQuality, exif=exif_bytes)
  187. size = os.path.getsize(out_full) // 1000
  188. print("%s %s, saved as %s (%d KB)" % (in_fname, scale_msg,
  189. output_def.OutputPath + "/" + out_fname, size))
  190. def process_image(main_image_path:str, watermark:Watermark):
  191. try:
  192. img_name = os.path.basename(main_image_path).split(".")[0]
  193. img_cfg = _config.getFileSpecs(img_name)
  194. if "year" in img_cfg:
  195. year = int(img_cfg["year"])
  196. else:
  197. mtime = os.path.getmtime(main_image_path)
  198. year = dt.fromtimestamp(mtime).year
  199. print("Input: %s, (year: %d to be used for copyright notice)" % (main_image_path, year))
  200. exif = makeExifBytes(_config.Copyright % year, _config.UserComment, img_cfg.get("title"))
  201. # Load the main image
  202. main_image = Image.open(main_image_path)
  203. # Save full output without water mark
  204. out_name = img_cfg.get("name", img_name)
  205. resizeSaveOutput(main_image_path, out_name, main_image, exif, FullDef)
  206. # Make reduced quality watermarked outputs
  207. wm_spec = img_cfg.get("watermark", "bl")
  208. if wm_spec != False and wm_spec != "off":
  209. watermark.apply(main_image, wm_spec)
  210. resizeSaveOutput(main_image_path, out_name, main_image, exif, SocialMediaDef)
  211. resizeSaveOutput(main_image_path, out_name, main_image, exif, HqDef)
  212. except Exception as e:
  213. print("ERROR: Processing %s failed: %s" % (main_image_path, str(e)))
  214. # ----
  215. def process(num_processes:int, filenames:list):
  216. print("Processes: %d" % (num_processes))
  217. watermark = Watermark(_config.WmFile)
  218. # Creating a pool of processes
  219. with multiprocessing.Pool(processes=num_processes) as pool:
  220. for filename in filenames:
  221. pool.apply_async(process_image, args=(filename, watermark))
  222. pool.close()
  223. pool.join()
  224. # ----
  225. def welcome():
  226. print("masterimg")
  227. def usage():
  228. print("""
  229. Usage:
  230. masterimg OPTIONS FILES ...
  231. Options:
  232. -h Show this help
  233. -v More verbose
  234. -p NUM Specify number of processes (default: 6)
  235. """)
  236. class ShowHelpException(Exception):
  237. pass
  238. class Params:
  239. def __init__(self):
  240. self.Flags = {"-v": False}
  241. self.Ints = {"-p": 6}
  242. self.Strings = {}
  243. self.Floats = {}
  244. self.Args = []
  245. self.ArgsMin = 1
  246. @staticmethod
  247. def parse(cmds:list):
  248. p = Params()
  249. while len(cmds) > 0 and cmds[0].startswith("-"):
  250. if cmds[0] == "-h" or cmds[0] == "-H":
  251. raise ShowHelpException
  252. elif cmds[0] in p.Flags:
  253. p.Flags[cmds[0]] = True
  254. elif cmds[0] in p.Floats:
  255. p.Floats[cmds[0]] = float(cmds[1])
  256. cmds.pop(0)
  257. elif cmds[0] in p.Ints:
  258. p.Ints[cmds[0]] = int(cmds[1])
  259. cmds.pop(0)
  260. elif cmds[0] in p.Strings:
  261. p.Strings[cmds[0]] = cmds[1]
  262. cmds.pop(0)
  263. else:
  264. raise ValueError("Unknown option: " + cmds[0])
  265. cmds.pop(0)
  266. if len(cmds) < p.ArgsMin:
  267. raise ShowHelpException
  268. p.Args = cmds
  269. return p
  270. def main(args):
  271. welcome()
  272. try:
  273. if len(args) == 0:
  274. raise ShowHelpException
  275. p = Params.parse(args)
  276. global _verbose
  277. _verbose = p.Flags["-v"]
  278. global _config
  279. _config = Config("masterimg.yaml")
  280. print("Read masterimg.yaml")
  281. process(p.Ints["-p"], p.Args)
  282. except ShowHelpException as e:
  283. usage()
  284. #except Exception as e:
  285. # print(str(e))
  286. if __name__ == "__main__":
  287. import sys
  288. main(sys.argv[1:])