Skip to content

chla module

Module for training and evaluating the VAE model for Chl-a concentration estimation.

VAE (Module)

Source code in hypercoast/chla.py
class VAE(nn.Module):
    def __init__(self, input_dim: int, output_dim: int) -> None:
        """Initializes the VAE model with encoder and decoder layers.

        Args:
            input_dim (int): Dimension of the input features.
            output_dim (int): Dimension of the output features.
        """
        super().__init__()

        # encoder
        self.encoder_layer = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, 64),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.2),
        )

        self.fc1 = nn.Linear(64, 32)
        self.fc2 = nn.Linear(64, 32)

        # decoder
        self.decoder = nn.Sequential(
            nn.Linear(32, 64),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, 64),
            nn.BatchNorm1d(64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, output_dim),
            nn.Softplus(),
        )

    def encode(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        """Encodes the input tensor into mean and log variance.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            tuple[torch.Tensor, torch.Tensor]: Mean and log variance tensors.
        """
        x = self.encoder_layer(x)
        mu = self.fc1(x)
        log_var = self.fc2(x)
        return mu, log_var

    def reparameterize(self, mu: torch.Tensor, log_var: torch.Tensor) -> torch.Tensor:
        """Applies the reparameterization trick to sample from the latent space.

        Args:
            mu (torch.Tensor): Mean tensor.
            log_var (torch.Tensor): Log variance tensor.

        Returns:
            torch.Tensor: Sampled latent vector.
        """
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        z = mu + eps * std
        return z

    def decode(self, z: torch.Tensor) -> torch.Tensor:
        """Decodes the latent vector back to the original space.

        Args:
            z (torch.Tensor): Latent vector.

        Returns:
            torch.Tensor: Reconstructed tensor.
        """
        return self.decoder(z)

    def forward(
        self, x: torch.Tensor
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """Performs a forward pass through the VAE model.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            tuple[torch.Tensor, torch.Tensor, torch.Tensor]: Reconstructed tensor,
                mean, and log variance.
        """
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        x_reconstructed = self.decode(z)
        return x_reconstructed, mu, log_var

__init__(self, input_dim, output_dim) special

Initializes the VAE model with encoder and decoder layers.

Parameters:

Name Type Description Default
input_dim int

Dimension of the input features.

required
output_dim int

Dimension of the output features.

required
Source code in hypercoast/chla.py
def __init__(self, input_dim: int, output_dim: int) -> None:
    """Initializes the VAE model with encoder and decoder layers.

    Args:
        input_dim (int): Dimension of the input features.
        output_dim (int): Dimension of the output features.
    """
    super().__init__()

    # encoder
    self.encoder_layer = nn.Sequential(
        nn.Linear(input_dim, 64),
        nn.BatchNorm1d(64),
        nn.LeakyReLU(0.2),
        nn.Linear(64, 64),
        nn.BatchNorm1d(64),
        nn.LeakyReLU(0.2),
    )

    self.fc1 = nn.Linear(64, 32)
    self.fc2 = nn.Linear(64, 32)

    # decoder
    self.decoder = nn.Sequential(
        nn.Linear(32, 64),
        nn.BatchNorm1d(64),
        nn.LeakyReLU(0.2),
        nn.Linear(64, 64),
        nn.BatchNorm1d(64),
        nn.LeakyReLU(0.2),
        nn.Linear(64, output_dim),
        nn.Softplus(),
    )

decode(self, z)

Decodes the latent vector back to the original space.

Parameters:

Name Type Description Default
z torch.Tensor

Latent vector.

required

Returns:

Type Description
torch.Tensor

Reconstructed tensor.

Source code in hypercoast/chla.py
def decode(self, z: torch.Tensor) -> torch.Tensor:
    """Decodes the latent vector back to the original space.

    Args:
        z (torch.Tensor): Latent vector.

    Returns:
        torch.Tensor: Reconstructed tensor.
    """
    return self.decoder(z)

encode(self, x)

Encodes the input tensor into mean and log variance.

Parameters:

Name Type Description Default
x torch.Tensor

Input tensor.

required

Returns:

Type Description
tuple[torch.Tensor, torch.Tensor]

Mean and log variance tensors.

Source code in hypercoast/chla.py
def encode(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
    """Encodes the input tensor into mean and log variance.

    Args:
        x (torch.Tensor): Input tensor.

    Returns:
        tuple[torch.Tensor, torch.Tensor]: Mean and log variance tensors.
    """
    x = self.encoder_layer(x)
    mu = self.fc1(x)
    log_var = self.fc2(x)
    return mu, log_var

forward(self, x)

Performs a forward pass through the VAE model.

Parameters:

Name Type Description Default
x torch.Tensor

Input tensor.

required

Returns:

Type Description
tuple[torch.Tensor, torch.Tensor, torch.Tensor]

Reconstructed tensor, mean, and log variance.

Source code in hypercoast/chla.py
def forward(
    self, x: torch.Tensor
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    """Performs a forward pass through the VAE model.

    Args:
        x (torch.Tensor): Input tensor.

    Returns:
        tuple[torch.Tensor, torch.Tensor, torch.Tensor]: Reconstructed tensor,
            mean, and log variance.
    """
    mu, log_var = self.encode(x)
    z = self.reparameterize(mu, log_var)
    x_reconstructed = self.decode(z)
    return x_reconstructed, mu, log_var

reparameterize(self, mu, log_var)

Applies the reparameterization trick to sample from the latent space.

Parameters:

Name Type Description Default
mu torch.Tensor

Mean tensor.

required
log_var torch.Tensor

Log variance tensor.

required

Returns:

Type Description
torch.Tensor

Sampled latent vector.

Source code in hypercoast/chla.py
def reparameterize(self, mu: torch.Tensor, log_var: torch.Tensor) -> torch.Tensor:
    """Applies the reparameterization trick to sample from the latent space.

    Args:
        mu (torch.Tensor): Mean tensor.
        log_var (torch.Tensor): Log variance tensor.

    Returns:
        torch.Tensor: Sampled latent vector.
    """
    std = torch.exp(0.5 * log_var)
    eps = torch.randn_like(std)
    z = mu + eps * std
    return z

calculate_metrics(predictions, actuals, threshold=0.8)

Calculates epsilon, beta, and additional metrics (RMSE, RMSLE, MAPE, Bias, MAE).

Parameters:

Name Type Description Default
predictions np.ndarray

Predicted values.

required
actuals np.ndarray

Actual values.

required
threshold float

Relative error threshold. Defaults to 0.8.

0.8

Returns:

Type Description
tuple[float, float, float, float, float, float, float]

epsilon, beta, rmse, rmsle, mape, bias, mae.

Source code in hypercoast/chla.py
def calculate_metrics(
    predictions: np.ndarray, actuals: np.ndarray, threshold: float = 0.8
) -> tuple[float, float, float, float, float, float, float]:
    """Calculates epsilon, beta, and additional metrics (RMSE, RMSLE, MAPE, Bias, MAE).

    Args:
        predictions (np.ndarray): Predicted values.
        actuals (np.ndarray): Actual values.
        threshold (float, optional): Relative error threshold. Defaults to 0.8.

    Returns:
        tuple[float, float, float, float, float, float, float]: epsilon, beta, rmse, rmsle, mape, bias, mae.
    """
    # Apply the threshold to filter out predictions with large relative error
    mask = np.abs(predictions - actuals) / np.abs(actuals + 1e-10) < threshold
    filtered_predictions = predictions[mask]
    filtered_actuals = actuals[mask]

    # Calculate epsilon and beta
    log_ratios = np.log10(filtered_predictions / filtered_actuals)
    Y = np.median(np.abs(log_ratios))
    Z = np.median(log_ratios)
    epsilon = 50 * (10**Y - 1)
    beta = 50 * np.sign(Z) * (10 ** np.abs(Z) - 1)

    # Calculate additional metrics
    rmse = np.sqrt(np.mean((filtered_predictions - filtered_actuals) ** 2))
    rmsle = np.sqrt(
        np.mean(
            (np.log10(filtered_predictions + 1) - np.log10(filtered_actuals + 1)) ** 2
        )
    )
    mape = 50 * np.median(
        np.abs((filtered_predictions - filtered_actuals) / filtered_actuals)
    )
    bias = 10 ** (np.mean(np.log10(filtered_predictions) - np.log10(filtered_actuals)))
    mae = 10 ** np.mean(
        np.abs(np.log10(filtered_predictions) - np.log10(filtered_actuals))
    )

    return epsilon, beta, rmse, rmsle, mape, bias, mae

chla_predict(pace_filepath, best_model_path, chla_data_file=None, device=None)

Predicts chlorophyll-a concentration using a pre-trained VAE model.

Parameters:

Name Type Description Default
pace_filepath str

Path to the PACE dataset file.

required
best_model_path str

Path to the pre-trained VAE model file.

required
chla_data_file str

Path to save the predicted chlorophyll-a data. Defaults to None.

None
device torch.device

Device to perform inference on. Defaults to None.

None
Source code in hypercoast/chla.py
def chla_predict(
    pace_filepath: str,
    best_model_path: str,
    chla_data_file: str = None,
    device: torch.device = None,
) -> None:
    """Predicts chlorophyll-a concentration using a pre-trained VAE model.

    Args:
        pace_filepath (str): Path to the PACE dataset file.
        best_model_path (str): Path to the pre-trained VAE model file.
        chla_data_file (str, optional): Path to save the predicted chlorophyll-a data. Defaults to None.
        device (torch.device, optional): Device to perform inference on. Defaults to None.
    """

    from .pace import read_pace

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load PACE dataset and prepare data
    PACE_dataset = read_pace(pace_filepath)
    da = PACE_dataset["Rrs"]
    wl = da.wavelength.values
    Rrs = da.values
    latitude = da.latitude.values
    longitude = da.longitude.values

    # Filter wavelengths between 400 and 703 nm
    indices = np.where((wl >= 400) & (wl <= 703))[0]
    filtered_Rrs = Rrs[:, :, indices]

    # Save filtered Rrs and wavelength
    filtered_wl = wl[indices]

    # Create a mask that is 1 where all wavelengths for a given pixel have non-NaN values, and 0 otherwise
    mask = np.all(~np.isnan(filtered_Rrs), axis=2).astype(int)

    # Define input and output dimensions
    input_dim = 148
    output_dim = 1

    # Load test data and mask
    test_data = filtered_Rrs
    mask_data = mask

    # Filter valid data using the mask
    mask = mask_data == 1
    N = np.sum(mask)
    valid_test_data = test_data[mask]

    # Normalize data
    valid_test_data = np.array(
        [
            (
                MinMaxScaler(feature_range=(1, 10))
                .fit_transform(row.reshape(-1, 1))
                .flatten()
                if not np.isnan(row).any()
                else row
            )
            for row in valid_test_data
        ]
    )
    valid_test_data = valid_test_data.reshape(N, input_dim)

    # Create DataLoader for test data
    test_tensor = TensorDataset(torch.tensor(valid_test_data).float())
    test_loader = DataLoader(test_tensor, batch_size=2048, shuffle=False)

    # Load the pre-trained VAE model
    model = VAE(input_dim, output_dim).to(device)
    model.load_state_dict(torch.load(best_model_path, map_location=device))
    model.eval()

    # Perform inference
    predictions_all = []
    with torch.no_grad():
        for batch in test_loader:
            batch = batch[0].to(device)
            predictions, _, _ = model(batch)  # VAE model inference
            predictions_all.append(predictions.cpu().numpy())

    # Concatenate all batch predictions
    predictions_all = np.vstack(predictions_all)

    # Ensure predictions are in the correct shape
    if predictions_all.shape[-1] == 1:
        predictions_all = predictions_all.squeeze(-1)
    # if predictions_all.dim() == 3:
    # all_outputs = predictions_all.squeeze(1)

    # Initialize output array with NaNs
    outputs = np.full((test_data.shape[0], test_data.shape[1]), np.nan)

    # Fill in the valid mask positions with predictions
    outputs[mask] = predictions_all

    # Flatten latitude, longitude, and predictions for output
    lat_flat = latitude.flatten()
    lon_flat = longitude.flatten()
    output_flat = outputs.flatten()

    # Combine latitude, longitude, and predictions
    final_output = np.column_stack((lat_flat, lon_flat, output_flat))

    # Save the final output including latitude and longitude
    if chla_data_file is None:
        chla_data_file = pace_filepath.replace(".nc", ".npy")
    np.save(chla_data_file, final_output)

chla_viz(rgb_image_tif_file, chla_data_file, output_file, title='PACE', figsize=(12, 8), cmap='jet')

Visualizes the chlorophyll-a concentration over an RGB image.

Parameters:

Name Type Description Default
rgb_image_tif_file str

Path to the RGB image file.

required
chla_data_file str

Path to the chlorophyll-a data file.

required
output_file str

Path to save the output visualization.

required
title str

Title of the plot. Defaults to "PACE".

'PACE'
figsize tuple

Figure size for the plot. Defaults to (12, 8).

(12, 8)
cmap str

Colormap for the chlorophyll-a concentration. Defaults to "jet".

'jet'
Source code in hypercoast/chla.py
def chla_viz(
    rgb_image_tif_file: str,
    chla_data_file: str,
    output_file: str,
    title: str = "PACE",
    figsize: tuple = (12, 8),
    cmap: str = "jet",
) -> None:
    """Visualizes the chlorophyll-a concentration over an RGB image.

    Args:
        rgb_image_tif_file (str): Path to the RGB image file.
        chla_data_file (str): Path to the chlorophyll-a data file.
        output_file (str): Path to save the output visualization.
        title (str, optional): Title of the plot. Defaults to "PACE".
        figsize (tuple, optional): Figure size for the plot. Defaults to (12, 8).
        cmap (str, optional): Colormap for the chlorophyll-a concentration. Defaults to "jet".
    """

    # Read RGB Image
    # rgb_image_tif_file = "data/snapshot-2024-08-10T00_00_00Z.tif"

    with rasterio.open(rgb_image_tif_file) as dataset:
        # Read R、G、B bands
        R = dataset.read(1)
        G = dataset.read(2)
        B = dataset.read(3)

        # # Get geographic extent, resolution information.
        extent = [
            dataset.bounds.left,
            dataset.bounds.right,
            dataset.bounds.bottom,
            dataset.bounds.top,
        ]
        transform = dataset.transform
        width, height = dataset.width, dataset.height

    # Combine the R, G, B bands into a 3D array.
    rgb_image = np.stack((R, G, B), axis=-1)

    # Load Chla data
    chla_data = np.load(chla_data_file)
    # chla_data = final_output

    # Extract the latitude, longitude, and concentration values of the chlorophyll-a data.
    latitude = chla_data[:, 0]
    longitude = chla_data[:, 1]
    chla_values = chla_data[:, 2]

    # Extract the pixels within the geographic extent of the RGB image.
    mask = (
        (latitude >= extent[2])
        & (latitude <= extent[3])
        & (longitude >= extent[0])
        & (longitude <= extent[1])
    )
    latitude = latitude[mask]
    longitude = longitude[mask]
    chla_values = chla_values[mask]

    # Create a grid with the same resolution as the RGB image.
    grid_lon = np.linspace(extent[0], extent[1], width)
    grid_lat = np.linspace(extent[3], extent[2], height)
    grid_lon, grid_lat = np.meshgrid(grid_lon, grid_lat)

    # Resample the chlorophyll-a data to the size of the RGB image using interpolation.
    chla_resampled = griddata(
        (longitude, latitude), chla_values, (grid_lon, grid_lat), method="linear"
    )

    # Keep NaN values as transparent regions.
    chla_resampled = np.ma.masked_invalid(chla_resampled)

    plt.figure(figsize=figsize)

    plt.imshow(rgb_image / 255.0, extent=extent, origin="upper")

    vmin, vmax = 0, 35
    im = plt.imshow(
        chla_resampled,
        extent=extent,
        cmap=cmap,
        alpha=0.6,
        origin="upper",
        vmin=vmin,
        vmax=vmax,
    )

    cbar = plt.colorbar(im, orientation="horizontal")
    cbar.set_label("Chlorophyll-a Concentration (mg/m³)")

    plt.title(title)
    plt.xlabel("Longitude")
    plt.ylabel("Latitude")

    # output_file = "20241024-2.png"
    plt.savefig(output_file, dpi=300, bbox_inches="tight", pad_inches=0.1)
    print(f"Saved overlay image to {output_file}")

    plt.show()

evaluate(model, test_dl, device=None)

Evaluates the VAE model.

Parameters:

Name Type Description Default
model VAE

VAE model to be evaluated.

required
test_dl DataLoader

DataLoader for test data.

required
device torch.device

Device to evaluate on. Defaults to None.

None

Returns:

Type Description
tuple[np.ndarray, np.ndarray]

Predictions and actual values.

Source code in hypercoast/chla.py
def evaluate(
    model: VAE, test_dl: DataLoader, device: torch.device = None
) -> tuple[np.ndarray, np.ndarray]:
    """Evaluates the VAE model.

    Args:
        model (VAE): VAE model to be evaluated.
        test_dl (DataLoader): DataLoader for test data.
        device (torch.device, optional): Device to evaluate on. Defaults to None.

    Returns:
        tuple[np.ndarray, np.ndarray]: Predictions and actual values.
    """

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.eval()
    predictions, actuals = [], []
    with torch.no_grad():
        for x, y in test_dl:
            x, y = x.to(device), y.to(device)
            y_pred, _, _ = model(x)
            predictions.append(y_pred.cpu().numpy())
            actuals.append(y.cpu().numpy())
    return np.vstack(predictions), np.vstack(actuals)

load_real_data(aphy_file_path, rrs_file_path)

Loads and preprocesses real data for training and testing.

Parameters:

Name Type Description Default
aphy_file_path str

Path to the aphy file.

required
rrs_file_path str

Path to the rrs file.

required

Returns:

Type Description
tuple[DataLoader, DataLoader, int, int]

Training DataLoader, testing DataLoader, input dimension, output dimension.

Source code in hypercoast/chla.py
def load_real_data(
    aphy_file_path: str, rrs_file_path: str
) -> tuple[DataLoader, DataLoader, int, int]:
    """Loads and preprocesses real data for training and testing.

    Args:
        aphy_file_path (str): Path to the aphy file.
        rrs_file_path (str): Path to the rrs file.

    Returns:
        tuple[DataLoader, DataLoader, int, int]: Training DataLoader, testing
            DataLoader, input dimension, output dimension.
    """
    array1 = np.loadtxt(aphy_file_path, delimiter=",", dtype=float)
    array2 = np.loadtxt(rrs_file_path, delimiter=",", dtype=float)

    array1 = array1.reshape(-1, 1)

    Rrs_real = array2
    Chl_real = array1

    input_dim = Rrs_real.shape[1]
    output_dim = Chl_real.shape[1]

    scalers_Rrs_real = [
        MinMaxScaler(feature_range=(1, 10)) for _ in range(Rrs_real.shape[0])
    ]

    Rrs_real_normalized = np.array(
        [
            scalers_Rrs_real[i].fit_transform(row.reshape(-1, 1)).flatten()
            for i, row in enumerate(Rrs_real)
        ]
    )

    Rrs_real_tensor = torch.tensor(Rrs_real_normalized, dtype=torch.float32)
    Chl_real_tensor = torch.tensor(Chl_real, dtype=torch.float32)

    dataset_real = TensorDataset(Rrs_real_tensor, Chl_real_tensor)

    train_size = int(0.7 * len(dataset_real))
    test_size = len(dataset_real) - train_size
    train_dataset_real, test_dataset_real = random_split(
        dataset_real, [train_size, test_size]
    )

    train_real_dl = DataLoader(
        train_dataset_real, batch_size=1024, shuffle=True, num_workers=12
    )
    test_real_dl = DataLoader(
        test_dataset_real, batch_size=1024, shuffle=False, num_workers=12
    )

    return train_real_dl, test_real_dl, input_dim, output_dim

load_real_test(aphy_file_path, rrs_file_path)

Loads and preprocesses real data for testing.

Parameters:

Name Type Description Default
aphy_file_path str

Path to the aphy file.

required
rrs_file_path str

Path to the rrs file.

required

Returns:

Type Description
tuple[DataLoader, int, int]

Testing DataLoader, input dimension, output dimension.

Source code in hypercoast/chla.py
def load_real_test(
    aphy_file_path: str, rrs_file_path: str
) -> tuple[DataLoader, int, int]:
    """Loads and preprocesses real data for testing.

    Args:
        aphy_file_path (str): Path to the aphy file.
        rrs_file_path (str): Path to the rrs file.

    Returns:
        tuple[DataLoader, int, int]: Testing DataLoader, input dimension, output dimension.
    """
    array1 = np.loadtxt(aphy_file_path, delimiter=",", dtype=float)
    array2 = np.loadtxt(rrs_file_path, delimiter=",", dtype=float)

    array1 = array1.reshape(-1, 1)

    Rrs_real = array2
    Chl_real = array1

    input_dim = Rrs_real.shape[1]
    output_dim = Chl_real.shape[1]

    scalers_Rrs_real = [
        MinMaxScaler(feature_range=(1, 10)) for _ in range(Rrs_real.shape[0])
    ]

    Rrs_real_normalized = np.array(
        [
            scalers_Rrs_real[i].fit_transform(row.reshape(-1, 1)).flatten()
            for i, row in enumerate(Rrs_real)
        ]
    )

    Rrs_real_tensor = torch.tensor(Rrs_real_normalized, dtype=torch.float32)
    Chl_real_tensor = torch.tensor(Chl_real, dtype=torch.float32)

    dataset_real = TensorDataset(Rrs_real_tensor, Chl_real_tensor)
    dataset_size = int(len(dataset_real))
    test_real_dl = DataLoader(
        dataset_real, batch_size=dataset_size, shuffle=False, num_workers=12
    )

    return test_real_dl, input_dim, output_dim

loss_function(recon_x, x, mu, log_var)

Computes the VAE loss function.

Parameters:

Name Type Description Default
recon_x torch.Tensor

Reconstructed tensor.

required
x torch.Tensor

Original input tensor.

required
mu torch.Tensor

Mean tensor.

required
log_var torch.Tensor

Log variance tensor.

required

Returns:

Type Description
torch.Tensor

Computed loss.

Source code in hypercoast/chla.py
def loss_function(
    recon_x: torch.Tensor, x: torch.Tensor, mu: torch.Tensor, log_var: torch.Tensor
) -> torch.Tensor:
    """Computes the VAE loss function.

    Args:
        recon_x (torch.Tensor): Reconstructed tensor.
        x (torch.Tensor): Original input tensor.
        mu (torch.Tensor): Mean tensor.
        log_var (torch.Tensor): Log variance tensor.

    Returns:
        torch.Tensor: Computed loss.
    """
    L1 = F.l1_loss(recon_x, x, reduction="mean")
    BCE = F.mse_loss(recon_x, x, reduction="mean")
    KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
    return L1

npy_to_geotiff(npy_path, out_tif, resolution_m=1000, method='linear', nodata_val=-9999.0, bbox_padding=0.0, lat_col=0, lon_col=1, band_cols=None, band_names=None, wavelengths=None, crs='EPSG:4326', compress='deflate', bigtiff='IF_SAFER')

Convert [lat, lon, band1, band2, ...] scattered points in .npy into a multi-band GeoTIFF (EPSG:4326).

Parameters:

Name Type Description Default
npy_path str

Path to the .npy file.

required
out_tif str

Path to the output GeoTIFF file.

required
resolution_m int

Resolution in meters per pixel. Defaults to 1000.

1000
method str

Method for interpolation. Defaults to "linear".

'linear'
nodata_val float

Value to fill NaN values. Defaults to -9999.0.

-9999.0
bbox_padding float

Padding around the bounding box. Defaults to 0.0.

0.0
lat_col int

Column index for latitude. Defaults to 0.

0
lon_col int

Column index for longitude. Defaults to 1.

1
band_cols list

Columns to rasterize as bands. Defaults to None.

None
band_names list

Band descriptions. Defaults to None.

None
wavelengths list

Wavelengths. Defaults to None.

None
crs str

Coordinate reference system. Defaults to "EPSG:4326".

'EPSG:4326'
compress str

Compression method. Defaults to "deflate".

'deflate'
bigtiff str

BigTIFF mode. Defaults to "IF_SAFER".

'IF_SAFER'
Source code in hypercoast/chla.py
def npy_to_geotiff(
    npy_path,
    out_tif,
    resolution_m=1000,  # meters per pixel (lat exact, lon adjusted by cos(lat))
    method="linear",  # 'linear' | 'nearest' | 'cubic'
    nodata_val=-9999.0,
    bbox_padding=0.0,  # degrees padding around bbox
    lat_col=0,
    lon_col=1,
    band_cols=None,  # None=use all columns after lat/lon; or list like [2,3,5,...]
    band_names=None,  # Optional list of per-band descriptions, same length as output bands
    wavelengths=None,  # Optional list; if given, will be used in descriptions like "aphy_440"
    crs="EPSG:4326",
    compress="deflate",
    bigtiff="IF_SAFER",  # 'YES'|'NO'|'IF_NEEDED'|'IF_SAFER'
):
    """
    Convert [lat, lon, band1, band2, ...] scattered points in .npy into a multi-band GeoTIFF (EPSG:4326).

    Args:
        npy_path (str): Path to the .npy file.
        out_tif (str): Path to the output GeoTIFF file.
        resolution_m (int, optional): Resolution in meters per pixel. Defaults to 1000.
        method (str, optional): Method for interpolation. Defaults to "linear".
        nodata_val (float, optional): Value to fill NaN values. Defaults to -9999.0.
        bbox_padding (float, optional): Padding around the bounding box. Defaults to 0.0.
        lat_col (int, optional): Column index for latitude. Defaults to 0.
        lon_col (int, optional): Column index for longitude. Defaults to 1.
        band_cols (list, optional): Columns to rasterize as bands. Defaults to None.
        band_names (list, optional): Band descriptions. Defaults to None.
        wavelengths (list, optional): Wavelengths. Defaults to None.
        crs (str, optional): Coordinate reference system. Defaults to "EPSG:4326".
        compress (str, optional): Compression method. Defaults to "deflate".
        bigtiff (str, optional): BigTIFF mode. Defaults to "IF_SAFER".
    """

    # 1) Load & basic checks
    arr = np.load(npy_path)
    if arr.ndim != 2 or arr.shape[1] < 3:
        raise ValueError("Must be 2D with >=3 columns.")

    lat = arr[:, lat_col].astype(float)
    lon = arr[:, lon_col].astype(float)

    # Decide which value columns become bands
    if band_cols is None:
        band_cols = [i for i in range(arr.shape[1]) if i not in (lat_col, lon_col)]
    if isinstance(band_cols, (int, np.integer)):
        band_cols = [int(band_cols)]
    if len(band_cols) == 0:
        raise ValueError("No value columns selected for bands.")

    # 2) Bounds (+ padding)
    lat_min, lat_max = np.nanmin(lat), np.nanmax(lat)
    lon_min, lon_max = np.nanmin(lon), np.nanmax(lon)
    lat_min -= bbox_padding
    lat_max += bbox_padding
    lon_min -= bbox_padding
    lon_max += bbox_padding

    # 3) Meter -> degree resolution (lat exact; lon scaled at center latitude)
    lat_center = (lat_min + lat_max) / 2.0
    deg_per_m_lat = 1.0 / 111000.0
    deg_per_m_lon = 1.0 / (111000.0 * np.cos(np.radians(lat_center)))
    res_lat_deg = resolution_m * deg_per_m_lat
    res_lon_deg = resolution_m * deg_per_m_lon

    # 4) Build grid (lon fastest axis -> width; lat -> height)
    lon_axis = np.arange(lon_min, lon_max + res_lon_deg, res_lon_deg)
    lat_axis = np.arange(lat_min, lat_max + res_lat_deg, res_lat_deg)
    Lon, Lat = np.meshgrid(lon_axis, lat_axis)

    # Precompute transform
    transform = from_origin(lon_axis.min(), lat_axis.max(), res_lon_deg, res_lat_deg)

    # 5) Interpolate each band onto the same grid
    grids = []
    for idx in band_cols:
        vals = arr[:, idx].astype(float)

        # Primary interpolation
        g = griddata(points=(lon, lat), values=vals, xi=(Lon, Lat), method=method)

        # Fill NaNs with nearest as a safety net
        if np.isnan(g).any():
            g_near = griddata(
                points=(lon, lat), values=vals, xi=(Lon, Lat), method="nearest"
            )
            g = np.where(np.isnan(g), g_near, g)

        # Flip vertically because raster origin is top-left
        grids.append(np.flipud(g).astype(np.float32))

    data_stack = np.stack(grids, axis=0)  # shape: (bands, height, width)

    # 6) Write GeoTIFF
    profile = {
        "driver": "GTiff",
        "height": data_stack.shape[1],
        "width": data_stack.shape[2],
        "count": data_stack.shape[0],
        "dtype": rasterio.float32,
        "crs": crs,
        "transform": transform,
        "nodata": nodata_val,
        "compress": compress,
        "tiled": True,
        "blockxsize": 256,
        "blockysize": 256,
        "BIGTIFF": bigtiff,
    }

    os.makedirs(os.path.dirname(out_tif) or ".", exist_ok=True)
    with rasterio.open(out_tif, "w", **profile) as dst:
        # Write bands
        for b in range(data_stack.shape[0]):
            band = data_stack[b]
            band[~np.isfinite(band)] = nodata_val
            dst.write(band, b + 1)

        # Add band descriptions
        descriptions = []
        n_bands = data_stack.shape[0]
        if band_names is not None and len(band_names) == n_bands:
            descriptions = list(map(str, band_names))
        elif wavelengths is not None and len(wavelengths) == n_bands:
            # e.g., "aphy_440"
            descriptions = [f"aphy_{int(wl)}" for wl in wavelengths]
        else:
            # Fallback: use column indices
            descriptions = [f"band_{band_cols[b]}" for b in range(n_bands)]

        for b in range(1, n_bands + 1):
            dst.set_band_description(b, descriptions[b - 1])

    # 7) Log
    print(f"✅ GeoTIFF:{out_tif}")
    print(f"   Size:{profile['width']} x {profile['height']} pixels")
    print(f"   Bands:{profile['count']}")
    print(
        f"   Resolution:{resolution_m} m/px (≈ {res_lon_deg:.6f}° × {res_lat_deg:.6f}° @ {lat_center:.2f}°)"
    )
    print(
        f"   Extent:lon[{lon_min:.6f}, {lon_max:.6f}], lat[{lat_min:.6f}, {lat_max:.6f}]"
    )
    print(f"   Descriptions:{descriptions}")

plot_results(predictions_rescaled, actuals_rescaled, save_dir, threshold=0.5, mode='test')

Plots the results of the predictions against the actual values.

Parameters:

Name Type Description Default
predictions_rescaled np.ndarray

Rescaled predicted values.

required
actuals_rescaled np.ndarray

Rescaled actual values.

required
save_dir str

Directory to save the plot.

required
threshold float

Relative error threshold. Defaults to 0.5.

0.5
mode str

Mode of the plot (e.g., "test"). Defaults to "test".

'test'
Source code in hypercoast/chla.py
def plot_results(
    predictions_rescaled: np.ndarray,
    actuals_rescaled: np.ndarray,
    save_dir: str,
    threshold: float = 0.5,
    mode: str = "test",
) -> None:
    """Plots the results of the predictions against the actual values.

    Args:
        predictions_rescaled (np.ndarray): Rescaled predicted values.
        actuals_rescaled (np.ndarray): Rescaled actual values.
        save_dir (str): Directory to save the plot.
        threshold (float, optional): Relative error threshold. Defaults to 0.5.
        mode (str, optional): Mode of the plot (e.g., "test"). Defaults to "test".
    """

    actuals = actuals_rescaled.flatten()
    predictions = predictions_rescaled.flatten()

    mask = np.abs(predictions - actuals) / np.abs(actuals + 1e-10) < 1
    filtered_predictions = predictions[mask]
    filtered_actuals = actuals[mask]

    log_actual = np.log10(np.where(actuals == 0, 1e-10, actuals))
    log_prediction = np.log10(np.where(predictions == 0, 1e-10, predictions))

    filtered_log_actual = np.log10(
        np.where(filtered_actuals == 0, 1e-10, filtered_actuals)
    )
    filtered_log_prediction = np.log10(
        np.where(filtered_predictions == 0, 1e-10, filtered_predictions)
    )

    epsilon, beta, rmse, rmsle, mape, bias, mae = calculate_metrics(
        filtered_predictions, filtered_actuals, threshold
    )

    valid_mask = np.isfinite(filtered_log_actual) & np.isfinite(filtered_log_prediction)
    slope, intercept = np.polyfit(
        filtered_log_actual[valid_mask], filtered_log_prediction[valid_mask], 1
    )
    x = np.array([-2, 4])
    y = slope * x + intercept

    _ = plt.figure(figsize=(6, 6))

    plt.plot(x, y, linestyle="--", color="blue", linewidth=0.8)
    lims = [-2, 4]
    plt.plot(lims, lims, linestyle="-", color="black", linewidth=0.8)

    sns.scatterplot(x=log_actual, y=log_prediction, alpha=0.5)

    sns.kdeplot(
        x=filtered_log_actual,
        y=filtered_log_prediction,
        levels=3,
        color="black",
        fill=False,
        linewidths=0.8,
    )

    plt.xlabel("Actual $Chla$ Values", fontsize=16)
    plt.ylabel("Predicted $Chla$ Values", fontsize=16)
    plt.xlim(-2, 4)
    plt.ylim(-2, 4)
    plt.grid(True, which="both", ls="--")

    plt.legend(
        title=(
            f"MAE = {mae:.2f}, RMSE = {rmse:.2f}, RMSLE = {rmsle:.2f} \n"
            f"Bias = {bias:.2f}, Slope = {slope:.2f} \n"
            f"MAPE = {mape:.2f}%, ε = {epsilon:.2f}%, β = {beta:.2f}%"
        ),
        fontsize=16,
        title_fontsize=12,
    )

    plt.xticks(fontsize=20)
    plt.yticks(fontsize=20)
    plt.show()

    plt.savefig(os.path.join(save_dir, f"{mode}_plot.pdf"), bbox_inches="tight")
    # plt.close()

save_to_csv(data, file_path)

Saves data to a CSV file.

Parameters:

Name Type Description Default
data np.ndarray

Data to be saved.

required
file_path str

Path to the CSV file.

required
Source code in hypercoast/chla.py
def save_to_csv(data: np.ndarray, file_path: str) -> None:
    """Saves data to a CSV file.

    Args:
        data (np.ndarray): Data to be saved.
        file_path (str): Path to the CSV file.
    """
    df = pd.DataFrame(data)
    df.to_csv(file_path, index=False)

train(model, train_dl, epochs=200, device=None, opt=None, model_path='model/vae_model_PACE.pth', best_model_path='model/vae_trans_model_best_Chl_PACE.pth')

Trains the VAE model.

Parameters:

Name Type Description Default
model VAE

VAE model to be trained.

required
train_dl DataLoader

DataLoader for training data.

required
epochs int

Number of epochs to train. Defaults to 200.

200
device torch.device

Device to train on. Defaults to None.

None
opt torch.optim.Optimizer

Optimizer. Defaults to None.

None
model_path str

Path to save the model. Defaults to "model/vae_model_PACE.pth".

'model/vae_model_PACE.pth'
best_model_path str

Path to save the best model. Defaults to "model/vae_trans_model_best_Chl_PACE.pth"

'model/vae_trans_model_best_Chl_PACE.pth'
Source code in hypercoast/chla.py
def train(
    model: VAE,
    train_dl: DataLoader,
    epochs: int = 200,
    device: torch.device = None,
    opt: torch.optim.Optimizer = None,
    model_path: str = "model/vae_model_PACE.pth",
    best_model_path: str = "model/vae_trans_model_best_Chl_PACE.pth",
) -> None:
    """Trains the VAE model.

    Args:
        model (VAE): VAE model to be trained.
        train_dl (DataLoader): DataLoader for training data.
        epochs (int, optional): Number of epochs to train. Defaults to 200.
        device (torch.device, optional): Device to train on. Defaults to None.
        opt (torch.optim.Optimizer, optional): Optimizer. Defaults to None.
        model_path (str, optional): Path to save the model. Defaults to
            "model/vae_model_PACE.pth".
        best_model_path (str, optional): Path to save the best model. Defaults
            to "model/vae_trans_model_best_Chl_PACE.pth"
    """

    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    if opt is None:
        opt = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-3)

    model.train()

    min_total_loss = float("inf")
    # Save the optimal model

    for epoch in range(epochs):
        total_loss = 0.0
        for x, y in train_dl:
            x, y = x.to(device), y.to(device)
            y_pred, mu, log_var = model(x)
            loss = loss_function(y_pred, y, mu, log_var)
            opt.zero_grad()
            loss.backward()
            opt.step()
            total_loss += loss.item()

        avg_total_loss = total_loss / len(train_dl)
        print(f"epoch = {epoch + 1}, total_loss = {avg_total_loss:.4f}")

        if avg_total_loss < min_total_loss:
            min_total_loss = avg_total_loss
            torch.save(model.state_dict(), best_model_path)
    # Save the model from the last epoch.
    torch.save(model.state_dict(), model_path)