import os
import random
import time
from copy import copy
from copy import deepcopy
from glob import glob
import cv2
import numpy as np
from augraphy.base.augmentationresult import AugmentationResult
from augraphy.base.augmentationsequence import AugmentationSequence
from augraphy.utilities.overlaybuilder import OverlayBuilder
[docs]
class AugraphyPipeline:
"""Contains phases of image augmentations and their results.
:param pre_phase: Collection of Augmentations to apply
:param ink_phase: Collection of Augmentations to apply.
:type ink_phase: base.augmentationsequence or list
:param paper_phase: Collection of Augmentations to apply.
:type paper_phase: base.augmentationsequence or list
:param post_phase: Collection of Augmentations to apply.
:type post_phase: base.augmentationsequence or list
:param overlay_type: Blending method to print ink into paper.
:type overlay_type: string, optional.
:param overlay_alpha: The alpha value for certain overlay methods.
:type overlay_alpha: float, optional.
:param ink_color_range: Pair of ints determining the range from which to
sample the ink color.
:type ink_color_range: tuple, optional
:param paper_color_range: Pair of ints determining the range from which to
sample the paper color.
:type paper_color_range: tuple, optional
:param save_outputs: Flag to enable saving each phase output image.
:type save_outputs: bool, optional
:param log: Flag to enable logging.
:type log: bool, optional
:param random_seed: The initial value for PRNGs used in Augraphy.
:type random_seed: int, optional
"""
def __init__(
self,
ink_phase=[],
paper_phase=[],
post_phase=[],
pre_phase=[],
overlay_type="ink_to_paper",
overlay_alpha=0.3,
ink_color_range=(-1, -1),
paper_color_range=(255, 255),
save_outputs=False,
log=False,
random_seed=None,
):
"""Constructor method"""
self.pre_phase = self.wrapListMaybe(pre_phase)
self.ink_phase = self.wrapListMaybe(ink_phase)
self.paper_phase = self.wrapListMaybe(paper_phase)
self.post_phase = self.wrapListMaybe(post_phase)
self.overlay_type = overlay_type
self.overlay_alpha = overlay_alpha
self.ink_color_range = ink_color_range
self.paper_color_range = paper_color_range
self.save_outputs = save_outputs
self.log = log
self.random_seed = random_seed
# ensure determinism if random_seed set
if self.random_seed:
random.seed(self.random_seed)
np.random.seed(self.random_seed)
cv2.setRNGSeed(self.random_seed)
# create directory to store log files
if self.log:
self.log_prob_path = os.path.join(os.getcwd(), "logs/")
os.makedirs(self.log_prob_path, exist_ok=True)
if self.save_outputs:
# create each phase folder
self.save_paths = []
self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/pre/"))
self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/ink/"))
self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/paper/"))
self.save_paths.append(os.path.join(os.getcwd(), "augmentation_images/post/"))
for i in range(len(self.save_paths)):
os.makedirs(self.save_paths[i], exist_ok=True)
[docs]
def wrapListMaybe(self, augs):
"""Converts a bare list to an AugmentationSequence, or does nothing."""
if type(augs) is list:
return AugmentationSequence(augs)
else:
return augs
[docs]
def augment(self, image, return_dict=1):
"""Applies the Augmentations in each phase of the pipeline.
:param image: The image to apply Augmentations to. Minimum 30x30 pixels.
:type image: numpy.array or list
:return: 1. A dictionary of AugmentationResults representing the changes in each phase of the pipeline if the input is image.
2. A list contains output images if the input is list of images.
3. A four dimensional numpy array if the input is a four dimensional numpy array (batch size, channels, height, width).
:rtype: 1. dictionary
2. list
3. numpy array (B, C, H, W)
:param return_dict: Flag to return output in dictionary format.
Not applicable when input is 4 dimensional array.
When input is 4 dimensional numpy array, output will be a 4 dimensional array too.
:type return_dict: int
"""
# image is a list of images
if isinstance(image, list):
output = []
for single_image in image:
data = self.augment_single_image(single_image)
if return_dict:
output.append(data)
else:
output.append(data["output"])
# image is a 4 dimensional numpy array
elif len(image.shape) == 4:
batch_size, channels, height, width = image.shape
output = np.zeros((batch_size, channels, height, width), dtype=image.dtype)
for i in range(batch_size):
single_image = image[i].reshape(height, width, channels)
output_image = self.augment_single_image(single_image)["output"]
# output is color image but input is in grayscale, convert output to grayscale
if len(output_image.shape) > channels:
output_image = cv2.cvtColor(output_image, cv2.COLOR_BGR2GRAY)
# output is in grayscale but input is color image, convert output to color image
if len(output_image.shape) != channels:
output_image = cv2.cvtColor(output_image, cv2.COLOR_GRAY2BGR)
# rescale image size if the image size is changes after the augmentation
if output_image.shape[0] != height or output_image.shape[1] != width:
output_image = cv2.resize(output_image, (width, height), interpolation=cv2.INTER_AREA)
output[i] = output_image.reshape(channels, height, width)
# single image
else:
data = self.augment_single_image(image)
if return_dict:
output = data
else:
output = data["output"]
return output
[docs]
def augment_single_image(self, image):
"""Applies the Augmentations in each phase of the pipeline.
:param image: The image to apply Augmentations to. Minimum 30x30 pixels.
:type image: numpy.array
:return: A dictionary of AugmentationResults representing the changes
in each phase of the pipeline.
:rtype: dictionary
"""
# Check if image has correct channel
if len(image.shape) > 2 and (image.shape[2] != 3):
raise Exception(
"Image should have channel number of 3 (BGR), but actual dimensions were {}.".format(
image.shape,
),
)
# Check that image is the correct size.
if (image.shape[0] < 30) or (image.shape[1] < 30):
raise Exception(
"Image should have dimensions greater than 30x30, but actual dimensions were {}.".format(
image.shape,
),
)
# get and check valid image type ( uint or float)
image_type = str(image.dtype)
image_max_value = 255
if image_type[:5] == "float":
if np.max(image) <= 1:
image_max_value = 1
image = np.uint8(image * 255)
else:
image = np.uint8(image)
elif image_type[:4] != "uint":
raise Exception(
"Image type should be uint or float, but the image type is {}.".format(
image_type,
),
)
# create augraphy cache folder
cache_folder_path = os.path.join(os.getcwd() + "/augraphy_cache/")
os.makedirs(cache_folder_path, exist_ok=True)
cache_image_paths = glob(cache_folder_path + "*.png", recursive=True)
file_indices = []
modified_time = []
for image_path in cache_image_paths:
file_name = os.path.basename(image_path)
file_indices.append(int(file_name[file_name.index("_") + 1 : -4]))
modified_time.append(os.path.getmtime(image_path))
# store 30 cache image files
if len(cache_image_paths) >= 30:
oldest_index = np.argmin(modified_time)
outfilename = cache_folder_path + "image_" + str(file_indices[oldest_index]) + ".png"
cv2.imwrite(
outfilename,
image,
)
else:
current_image_index = len(cache_image_paths)
outfilename = cache_folder_path + "image_" + str(current_image_index) + ".png"
cv2.imwrite(
outfilename,
image,
)
data = dict()
# Store performance metadata and other logs here.
data["log"] = dict()
# For storing augmentation execution times.
data["log"]["time"] = list()
data["log"]["augmentation_name"] = list()
data["log"]["augmentation_status"] = list()
data["log"]["augmentation_parameters"] = list()
# This is useful.
data["log"]["image_shape"] = image.shape
data["image"] = image.copy()
data["pipeline"] = self
data["pre"] = list()
data["ink"] = list()
data["paper"] = list()
data["post"] = list()
if len(self.pre_phase) == 0:
self.pre_phase = AugmentationSequence([])
ink = data["image"].copy()
else:
# apply pre phase augmentation
pre = data["image"].copy()
data["pre"].append(AugmentationResult(None, pre))
self.apply_phase(data, layer="pre", phase=self.pre_phase)
if data["pre"][-1].result["rescaled_img"] is not None:
ink = data["pre"][-1].result["rescaled_img"]
else:
ink = data["image"].copy()
data["ink"].append(AugmentationResult(None, ink))
if (self.paper_color_range[0] != 0) | (self.paper_color_range[1] != 0):
paper_color = random.randint(
self.paper_color_range[0],
self.paper_color_range[1],
)
else:
paper_color = 255
data["log"]["paper_color"] = paper_color
data["paper"].append(
AugmentationResult(
None,
np.full(
(ink.shape[0], ink.shape[1], 3),
paper_color,
dtype=np.uint8,
),
),
)
# If phases were defined None or [] in a custom pipeline, they wouldn't
# be callable objects, so make them empty AugmentationSequences
if len(self.ink_phase) == 0:
self.ink_phase = AugmentationSequence([])
if len(self.paper_phase) == 0:
self.paper_phase = AugmentationSequence([])
if len(self.post_phase) == 0:
self.post_phase = AugmentationSequence([])
# apply ink phase augmentation
self.apply_phase(data, layer="ink", phase=self.ink_phase)
# apply paper phase augmentations
self.apply_phase(data, layer="paper", phase=self.paper_phase)
# ink and paper phases always have at least one result by now
data["post"].append(
AugmentationResult(
None,
self.print_ink_to_paper(
data,
data["ink"][-1].result.copy(),
data["paper"][-1].result.copy(),
),
),
)
# apply post phase augmentations
self.apply_phase(data, layer="post", phase=self.post_phase)
# revert to input image type
if image_type[:5] == "float":
if image_max_value == 1:
data["output"] = (data["post"][-1].result.astype(image_type)) / 255
else:
data["output"] = data["post"][-1].result.astype(image_type)
else:
data["output"] = data["post"][-1].result.astype("uint8")
# save each phase augmented images
if self.save_outputs:
self.save_images(data)
# log probability
if self.log:
self.write_log(data)
return data
[docs]
def save_images(self, data):
"""Save each augmented image in each phases to local disk.
:param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline.
:type data: dictionary
"""
layer_names = ["pre", "ink", "paper", "post"]
pre_layers = data["pre"]
ink_layers = data["ink"]
paper_layers = data["paper"]
post_layers = data["post"]
n = 0
for j, layers in enumerate([pre_layers, ink_layers, paper_layers, post_layers]):
# output path for each ink, paper and post phase images
save_path = self.save_paths[j]
# name of layer or phase
layer_name = layer_names[j]
for i, layer_data in enumerate(layers):
if layer_data.metadata is None:
result = layer_data.result
# input layer
if layer_data.augmentation is None:
augmentation_name = layer_name + "_layer_input"
cv2.imwrite(
save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png",
result,
)
n += 1
# one of
elif layer_data.augmentation.__class__.__name__ == "OneOf":
augmentation_name = "oneof_"
n = self.get_oneof_data(
layer_data.augmentation,
result,
save_path,
layer_name,
augmentation_name,
i,
n,
)
# sequence
elif layer_data.augmentation.__class__.__name__ == "AugmentationSequence":
augmentation_name = "sequence_"
n = self.get_sequence_data(
layer_data.augmentation,
result,
save_path,
layer_name,
augmentation_name,
i,
n,
)
# normal augmentations
else:
augmentation_name = layer_data.augmentation.__class__.__name__
cv2.imwrite(
save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png",
result,
)
n += 1
[docs]
def get_oneof_data(self, augmentation, result, save_path, layer_name, augmentation_name, i, n):
"""Get augmentation information from OneOf augmentation recursively or save the augmented image in disk.
:param augmentation: Augmentation object of OneOf augmentation.
:type augmentation: class instance
:param result: Augmentation output, it may be nested in a list.
:type result: list or numpy array.
:param save_path: Output path of result.
:type save_path: list
:param layer_name: Name of current layer.
:type layer_name: list
:param augmentation_name: A combined name of augmentations, seperated by _.
:type augmentation_name: list
:param i: Index of current augmentation in total number of augmentations.
:type i: int
:param n: Index of current augmented in total number of augmented images.
:type n: int
"""
current_augmentation = augmentation.augmentations[np.argmax(augmentation.augmentation_probabilities)]
# sequence inside oneof
if current_augmentation.__class__.__name__ == "AugmentationSequence":
augmentation_name += "sequence_"
n = self.get_sequence_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n)
# oneof inside oneof
elif current_augmentation.__class__.__name__ == "OneOf":
augmentation_name += "oneof_"
n = self.get_oneof_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n)
# augmentations inside oneof
else:
augmentation_name += current_augmentation.__class__.__name__
cv2.imwrite(save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png", result)
n += 1
return n
[docs]
def get_sequence_data(self, augmentation, result, save_path, layer_name, input_augmentation_name, i, n):
"""Get augmentation information from AugmentationSequence augmentation recursively or save the augmented image in disk.
:param augmentation: Augmentation object of OneOf augmentation.
:type augmentation: class instance
:param result: Augmentation output, it may be nested in a list.
:type result: list or numpy array.
:param save_path: Output path of result.
:type save_path: list
:param layer_name: Name of current layer.
:type layer_name: list
:param augmentation_name: A combined name of augmentations, seperated by _.
:type augmentation_name: list
:param i: Index of current augmentation in total number of augmentations.
:type i: int
:param n: Index of current augmented in total number of augmented images.
:type n: int
"""
s = 0
for current_augmentation, result in zip(augmentation.augmentations, augmentation.results):
augmentation_name = copy(input_augmentation_name) + str(s) + "_"
# sequence inside sequence
if current_augmentation.__class__.__name__ == "AugmentationSequence":
# sequence returns (result, self.augmentations), so get result only here
result = result[0]
augmentation_name += "sequence_"
n = self.get_sequence_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n)
# oneof inside sequence
elif current_augmentation.__class__.__name__ == "OneOf":
# oneof returns (image, [augmentation]), so get image only here
result = result[0]
augmentation_name += "oneof_"
n = self.get_oneof_data(current_augmentation, result, save_path, layer_name, augmentation_name, i, n)
# augmentations inside sequence
else:
augmentation_name += current_augmentation.__class__.__name__
if result is not None:
cv2.imwrite(
save_path + "p" + str(n) + "_" + layer_name + str(i) + "_" + augmentation_name + ".png",
result,
)
n += 1
s += 1
return n
[docs]
def write_log(self, data):
"""Save augmentations log to local disk.
:param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline.
:type data: dictionary
"""
# path to log file
log_file_name = "log_" + time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) + ".txt"
log_prob_file_path = self.log_prob_path + log_file_name
augmentation_names = data["log"]["augmentation_name"]
augmentation_status = data["log"]["augmentation_status"]
augmentation_parameters = deepcopy(data["log"]["augmentation_parameters"])
# remove image array and replace it with shape
for j, augmentation_parameter in enumerate(augmentation_parameters):
# check and convert from tuple to list
if isinstance(augmentation_parameter, tuple):
augmentation_parameter = list(augmentation_parameter)
augmentation_parameters[j] = augmentation_parameter
check_values = [augmentation_parameter]
while check_values:
value = check_values.pop(0)
if value:
if isinstance(value, list):
for i, nested_value in enumerate(value):
if hasattr(nested_value, "shape"):
value[i] = nested_value.shape
elif (
isinstance(nested_value, list)
or isinstance(nested_value, tuple)
or hasattr(nested_value, "shape")
):
# convert from tuple to list
if isinstance(nested_value, tuple):
nested_value = list(nested_value)
value[i] = nested_value
check_values.append(nested_value)
elif hasattr(value, "items"):
for parameter, nested_value in value.items():
if hasattr(nested_value, "shape"):
value[parameter] = nested_value.shape
elif (
isinstance(nested_value, list)
or isinstance(nested_value, tuple)
or hasattr(nested_value, "shape")
):
# convert from tuple to list
if isinstance(nested_value, tuple):
nested_value = list(nested_value)
value[parameter] = nested_value
check_values.append(nested_value)
with open(log_prob_file_path, "w+") as file:
for (name, status, parameters) in zip(
augmentation_names,
augmentation_status,
augmentation_parameters,
):
file.write("%s,%s,%s \n" % (name, status, parameters))
# put a space
file.write("\n")
file.close()
[docs]
def apply_phase(self, data, layer, phase):
"""Applies every augmentation in a phase.
:param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline.
:type data: dictionary
:param layer: The name of current layer or phase.
:type layer: string
:param phase: Collection of Augmentations to apply.
:type phase: base.augmentationsequence or list
"""
for augmentation in phase.augmentations:
result = data[layer][-1].result.copy()
if augmentation.should_run():
start = time.process_time() # time at start of execution
if (augmentation.__class__.__name__ == "Rescale") and layer == "post":
if len(data["pre"]):
result = augmentation(
result,
layer,
force=True,
doc_dims=data["pre"][1].result["doc_dimensions"],
original_dpi=data["pre"][1].result["original_dpi"],
)
else:
continue
else:
result = augmentation(result, layer, force=True)
end = time.process_time() # time at end of execution
elapsed = end - start # execution duration
data["log"]["time"].append((augmentation, elapsed))
else:
result = None
data["log"]["augmentation_name"].append(augmentation.__class__.__name__)
if result is None:
data["log"]["augmentation_status"].append(False)
data["log"]["augmentation_parameters"].append("")
data[layer].append(
AugmentationResult(
augmentation,
data[layer][-1].result.copy(),
'This augmentation did not run, its "result" is unchanged.',
),
)
else:
data["log"]["augmentation_status"].append(True)
data["log"]["augmentation_parameters"].append(augmentation.__dict__)
# for "OneOf" or "AugmentationSequence"
while isinstance(result, tuple) or isinstance(result, list):
result, augmentations = result
for nested_augmentation in augmentations:
data["log"]["augmentation_name"].append(
nested_augmentation.__class__.__name__,
)
data["log"]["augmentation_status"].append(True)
data["log"]["augmentation_parameters"].append(
nested_augmentation.__dict__,
)
data[layer].append(AugmentationResult(augmentation, result))
[docs]
def print_ink_to_paper(self, data, overlay, background):
"""Applies the ink layer to the paper layer.
:param data: A dictionary of AugmentationResults representing the changes in each phase of the pipeline.
:type data: dictionary
:param overlay: Foreground of overlay process, output from ink phase.
:type overlay: numpy array
:param background: Background of overlay process, output from paper phase.
:type background: numpy array
"""
if (self.ink_color_range[0] != -1) or (self.ink_color_range[1] != -1):
ink_color = random.randint(self.ink_color_range[0], self.ink_color_range[1])
else:
ink_color = -1
data["log"]["ink_color"] = ink_color
# prevent inconsistency in size between background and overlay
if overlay.shape[:2] != background.shape[:2]:
overlay_y, overlay_x = overlay.shape[:2]
background = cv2.resize(
background,
(overlay_x, overlay_y),
interpolation=cv2.INTER_AREA,
)
ink_to_paper_builder = OverlayBuilder(
overlay_types=self.overlay_type,
foreground=overlay,
background=background,
ntimes=1,
nscales=(1, 1),
edge="center",
edge_offset=0,
alpha=self.overlay_alpha,
ink_color=ink_color,
)
return ink_to_paper_builder.build_overlay()
def __repr__(self):
r = f"pre_phase = {repr(self.pre_phase)}\n\n"
r += f"ink_phase = {repr(self.ink_phase)}\n\n"
r += f"paper_phase = {repr(self.paper_phase)}\n\n"
r += f"post_phase = {repr(self.post_phase)}\n\n"
r += f"AugraphyPipeline(pre_phase , ink_phase, paper_phase, post_phase, overlay_type={self.overlay_type}, overlay_alpha={self.overlay_alpha}, ink_color_range={self.ink_color_range}, paper_color_range={self.paper_color_range}, save_outputs={self.save_outputs}, log={self.log}, random_seed={self.random_seed})"
return r
[docs]
def visualize(self):
print(repr(self))
def __call__(self, image):
return self.augment(image, return_dict=0)