Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/get-convex/convex-backend/llms.txt

Use this file to discover all available pages before exploring further.

The database engine is the heart of Convex’s data management, providing ACID transactions, snapshot isolation, reactive subscriptions, and automatic index maintenance.

Overview

Path: crates/database/ The database crate implements:
  • Transactional ACID database with snapshot isolation
  • Table and schema management
  • Index coordination and query optimization
  • Reactive subscription tracking
  • Read and write set management
  • Strong consistency guarantees

Architecture

Core components

Database struct

Main database interface:
pub struct Database<RT: Runtime> {
    /// Runtime for async operations
    runtime: RT,
    
    /// Persistence layer
    persistence: Arc<dyn Persistence>,
    
    /// Snapshot manager for MVCC
    snapshot_manager: SnapshotManager,
    
    /// Subscription tracking
    subscription_manager: SubscriptionManager,
    
    /// Index workers
    index_workers: Arc<IndexWorkers>,
    
    /// Table registry
    tables: Arc<RwLock<TableRegistry>>,
    
    /// Schema registry
    schemas: Arc<RwLock<SchemaRegistry>>,
}

impl<RT: Runtime> Database<RT> {
    pub async fn new(
        runtime: RT,
        persistence: Arc<dyn Persistence>,
    ) -> Result<Self> {
        // Initialize components
        let snapshot_manager = SnapshotManager::new();
        let subscription_manager = SubscriptionManager::new();
        let tables = Arc::new(RwLock::new(TableRegistry::new()));
        let schemas = Arc::new(RwLock::new(SchemaRegistry::new()));
        
        // Start index workers
        let index_workers = Arc::new(IndexWorkers::start(
            runtime.clone(),
            persistence.clone(),
        ));
        
        Ok(Self {
            runtime,
            persistence,
            snapshot_manager,
            subscription_manager,
            index_workers,
            tables,
            schemas,
        })
    }
}

Transaction

Path: crates/database/src/transaction.rs Transactional interface:
pub struct Transaction<RT: Runtime> {
    /// Transaction ID
    id: TransactionId,
    
    /// Begin timestamp (snapshot)
    begin_ts: Timestamp,
    
    /// Commit timestamp (assigned at commit)
    commit_ts: Option<Timestamp>,
    
    /// Read set for conflict detection
    reads: ReadSet,
    
    /// Write set (buffered writes)
    writes: WriteSet,
    
    /// Table registry snapshot
    tables: TableRegistry,
    
    /// Schema registry snapshot
    schemas: SchemaRegistry,
    
    /// Index registry snapshot
    indexes: IndexRegistry,
}

impl<RT: Runtime> Transaction<RT> {
    /// Read a document by ID
    pub async fn get(
        &mut self,
        id: DocumentId,
    ) -> Result<Option<ResolvedDocument>> {
        // Track read for subscriptions
        self.reads.insert(id);
        
        // Check write set first
        if let Some(doc) = self.writes.get(&id) {
            return Ok(Some(doc.clone()));
        }
        
        // Read from persistence at snapshot
        let doc = self.persistence
            .get_at_timestamp(id, self.begin_ts)
            .await?;
        
        Ok(doc)
    }
    
    /// Insert a new document
    pub fn insert(
        &mut self,
        table: TableName,
        document: ConvexObject,
    ) -> Result<DocumentId> {
        // Validate against schema
        self.schemas.validate(table, &document)?;
        
        // Generate ID
        let id = self.id_generator.next_id(table)?;
        
        // Add to write set
        self.writes.insert(id, document);
        
        Ok(id)
    }
    
    /// Update a document
    pub fn patch(
        &mut self,
        id: DocumentId,
        patches: ConvexObject,
    ) -> Result<()> {
        // Get current version
        let mut doc = self.get(id).await?
            .ok_or_else(|| anyhow!("Document not found"))?;
        
        // Apply patches
        for (field, value) in patches {
            doc.insert(field, value);
        }
        
        // Validate
        self.schemas.validate(id.table(), &doc)?;
        
        // Update write set
        self.writes.update(id, doc);
        
        Ok(())
    }
    
    /// Delete a document
    pub fn delete(&mut self, id: DocumentId) -> Result<()> {
        // Mark as deleted in write set
        self.writes.delete(id);
        Ok(())
    }
    
    /// Query a table
    pub fn query(
        &mut self,
        table: TableName,
    ) -> Query<RT> {
        Query::new(self, table)
    }
}

Transaction lifecycle

Begin transaction

impl<RT: Runtime> Database<RT> {
    pub async fn begin(&self) -> Result<Transaction<RT>> {
        // Acquire transaction ID
        let id = self.id_generator.next_transaction_id();
        
        // Get current timestamp (snapshot point)
        let begin_ts = self.snapshot_manager.current_timestamp();
        
        // Create transaction with snapshot of metadata
        let tx = Transaction {
            id,
            begin_ts,
            commit_ts: None,
            reads: ReadSet::new(),
            writes: WriteSet::new(),
            tables: self.tables.read().clone(),
            schemas: self.schemas.read().clone(),
            indexes: self.indexes.read().clone(),
        };
        
        Ok(tx)
    }
}

Commit transaction

impl<RT: Runtime> Transaction<RT> {
    pub async fn commit(mut self) -> Result<CommitResult> {
        // Validate writes
        self.validate_writes()?;
        
        // Detect conflicts
        self.check_conflicts().await?;
        
        // Acquire commit timestamp
        let commit_ts = self.snapshot_manager.next_timestamp();
        self.commit_ts = Some(commit_ts);
        
        // Write to persistence
        self.persistence.write_batch(
            &self.writes,
            commit_ts,
        ).await?;
        
        // Update indexes asynchronously
        self.index_workers.schedule_updates(
            &self.writes,
            commit_ts,
        );
        
        // Notify subscribers
        self.subscription_manager.notify(
            &self.writes,
            commit_ts,
        );
        
        Ok(CommitResult {
            timestamp: commit_ts,
            writes: self.writes.len(),
        })
    }
}

Conflict detection

impl<RT: Runtime> Transaction<RT> {
    async fn check_conflicts(&self) -> Result<()> {
        // Check if any read documents were modified
        for doc_id in &self.reads {
            let modified = self.persistence
                .was_modified_since(doc_id, self.begin_ts)
                .await?;
            
            if modified {
                return Err(anyhow!("Transaction conflict: document {} was modified", doc_id));
            }
        }
        
        Ok(())
    }
}

Query execution

Query builder

Path: crates/database/src/query.rs
pub struct Query<RT: Runtime> {
    tx: &mut Transaction<RT>,
    table: TableName,
    index: Option<IndexName>,
    filter: Option<Filter>,
    limit: Option<usize>,
    order: Option<Order>,
}

impl<RT: Runtime> Query<RT> {
    /// Specify index to use
    pub fn with_index(mut self, index: IndexName) -> Self {
        self.index = Some(index);
        self
    }
    
    /// Add filter
    pub fn filter(mut self, filter: Filter) -> Self {
        self.filter = Some(filter);
        self
    }
    
    /// Limit results
    pub fn take(mut self, limit: usize) -> Self {
        self.limit = Some(limit);
        self
    }
    
    /// Collect all results
    pub async fn collect(self) -> Result<Vec<ResolvedDocument>> {
        let mut results = Vec::new();
        
        // Choose index
        let index = self.choose_index()?;
        
        // Scan index
        let mut cursor = index.scan(self.filter.as_ref())?;
        
        while let Some(doc_id) = cursor.next().await? {
            // Track read
            self.tx.reads.insert(doc_id);
            
            // Get document
            let doc = self.tx.get(doc_id).await?;
            
            // Apply filter
            if let Some(doc) = doc {
                if self.matches_filter(&doc) {
                    results.push(doc);
                    
                    // Check limit
                    if let Some(limit) = self.limit {
                        if results.len() >= limit {
                            break;
                        }
                    }
                }
            }
        }
        
        Ok(results)
    }
    
    /// Stream results
    pub fn stream(self) -> impl Stream<Item = Result<ResolvedDocument>> {
        // Return async stream of documents
        // ...
    }
}

Index selection

impl<RT: Runtime> Query<RT> {
    fn choose_index(&self) -> Result<&Index> {
        // If index specified, use it
        if let Some(index_name) = &self.index {
            return self.tx.indexes.get(index_name)
                .ok_or_else(|| anyhow!("Index not found: {}", index_name));
        }
        
        // Otherwise, find best index for filter
        if let Some(filter) = &self.filter {
            if let Some(index) = self.tx.indexes.find_best_index(filter) {
                return Ok(index);
            }
        }
        
        // Fall back to table scan (creation index)
        Ok(self.tx.indexes.get_creation_index(self.table)?)
    }
}

Table registry

Path: crates/database/src/table_registry.rs
pub struct TableRegistry {
    tables: BTreeMap<TableName, TableMetadata>,
    table_mapping: TableMapping,
}

pub struct TableMetadata {
    name: TableName,
    id: TableId,
    created_at: Timestamp,
    document_count: u64,
}

impl TableRegistry {
    pub fn create_table(&mut self, name: TableName) -> Result<TableId> {
        // Check if table exists
        if self.tables.contains_key(&name) {
            return Err(anyhow!("Table already exists: {}", name));
        }
        
        // Generate table ID
        let id = self.table_mapping.next_id();
        
        // Create metadata
        let metadata = TableMetadata {
            name: name.clone(),
            id,
            created_at: Timestamp::now(),
            document_count: 0,
        };
        
        self.tables.insert(name, metadata);
        Ok(id)
    }
    
    pub fn get_table(&self, name: &TableName) -> Option<&TableMetadata> {
        self.tables.get(name)
    }
}

Schema registry

Path: crates/database/src/schema_registry.rs
pub struct SchemaRegistry {
    schemas: BTreeMap<TableName, TableSchema>,
}

pub struct TableSchema {
    fields: BTreeMap<FieldName, FieldValidator>,
    indexes: Vec<IndexDefinition>,
}

impl SchemaRegistry {
    pub fn validate(
        &self,
        table: &TableName,
        document: &ConvexObject,
    ) -> Result<()> {
        let schema = self.schemas.get(table)
            .ok_or_else(|| anyhow!("No schema for table: {}", table))?;
        
        // Validate each field
        for (field, value) in document.iter() {
            if let Some(validator) = schema.fields.get(field) {
                validator.validate(value)?;
            }
        }
        
        Ok(())
    }
}

Subscription management

Path: crates/database/src/subscription.rs
pub struct SubscriptionManager {
    subscriptions: Arc<RwLock<BTreeMap<SubscriptionId, Subscription>>>,
}

pub struct Subscription {
    id: SubscriptionId,
    query_set: ReadSet,
    sender: mpsc::Sender<Update>,
}

impl SubscriptionManager {
    pub fn register(
        &self,
        read_set: ReadSet,
    ) -> (SubscriptionId, mpsc::Receiver<Update>) {
        let id = SubscriptionId::new();
        let (tx, rx) = mpsc::channel(100);
        
        let subscription = Subscription {
            id,
            query_set: read_set,
            sender: tx,
        };
        
        self.subscriptions.write().insert(id, subscription);
        (id, rx)
    }
    
    pub fn notify(
        &self,
        writes: &WriteSet,
        timestamp: Timestamp,
    ) {
        let subscriptions = self.subscriptions.read();
        
        for subscription in subscriptions.values() {
            // Check if write affects this subscription
            if writes.intersects(&subscription.query_set) {
                let update = Update {
                    timestamp,
                    changes: writes.clone(),
                };
                
                // Send update (non-blocking)
                subscription.sender.try_send(update).ok();
            }
        }
    }
}

Index workers

Path: crates/database/src/database_index_workers.rs
pub struct IndexWorkers {
    workers: Vec<IndexWorker>,
}

pub struct IndexWorker {
    update_queue: mpsc::Receiver<IndexUpdate>,
    index_writer: IndexWriter,
}

impl IndexWorker {
    pub async fn run(mut self) -> Result<()> {
        while let Some(update) = self.update_queue.recv().await {
            match update {
                IndexUpdate::Document { id, change } => {
                    self.update_indexes(id, change).await?;
                }
                IndexUpdate::Backfill { index_id } => {
                    self.backfill_index(index_id).await?;
                }
            }
        }
        Ok(())
    }
    
    async fn update_indexes(
        &mut self,
        doc_id: DocumentId,
        change: DocumentChange,
    ) -> Result<()> {
        match change {
            DocumentChange::Insert(doc) => {
                self.index_writer.insert(doc_id, &doc).await?;
            }
            DocumentChange::Update { old, new } => {
                self.index_writer.update(doc_id, &old, &new).await?;
            }
            DocumentChange::Delete(doc) => {
                self.index_writer.delete(doc_id, &doc).await?;
            }
        }
        Ok(())
    }
}

Read and write sets

Path: crates/database/src/reads.rs and crates/database/src/writes.rs
pub struct ReadSet {
    /// Documents read during transaction
    documents: BTreeSet<DocumentId>,
    
    /// Tables scanned
    tables: BTreeSet<TableName>,
    
    /// Index ranges scanned
    index_ranges: Vec<IndexRange>,
}

pub struct WriteSet {
    /// Documents inserted
    inserts: BTreeMap<DocumentId, ConvexObject>,
    
    /// Documents updated
    updates: BTreeMap<DocumentId, ConvexObject>,
    
    /// Documents deleted
    deletes: BTreeSet<DocumentId>,
}

impl ReadSet {
    pub fn intersects(&self, writes: &WriteSet) -> bool {
        // Check if any written documents were read
        for doc_id in &self.documents {
            if writes.contains(doc_id) {
                return true;
            }
        }
        
        // Check if any scanned tables were written
        for table in &self.tables {
            if writes.affects_table(table) {
                return true;
            }
        }
        
        false
    }
}

Testing

Transaction tests

#[tokio::test]
async fn test_transaction_isolation() {
    let db = Database::new_test().await;
    
    // Start two transactions
    let mut tx1 = db.begin().await.unwrap();
    let mut tx2 = db.begin().await.unwrap();
    
    // Insert in tx1
    let id = tx1.insert("tasks", doc).unwrap();
    
    // tx2 should not see it
    assert!(tx2.get(id).await.unwrap().is_none());
    
    // Commit tx1
    tx1.commit().await.unwrap();
    
    // Start tx3 - should see the insert
    let mut tx3 = db.begin().await.unwrap();
    assert!(tx3.get(id).await.unwrap().is_some());
}

Next steps