Rust Guide

The Rust crate zippy_data is the foundation of ZDS. It provides zero-copy memory-mapped access, high-throughput writers, and full control over the storage layer. Use it when you need maximum performance or want to embed ZDS in your Rust application.

Table of contents
  1. Installation
  2. Quick Start
  3. Core Concepts
    1. Store Types
    2. Memory-Mapped I/O
    3. Batch Writes
  4. FastStore
    1. Opening a Store
    2. Adding Documents
    3. Retrieving Documents
    4. Updating and Deleting
    5. Scanning Documents
    6. Flushing and Consistency
  5. Engine (File-per-Document)
    1. Opening
    2. Reading Documents
    3. Scanning with Filters
    4. Statistics
  6. Writers
    1. SyncWriter
    2. BufferedWriter
  7. Recipes
    1. Recipe: ML Dataset Ingestion
    2. Recipe: Parallel Processing
    3. Recipe: Streaming Export
    4. Recipe: Custom Serialization
  8. Pack/Unpack Archives
  9. Index Operations
    1. Rebuild Index
    2. Load and Query Index
  10. Layout Utilities
  11. Error Handling
    1. Error Types
  12. Performance Tips
    1. 1. Choose the Right Store
    2. 2. Tune Batch Size
    3. 3. Use Raw JSONL for Bulk Operations
    4. 4. Parallel Reads
  13. Benchmarking
    1. Benchmark Suites
    2. Example Results (M1 MacBook Pro)
  14. API Reference
    1. FastStore
    2. BufferedWriter
  15. Next Steps

Installation

Add to your Cargo.toml:

[dependencies]
zippy_data = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

For async support:

[dependencies]
zippy_data = { version = "0.1", features = ["async"] }
tokio = { version = "1", features = ["full"] }

Quick Start

use zippy_data::{FastStore, Layout, Result};
use serde_json::json;

fn main() -> Result<()> {
    // Initialize the store directory
    Layout::init_root("./my_dataset")?;
    
    // Open a collection (creates if doesn't exist)
    let mut store = FastStore::open("./my_dataset", "train", 100)?;
    
    // Add documents
    store.put("user_001", json!({
        "name": "Alice",
        "email": "alice@example.com",
        "role": "admin"
    }))?;
    
    store.put("user_002", json!({
        "name": "Bob",
        "email": "bob@example.com",
        "role": "user"
    }))?;
    
    // Retrieve by ID
    let doc = store.get("user_001")?;
    println!("Found: {}", doc["name"]);  // "Alice"
    
    // Iterate all documents
    for doc in store.scan_all()? {
        println!("{}: {}", doc["_id"], doc["name"]);
    }
    
    // Flush to ensure durability
    store.flush()?;
    
    println!("Stored {} documents", store.len());
    Ok(())
}

Core Concepts

Store Types

ZDS provides two storage backends:

Type Storage Best For
FastStore Single JSONL file + binary index High throughput, bulk operations
Engine One JSON file per document Git-friendly, manual editing

Memory-Mapped I/O

FastStore uses memory-mapped files for zero-copy reads. The binary index provides O(1) lookups by document ID without loading the entire dataset into memory.

Batch Writes

Both stores support batched writes. Documents are buffered in memory and flushed to disk when the batch size is reached or when flush() is called explicitly.


FastStore

The primary high-performance store for most use cases.

Opening a Store

use zippy_data::{FastStore, Layout};

// First-time setup: create directory structure
Layout::init_root("./data")?;

// Open with default batch size (100)
let mut store = FastStore::open("./data", "train", 100)?;

// Check store info
println!("Path: {}", store.root());
println!("Collection: {}", store.collection());
println!("Documents: {}", store.len());

Adding Documents

use serde_json::json;

// Simple document
store.put("product_001", json!({
    "name": "Widget Pro",
    "price": 29.99,
    "in_stock": true
}))?;

// Nested structures
store.put("order_001", json!({
    "customer": {
        "id": "cust_123",
        "email": "alice@example.com"
    },
    "items": [
        {"sku": "WIDGET-001", "qty": 2, "price": 29.99},
        {"sku": "GADGET-002", "qty": 1, "price": 49.99}
    ],
    "total": 109.97
}))?;

// With typed structs
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
struct User {
    name: String,
    email: String,
    role: String,
}

let user = User {
    name: "Charlie".into(),
    email: "charlie@example.com".into(),
    role: "user".into(),
};

store.put("user_003", serde_json::to_value(&user)?)?;

Retrieving Documents

// Get by ID (returns serde_json::Value)
let doc = store.get("order_001")?;
println!("Total: {}", doc["total"]);

// Check existence without loading
if store.exists("order_001") {
    println!("Order exists!");
}

// Get all document IDs
let ids = store.doc_ids();
for id in ids {
    println!("ID: {}", id);
}

// Deserialize to typed struct
let user: User = serde_json::from_value(store.get("user_003")?)?;
println!("User: {}", user.name);

Updating and Deleting

// Update (put with same ID replaces)
store.put("product_001", json!({
    "name": "Widget Pro",
    "price": 24.99,  // Updated price
    "in_stock": true,
    "on_sale": true  // New field
}))?;

// Delete
store.delete("product_001")?;

// Verify deletion
assert!(!store.exists("product_001"));

Scanning Documents

// Scan all documents
let docs = store.scan_all()?;
for doc in docs {
    println!("{}", doc);
}

// Read raw JSONL (fastest for export)
let blob = store.read_jsonl_blob()?;
println!("Read {} bytes", blob.len());

// Write to file
std::fs::write("export.jsonl", &blob)?;

Flushing and Consistency

// Explicit flush writes pending changes to disk
store.flush()?;

// After external modifications, refresh the mmap
store.refresh_mmap()?;

// Drop flushes automatically
drop(store);

Engine (File-per-Document)

For workflows that benefit from individual files per document.

Opening

use zippy_data::Engine;

let engine = Engine::open("./data", "train")?;
println!("Documents: {}", engine.len());

Reading Documents

// Get by ID
let doc = engine.get_document("doc_001")?;

// Get by index position (0-based)
let first = engine.get_document_at(0)?;
let last = engine.get_document_at(engine.len() - 1)?;

// List all IDs
for id in engine.doc_ids() {
    println!("{}", id);
}

Scanning with Filters

use zippy_data::Predicate;

// Scan all documents
let mut scanner = engine.scan(None, None)?;
while let Some(doc) = scanner.next()? {
    println!("{}", doc);
}

// Filter with predicate
let pred = Predicate::eq("category", "electronics");
let mut scanner = engine.scan(Some(&pred), None)?;

// Project specific fields only
let fields = ["name", "price"];
let mut scanner = engine.scan(None, Some(&fields))?;

// Combine filter and projection
let mut scanner = engine.scan(Some(&pred), Some(&fields))?;

// Collect into Vec
let docs: Vec<_> = scanner.collect();
println!("Found {} matching documents", docs.len());

Statistics

let stats = engine.stats();
println!("Collection: {}", stats.collection);
println!("Documents: {}", stats.doc_count);
println!("Schemas: {}", stats.schema_count);
println!("Total size: {} bytes", stats.total_size);
println!("Strict mode: {}", stats.strict_mode);

Writers

SyncWriter

Immediate writes for small datasets or when durability per-write is required:

use zippy_data::writer::SyncWriter;
use serde_json::json;

let mut writer = SyncWriter::new("./data", "train")?;

// Each put writes immediately to disk
writer.put("doc_001", &json!({"text": "hello"}))?;
writer.put("doc_002", &json!({"text": "world"}))?;

BufferedWriter

High-throughput batched writes:

use zippy_data::writer::{BufferedWriter, WriteConfig};
use serde_json::json;

let config = WriteConfig {
    max_pending_ops: 1000,           // Flush after 1000 documents
    max_pending_bytes: 100 << 20,    // Or after 100MB
    flush_interval_ms: 60_000,       // Or after 60 seconds
};

let mut writer = BufferedWriter::new("./data", "train", config)?;

// Bulk ingestion
for i in 0..100_000 {
    writer.put(
        format!("doc_{:06}", i),
        json!({
            "id": i,
            "value": rand::random::<f64>(),
            "category": ["A", "B", "C"][i % 3]
        })
    )?;
}

// Final flush
writer.flush()?;
println!("Wrote {} documents", writer.count());

Recipes

Recipe: ML Dataset Ingestion

use zippy_data::{FastStore, Layout, Result};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Deserialize)]
struct RawSample {
    text: String,
    label: i32,
}

fn ingest_dataset(input_path: &str, output_path: &str) -> Result<()> {
    Layout::init_root(output_path)?;
    let mut store = FastStore::open(output_path, "train", 500)?;
    
    let file = File::open(input_path)?;
    let reader = BufReader::new(file);
    
    for (i, line) in reader.lines().enumerate() {
        let line = line?;
        let sample: RawSample = serde_json::from_str(&line)?;
        
        store.put(
            format!("sample_{:08}", i),
            json!({
                "text": sample.text,
                "label": sample.label,
                "split": "train"
            })
        )?;
        
        if i % 10_000 == 0 {
            println!("Processed {} samples...", i);
        }
    }
    
    store.flush()?;
    println!("Ingested {} samples", store.len());
    Ok(())
}

Recipe: Parallel Processing

use zippy_data::{FastStore, Result};
use rayon::prelude::*;
use serde_json::Value;

fn process_in_parallel(store_path: &str) -> Result<Vec<Value>> {
    let store = FastStore::open(store_path, "train", 100)?;
    
    // Get all document IDs
    let ids: Vec<String> = store.doc_ids().into_iter().collect();
    
    // Process in parallel with rayon
    let results: Vec<Value> = ids
        .par_iter()
        .filter_map(|id| {
            // Each thread opens its own store handle
            let local_store = FastStore::open(store_path, "train", 100).ok()?;
            let doc = local_store.get(id).ok()?;
            
            // Your processing logic here
            if doc["score"].as_f64()? > 0.8 {
                Some(doc)
            } else {
                None
            }
        })
        .collect();
    
    Ok(results)
}

Recipe: Streaming Export

use zippy_data::{FastStore, Result};
use std::io::{Write, BufWriter};
use std::fs::File;

fn export_to_jsonl(store_path: &str, output_path: &str) -> Result<()> {
    let store = FastStore::open(store_path, "train", 100)?;
    
    let file = File::create(output_path)?;
    let mut writer = BufWriter::new(file);
    
    for doc in store.scan_all()? {
        serde_json::to_writer(&mut writer, &doc)?;
        writeln!(writer)?;
    }
    
    writer.flush()?;
    println!("Exported {} documents to {}", store.len(), output_path);
    Ok(())
}

Recipe: Custom Serialization

use zippy_data::{FastStore, Layout, Result};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug)]
struct Embedding {
    id: String,
    vector: Vec<f32>,
    metadata: EmbeddingMeta,
}

#[derive(Serialize, Deserialize, Debug)]
struct EmbeddingMeta {
    source: String,
    model: String,
    timestamp: i64,
}

fn store_embeddings(embeddings: Vec<Embedding>, path: &str) -> Result<()> {
    Layout::init_root(path)?;
    let mut store = FastStore::open(path, "embeddings", 100)?;
    
    for emb in embeddings {
        store.put(&emb.id, serde_json::to_value(&emb)?)?;
    }
    
    store.flush()?;
    Ok(())
}

fn load_embedding(path: &str, id: &str) -> Result<Embedding> {
    let store = FastStore::open(path, "embeddings", 100)?;
    let value = store.get(id)?;
    let emb: Embedding = serde_json::from_value(value)?;
    Ok(emb)
}

Pack/Unpack Archives

Create portable .zds archives:

use zippy_data::container::{pack, unpack};

// Pack a store directory into a single archive
pack("./my_dataset", "./my_dataset.zds")?;

// Unpack an archive
unpack("./my_dataset.zds", "./extracted")?;

// The archive is a standard ZIP file
// Recipients can extract with any ZIP tool

Index Operations

Rebuild Index

use zippy_data::IndexRegistry;

// Rebuild from JSONL data
let index = IndexRegistry::rebuild("./data", "train")?;
index.save("./data", "train")?;
println!("Indexed {} documents", index.len());

Load and Query Index

let index = IndexRegistry::load("./data", "train")?;

// Get document ID at position
if let Some(id) = index.get_doc_id_at(0) {
    println!("First document: {}", id);
}

// Check if ID exists
if index.contains("doc_001") {
    println!("Document exists in index");
}

Layout Utilities

use zippy_data::Layout;

// Initialize store structure
Layout::init_root("./data")?;

// Initialize a collection
Layout::init_collection("./data", "train")?;

// Validate structure
Layout::validate("./data")?;
Layout::validate_collection("./data", "train")?;

// Get paths
let meta_dir = Layout::meta_dir("./data", "train");
let manifest = Layout::manifest_file("./data", "train");
let data_file = Layout::data_file("./data", "train");

Error Handling

use zippy_data::{Error, Result};

fn safe_get(store: &FastStore, id: &str) -> Result<Option<serde_json::Value>> {
    match store.get(id) {
        Ok(doc) => Ok(Some(doc)),
        Err(Error::DocumentNotFound(_)) => Ok(None),
        Err(e) => Err(e),
    }
}

fn process() -> Result<()> {
    let store = FastStore::open("./data", "train", 100)?;
    
    match safe_get(&store, "maybe_exists")? {
        Some(doc) => println!("Found: {}", doc),
        None => println!("Not found"),
    }
    
    Ok(())
}

Error Types

Error Description
Error::DocumentNotFound(id) Document with given ID doesn’t exist
Error::CollectionNotFound(name) Collection doesn’t exist
Error::InvalidPath(path) Path is invalid or inaccessible
Error::CorruptedIndex Binary index is corrupted
Error::IoError(e) Underlying I/O error
Error::JsonError(e) JSON serialization/deserialization error

Performance Tips

1. Choose the Right Store

Use Case Recommended
Bulk ingestion FastStore with BufferedWriter
Random access reads FastStore (mmap)
Git-versioned data Engine (file-per-doc)
Streaming large datasets FastStore + scan_all()

2. Tune Batch Size

// Small documents, high throughput: larger batch
let store = FastStore::open("./data", "logs", 1000)?;

// Large documents: smaller batch
let store = FastStore::open("./data", "embeddings", 50)?;

3. Use Raw JSONL for Bulk Operations

// Fastest: read raw bytes
let blob = store.read_jsonl_blob()?;

// Process line by line without full parse
for line in blob.split(|&b| b == b'\n') {
    if !line.is_empty() {
        // Quick field extraction with simd-json or similar
    }
}

4. Parallel Reads

use rayon::prelude::*;

let ids: Vec<_> = store.doc_ids().into_iter().collect();

// Each thread gets its own store handle
let results: Vec<_> = ids.par_iter()
    .map(|id| {
        let s = FastStore::open("./data", "train", 100).unwrap();
        s.get(id).unwrap()
    })
    .collect();

Benchmarking

Run the built-in benchmarks:

cd crates/zippy_data
cargo bench

Benchmark Suites

Suite Description
ingestion Write throughput (docs/sec)
random_access Single document lookup latency
scan Sequential read throughput
index Index build and lookup performance

Example Results (M1 MacBook Pro)

Operation Throughput
Sequential writes 4.6M docs/sec
Random reads 890K docs/sec
Full scan 12M docs/sec

API Reference

FastStore

impl FastStore {
    pub fn open(root: &str, collection: &str, batch_size: usize) -> Result<Self>;
    
    pub fn put(&mut self, id: &str, doc: Value) -> Result<()>;
    pub fn get(&self, id: &str) -> Result<Value>;
    pub fn delete(&mut self, id: &str) -> Result<()>;
    pub fn exists(&self, id: &str) -> bool;
    
    pub fn scan_all(&self) -> Result<Vec<Value>>;
    pub fn read_jsonl_blob(&self) -> Result<Vec<u8>>;
    pub fn doc_ids(&self) -> Vec<&str>;
    
    pub fn len(&self) -> usize;
    pub fn is_empty(&self) -> bool;
    
    pub fn flush(&mut self) -> Result<()>;
    pub fn refresh_mmap(&mut self) -> Result<()>;
    
    pub fn root(&self) -> &str;
    pub fn collection(&self) -> &str;
}

BufferedWriter

impl BufferedWriter {
    pub fn new(root: &str, collection: &str, config: WriteConfig) -> Result<Self>;
    
    pub fn put(&mut self, id: String, doc: Value) -> Result<()>;
    pub fn flush(&mut self) -> Result<()>;
    pub fn count(&self) -> usize;
}

pub struct WriteConfig {
    pub max_pending_ops: usize,
    pub max_pending_bytes: usize,
    pub flush_interval_ms: u64,
}

Next Steps