Python API Usage¶
The SPIDB package provides a Python API for interacting with the database using SQLAlchemy syntax. This guide demonstrates how to use the API for common tasks.
Quick Start¶
Opening a Database¶
from spidb import spidb
# Open existing database
db = spidb.Database("data/spi.db")
# Or create a new database
db = spidb.Database("my_new_database.db")
Basic Queries¶
Querying Events¶
# Get all events
events = db.session.query(spidb.Event).all()
# Get events with specific subject
cowpea = db.session.query(spidb.Subject).filter(
spidb.Subject.scientific_name == "Callosobruchus maculatus"
).first()
events = db.session.query(spidb.Event).filter(
spidb.Event.subject == cowpea
).all()
# Get events with specific material
rice_events = db.session.query(spidb.Event).join(spidb.Material).filter(
spidb.Material.name == "rice"
).all()
# Get events in a time range
from datetime import datetime
start_date = datetime(2023, 5, 1)
end_date = datetime(2023, 6, 1)
events = db.session.query(spidb.Event).filter(
spidb.Event.start >= start_date,
spidb.Event.end <= end_date
).all()
Querying Samples¶
# Get all samples
samples = db.session.query(spidb.Sample).all()
# Get samples for a specific subject
mealworm_samples = db.session.query(spidb.Sample).join(spidb.Subject).filter(
spidb.Subject.common_name == "Mealworm"
).all()
# Get samples from a specific channel
channel_0_samples = db.session.query(spidb.Sample).join(spidb.Channel).filter(
spidb.Channel.number == 0
).all()
# Get samples with specific noise level
quiet_samples = db.session.query(spidb.Sample).filter(
spidb.Sample.noise == "Silence"
).all()
Querying Records¶
# Get all records
records = db.session.query(spidb.Record).all()
# Get records for a specific event
event = db.session.query(spidb.Event).first()
records = db.session.query(spidb.Record).filter(
spidb.Record.event == event
).all()
# Get records with specific material
wheat_records = db.session.query(spidb.Record).join(spidb.Material).filter(
spidb.Material.name == "wheat_groats"
).all()
Querying Subjects and Materials¶
# Get all subjects
subjects = db.session.query(spidb.Subject).all()
# Get subject by scientific name
subject = db.session.query(spidb.Subject).filter(
spidb.Subject.scientific_name == "Tribolium confusum"
).first()
# Get all materials
materials = db.session.query(spidb.Material).all()
# Get material by name
rice = db.session.query(spidb.Material).filter(
spidb.Material.name == "rice"
).first()
Adding Data¶
Adding a New Event¶
from datetime import datetime
# Get required relationships
sensor = db.session.query(spidb.Sensor).filter(
spidb.Sensor.name == "A-SPIDS"
).first()
material = db.session.query(spidb.Material).filter(
spidb.Material.name == "rice"
).first()
subject = db.session.query(spidb.Subject).filter(
spidb.Subject.common_name == "Mealworm"
).first()
# Create new event
event = spidb.Event(
start=datetime(2024, 10, 18, 0, 4, 20),
end=datetime(2024, 10, 18, 0, 5, 20),
description="Mealworm placed in center",
noise="Silence",
material=material,
subject=subject,
sensor=sensor
)
# Add and commit
db.session.add(event)
db.session.commit()
Adding a New Subject¶
# Create new subject
new_subject = spidb.Subject(
name="Rice Weevil Adult",
scientific_name="Sitophilus oryzae",
common_name="Rice Weevil",
life_stage="Adult",
length=3.0, # mm
width=1.0, # mm
height=1.0, # mm
weight=0.002, # grams
volume=None,
density=None
)
db.session.add(new_subject)
db.session.commit()
Adding a New Material¶
# Create new material
new_material = spidb.Material(
name="corn_kernels",
scientific_name="Zea mays",
common_name="Corn",
density=1.2 # g/cm³
)
db.session.add(new_material)
db.session.commit()
Extracting Audio Data¶
Get Audio for an Event¶
# Get event
event = db.session.query(spidb.Event).filter(
spidb.Event.id == 42
).first()
# Extract audio
audio = db.get_audio(
start=event.start,
end=event.end,
sensor=event.sensor,
channel_number=0
)
# Audio is a numpy array
print(f"Audio shape: {audio.shape}")
print(f"Sample rate: {event.sensor.channels[0].sample_rate}")
Get Audio for Multiple Channels¶
# Get audio from all piezoelectric channels (0-3)
channels = []
for ch_num in range(4):
audio = db.get_audio(
start=event.start,
end=event.end,
sensor=event.sensor,
channel_number=ch_num
)
channels.append(audio)
import numpy as np
multichannel_audio = np.stack(channels) # Shape: (4, n_samples)
Get Audio from Sample¶
# Get sample
sample = db.session.query(spidb.Sample).first()
# Get associated file
audio_file = sample.file
# Load audio
from sonicdb import utilities
audio, sr = utilities.read_audio(audio_file.filepath)
Building Databases Programmatically¶
Populate from Dataset¶
from spidb.build import populate_db, generate_records, generate_samples
# Create new database
db = spidb.Database("my_database.db")
# Populate from A-SPIDS dataset
populate_db(
db=db,
file_directory="data/aspids",
deck="data/aspids/metadata.json"
)
print("Database populated!")
Generate Records¶
from spidb.build import generate_records
# Generate 60-second records
generate_records(db, duration=60, overwrite=True)
# Generate 30-second records
generate_records(db, duration=30, overwrite=True)
Generate Samples¶
from spidb.build import generate_samples
# Generate samples from records
generate_samples(db, overwrite=True)
Complete Database Build¶
from spidb.build import populate_db, generate_records, generate_samples
from spidb import spidb
# Create database
db = spidb.Database("complete_database.db")
# Populate with both datasets
for dataset in ["aspids", "mspids"]:
populate_db(
db=db,
file_directory=f"data/{dataset}",
deck=f"data/{dataset}/metadata.json"
)
# Generate records and samples
generate_records(db, duration=60, overwrite=True)
generate_samples(db, overwrite=True)
print("Complete database built!")
Advanced Queries¶
Statistics¶
# Count samples by subject
from sqlalchemy import func
subject_counts = db.session.query(
spidb.Subject.common_name,
func.count(spidb.Sample.id)
).join(spidb.Sample).group_by(spidb.Subject.common_name).all()
for subject, count in subject_counts:
print(f"{subject}: {count} samples")
# Count events by material
material_counts = db.session.query(
spidb.Material.name,
func.count(spidb.Event.id)
).join(spidb.Event).group_by(spidb.Material.name).all()
for material, count in material_counts:
print(f"{material}: {count} events")
Complex Joins¶
# Get all samples for cowpea beetle in rice
samples = db.session.query(spidb.Sample).join(
spidb.Subject
).join(
spidb.Material
).filter(
spidb.Subject.scientific_name == "Callosobruchus maculatus",
spidb.Material.name == "rice"
).all()
# Get samples with no noise
quiet_samples = db.session.query(spidb.Sample).filter(
spidb.Sample.noise.in_(["Silence", "Ambient"])
).all()
Exporting Data¶
import pandas as pd
# Export events to DataFrame
events = db.session.query(spidb.Event).all()
events_data = [{
'id': e.id,
'start': e.start,
'end': e.end,
'subject': e.subject.common_name if e.subject else None,
'material': e.material.name if e.material else None,
'noise': e.noise
} for e in events]
df = pd.DataFrame(events_data)
df.to_csv("events.csv", index=False)
Updating Records¶
Update Event Information¶
# Get event
event = db.session.query(spidb.Event).filter(
spidb.Event.id == 1
).first()
# Update description
event.description = "Updated description"
event.noise = "90dBA"
# Commit changes
db.session.commit()
Update Subject Information¶
# Get subject
subject = db.session.query(spidb.Subject).filter(
spidb.Subject.name == "Mealworm"
).first()
# Update measurements
subject.weight = 0.15
subject.length = 25.0
db.session.commit()
Deleting Records¶
# Delete a specific event
event = db.session.query(spidb.Event).filter(
spidb.Event.id == 999
).first()
if event:
db.session.delete(event)
db.session.commit()
# Delete all samples with specific noise level
samples = db.session.query(spidb.Sample).filter(
spidb.Sample.noise == "100dBA"
).all()
for sample in samples:
db.session.delete(sample)
db.session.commit()
Best Practices¶
Session Management¶
# Always commit after modifications
db.session.add(new_object)
db.session.commit()
# Use try/except for error handling
try:
db.session.add(new_object)
db.session.commit()
except Exception as e:
db.session.rollback()
print(f"Error: {e}")
Efficient Querying¶
# Use filters instead of loading all data
# Bad: Load everything then filter in Python
all_samples = db.session.query(spidb.Sample).all()
filtered = [s for s in all_samples if s.noise == "Silence"]
# Good: Filter in database
filtered = db.session.query(spidb.Sample).filter(
spidb.Sample.noise == "Silence"
).all()
Relationships¶
# Access related objects through relationships
sample = db.session.query(spidb.Sample).first()
# Access subject through relationship
subject = sample.subject
print(subject.common_name)
# Access material through relationship
material = sample.material
print(material.name)
# Access event through record
event = sample.record.event
print(event.description)