Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 94 additions & 5 deletions src/labthings_picamera2/thing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Annotated, Any, Iterator, Literal, Mapping, Optional, Self
from contextlib import contextmanager
import piexif
from scipy.ndimage import zoom
from scipy.ndimage import zoom, convolve
from scipy.interpolate import interp1d
from PIL import Image
from threading import RLock
Expand Down Expand Up @@ -134,6 +134,7 @@ class ImageProcessingInputs(BaseModel):
@dataclass
class ImageProcessingCache:
white_norm: np.ndarray
white_norm_bl: np.ndarray
gamma: interp1d
ccm: np.ndarray

Expand Down Expand Up @@ -172,6 +173,73 @@ def from_arrays(cls, arrays: Mapping[str, np.ndarray]) -> Self:
)


GREEN_KERNEL = np.asarray(
[[0, 0.25, 0],
[0.25, 1, 0.25],
[0, 0.25, 0]]
)

RED_BLUE_KERNEL = np.asarray(
[[0.25, 0.5, 0.25],
[0.5, 1, 0.5],
[0.25, 0.5, 0.25]]
)

def raw_to_8bit_bayer(raw: np.ndarray, size: tuple[int, int]) -> np.ndarray:
"""Convert packed 10 bit raw to 8 bit Raw bayer data"""
raw = np.asarray(raw) # ensure it's an array
output_shape = (size[1], size[0])
bayer8bit = np.empty(output_shape, dtype=np.uint8)
# raw_w is Raw data width in bytes which is:
# pixel width * bits_per_pixel / bits_per_byte
# This is calculated as below because the data is saved as:
# For each red line
# [8-bit R pixel, 8-bit G pixel, 8-bit R pixel, 8-bit G pixel, extra bits, ...]
# For each blue line
# [8-bit G pixel, 8-bit B pixel, 8-bit G pixel, 8-bit B pixel, extra bits, ...]
# where the extra bits are the 2 bits for the previous 4 pixels
raw_w = bayer8bit.shape[1] // 4 * 5
# First pixel in block of 5 bytes
bayer8bit[:,::4] = raw[:, : raw_w : 5]
# 2nd pixel in block of 5 bytes
bayer8bit[:,1::4] = raw[:, 1: raw_w+1 : 5]
# 3rd pixel in block of 5 bytes
bayer8bit[:,2::4] = raw[:, 2: raw_w+2 : 5]
# 4th pixel in block of 5 bytes
bayer8bit[:,3::4] = raw[:, 3: raw_w+3 : 5]
return bayer8bit

def bayer_masks(
shape: tuple[int, int],
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Return the Bayer red, green and blue masks
"""

r = np.zeros(shape)
r[1::2,1::2] = 1
g = np.zeros(shape)
g[1::2,::2] = 1
g[::2,1::2] = 1
b = np.zeros(shape)
b[::2,::2] = 1

return r,g,b

def demosaicing_bilinear(bayer8bit: np.ndarray) -> np.ndarray:
"""
Demosaic using a bilinear algorithm taken from the library
colour_demosaicing
"""
bayer8bit.astype(np.double)

r_mask, g_mask, b_mask = bayer_masks(bayer8bit.shape)

r = convolve(bayer8bit * r_mask, RED_BLUE_KERNEL)
g = convolve(bayer8bit * g_mask, GREEN_KERNEL)
b = convolve(bayer8bit * b_mask, RED_BLUE_KERNEL)

return np.dstack((r,g,b)).astype(np.uint8)

def raw2rggb(raw: np.ndarray, size: tuple[int, int]) -> np.ndarray:
"""Convert packed 10 bit raw to RGGB 8 bit"""
Expand Down Expand Up @@ -614,10 +682,17 @@ def generate_image_processing_cache(
white_norm = zoom(p.white_norm_lores, zoom_factors, order=1)[
: (p.raw_size[1]//2), : (p.raw_size[0]//2), :
]
zoom_factors_bl = [
i / n for i, n in zip(p.raw_size[::-1], p.white_norm_lores.shape[:2])
] + [1]
white_norm_bl = zoom(p.white_norm_lores, zoom_factors_bl, order=1)[
: (p.raw_size[1]), : (p.raw_size[0]), :
]
ccm = np.array(p.colour_correction_matrix).reshape((3,3))
gamma = interp1d(p.gamma[:, 0] / 255, p.gamma[:, 1] / 255)
return ImageProcessingCache(
white_norm=white_norm,
white_norm_bl=white_norm_bl,
ccm = ccm,
gamma = gamma,
)
Expand All @@ -642,6 +717,7 @@ def process_raw_array(
self,
raw: RawImageModel,
use_cache: bool = False,
bilinear_demosaic: bool = True,
)->NDArray:
"""Convert a raw image to a processed array"""
if not use_cache:
Expand All @@ -659,8 +735,12 @@ def process_raw_array(
assert raw.format == "SBGGR10_CSI2P"
buffer = np.frombuffer(raw.image_data.content, dtype=np.uint8)
packed = buffer.reshape((-1, raw.stride))
rgb = rggb2rgb(raw2rggb(packed, raw.size))
normed = rgb / p.white_norm
if bilinear_demosaic:
rgb = demosaicing_bilinear(raw_to_8bit_bayer(packed, raw.size))
normed = rgb / p.white_norm_bl
else:
rgb = rggb2rgb(raw2rggb(packed, raw.size))
normed = rgb / p.white_norm
corrected = np.dot(
p.ccm, normed.reshape((-1, 3)).T
).T.reshape(normed.shape)
Expand All @@ -670,9 +750,18 @@ def process_raw_array(
return processed_image.astype(np.uint8)

@thing_action
def raw_to_png(self, raw: RawImageModel, use_cache: bool = False)->PNGBlob:
def raw_to_png(
self,
raw: RawImageModel,
use_cache: bool = False,
bilinear_demosaic: bool = True,
)->PNGBlob:
"""Process a raw image to a PNG"""
arr = self.process_raw_array(raw=raw, use_cache=use_cache)
arr = self.process_raw_array(
raw=raw,
use_cache=use_cache,
bilinear_demosaic=bilinear_demosaic
)
image = Image.fromarray(arr.astype(np.uint8), mode="RGB")
out = io.BytesIO()
image.save(out, format="png")
Expand Down