Data Engineering for AI
Before any model trains, data must be collected, cleaned, transformed, versioned, and served reliably. This guide covers everything an AI engineer needs to know about the data layer — from pipelines and file formats to feature stores, orchestration, streaming, and vector databases.
- What is Data Engineering for AI?
- Data Pipeline Architecture
- File Formats & Serialization
- Storage: Lakes, Lakehouses & Warehouses
- ETL vs ELT for ML Workloads
- Feature Engineering & Feature Stores
- Data Quality & Validation
- Orchestration: Airflow, Prefect & Dagster
- Streaming Data & Real-Time AI
- Vector Databases & Embeddings Storage
Data engineering is the discipline of designing, building, and operating the infrastructure that collects, stores, transforms, and delivers data reliably. For AI/ML workloads, it takes on a specific character: data quality directly determines model quality. Garbage in, garbage out is not a cliche — it is the single biggest reason production AI systems fail.
- Serves BI dashboards and reports
- Schema-on-write (structured tables)
- Batch daily or hourly is acceptable
- SQL is the primary interface
- Data freshness: hours is fine
- Serves training pipelines and model inference
- Schema-on-read (semi-structured data lakes)
- May need sub-second real-time features
- Python, Spark, Arrow as primary tools
- Data versioning and reproducibility are critical
| Role | Primary Responsibility | Data Tasks |
|---|---|---|
| Data Engineer | Builds and operates data pipelines | Ingestion, transformation, storage, orchestration |
| ML Engineer | Trains models and deploys them | Feature engineering, data versioning, training pipelines |
| Data Scientist | Analyses data and experiments with models | EDA, feature selection, data labelling |
| MLOps Engineer | Operates ML systems in production | Data drift monitoring, retraining triggers, CI/CD |
At smaller companies one person covers all four roles. At scale they split into dedicated teams. Either way, understanding the data engineering layer is non-negotiable for any AI engineer who wants to ship reliable systems. This knowledge sits at Layer 1 of the AI Skills Stack — everything else builds on it.
A data pipeline is a sequence of stages that moves data from source systems to a destination where it can be consumed by a dashboard, a model, or an API. For AI, the canonical three phases are Ingest → Transform → Serve.
The most widely adopted pattern in modern AI data stacks is the medallion architecture, popularised by Databricks. Raw data lands in Bronze, is cleaned into Silver, and is aggregated into Gold for models and reports.
- Processes data in chunks on a schedule
- Tools: Spark, dbt, pandas, Polars, DuckDB
- Latency: minutes to hours
- Use cases: daily training jobs, offline reports
- Simpler to debug and reprocess
- Processes events continuously as they arrive
- Tools: Kafka, Flink, Spark Streaming
- Latency: milliseconds to seconds
- Use cases: fraud detection, real-time recommendations
- Harder to manage state and exactly-once semantics
import pandas as pd
from pathlib import Path
# Bronze: read raw data exactly as received
bronze = pd.read_json("data/bronze/events_2026-06-30.jsonl", lines=True)
# Silver: clean and type
silver = (bronze
.drop_duplicates(subset=["event_id"])
.dropna(subset=["user_id", "timestamp"])
.assign(
timestamp=pd.to_datetime(bronze["timestamp"]),
amount=bronze["amount"].astype(float).clip(lower=0)
)
)
# Write Silver as Parquet (columnar, compressed, fast for ML)
Path("data/silver").mkdir(parents=True, exist_ok=True)
silver.to_parquet("data/silver/events_2026-06-30.parquet", index=False)
print(f"Bronze rows: {len(bronze):,} -> Silver rows: {len(silver):,}")
Polars is a Rust-based DataFrame library that is typically 5–20x faster than pandas for large dataset operations. It uses lazy evaluation (builds a query plan and optimises before executing), native multi-threading, and Arrow as its memory format. For Gold-layer transformations on datasets over 10M rows, Polars is becoming the default choice.
import polars as pl
# Lazy: builds query plan, doesn't execute yet
result = (
pl.scan_parquet("data/silver/*.parquet") # lazy read
.filter(pl.col("amount") > 0)
.group_by("user_id")
.agg([
pl.col("amount").sum().alias("total_spend"),
pl.col("event_id").count().alias("num_events"),
pl.col("timestamp").max().alias("last_seen"),
])
.sort("total_spend", descending=True)
)
# .collect() executes the optimised query plan
df = result.collect() # uses all CPU cores automatically
The file format you choose has a larger impact on ML pipeline performance than most engineers expect. For a 100M-row dataset, the difference between CSV and Parquet can be 10–50x in read speed and 5–10x in storage cost.
| Format | Layout | Compression | Best For | Avoid When |
|---|---|---|---|---|
| CSV | Row (text) | None | Small datasets, human inspection, data interchange | Large ML datasets, analytical queries |
| Parquet | Columnar (binary) | Snappy / ZSTD | ML feature tables, analytics on selected columns | Append-heavy workloads, tiny files |
| Apache Arrow (IPC) | Columnar (in-memory) | LZ4 | Zero-copy data sharing across Pandas, Spark, DuckDB | On-disk long-term storage |
| JSON Lines (JSONL) | Row (text) | None / GZIP | LLM training datasets, log events, streaming | Analytical queries, large numerical arrays |
| HDF5 / NPY | Array (binary) | Optional | Numerical tensors, numpy arrays, scientific data | Heterogeneous schemas, non-numerical data |
| TFRecord | Sequential binary | GZIP | TensorFlow training datasets with tf.data | Non-TensorFlow frameworks |
| Avro | Row (binary) | Deflate / Snappy | Kafka message schemas, schema evolution | Column-heavy analytics workloads |
Parquet stores data column-by-column rather than row-by-row. When your training script reads only user_age, purchase_amount, and label from a 200-column table, Parquet reads only 3 column chunks off disk. CSV reads all 200 columns and discards 197. That difference compounds massively at scale.
import pandas as pd, time, os
# Same 10M-row, 50-column dataset in both formats
cols_needed = ["age", "purchase_amount", "label"]
t = time.time()
df_csv = pd.read_csv("data.csv", usecols=cols_needed)
print(f"CSV: {time.time()-t:.1f}s | disk: {os.path.getsize('data.csv')/1e9:.1f} GB")
t = time.time()
df_pq = pd.read_parquet("data.parquet", columns=cols_needed)
print(f"Parquet: {time.time()-t:.1f}s | disk: {os.path.getsize('data.parquet')/1e9:.1f} GB")
# Typical results:
# CSV: 18.4s | disk: 4.7 GB
# Parquet: 0.9s | disk: 0.4 GB (5x smaller, 20x faster)
Large language model training datasets are almost always stored as JSONL — one JSON object per line. This allows streaming reads (no need to load the entire file), easy sharding across GPU workers, and simple append-only updates. HuggingFace datasets natively supports JSONL.
{"instruction": "Explain gradient descent in simple terms", "response": "Gradient descent is an optimisation algorithm..."}
{"instruction": "What is a transformer architecture?", "response": "A transformer uses self-attention to process sequences..."}
{"instruction": "Write a Python function to sort a list", "response": "def sort_list(lst): return sorted(lst)"}
Apache Arrow defines a standardised in-memory columnar format. Because Pandas, Spark, Polars, DuckDB, and PyArrow all speak Arrow natively, you can pass a DataFrame between them without serialisation or copying. For ML pipelines that chain Python tools (pandas → scikit-learn → model), Arrow is the invisible glue that keeps it fast.
Where you store data shapes how fast your ML pipeline runs, how much it costs, and whether you can reproduce experiments from six months ago. Three patterns dominate modern AI stacks.
A data lake stores raw files in cheap object storage (Amazon S3, Google GCS, Azure Blob) in any format — structured, semi-structured, or unstructured. Extremely cheap (~$0.02/GB/month on S3). No enforced schema. Great for raw Bronze data and unstructured content (text, images, audio) for AI training. The risk is the data swamp: without governance, nobody knows what data exists or whether it is reliable.
A data warehouse (Snowflake, BigQuery, Redshift) enforces schema, supports SQL, and provides ACID transactions. Fast for analytical queries on aggregated Gold-layer data. Too expensive to use as primary storage for raw training data. Best for model monitoring dashboards, A/B test results, and business metrics.
The lakehouse combines lake economics (cheap object storage) with warehouse reliability (ACID, schema enforcement, time travel). Enabled by open table formats sitting on top of Parquet in object storage:
SELECT * FROM training_data TIMESTAMP AS OF '2026-01-15' lets you recreate the exact dataset your model trained on 6 months ago, even if the table has since changed. Without this, experiment reproducibility is impossible.- Partition by date and entity:
s3://bucket/events/year=2026/month=06/day=30/— Hive-compatible partitioning enables Spark to prune scan to only relevant partitions, cutting costs 10–100x on large tables. - Target file size 128MB–1GB: Too many small files (under 1MB each) kills Spark performance due to task scheduling overhead. Use
coalesce()orrepartition(n)before writing. - Lifecycle policies: Auto-archive Bronze data older than 90 days to Glacier ($0.004/GB/month). Keep Gold layer in Standard storage for fast access.
- Separate buckets by tier: Prevent Gold-layer data from being accidentally overwritten by Bronze ingestion jobs. Use separate IAM permissions per bucket.
- Enable versioning on Gold: S3 bucket versioning lets you restore accidentally deleted or overwritten model training datasets.
ETL and ELT both move data from sources to a destination. The difference is where and when transformation happens — and that determines your cost model, flexibility, and ability to reprocess historical data.
- Transform before loading into the destination
- Used with legacy data warehouses (limited compute)
- Transformation done in custom Python or Spark code
- Harder to reprocess — logic lives outside the destination
- Better for sensitive data (PII masked before loading)
- Load raw data then transform inside the destination
- Used with modern cloud warehouses (BigQuery, Snowflake)
- Transformation done in SQL/dbt inside the warehouse
- Easy to re-run transformations on historical raw data
- More cost-effective: cloud warehouses scale compute elastically
dbt (data build tool) is the de-facto standard for the Transform step in ELT pipelines. It lets you write SQL transformations as version-controlled models with built-in testing, documentation generation, and data lineage graphs. Every serious ML team running on BigQuery or Snowflake uses dbt.
-- models/silver/user_features.sql
{{ config(materialized='table', schema='silver') }}
WITH raw AS (
SELECT * FROM {{ source('bronze', 'user_events') }}
WHERE event_date >= '2026-01-01'
),
cleaned AS (
SELECT
user_id,
DATE(event_timestamp) AS event_date,
COUNT(*) AS daily_events,
SUM(purchase_amount) AS daily_spend,
AVG(session_duration_seconds) AS avg_session_sec
FROM raw
WHERE user_id IS NOT NULL
AND purchase_amount >= 0
GROUP BY 1, 2
)
SELECT * FROM cleaned
-- Run: dbt run --select silver.user_features
-- Test: dbt test --select silver.user_features
A data contract is a formal schema agreement between a data producer (upstream team) and consumer (ML pipeline). When a data contract is broken — an upstream team renames a column, changes a data type, or drops a field — it can silently corrupt model inputs and produce wrong predictions for hours before anyone notices. Contracts are enforced as schema checks in CI/CD using tools like Soda Core, Great Expectations, or custom Pydantic validators.
- Point-in-time joins: When building training data, join features using only information available at or before the event timestamp. Violating this causes label leakage — the most dangerous form of data contamination in ML.
- Snapshot tables: Store daily snapshots of slowly-changing dimensions (user attributes, product catalogues) so you can reconstruct historical states for training.
- Data versioning: Use Delta Lake or Iceberg time travel to tag each training run with the exact data version used. Essential for audit trails and experiment reproducibility.
A feature is a measurable property of data used as input to a model. Raw data rarely goes directly into models — it must be transformed into numerical representations that capture the signal the model needs. Feature engineering is often the highest-leverage activity in applied ML, and bad features are impossible for even the best model to overcome.
| Technique | When to Use | Example |
|---|---|---|
| Normalisation / Scaling | Numerical features with different ranges | StandardScaler on age; MinMax on price [0,1] |
| One-Hot Encoding | Low-cardinality categoricals (<50 values) | country → [is_US, is_UK, is_IN, ...] |
| Target Encoding | High-cardinality categoricals | Replace city with average conversion rate per city |
| Binning / Bucketisation | Continuous to ordinal conversion | age → [18-25, 26-35, 36-50, 51+] |
| Window Aggregations | Time-series and user behaviour | spend_last_7d, clicks_last_30d, avg_order_last_3 |
| Embeddings | Text, images, high-cardinality IDs | product_id → 128-dim dense vector |
| Log Transform | Right-skewed distributions (price, revenue) | log1p(purchase_amount) normalises heavy tail |
| Interaction Features | Capturing combined signal | age * income, clicks / impressions |
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
numerical = ["age", "account_age_days", "spend_last_30d", "num_sessions"]
categorical = ["country", "device_type", "plan_tier"]
preprocessor = ColumnTransformer([
("num", StandardScaler(), numerical),
("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), categorical),
])
# CRITICAL: fit on training data only, then transform both sets
# This prevents data leakage from test set statistics entering training
X_train_enc = preprocessor.fit_transform(X_train)
X_test_enc = preprocessor.transform(X_test) # reuses train mean/std
Training-serving skew occurs when feature computation at training time differs from feature computation at inference time. It is extremely common and nearly impossible to detect without deliberate monitoring. Classic examples:
- Training computes "purchases in last 30 days" from a historical Parquet snapshot. Production computes it from a real-time Redis aggregation with a slightly different time-window boundary (e.g., UTC vs local timezone).
- Training imputes missing ages with the mean of the training set. Inference uses a hardcoded constant that differs.
- Training uses log1p(amount). Production endpoint applies log(amount+1) — same math, different edge case at amount=0.
A feature store is a centralised system that computes, stores, and serves features identically for both training and inference. It eliminates training-serving skew by ensuring the same code path runs in both contexts.
Key concept: the offline store (Parquet, BigQuery) provides point-in-time-correct features for training, preventing label leakage. The online store (Redis, DynamoDB) serves those same features at millisecond latency for real-time inference. Feature definitions are written once, executed in both contexts.
Data quality issues cause more production ML failures than model bugs. A model trained on corrupted or drifted data produces wrong predictions silently — often for days — before anyone notices. Automated data validation is your first line of defence and should run at every stage of the pipeline.
Great Expectations is the most widely used open-source data validation framework. You define "expectations" — assertions about your data — and run them in your pipeline CI/CD. A failed expectation blocks the pipeline before corrupted data reaches your model.
import great_expectations as gx
context = gx.get_context()
batch = context.sources.pandas_default.read_parquet("data/silver/users.parquet")
# Define expectations (assertions about data)
batch.expect_column_to_exist("user_id")
batch.expect_column_values_to_not_be_null("user_id")
batch.expect_column_values_to_be_unique("user_id")
batch.expect_column_values_to_be_between("age", min_value=0, max_value=120)
batch.expect_column_mean_to_be_between("spend_last_30d", min_value=5, max_value=500)
batch.expect_column_values_to_be_in_set("country", ["US","UK","IN","DE","AU"])
batch.expect_column_proportion_of_unique_values_to_be_between("plan_tier", min_value=0, max_value=0.1)
# Run validation - fails pipeline if any expectation is not met
result = batch.validate()
assert result.success, f"Data quality FAILED:
{result.to_json_dict()}"
print(f"All {result.statistics['evaluated_expectations']} checks passed.")
Pandera provides DataFrame schema validation with a Python-native API. It integrates directly into pipeline functions as decorators, checking inputs and outputs automatically.
import pandera as pa
from pandera import check_types
from pandera.typing import DataFrame, Series
class UserFeatureSchema(pa.DataFrameModel):
user_id: Series[str] = pa.Field(nullable=False, unique=True)
age: Series[int] = pa.Field(ge=0, le=120)
spend: Series[float] = pa.Field(ge=0.0)
country: Series[str] = pa.Field(isin=["US","UK","IN","DE","AU"])
class Config:
coerce = True # auto-cast types if possible
@check_types
def build_features(raw: DataFrame) -> DataFrame[UserFeatureSchema]:
# If output doesn't match UserFeatureSchema, pandera raises at runtime
return raw.rename(columns={"total_spend": "spend"}).dropna()
Data drift occurs when the statistical distribution of production data shifts from what the model was trained on. This is one of the leading causes of silent model degradation in production. Detect it with:
- Population Stability Index (PSI): Standard for categorical drift. PSI above 0.2 indicates significant distribution shift and should trigger model retraining.
- Kolmogorov-Smirnov (KS) test: Detects distribution shift in continuous features. A low p-value (typically <0.05) means drift is detected.
- Jensen-Shannon Divergence: Measures similarity between two distributions. Works for both continuous and discrete features.
- Evidently AI: Open-source library that generates data and model drift reports with a single call. Integrates with MLflow, Grafana, and Prometheus for production monitoring.
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
report = Report(metrics=[
DataDriftPreset(), # checks all features for distribution shift
DataQualityPreset(), # checks nulls, ranges, duplicates
])
report.run(
reference_data=training_df, # what model was trained on
current_data=production_df # what's arriving in production now
)
# Check programmatically
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
drifted_features = result["metrics"][0]["result"]["number_of_drifted_columns"]
if drift_detected:
print(f"WARNING: {drifted_features} features drifted - trigger retraining")
An ML pipeline has many interdependent steps: ingest data → validate → transform → build features → train model → evaluate → deploy. These steps have dependencies, can fail, need retries, and must run on a schedule. A data orchestrator manages all of this with dependency awareness, retries, monitoring, and alerting.
| Tool | Paradigm | Best For | Learning Curve |
|---|---|---|---|
| Apache Airflow | DAG-based (Python decorated tasks) | Large teams, complex dependencies, existing Airflow investment | High (separate scheduler, worker, webserver components) |
| Prefect | Python-native flows and tasks | Teams wanting simpler Python API with cloud deployment option | Medium (very Pythonic, fast to get started) |
| Dagster | Software-defined assets | ML pipelines needing strong lineage and data-aware scheduling | Medium (asset paradigm is different but powerful for ML) |
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def ingest_data(**ctx): ... # pull from data sources
def validate_data(**ctx): ... # run Great Expectations checks
def build_features(**ctx): ... # Silver -> Gold transformation
def train_model(**ctx): ... # fit model on Gold features
def evaluate_model(**ctx): ... # compute metrics, push to MLflow
with DAG(
dag_id="ml_training_pipeline",
schedule="0 2 * * *", # run daily at 2am UTC
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": alert_slack,
},
) as dag:
t1 = PythonOperator(task_id="ingest", python_callable=ingest_data)
t2 = PythonOperator(task_id="validate", python_callable=validate_data)
t3 = PythonOperator(task_id="features", python_callable=build_features)
t4 = PythonOperator(task_id="train", python_callable=train_model)
t5 = PythonOperator(task_id="evaluate", python_callable=evaluate_model)
t1 >> t2 >> t3 >> t4 >> t5 # sequential dependency chain
Dagster's asset-based model aligns better with ML workflows than task-based DAGs. You define what you produce (datasets, models, metrics) rather than what tasks run. This gives you automatic data lineage, data-aware scheduling (only re-run when upstream assets change), and built-in observability.
from dagster import asset, define_asset_job, ScheduleDefinition, AssetIn
@asset
def bronze_events():
"""Raw event data ingested from S3."""
return pd.read_json("s3://bucket/events/today.jsonl", lines=True)
@asset
def silver_features(bronze_events):
"""Cleaned and validated feature table derived from bronze events."""
return clean_and_validate(bronze_events)
@asset
def trained_model(silver_features):
"""XGBoost model trained on silver features. Logged to MLflow."""
model = train_xgboost(silver_features)
mlflow.sklearn.log_model(model, "model")
return model
@asset
def model_metrics(trained_model, silver_features):
"""Evaluation metrics for the latest trained model."""
return evaluate(trained_model, get_test_split(silver_features))
# Lineage: model_metrics <- trained_model <- silver_features <- bronze_events
daily_job = define_asset_job("daily_training", selection="*")
daily_schedule = ScheduleDefinition(job=daily_job, cron_schedule="0 2 * * *")
Real-time AI features — fraud scores updated with every transaction, personalised recommendations updated after every click — require streaming data pipelines. Batch pipelines introduce latency measured in hours. Streaming pipelines introduce latency measured in milliseconds.
Kafka is the de-facto standard message broker for real-time ML pipelines. It decouples producers (systems generating events) from consumers (ML services processing them) via a distributed, fault-tolerant, ordered log. Key concepts:
- Topic: A named stream of events (similar to a database table, but for streams). E.g.,
user-clicks,payment-events,sensor-readings. - Partition: Topics are split into partitions for parallelism and scalability. A topic with 12 partitions can be processed by 12 consumer instances simultaneously.
- Offset: Each message has a monotonically increasing offset within its partition. Consumers track their position — enabling replay from any historical offset.
- Consumer Group: A group of consumers sharing a topic's partitions. Kafka guarantees each partition is consumed by exactly one member of the group, enabling parallel processing with no duplicate work.
- Retention: Kafka retains messages on disk (default 7 days, configurable to forever). This enables the Kappa architecture: replay all history to rebuild derived datasets.
from confluent_kafka import Producer, Consumer
import json
# PRODUCER: application sends events to Kafka
producer = Producer({"bootstrap.servers": "kafka:9092"})
producer.produce(
topic="user-events",
key="user_123",
value=json.dumps({
"event": "purchase",
"amount": 49.99,
"item_id": "prod_456",
"ts": "2026-06-30T10:00:00Z"
})
)
producer.flush()
# CONSUMER: real-time feature computation service
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "realtime-feature-pipeline",
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # manual commit for exactly-once
})
consumer.subscribe(["user-events"])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error(): handle_error(msg.error()); continue
event = json.loads(msg.value().decode("utf-8"))
# Compute real-time feature and update online feature store (Redis)
update_spend_aggregate(event["key"], event["amount"])
consumer.commit(message=msg) # only commit after successful processing
- Two separate pipelines: batch (accurate) + streaming (fast)
- Merge results at query time in serving layer
- Complex: same transformation logic written twice
- Pre-2018 standard approach
- Single streaming pipeline handles everything
- Reprocessing via Kafka log replay from offset 0
- One codebase, simpler operations
- Kafka's long retention makes historical replay feasible
The most common real-time AI architecture: Kafka → Flink/consumer → Redis (online feature store) → ML model serving API. The batch pipeline (Airflow/Spark) keeps the offline feature store updated. The streaming pipeline keeps Redis up-to-date with sub-second latency. The model reads from Redis at inference time.
Vector databases are purpose-built storage and retrieval systems for high-dimensional embedding vectors — the numerical representations produced by LLMs, image encoders, and recommendation models. They are the storage backbone of every RAG (Retrieval Augmented Generation) system and modern semantic search application.
Finding the 5 most similar embeddings to a query vector requires computing cosine similarity or Euclidean distance against every stored vector. For 10M vectors at 1536 dimensions (OpenAI's text-embedding-3-large), brute-force exact search takes 2–5 seconds per query. Production systems need this in under 10 milliseconds. Traditional B-tree indexes are built for equality and range queries, not approximate nearest-neighbour (ANN) search. Vector databases solve this with specialised ANN indexes.
- Graph-based multi-layer index structure
- Fastest query speed (1–5ms for millions of vectors)
- High memory usage: index must fit in RAM
- Best for: production retrieval where speed is critical
- Used by: Qdrant, Weaviate, Milvus, pgvector
- Divides vector space into clusters; quantises vectors
- Much lower memory: quantised vectors fit on disk
- Slower than HNSW but scales to billions of vectors
- Best for: cost-sensitive billion-scale deployments
- Used by: FAISS, Pinecone (internal compression)
| Database | Type | Best For | Metadata Filtering | Scale |
|---|---|---|---|---|
| Pinecone | Managed SaaS | Fastest time-to-production, zero ops | ✅ Strong | 100M+ vectors |
| Qdrant | OSS / Cloud | High performance + rich payload filtering, self-host | ✅ Best-in-class | 100M+ vectors |
| Weaviate | OSS / Cloud | GraphQL API, multi-tenancy, hybrid search built-in | ✅ Good | 100M+ vectors |
| Chroma | OSS (local) | Prototyping, local RAG development, no infra needed | ✅ Basic | Millions |
| pgvector | PostgreSQL extension | Existing Postgres stack, avoids new infra | ✅ Full SQL | Under 5M vectors |
| Milvus | OSS distributed | Billion-scale, GPU-accelerated search, Kubernetes | ✅ Good | Billions |
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from sentence_transformers import SentenceTransformer
client = QdrantClient(url="http://localhost:6333")
encoder = SentenceTransformer("all-MiniLM-L6-v2") # 384-dim embeddings
# Create collection with HNSW index
client.create_collection("knowledge_base", vectors_config=VectorParams(
size=384, distance=Distance.COSINE
))
# Index documents (batch upsert for efficiency)
docs = ["LLMs use attention mechanisms", "RAG combines retrieval with generation", ...]
tags = ["llm", "rag", ...]
embeddings = encoder.encode(docs, batch_size=64, show_progress_bar=True)
client.upsert("knowledge_base", points=[
PointStruct(
id=i,
vector=emb.tolist(),
payload={"text": doc, "category": tag, "source": "internal-docs"}
)
for i, (doc, emb, tag) in enumerate(zip(docs, embeddings, tags))
])
# Query: semantic search with metadata filter
query_embedding = encoder.encode(["how does retrieval augmented generation work"])[0]
results = client.search(
collection_name="knowledge_base",
query_vector=query_embedding.tolist(),
limit=5,
query_filter=Filter(must=[
FieldCondition(key="category", match=MatchValue(value="rag"))
])
)
for r in results:
print(f"Score: {r.score:.3f} | {r.payload['text'][:80]}")
Pure vector search misses exact keyword matches (a product SKU, a person's name, a technical term). Pure keyword search (BM25) misses semantic meaning. Hybrid search combines both using Reciprocal Rank Fusion (RRF) to merge ranked results. Weaviate and Qdrant support this natively. It consistently outperforms either approach alone in production RAG systems.
You've completed Layer 1 of the AI Skills Stack
Data engineering is the foundation everything else builds on. Now continue up the stack, explore the full 10-layer roadmap, or test your knowledge with an AI mock interview.