Python API Usage

The SPIDB package provides a Python API for interacting with the database using SQLAlchemy syntax. This guide demonstrates how to use the API for common tasks.

Quick Start

Opening a Database

from spidb import spidb

# Open existing database
db = spidb.Database("data/spi.db")

# Or create a new database
db = spidb.Database("my_new_database.db")

Basic Queries

Querying Events

# Get all events
events = db.session.query(spidb.Event).all()

# Get events with specific subject
cowpea = db.session.query(spidb.Subject).filter(
    spidb.Subject.scientific_name == "Callosobruchus maculatus"
).first()

events = db.session.query(spidb.Event).filter(
    spidb.Event.subject == cowpea
).all()

# Get events with specific material
rice_events = db.session.query(spidb.Event).join(spidb.Material).filter(
    spidb.Material.name == "rice"
).all()

# Get events in a time range
from datetime import datetime

start_date = datetime(2023, 5, 1)
end_date = datetime(2023, 6, 1)

events = db.session.query(spidb.Event).filter(
    spidb.Event.start >= start_date,
    spidb.Event.end <= end_date
).all()

Querying Samples

# Get all samples
samples = db.session.query(spidb.Sample).all()

# Get samples for a specific subject
mealworm_samples = db.session.query(spidb.Sample).join(spidb.Subject).filter(
    spidb.Subject.common_name == "Mealworm"
).all()

# Get samples from a specific channel
channel_0_samples = db.session.query(spidb.Sample).join(spidb.Channel).filter(
    spidb.Channel.number == 0
).all()

# Get samples with specific noise level
quiet_samples = db.session.query(spidb.Sample).filter(
    spidb.Sample.noise == "Silence"
).all()

Querying Records

# Get all records
records = db.session.query(spidb.Record).all()

# Get records for a specific event
event = db.session.query(spidb.Event).first()
records = db.session.query(spidb.Record).filter(
    spidb.Record.event == event
).all()

# Get records with specific material
wheat_records = db.session.query(spidb.Record).join(spidb.Material).filter(
    spidb.Material.name == "wheat_groats"
).all()

Querying Subjects and Materials

# Get all subjects
subjects = db.session.query(spidb.Subject).all()

# Get subject by scientific name
subject = db.session.query(spidb.Subject).filter(
    spidb.Subject.scientific_name == "Tribolium confusum"
).first()

# Get all materials
materials = db.session.query(spidb.Material).all()

# Get material by name
rice = db.session.query(spidb.Material).filter(
    spidb.Material.name == "rice"
).first()

Adding Data

Adding a New Event

from datetime import datetime

# Get required relationships
sensor = db.session.query(spidb.Sensor).filter(
    spidb.Sensor.name == "A-SPIDS"
).first()

material = db.session.query(spidb.Material).filter(
    spidb.Material.name == "rice"
).first()

subject = db.session.query(spidb.Subject).filter(
    spidb.Subject.common_name == "Mealworm"
).first()

# Create new event
event = spidb.Event(
    start=datetime(2024, 10, 18, 0, 4, 20),
    end=datetime(2024, 10, 18, 0, 5, 20),
    description="Mealworm placed in center",
    noise="Silence",
    material=material,
    subject=subject,
    sensor=sensor
)

# Add and commit
db.session.add(event)
db.session.commit()

Adding a New Subject

# Create new subject
new_subject = spidb.Subject(
    name="Rice Weevil Adult",
    scientific_name="Sitophilus oryzae",
    common_name="Rice Weevil",
    life_stage="Adult",
    length=3.0,  # mm
    width=1.0,   # mm
    height=1.0,  # mm
    weight=0.002,  # grams
    volume=None,
    density=None
)

db.session.add(new_subject)
db.session.commit()

Adding a New Material

# Create new material
new_material = spidb.Material(
    name="corn_kernels",
    scientific_name="Zea mays",
    common_name="Corn",
    density=1.2  # g/cm³
)

db.session.add(new_material)
db.session.commit()

Extracting Audio Data

Get Audio for an Event

# Get event
event = db.session.query(spidb.Event).filter(
    spidb.Event.id == 42
).first()

# Extract audio
audio = db.get_audio(
    start=event.start,
    end=event.end,
    sensor=event.sensor,
    channel_number=0
)

# Audio is a numpy array
print(f"Audio shape: {audio.shape}")
print(f"Sample rate: {event.sensor.channels[0].sample_rate}")

Get Audio for Multiple Channels

# Get audio from all piezoelectric channels (0-3)
channels = []
for ch_num in range(4):
    audio = db.get_audio(
        start=event.start,
        end=event.end,
        sensor=event.sensor,
        channel_number=ch_num
    )
    channels.append(audio)

import numpy as np
multichannel_audio = np.stack(channels)  # Shape: (4, n_samples)

Get Audio from Sample

# Get sample
sample = db.session.query(spidb.Sample).first()

# Get associated file
audio_file = sample.file

# Load audio
from sonicdb import utilities
audio, sr = utilities.read_audio(audio_file.filepath)

Building Databases Programmatically

Populate from Dataset

from spidb.build import populate_db, generate_records, generate_samples

# Create new database
db = spidb.Database("my_database.db")

# Populate from A-SPIDS dataset
populate_db(
    db=db,
    file_directory="data/aspids",
    deck="data/aspids/metadata.json"
)

print("Database populated!")

Generate Records

from spidb.build import generate_records

# Generate 60-second records
generate_records(db, duration=60, overwrite=True)

# Generate 30-second records
generate_records(db, duration=30, overwrite=True)

Generate Samples

from spidb.build import generate_samples

# Generate samples from records
generate_samples(db, overwrite=True)

Complete Database Build

from spidb.build import populate_db, generate_records, generate_samples
from spidb import spidb

# Create database
db = spidb.Database("complete_database.db")

# Populate with both datasets
for dataset in ["aspids", "mspids"]:
    populate_db(
        db=db,
        file_directory=f"data/{dataset}",
        deck=f"data/{dataset}/metadata.json"
    )

# Generate records and samples
generate_records(db, duration=60, overwrite=True)
generate_samples(db, overwrite=True)

print("Complete database built!")

Advanced Queries

Statistics

# Count samples by subject
from sqlalchemy import func

subject_counts = db.session.query(
    spidb.Subject.common_name,
    func.count(spidb.Sample.id)
).join(spidb.Sample).group_by(spidb.Subject.common_name).all()

for subject, count in subject_counts:
    print(f"{subject}: {count} samples")

# Count events by material
material_counts = db.session.query(
    spidb.Material.name,
    func.count(spidb.Event.id)
).join(spidb.Event).group_by(spidb.Material.name).all()

for material, count in material_counts:
    print(f"{material}: {count} events")

Complex Joins

# Get all samples for cowpea beetle in rice
samples = db.session.query(spidb.Sample).join(
    spidb.Subject
).join(
    spidb.Material
).filter(
    spidb.Subject.scientific_name == "Callosobruchus maculatus",
    spidb.Material.name == "rice"
).all()

# Get samples with no noise
quiet_samples = db.session.query(spidb.Sample).filter(
    spidb.Sample.noise.in_(["Silence", "Ambient"])
).all()

Exporting Data

import pandas as pd

# Export events to DataFrame
events = db.session.query(spidb.Event).all()

events_data = [{
    'id': e.id,
    'start': e.start,
    'end': e.end,
    'subject': e.subject.common_name if e.subject else None,
    'material': e.material.name if e.material else None,
    'noise': e.noise
} for e in events]

df = pd.DataFrame(events_data)
df.to_csv("events.csv", index=False)

Updating Records

Update Event Information

# Get event
event = db.session.query(spidb.Event).filter(
    spidb.Event.id == 1
).first()

# Update description
event.description = "Updated description"
event.noise = "90dBA"

# Commit changes
db.session.commit()

Update Subject Information

# Get subject
subject = db.session.query(spidb.Subject).filter(
    spidb.Subject.name == "Mealworm"
).first()

# Update measurements
subject.weight = 0.15
subject.length = 25.0

db.session.commit()

Deleting Records

# Delete a specific event
event = db.session.query(spidb.Event).filter(
    spidb.Event.id == 999
).first()

if event:
    db.session.delete(event)
    db.session.commit()

# Delete all samples with specific noise level
samples = db.session.query(spidb.Sample).filter(
    spidb.Sample.noise == "100dBA"
).all()

for sample in samples:
    db.session.delete(sample)
db.session.commit()

Best Practices

Session Management

# Always commit after modifications
db.session.add(new_object)
db.session.commit()

# Use try/except for error handling
try:
    db.session.add(new_object)
    db.session.commit()
except Exception as e:
    db.session.rollback()
    print(f"Error: {e}")

Efficient Querying

# Use filters instead of loading all data
# Bad: Load everything then filter in Python
all_samples = db.session.query(spidb.Sample).all()
filtered = [s for s in all_samples if s.noise == "Silence"]

# Good: Filter in database
filtered = db.session.query(spidb.Sample).filter(
    spidb.Sample.noise == "Silence"
).all()

Relationships

# Access related objects through relationships
sample = db.session.query(spidb.Sample).first()

# Access subject through relationship
subject = sample.subject
print(subject.common_name)

# Access material through relationship
material = sample.material
print(material.name)

# Access event through record
event = sample.record.event
print(event.description)

Next Steps