Back to All Concepts
StorageBig DataInfrastructureAdvanced

Distributed File Systems

Complete guide to storing petabytes across thousands of machines using distributed file systems like HDFS and GFS, covering chunking, replication, master-slave architecture, and implementation patterns powering Google, Facebook, and Hadoop ecosystems.

Beyond the Hard Drive

The problem: You have 100TB of video data. A single disk holds 2TB. How do you store and access it efficiently?

Solution: Distributed File System (DFS) - Makes thousands of disks look like one giant drive.


What is a Distributed File System?

A DFS stores massive files across multiple machines, providing:

  • Scalability: Add more machines = more storage
  • Fault tolerance: Data replicated across machines
  • High throughput: Parallel reads/writes

Examples

  • HDFS (Hadoop Distributed File System) - Open source
  • GFS (Google File System) - Google's internal system
  • Ceph - Modern distributed storage
  • Amazon S3 - Object storage (DFS-like)

Architecture

mermaid
graph TB
    Client[Client] -->|1. Request| NN[NameNode<br/>Master]
    NN -->|2. Metadata| Client
    Client -->|3. Read/Write| DN1[DataNode 1]
    Client -->|3. Read/Write| DN2[DataNode 2]
    Client -->|3. Read/Write| DN3[DataNode 3]
    
    DN1 -->|Heartbeat| NN
    DN2 -->|Heartbeat| NN
    DN3 -->|Heartbeat| NN
    
    DN1 -.->|Replication| DN2
    DN2 -.->|Replication| DN3
Click to expand code...

Components

1. NameNode (Master)

  • Stores metadata (directory tree, file-to-block mapping)
  • Coordinates operations
  • Single point of failure (mitigated by standby)

2. DataNodes (Workers)

  • Store actual data blocks
  • Report to NameNode via heartbeats
  • Handle read/write requests from clients

How It Works

1. File Chunking

Files split into large blocks (64MB-128MB).

Example: video.mp4 (1GB file)

Chunk size: 128MB
Result:
  - Chunk 0: bytes 0-134,217,727
  - Chunk 1: bytes 134,217,728-268,435,455
  - Chunk 2: bytes 268,435,456-402,653,183
  - ...
  - Chunk 7: bytes 939,524,096-1,073,741,824

Total: 8 chunks
Click to expand code...

Why large blocks?

  • Reduces metadata overhead
  • Optimizes sequential reads
  • Minimizes seek time

2. Replication

Each chunk replicated 3x (configurable) across different machines/racks.

Chunk 0 replicas:
  - Primary: DataNode 1 (Rack A)
  - Secondary: DataNode 5 (Rack A)
  - Tertiary: DataNode 12 (Rack B)

Purpose:
  - Survive disk failures
  - Survive rack failures
  - Load balancing for reads
Click to expand code...

3. Metadata Management

NameNode stores in memory:

python
# Simplified metadata structure
metadata = {
    "/user/data/video.mp4": {
        "size": 1073741824,  # 1GB
        "blocks": [
            {
                "block_id": "blk_1234",
                "size": 134217728,
                "replicas": [
                    {"datanode": "dn1.example.com:50010", "rack": "/rack1"},
                    {"datanode": "dn5.example.com:50010", "rack": "/rack1"},
                    {"datanode": "dn12.example.com:50010", "rack": "/rack2"}
                ]
            },
            # ... 7 more blocks
        ],
        "replication": 3,
        "block_size": 134217728
    }
}
Click to expand code...

Read Flow

1. Client: "Read /data/video.mp4"
   
2. NameNode responds:
   Block 0 → [DN1, DN5, DN12]
   Block 1 → [DN2, DN6, DN13]
   ...
   
3. Client connects directly to DataNodes:
   - Read Block 0 from DN1 (closest)
   - Read Block 1 from DN2
   - ...
   - Reassemble file
   
4. NameNode NOT involved in data transfer (avoids bottleneck)
Click to expand code...

Implementation:

python
class HDFSClient:
    def __init__(self, namenode_url):
        self.namenode = namenode_url
        
    def read_file(self, path):
        # 1. Get block locations from NameNode
        response = requests.post(f"{self.namenode}/getBlockLocations", 
                                json={"path": path})
        blocks = response.json()["blocks"]
        
        # 2. Read each block from DataNodes
        file_data = bytearray()
        
        for block in blocks:
            # Choose closest replica
            datanode = self.choose_datanode(block["replicas"])
            
            # Read block
            block_data = self.read_block(datanode, block["block_id"])
            file_data.extend(block_data)
        
        return bytes(file_data)
    
    def choose_datanode(self, replicas):
        """Choose closest/healthiest replica"""
        # Prefer local DataNode
        for replica in replicas:
            if self.is_local(replica["datanode"]):
                return replica["datanode"]
        
        # Otherwise, choose randomly
        return random.choice(replicas)["datanode"]
    
    def read_block(self, datanode, block_id):
        """Read block from DataNode"""
        response = requests.get(
            f"http://{datanode}/readBlock",
            params={"blockId": block_id}
        )
        return response.content
Click to expand code...

Write Flow

1. Client: "Write file.txt (300MB)"
   
2. Client asks NameNode: "Where should I write?"
   
3. NameNode responds:
   Block 0 → Write to [DN1, DN2, DN3] (pipeline)
   Block 1 → Write to [DN4, DN5, DN6]
   Block 2 → Write to [DN7, DN8, DN9]
   
4. Client writes Block 0:
   Client → DN1 → DN2 → DN3 (replication pipeline)
   
5. After all blocks written:
   Client → NameNode: "Commit writes"
   NameNode updates metadata
Click to expand code...

Replication Pipeline:

python
def write_block_with_replication(client_data, datanodes):
    """
    Write block with pipelined replication
    
    Flow:
    Client → DN1 → DN2 → DN3
             ↓     ↓     ↓
           disk  disk  disk
    """
    
    # Client sends to first DataNode
    pipeline = create_pipeline(datanodes)
    
    # DN1 receives, writes to disk, forwards to DN2
    # DN2 receives, writes to disk, forwards to DN3
    # DN3 receives, writes to disk, sends ACK
    
    # ACKs propagate back:
    # DN3 → DN2 → DN1 → Client
    
    send_to_datanode(datanodes[0], client_data, pipeline)
    
    # Wait for ACK from entire pipeline
    return wait_for_ack()
Click to expand code...

Fault Tolerance

1. DataNode Failure

Scenario: DN2 fails (disk crash)

NameNode detects:
  - No heartbeat from DN2 for 10 minutes
  - Marks DN2 as dead
  
Re-replication:
  - Finds under-replicated blocks
  - Block X had replicas on [DN1, DN2, DN5]
  - Now only [DN1, DN5] available
  - Instructs DN1 to replicate to DN8
  - Replication factor restored to 3
Click to expand code...

2. NameNode Failure

Problem: NameNode is SPOF (Single Point of Failure)

Solutions:

A. Standby NameNode (HDFS HA)

Active NameNode ←→ Shared Edit Log ←→ Standby NameNode
                      (ZooKeeper)

Failover:
1. Active NameNode crashes
2. ZooKeeper detects failure
3. Standby promoted to Active
4. Standby reads shared edit log
5. Standby becomes new Active
Downtime: ~30 seconds
Click to expand code...

B. Metadata Backup

python
class NameNode:
    def checkpoint_metadata(self):
        """Save metadata to disk periodically"""
        fsimage = {
            "namespace": self.namespace_tree,
            "block_map": self.block_locations,
            "replication_queues": self.replication_work
        }
        
        with open("/hdfs/fsimage", "wb") as f:
            pickle.dump(fsimage, f)
        
        # Also save edit log
        self.edit_log.flush()
    
    def recover_from_crash(self):
        """Recover from last checkpoint"""
        # Load fsimage
        with open("/hdfs/fsimage", "rb") as f:
            state = pickle.load(f)
        
        # Replay edit log since checkpoint
        self.replay_edit_log()
        
        # Wait for DataNodes to re-register
        self.wait_for_datanodes()
Click to expand code...

HDFS vs GFS

FeatureHDFSGFS (Google)
Block Size128MB default64MB default
Replication3x default3x default
MasterNameNodeMaster
WorkersDataNodesChunkservers
LanguageJavaC++
Open SourceYesNo (design public)
Use CaseHadoop ecosystemGoogle internal

Real-World Use Cases

1. Facebook (HDFS)

Problem: Store 300+ PB of photos/videos
Solution: HDFS cluster

Architecture:
- 10,000+ DataNodes
- Petabyte-scale storage
- 1B+ reads/day
- Optimized for write-once, read-many

Results:
- 99.9% availability
- Cost: ~$0.01/GB/month (commodity hardware)
- Scales horizontally (add more DataNodes)
Click to expand code...

2. Yahoo (HDFS)

Use case: Web crawl data storage
Scale:
- 42,000 nodes
- 600 PB storage
- Largest HDFS cluster (historically)

Workload:
- MapReduce jobs on web index
- Log aggregation
- Machine learning training data
Click to expand code...

3. Netflix (S3 + Custom DFS)

Strategy: Hybrid approach
- S3 for video storage (durability)
- Custom DFS for encoding/processing
- HDFS for analytics

Flow:
1. Upload raw video → S3
2. Encoding jobs read from S3
3. Encoded files → S3 (multiple bitrates)
4. Analytics on HDFS (viewing patterns)
Click to expand code...

Performance Optimization

1. Data Locality

python
class TaskScheduler:
    def schedule_map_task(self, input_block):
        """Schedule task on node with data"""
        
        # Get block locations
        replicas = namenode.get_block_locations(input_block)
        
        # Prefer node with data (avoid network transfer)
        for replica in replicas:
            if self.is_worker_available(replica.datanode):
                return self.assign_task(replica.datanode, input_block)
        
        # If data node busy, find nearby node (same rack)
        for replica in replicas:
            nearby_workers = self.get_rack_workers(replica.rack)
            for worker in nearby_workers:
                if self.is_worker_available(worker):
                    return self.assign_task(worker, input_block)
        
        # Last resort: remote read
        return self.assign_task(self.get_any_worker(), input_block)
Click to expand code...

Impact:

  • Local read: 100 MB/s (memory/disk)
  • Rack-local read: 10 Gb/s (network within rack)
  • Cross-rack read: 1 Gb/s (inter-rack network)

2. Read Caching

python
class HDFSClient:
    def __init__(self):
        self.block_cache = LRUCache(capacity=1000)  # Cache 1000 blocks
    
    def read_block(self, block_id, datanode):
        # Check cache first
        if block_id in self.block_cache:
            return self.block_cache[block_id]
        
        # Read from DataNode
        data = self.fetch_from_datanode(block_id, datanode)
        
        # Cache for future reads
        self.block_cache[block_id] = data
        
        return data
Click to expand code...

Interview Tips 💡

When discussing distributed file systems in interviews:

  1. Problem: "Need to store 100TB - can't fit on one disk..."
  2. Chunking: "Split file into 128MB blocks, distribute across machines..."
  3. Replication: "Each block replicated 3x - survives disk/rack failures..."
  4. Master-worker: "NameNode stores metadata, DataNodes store actual data..."
  5. Read flow: "Client asks NameNode for locations, reads directly from DataNodes..."
  6. Scalability: "Add more DataNodes = more storage/throughput..."
  7. Real examples: "Facebook stores 300PB on HDFS, Yahoo had 42k-node cluster..."

Related Concepts

About ScaleWiki

ScaleWiki is an interactive educational platform dedicated to demystifying distributed systems, software architecture, and system design. Our mission is to provide high-quality, technically accurate resources for software engineers preparing for interviews or solving complex scaling challenges in production.

Read more about our Editorial Guidelines & Authorship.

Educational Disclaimer: The architectural patterns and system designs discussed in this article are based on common industry practices, technical whitepapers, and public engineering blogs. Actual implementations in enterprise environments may vary significantly based on specific product requirements, legacy constraints, and evolving technologies.

Related Articles