# Copyright 2023-2024 Blue Brain Project / EPFL
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sonata circuit access implementation for BlueCelluLab."""
from __future__ import annotations
import hashlib
from functools import lru_cache
import logging
from pathlib import Path
from typing import Mapping, Optional
from bluepysnap.bbp import Cell as SnapCell
from bluepysnap.circuit_ids import CircuitNodeId, CircuitEdgeIds
from bluepysnap.exceptions import BluepySnapError
from bluepysnap import Circuit as SnapCircuit
import neuron
import pandas as pd
from bluecellulab import circuit
from bluecellulab.circuit.circuit_access.definition import CircuitAccess, EmodelProperties
from bluecellulab.circuit import CellId, SynapseProperty
from bluecellulab.circuit.config import SimulationConfig
from bluecellulab.circuit.synapse_properties import SynapseProperties
from bluecellulab.circuit.config import SonataSimulationConfig
from bluecellulab.circuit.synapse_properties import (
properties_from_snap,
properties_to_snap,
)
logger = logging.getLogger(__name__)
[docs]
class SonataCircuitAccess(CircuitAccess):
"""Sonata implementation of CircuitAccess protocol."""
def __init__(self, simulation_config: str | Path | SimulationConfig) -> None:
"""Initialize SonataCircuitAccess object."""
if isinstance(simulation_config, (str, Path)) and not Path(simulation_config).exists():
raise FileNotFoundError(f"Circuit config file {simulation_config} not found.")
if isinstance(simulation_config, SonataSimulationConfig):
self.config = simulation_config
else:
self.config = SonataSimulationConfig(simulation_config)
circuit_config = self.config.impl.config["network"]
self._circuit = SnapCircuit(circuit_config)
self._inner_edge_pop_names = {
name for name, epop in self._circuit.edges.items()
if getattr(epop.source, "type", None) != "virtual"
}
@property
def available_cell_properties(self) -> set:
return self._circuit.nodes.property_names
def get_emodel_properties(self, cell_id: CellId) -> Optional[EmodelProperties]:
cell_properties = self._circuit.nodes[cell_id.population_name].get(cell_id.id)
if "@dynamics:AIS_scaler" in cell_properties:
AIS_scaler = cell_properties["@dynamics:AIS_scaler"]
else:
AIS_scaler = 1.0
if "@dynamics:soma_scaler" in cell_properties:
soma_scaler = cell_properties["@dynamics:soma_scaler"]
else:
soma_scaler = 1.0
return EmodelProperties(
cell_properties["@dynamics:threshold_current"],
cell_properties["@dynamics:holding_current"],
AIS_scaler,
soma_scaler,
)
def get_template_format(self) -> Optional[str]:
return 'v6'
def get_cell_properties(
self, cell_id: CellId, properties: list[str] | str
) -> pd.Series:
if isinstance(properties, str):
properties = [properties]
return self._circuit.nodes[cell_id.population_name].get(
cell_id.id, properties=properties
)
@staticmethod
def _compute_pop_ids(source: str, target: str) -> tuple[int, int]:
"""Compute the population ids from the population names."""
def make_id(node_pop: str) -> int:
pop_hash = hashlib.md5(node_pop.encode()).digest()
return ((pop_hash[1] & 0x0f) << 8) + pop_hash[0] # id: 12bit hash
source_popid = make_id(source)
target_popid = make_id(target)
return source_popid, target_popid
def get_population_ids(
self, source_population_name: str, target_population_name: str
) -> tuple[int, int]:
source_popid, target_popid = self._compute_pop_ids(
source_population_name, target_population_name)
return source_popid, target_popid
def _select_edge_pop_names(self, projections) -> list[str]:
"""Select the SONATA edge populations to use for synapse extraction.
Intrinsic connectivity is always included. The ``projections`` argument only controls whether
edges originating from SONATA *virtual* node populations are added.
Args:
projections:
* ``False`` – intrinsic edges only.
* ``True`` – intrinsic edges plus all projection edges.
* ``str`` or ``list[str]`` – intrinsic edges plus the specified
projection edge population name(s).
Names must be valid edge population names (``SnapCircuit.edges`` keys).
Returns:
list[str]: Ordered edge population names, intrinsic first and without duplicates.
Raises:
ValueError: If a requested projection name is unknown.
"""
edges = self._circuit.edges
all_names = list(edges.keys())
# intrinsic connectivity within the simulated circuit
inner = [n for n in all_names if n in self._inner_edge_pop_names]
# edges whose source is a SONATA virtual node population
proj = [n for n in all_names if getattr(edges[n].source, "type", None) == "virtual"]
if projections is False or projections is None:
return inner
elif projections is True:
# intrinsic + all projections
# out: ordered list of edge population names to return (intrinsic first, then projections)
# seen: helper set to avoid adding the same population more than once
out, seen = [], set()
for n in inner + proj:
if n not in seen:
out.append(n)
seen.add(n)
return out
else: # str / list[str]: intrinsic + requested
requested = [projections] if isinstance(projections, str) else list(projections or [])
out, seen = [], set()
by_source: dict[str, list[str]] = {}
for n in all_names:
by_source.setdefault(edges[n].source.name, []).append(n)
for n in inner:
if n not in seen:
out.append(n)
seen.add(n)
for token in requested:
if token not in edges:
raise ValueError(
f"Unknown projection '{token}'. Expected an edge population name. "
f"Available edge populations: {sorted(all_names)}"
)
if token not in seen:
out.append(token)
seen.add(token)
return out
def target_contains_cell(self, target: str, cell_id: CellId) -> bool:
return cell_id in self.get_target_cell_ids(target)
@lru_cache(maxsize=1000)
def is_valid_group(self, group: str) -> bool:
return group in self._circuit.node_sets
[docs]
@lru_cache(maxsize=16)
def get_target_cell_ids(self, target: str) -> set[CellId]:
"""Resolve a node set name into a set of CellIds."""
node_sets = self.config.get_node_sets()
return self._resolve_node_set_to_cell_ids(target, node_sets)
def _resolve_node_set_to_cell_ids(
self,
target: str,
node_sets: Mapping[str, object],
) -> set[CellId]:
if target not in node_sets:
raise KeyError(f"Unknown node set: {target}")
node_set_def = node_sets[target]
# Alias/composite node set, e.g. "All": ["L4_SBC", "L5_TPC:B", ...]
if isinstance(node_set_def, list):
result: set[CellId] = set()
for item in node_set_def:
if isinstance(item, str) and item in node_sets:
result.update(self._resolve_node_set_to_cell_ids(item, node_sets))
else:
raise ValueError(
f"Unsupported composite node set entry {item!r} in node set {target!r}"
)
return result
# Concrete single-population node set
if isinstance(node_set_def, dict) and "population" in node_set_def:
population = str(node_set_def["population"])
ids = self._circuit.nodes[population].ids(node_set_def)
return {CellId(population, int(x)) for x in ids}
# Fallback: let BluePySnap resolve it
ids = self._circuit.nodes.ids(node_set_def)
return {CellId(x.population, x.id) for x in ids}
@lru_cache(maxsize=100)
def fetch_cell_info(self, cell_id: CellId) -> pd.Series:
return self._circuit.nodes[cell_id.population_name].get(cell_id.id)
def fetch_mini_frequencies(self, cell_id: CellId) -> tuple[float | None, float | None]:
cell_info = self.fetch_cell_info(cell_id)
exc_mini_frequency = cell_info['exc-mini_frequency'] \
if 'exc-mini_frequency' in cell_info else None
inh_mini_frequency = cell_info['inh-mini_frequency'] \
if 'inh-mini_frequency' in cell_info else None
return exc_mini_frequency, inh_mini_frequency
@property
def node_properties_available(self) -> bool:
return True
def get_gids_of_mtypes(self, mtypes: list[str]) -> set[CellId]:
all_cell_ids = set()
all_population_names: list[str] = list(self._circuit.nodes)
for population_name in all_population_names:
try:
cell_ids = self._circuit.nodes[population_name].ids(
{SnapCell.MTYPE: mtypes})
except BluepySnapError:
continue
all_cell_ids |= {CellId(population_name, id) for id in cell_ids}
return all_cell_ids
def get_cell_ids_of_targets(self, targets: list[str]) -> set[CellId]:
cell_ids = set()
for target in targets:
cell_ids |= self.get_target_cell_ids(target)
return cell_ids
[docs]
def morph_filepath(self, cell_id: CellId) -> str:
"""Returns the morphology path from 'alternate_morphologies' based on
available formats."""
node_population = self._circuit.nodes[cell_id.population_name]
# Get the alternate morphologies configuration
alternate_morphologies = node_population.config.get("alternate_morphologies")
# Check for H5v1 format first (highest priority for H5 containers)
if alternate_morphologies and "h5v1" in alternate_morphologies:
h5_container_path = alternate_morphologies["h5v1"]
# Get the morphology name for this cell
cell_properties = node_population.get(cell_id.id)
morphology_name = cell_properties.get("morphology", "")
if morphology_name:
# Return the H5 container path with cell name and .h5 extension
# This format works with os.path.split() in NeuronTemplate.get_cell()
# which splits it into (container.h5, CellName.h5) for the template
return f"{h5_container_path}/{morphology_name}.h5"
# Check for neurolucida-asc format
try: # if asc defined in alternate morphology
return str(node_population.morph.get_filepath(cell_id.id, extension="asc"))
except BluepySnapError:
logger.debug(f"No asc morphology found for {cell_id}, trying swc.")
# Fallback to default morphology handling
try:
return str(node_population.morph.get_filepath(cell_id.id))
except BluepySnapError as e:
raise BluepySnapError(f"Could not determine morphology path for cell {cell_id}: {e}")
def emodel_path(self, cell_id: CellId) -> str:
node_population = self._circuit.nodes[cell_id.population_name]
# Try to use the models helper if available
try:
return str(node_population.models.get_filepath(cell_id.id))
except BluepySnapError:
# If models helper is not available, construct path from model_template
# model_template format is typically "hoc:template_name"
cell_properties = node_population.get(cell_id.id)
model_template = cell_properties.get("model_template", "")
if model_template.startswith("hoc:"):
template_name = model_template.split(":", 1)[1]
# Get the biophysical_neuron_models_dir from config
models_dir = node_population.config.get("biophysical_neuron_models_dir", "")
if models_dir:
return str(Path(models_dir) / f"{template_name}.hoc")
raise BluepySnapError(f"Could not determine emodel path for cell {cell_id}")
def node_population_sizes(self) -> dict[str, int]:
out: dict[str, int] = {}
for pop_name, node_pop in self._circuit.nodes.items():
out[str(pop_name)] = node_pop.size
return out
def virtual_population_sizes(self) -> dict[str, int]:
out: dict[str, int] = {}
for pop_name, node_pop in self._circuit.nodes.items():
if getattr(node_pop, "type", None) == "virtual":
out[str(pop_name)] = int(node_pop.size)
return out