# Python API Usage The SPIDB package provides a Python API for interacting with the database using `SQLAlchemy` syntax. This guide demonstrates how to use the API for common tasks. ## Quick Start ### Opening a Database ```python from spidb import spidb # Open existing database db = spidb.Database("data/spi.db") # Or create a new database db = spidb.Database("my_new_database.db") ``` ## Basic Queries ### Querying Events ```python # Get all events events = db.session.query(spidb.Event).all() # Get events with specific subject cowpea = db.session.query(spidb.Subject).filter( spidb.Subject.scientific_name == "Callosobruchus maculatus" ).first() events = db.session.query(spidb.Event).filter( spidb.Event.subject == cowpea ).all() # Get events with specific material rice_events = db.session.query(spidb.Event).join(spidb.Material).filter( spidb.Material.name == "rice" ).all() # Get events in a time range from datetime import datetime start_date = datetime(2023, 5, 1) end_date = datetime(2023, 6, 1) events = db.session.query(spidb.Event).filter( spidb.Event.start >= start_date, spidb.Event.end <= end_date ).all() ``` ### Querying Samples ```python # Get all samples samples = db.session.query(spidb.Sample).all() # Get samples for a specific subject mealworm_samples = db.session.query(spidb.Sample).join(spidb.Subject).filter( spidb.Subject.common_name == "Mealworm" ).all() # Get samples from a specific channel channel_0_samples = db.session.query(spidb.Sample).join(spidb.Channel).filter( spidb.Channel.number == 0 ).all() # Get samples with specific noise level quiet_samples = db.session.query(spidb.Sample).filter( spidb.Sample.noise == "Silence" ).all() ``` ### Querying Records ```python # Get all records records = db.session.query(spidb.Record).all() # Get records for a specific event event = db.session.query(spidb.Event).first() records = db.session.query(spidb.Record).filter( spidb.Record.event == event ).all() # Get records with specific material wheat_records = db.session.query(spidb.Record).join(spidb.Material).filter( spidb.Material.name == "wheat_groats" ).all() ``` ### Querying Subjects and Materials ```python # Get all subjects subjects = db.session.query(spidb.Subject).all() # Get subject by scientific name subject = db.session.query(spidb.Subject).filter( spidb.Subject.scientific_name == "Tribolium confusum" ).first() # Get all materials materials = db.session.query(spidb.Material).all() # Get material by name rice = db.session.query(spidb.Material).filter( spidb.Material.name == "rice" ).first() ``` ## Adding Data ### Adding a New Event ```python from datetime import datetime # Get required relationships sensor = db.session.query(spidb.Sensor).filter( spidb.Sensor.name == "A-SPIDS" ).first() material = db.session.query(spidb.Material).filter( spidb.Material.name == "rice" ).first() subject = db.session.query(spidb.Subject).filter( spidb.Subject.common_name == "Mealworm" ).first() # Create new event event = spidb.Event( start=datetime(2024, 10, 18, 0, 4, 20), end=datetime(2024, 10, 18, 0, 5, 20), description="Mealworm placed in center", noise="Silence", material=material, subject=subject, sensor=sensor ) # Add and commit db.session.add(event) db.session.commit() ``` ### Adding a New Subject ```python # Create new subject new_subject = spidb.Subject( name="Rice Weevil Adult", scientific_name="Sitophilus oryzae", common_name="Rice Weevil", life_stage="Adult", length=3.0, # mm width=1.0, # mm height=1.0, # mm weight=0.002, # grams volume=None, density=None ) db.session.add(new_subject) db.session.commit() ``` ### Adding a New Material ```python # Create new material new_material = spidb.Material( name="corn_kernels", scientific_name="Zea mays", common_name="Corn", density=1.2 # g/cm³ ) db.session.add(new_material) db.session.commit() ``` ## Extracting Audio Data ### Get Audio for an Event ```python # Get event event = db.session.query(spidb.Event).filter( spidb.Event.id == 42 ).first() # Extract audio audio = db.get_audio( start=event.start, end=event.end, sensor=event.sensor, channel_number=0 ) # Audio is a numpy array print(f"Audio shape: {audio.shape}") print(f"Sample rate: {event.sensor.channels[0].sample_rate}") ``` ### Get Audio for Multiple Channels ```python # Get audio from all piezoelectric channels (0-3) channels = [] for ch_num in range(4): audio = db.get_audio( start=event.start, end=event.end, sensor=event.sensor, channel_number=ch_num ) channels.append(audio) import numpy as np multichannel_audio = np.stack(channels) # Shape: (4, n_samples) ``` ### Get Audio from Sample ```python # Get sample sample = db.session.query(spidb.Sample).first() # Get associated file audio_file = sample.file # Load audio from sonicdb import utilities audio, sr = utilities.read_audio(audio_file.filepath) ``` ## Building Databases Programmatically ### Populate from Dataset ```python from spidb.build import populate_db, generate_records, generate_samples # Create new database db = spidb.Database("my_database.db") # Populate from A-SPIDS dataset populate_db( db=db, file_directory="data/aspids", deck="data/aspids/metadata.json" ) print("Database populated!") ``` ### Generate Records ```python from spidb.build import generate_records # Generate 60-second records generate_records(db, duration=60, overwrite=True) # Generate 30-second records generate_records(db, duration=30, overwrite=True) ``` ### Generate Samples ```python from spidb.build import generate_samples # Generate samples from records generate_samples(db, overwrite=True) ``` ### Complete Database Build ```python from spidb.build import populate_db, generate_records, generate_samples from spidb import spidb # Create database db = spidb.Database("complete_database.db") # Populate with both datasets for dataset in ["aspids", "mspids"]: populate_db( db=db, file_directory=f"data/{dataset}", deck=f"data/{dataset}/metadata.json" ) # Generate records and samples generate_records(db, duration=60, overwrite=True) generate_samples(db, overwrite=True) print("Complete database built!") ``` ## Advanced Queries ### Statistics ```python # Count samples by subject from sqlalchemy import func subject_counts = db.session.query( spidb.Subject.common_name, func.count(spidb.Sample.id) ).join(spidb.Sample).group_by(spidb.Subject.common_name).all() for subject, count in subject_counts: print(f"{subject}: {count} samples") # Count events by material material_counts = db.session.query( spidb.Material.name, func.count(spidb.Event.id) ).join(spidb.Event).group_by(spidb.Material.name).all() for material, count in material_counts: print(f"{material}: {count} events") ``` ### Complex Joins ```python # Get all samples for cowpea beetle in rice samples = db.session.query(spidb.Sample).join( spidb.Subject ).join( spidb.Material ).filter( spidb.Subject.scientific_name == "Callosobruchus maculatus", spidb.Material.name == "rice" ).all() # Get samples with no noise quiet_samples = db.session.query(spidb.Sample).filter( spidb.Sample.noise.in_(["Silence", "Ambient"]) ).all() ``` ### Exporting Data ```python import pandas as pd # Export events to DataFrame events = db.session.query(spidb.Event).all() events_data = [{ 'id': e.id, 'start': e.start, 'end': e.end, 'subject': e.subject.common_name if e.subject else None, 'material': e.material.name if e.material else None, 'noise': e.noise } for e in events] df = pd.DataFrame(events_data) df.to_csv("events.csv", index=False) ``` ## Updating Records ### Update Event Information ```python # Get event event = db.session.query(spidb.Event).filter( spidb.Event.id == 1 ).first() # Update description event.description = "Updated description" event.noise = "90dBA" # Commit changes db.session.commit() ``` ### Update Subject Information ```python # Get subject subject = db.session.query(spidb.Subject).filter( spidb.Subject.name == "Mealworm" ).first() # Update measurements subject.weight = 0.15 subject.length = 25.0 db.session.commit() ``` ## Deleting Records ```python # Delete a specific event event = db.session.query(spidb.Event).filter( spidb.Event.id == 999 ).first() if event: db.session.delete(event) db.session.commit() # Delete all samples with specific noise level samples = db.session.query(spidb.Sample).filter( spidb.Sample.noise == "100dBA" ).all() for sample in samples: db.session.delete(sample) db.session.commit() ``` ## Best Practices ### Session Management ```python # Always commit after modifications db.session.add(new_object) db.session.commit() # Use try/except for error handling try: db.session.add(new_object) db.session.commit() except Exception as e: db.session.rollback() print(f"Error: {e}") ``` ### Efficient Querying ```python # Use filters instead of loading all data # Bad: Load everything then filter in Python all_samples = db.session.query(spidb.Sample).all() filtered = [s for s in all_samples if s.noise == "Silence"] # Good: Filter in database filtered = db.session.query(spidb.Sample).filter( spidb.Sample.noise == "Silence" ).all() ``` ### Relationships ```python # Access related objects through relationships sample = db.session.query(spidb.Sample).first() # Access subject through relationship subject = sample.subject print(subject.common_name) # Access material through relationship material = sample.material print(material.name) # Access event through record event = sample.record.event print(event.description) ``` ## Next Steps - See [CLI Guide](cli_guide.md) for command-line usage - See [Database](database.md) for schema details - See [Models](models.md) for complete model reference - Check [examples/](../examples/) for more code samples