import base64
import warnings
from typing import Optional, Tuple, Union
import numpy as np
import rasterio
import rasterio.env
import rasterio.session
import xarray as xr
from ._interop import is_dask_collection
from ._rgba import replace_transparent_pixels
# Driver, Compress Option Name
_fmt_info = {
"png": ("PNG", "zlevel", "image/png"),
"jpeg": ("JPEG", "quality", "image/jpeg"),
"webp": ("WEBP", "quality", "image/webp"),
}
def _verify_can_compress(xx: xr.DataArray):
"""Returns error if array dimensions are not suitable for compress"""
if xx.ndim > 2:
xx = xx.squeeze()
if xx.ndim not in (2, 3):
raise ValueError(
f"Expected a 2 or 3 dimensional array; got {xx.ndim} dimensions {xx.dims}."
)
def _compress_image(im: np.ndarray, driver="PNG", **opts) -> bytes:
if im.ndim > 2:
im = np.squeeze(im)
if im.ndim == 3:
h, w, nc = im.shape # type: ignore
bands = np.transpose(im, axes=(2, 0, 1)) # Y,X,B -> B,Y,X
elif im.ndim == 2:
(h, w), nc = im.shape, 1
bands = im.reshape(nc, h, w)
else:
raise ValueError(
f"Expected a 2 or 3 dimensional array; got {im.ndim} dimensions."
)
rio_opts = {
"width": w,
"height": h,
"count": nc,
"driver": driver,
"dtype": im.dtype,
**opts,
}
with warnings.catch_warnings():
warnings.simplefilter("ignore", rasterio.errors.NotGeoreferencedWarning)
with rasterio.env.Env(session=rasterio.session.DummySession()):
with rasterio.MemoryFile() as mem:
with mem.open(**rio_opts) as dst:
dst.write(bands)
return mem.read()
[docs]
def compress(
xx,
/,
*args,
as_data_url=False,
transparent: Optional[Tuple[int, int, int]] = None,
**kw,
) -> Union[str, bytes]:
"""
Save image to RAM in jpeg/png/webp format.
.. code:: python
png_bytes = compress(xx) # default is PNG
png_bytes = compress(xx, "png", 9) # compression settings
# - Make data url with JPEG quality of 85
# - Use black for transparent pixels (JPEG doesn't support transparency)
# - Result is an ASCII string
url = compress(xx, "jpeg", 85, as_data_url=True, transparent=(0,0,0))
:param xx: DataArray to compress
:param transparent:
Pixel value to use for transparent pixels, useful for jpeg output.
"""
# Raise error early if xx has unsuitable dims
_verify_can_compress(xx)
fmt = "png"
opts = {}
if len(args) >= 1:
fmt, *_ = args
driver, opt_name, mime_type = _fmt_info.get(fmt.lower(), (None, "", ""))
if driver is None:
raise ValueError(f"Format '{fmt}', is not suported, try jpeg/png/webp")
if len(args) >= 2:
_, comp, *_ = args
opts[opt_name] = comp
if isinstance(xx, xr.DataArray):
xx = xx.data
if is_dask_collection(xx):
xx = xx.compute()
if transparent is not None:
xx = replace_transparent_pixels(xx, transparent)
_bytes = _compress_image(xx, driver, **opts, **kw)
if not as_data_url:
return _bytes
return f"data:{mime_type};base64," + base64.encodebytes(_bytes).decode("ascii")