Beyond the Hard Drive
The problem: You have 100TB of video data. A single disk holds 2TB. How do you store and access it efficiently?
Solution: Distributed File System (DFS) - Makes thousands of disks look like one giant drive.
What is a Distributed File System?
A DFS stores massive files across multiple machines, providing:
- Scalability: Add more machines = more storage
- Fault tolerance: Data replicated across machines
- High throughput: Parallel reads/writes
Examples
- HDFS (Hadoop Distributed File System) - Open source
- GFS (Google File System) - Google's internal system
- Ceph - Modern distributed storage
- Amazon S3 - Object storage (DFS-like)
Architecture
graph TB
Client[Client] -->|1. Request| NN[NameNode<br/>Master]
NN -->|2. Metadata| Client
Client -->|3. Read/Write| DN1[DataNode 1]
Client -->|3. Read/Write| DN2[DataNode 2]
Client -->|3. Read/Write| DN3[DataNode 3]
DN1 -->|Heartbeat| NN
DN2 -->|Heartbeat| NN
DN3 -->|Heartbeat| NN
DN1 -.->|Replication| DN2
DN2 -.->|Replication| DN3
Components
1. NameNode (Master)
- Stores metadata (directory tree, file-to-block mapping)
- Coordinates operations
- Single point of failure (mitigated by standby)
2. DataNodes (Workers)
- Store actual data blocks
- Report to NameNode via heartbeats
- Handle read/write requests from clients
How It Works
1. File Chunking
Files split into large blocks (64MB-128MB).
Example: video.mp4 (1GB file) Chunk size: 128MB Result: - Chunk 0: bytes 0-134,217,727 - Chunk 1: bytes 134,217,728-268,435,455 - Chunk 2: bytes 268,435,456-402,653,183 - ... - Chunk 7: bytes 939,524,096-1,073,741,824 Total: 8 chunks
Why large blocks?
- Reduces metadata overhead
- Optimizes sequential reads
- Minimizes seek time
2. Replication
Each chunk replicated 3x (configurable) across different machines/racks.
Chunk 0 replicas: - Primary: DataNode 1 (Rack A) - Secondary: DataNode 5 (Rack A) - Tertiary: DataNode 12 (Rack B) Purpose: - Survive disk failures - Survive rack failures - Load balancing for reads
3. Metadata Management
NameNode stores in memory:
# Simplified metadata structure
metadata = {
"/user/data/video.mp4": {
"size": 1073741824, # 1GB
"blocks": [
{
"block_id": "blk_1234",
"size": 134217728,
"replicas": [
{"datanode": "dn1.example.com:50010", "rack": "/rack1"},
{"datanode": "dn5.example.com:50010", "rack": "/rack1"},
{"datanode": "dn12.example.com:50010", "rack": "/rack2"}
]
},
# ... 7 more blocks
],
"replication": 3,
"block_size": 134217728
}
}
Read Flow
1. Client: "Read /data/video.mp4" 2. NameNode responds: Block 0 → [DN1, DN5, DN12] Block 1 → [DN2, DN6, DN13] ... 3. Client connects directly to DataNodes: - Read Block 0 from DN1 (closest) - Read Block 1 from DN2 - ... - Reassemble file 4. NameNode NOT involved in data transfer (avoids bottleneck)
Implementation:
class HDFSClient:
def __init__(self, namenode_url):
self.namenode = namenode_url
def read_file(self, path):
# 1. Get block locations from NameNode
response = requests.post(f"{self.namenode}/getBlockLocations",
json={"path": path})
blocks = response.json()["blocks"]
# 2. Read each block from DataNodes
file_data = bytearray()
for block in blocks:
# Choose closest replica
datanode = self.choose_datanode(block["replicas"])
# Read block
block_data = self.read_block(datanode, block["block_id"])
file_data.extend(block_data)
return bytes(file_data)
def choose_datanode(self, replicas):
"""Choose closest/healthiest replica"""
# Prefer local DataNode
for replica in replicas:
if self.is_local(replica["datanode"]):
return replica["datanode"]
# Otherwise, choose randomly
return random.choice(replicas)["datanode"]
def read_block(self, datanode, block_id):
"""Read block from DataNode"""
response = requests.get(
f"http://{datanode}/readBlock",
params={"blockId": block_id}
)
return response.content
Write Flow
1. Client: "Write file.txt (300MB)" 2. Client asks NameNode: "Where should I write?" 3. NameNode responds: Block 0 → Write to [DN1, DN2, DN3] (pipeline) Block 1 → Write to [DN4, DN5, DN6] Block 2 → Write to [DN7, DN8, DN9] 4. Client writes Block 0: Client → DN1 → DN2 → DN3 (replication pipeline) 5. After all blocks written: Client → NameNode: "Commit writes" NameNode updates metadata
Replication Pipeline:
def write_block_with_replication(client_data, datanodes):
"""
Write block with pipelined replication
Flow:
Client → DN1 → DN2 → DN3
↓ ↓ ↓
disk disk disk
"""
# Client sends to first DataNode
pipeline = create_pipeline(datanodes)
# DN1 receives, writes to disk, forwards to DN2
# DN2 receives, writes to disk, forwards to DN3
# DN3 receives, writes to disk, sends ACK
# ACKs propagate back:
# DN3 → DN2 → DN1 → Client
send_to_datanode(datanodes[0], client_data, pipeline)
# Wait for ACK from entire pipeline
return wait_for_ack()
Fault Tolerance
1. DataNode Failure
Scenario: DN2 fails (disk crash) NameNode detects: - No heartbeat from DN2 for 10 minutes - Marks DN2 as dead Re-replication: - Finds under-replicated blocks - Block X had replicas on [DN1, DN2, DN5] - Now only [DN1, DN5] available - Instructs DN1 to replicate to DN8 - Replication factor restored to 3
2. NameNode Failure
Problem: NameNode is SPOF (Single Point of Failure)
Solutions:
A. Standby NameNode (HDFS HA)
Active NameNode ←→ Shared Edit Log ←→ Standby NameNode
(ZooKeeper)
Failover:
1. Active NameNode crashes
2. ZooKeeper detects failure
3. Standby promoted to Active
4. Standby reads shared edit log
5. Standby becomes new Active
Downtime: ~30 seconds
B. Metadata Backup
class NameNode:
def checkpoint_metadata(self):
"""Save metadata to disk periodically"""
fsimage = {
"namespace": self.namespace_tree,
"block_map": self.block_locations,
"replication_queues": self.replication_work
}
with open("/hdfs/fsimage", "wb") as f:
pickle.dump(fsimage, f)
# Also save edit log
self.edit_log.flush()
def recover_from_crash(self):
"""Recover from last checkpoint"""
# Load fsimage
with open("/hdfs/fsimage", "rb") as f:
state = pickle.load(f)
# Replay edit log since checkpoint
self.replay_edit_log()
# Wait for DataNodes to re-register
self.wait_for_datanodes()
HDFS vs GFS
| Feature | HDFS | GFS (Google) |
|---|---|---|
| Block Size | 128MB default | 64MB default |
| Replication | 3x default | 3x default |
| Master | NameNode | Master |
| Workers | DataNodes | Chunkservers |
| Language | Java | C++ |
| Open Source | Yes | No (design public) |
| Use Case | Hadoop ecosystem | Google internal |
Real-World Use Cases
1. Facebook (HDFS)
Problem: Store 300+ PB of photos/videos Solution: HDFS cluster Architecture: - 10,000+ DataNodes - Petabyte-scale storage - 1B+ reads/day - Optimized for write-once, read-many Results: - 99.9% availability - Cost: ~$0.01/GB/month (commodity hardware) - Scales horizontally (add more DataNodes)
2. Yahoo (HDFS)
Use case: Web crawl data storage Scale: - 42,000 nodes - 600 PB storage - Largest HDFS cluster (historically) Workload: - MapReduce jobs on web index - Log aggregation - Machine learning training data
3. Netflix (S3 + Custom DFS)
Strategy: Hybrid approach - S3 for video storage (durability) - Custom DFS for encoding/processing - HDFS for analytics Flow: 1. Upload raw video → S3 2. Encoding jobs read from S3 3. Encoded files → S3 (multiple bitrates) 4. Analytics on HDFS (viewing patterns)
Performance Optimization
1. Data Locality
class TaskScheduler:
def schedule_map_task(self, input_block):
"""Schedule task on node with data"""
# Get block locations
replicas = namenode.get_block_locations(input_block)
# Prefer node with data (avoid network transfer)
for replica in replicas:
if self.is_worker_available(replica.datanode):
return self.assign_task(replica.datanode, input_block)
# If data node busy, find nearby node (same rack)
for replica in replicas:
nearby_workers = self.get_rack_workers(replica.rack)
for worker in nearby_workers:
if self.is_worker_available(worker):
return self.assign_task(worker, input_block)
# Last resort: remote read
return self.assign_task(self.get_any_worker(), input_block)
Impact:
- Local read: 100 MB/s (memory/disk)
- Rack-local read: 10 Gb/s (network within rack)
- Cross-rack read: 1 Gb/s (inter-rack network)
2. Read Caching
class HDFSClient:
def __init__(self):
self.block_cache = LRUCache(capacity=1000) # Cache 1000 blocks
def read_block(self, block_id, datanode):
# Check cache first
if block_id in self.block_cache:
return self.block_cache[block_id]
# Read from DataNode
data = self.fetch_from_datanode(block_id, datanode)
# Cache for future reads
self.block_cache[block_id] = data
return data
Interview Tips 💡
When discussing distributed file systems in interviews:
- Problem: "Need to store 100TB - can't fit on one disk..."
- Chunking: "Split file into 128MB blocks, distribute across machines..."
- Replication: "Each block replicated 3x - survives disk/rack failures..."
- Master-worker: "NameNode stores metadata, DataNodes store actual data..."
- Read flow: "Client asks NameNode for locations, reads directly from DataNodes..."
- Scalability: "Add more DataNodes = more storage/throughput..."
- Real examples: "Facebook stores 300PB on HDFS, Yahoo had 42k-node cluster..."
Related Concepts
- MapReduce — Distributed processing on HDFS
- Object Storage — S3, GCS alternatives
- Replication — Data durability
- Sharding — Data partitioning
- CAP Theorem — Consistency trade-offs
About ScaleWiki
ScaleWiki is an interactive educational platform dedicated to demystifying distributed systems, software architecture, and system design. Our mission is to provide high-quality, technically accurate resources for software engineers preparing for interviews or solving complex scaling challenges in production.
Read more about our Editorial Guidelines & Authorship.
Educational Disclaimer: The architectural patterns and system designs discussed in this article are based on common industry practices, technical whitepapers, and public engineering blogs. Actual implementations in enterprise environments may vary significantly based on specific product requirements, legacy constraints, and evolving technologies.
Related Articles
Object Storage (S3 Design)
How systems like Amazon S3 store petabytes of data. Design internals, Erasure Coding vs Replication, Multipart Uploads, and Consistency Models.
ACID vs BASE: Consistency Models
The two philosophies of database transaction handling: Strict guarantees (ACID) versus flexible availability (BASE). Deep dive into isolation levels, transaction anomalies, and hybrid approaches.
Consistent Hashing
How to add/remove servers without moving every single key. The Ring, Virtual Nodes, and real-world usage in Cassandra, DynamoDB, and Discord.