diff --git a/src/labthings_picamera2/thing.py b/src/labthings_picamera2/thing.py index 2b7554f..e0d47e6 100644 --- a/src/labthings_picamera2/thing.py +++ b/src/labthings_picamera2/thing.py @@ -24,7 +24,7 @@ from typing import Annotated, Any, Iterator, Literal, Mapping, Optional, Self from contextlib import contextmanager import piexif -from scipy.ndimage import zoom +from scipy.ndimage import zoom, convolve from scipy.interpolate import interp1d from PIL import Image from threading import RLock @@ -134,6 +134,7 @@ class ImageProcessingInputs(BaseModel): @dataclass class ImageProcessingCache: white_norm: np.ndarray + white_norm_bl: np.ndarray gamma: interp1d ccm: np.ndarray @@ -172,6 +173,73 @@ def from_arrays(cls, arrays: Mapping[str, np.ndarray]) -> Self: ) +GREEN_KERNEL = np.asarray( + [[0, 0.25, 0], + [0.25, 1, 0.25], + [0, 0.25, 0]] +) + +RED_BLUE_KERNEL = np.asarray( + [[0.25, 0.5, 0.25], + [0.5, 1, 0.5], + [0.25, 0.5, 0.25]] +) + +def raw_to_8bit_bayer(raw: np.ndarray, size: tuple[int, int]) -> np.ndarray: + """Convert packed 10 bit raw to 8 bit Raw bayer data""" + raw = np.asarray(raw) # ensure it's an array + output_shape = (size[1], size[0]) + bayer8bit = np.empty(output_shape, dtype=np.uint8) + # raw_w is Raw data width in bytes which is: + # pixel width * bits_per_pixel / bits_per_byte + # This is calculated as below because the data is saved as: + # For each red line + # [8-bit R pixel, 8-bit G pixel, 8-bit R pixel, 8-bit G pixel, extra bits, ...] + # For each blue line + # [8-bit G pixel, 8-bit B pixel, 8-bit G pixel, 8-bit B pixel, extra bits, ...] + # where the extra bits are the 2 bits for the previous 4 pixels + raw_w = bayer8bit.shape[1] // 4 * 5 + # First pixel in block of 5 bytes + bayer8bit[:,::4] = raw[:, : raw_w : 5] + # 2nd pixel in block of 5 bytes + bayer8bit[:,1::4] = raw[:, 1: raw_w+1 : 5] + # 3rd pixel in block of 5 bytes + bayer8bit[:,2::4] = raw[:, 2: raw_w+2 : 5] + # 4th pixel in block of 5 bytes + bayer8bit[:,3::4] = raw[:, 3: raw_w+3 : 5] + return bayer8bit + +def bayer_masks( + shape: tuple[int, int], +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Return the Bayer red, green and blue masks + """ + + r = np.zeros(shape) + r[1::2,1::2] = 1 + g = np.zeros(shape) + g[1::2,::2] = 1 + g[::2,1::2] = 1 + b = np.zeros(shape) + b[::2,::2] = 1 + + return r,g,b + +def demosaicing_bilinear(bayer8bit: np.ndarray) -> np.ndarray: + """ + Demosaic using a bilinear algorithm taken from the library + colour_demosaicing + """ + bayer8bit.astype(np.double) + + r_mask, g_mask, b_mask = bayer_masks(bayer8bit.shape) + + r = convolve(bayer8bit * r_mask, RED_BLUE_KERNEL) + g = convolve(bayer8bit * g_mask, GREEN_KERNEL) + b = convolve(bayer8bit * b_mask, RED_BLUE_KERNEL) + + return np.dstack((r,g,b)).astype(np.uint8) def raw2rggb(raw: np.ndarray, size: tuple[int, int]) -> np.ndarray: """Convert packed 10 bit raw to RGGB 8 bit""" @@ -614,10 +682,17 @@ def generate_image_processing_cache( white_norm = zoom(p.white_norm_lores, zoom_factors, order=1)[ : (p.raw_size[1]//2), : (p.raw_size[0]//2), : ] + zoom_factors_bl = [ + i / n for i, n in zip(p.raw_size[::-1], p.white_norm_lores.shape[:2]) + ] + [1] + white_norm_bl = zoom(p.white_norm_lores, zoom_factors_bl, order=1)[ + : (p.raw_size[1]), : (p.raw_size[0]), : + ] ccm = np.array(p.colour_correction_matrix).reshape((3,3)) gamma = interp1d(p.gamma[:, 0] / 255, p.gamma[:, 1] / 255) return ImageProcessingCache( white_norm=white_norm, + white_norm_bl=white_norm_bl, ccm = ccm, gamma = gamma, ) @@ -642,6 +717,7 @@ def process_raw_array( self, raw: RawImageModel, use_cache: bool = False, + bilinear_demosaic: bool = True, )->NDArray: """Convert a raw image to a processed array""" if not use_cache: @@ -659,8 +735,12 @@ def process_raw_array( assert raw.format == "SBGGR10_CSI2P" buffer = np.frombuffer(raw.image_data.content, dtype=np.uint8) packed = buffer.reshape((-1, raw.stride)) - rgb = rggb2rgb(raw2rggb(packed, raw.size)) - normed = rgb / p.white_norm + if bilinear_demosaic: + rgb = demosaicing_bilinear(raw_to_8bit_bayer(packed, raw.size)) + normed = rgb / p.white_norm_bl + else: + rgb = rggb2rgb(raw2rggb(packed, raw.size)) + normed = rgb / p.white_norm corrected = np.dot( p.ccm, normed.reshape((-1, 3)).T ).T.reshape(normed.shape) @@ -670,9 +750,18 @@ def process_raw_array( return processed_image.astype(np.uint8) @thing_action - def raw_to_png(self, raw: RawImageModel, use_cache: bool = False)->PNGBlob: + def raw_to_png( + self, + raw: RawImageModel, + use_cache: bool = False, + bilinear_demosaic: bool = True, + )->PNGBlob: """Process a raw image to a PNG""" - arr = self.process_raw_array(raw=raw, use_cache=use_cache) + arr = self.process_raw_array( + raw=raw, + use_cache=use_cache, + bilinear_demosaic=bilinear_demosaic + ) image = Image.fromarray(arr.astype(np.uint8), mode="RGB") out = io.BytesIO() image.save(out, format="png")