  November 2019
Distributed Databases By Sudarshan



Distributed Database Design

• Three key issues: – Fragmentation • Relation may be divided into a number of subrelations, which are then distributed.

– Allocation • Each fragment is stored at site with “optimal” distribution.

– Replication. • Copy of fragment may be maintained at several sites.

Data Allocation • An allocation schema describes the allocation of fragments to sites of the DDBs. • It is mapping that specifies for each fragment the site at which it is stored. • If a fragment is stored at more than one site it is said to be replicated.

Distributed Catalog Management • Must keep track of how data is distributed across sites. • Must be able to name each replica of each n fragment. To preserve local autonomy: –

• Site Catalog: Describes all objects (fragments, replicas) at a site + Keeps track of replicas of relations created at this site. – To find a relation, look up its birth-site catalog.

Data Replication • Fully replicated : each fragment at each site • Partially replicated : each fragment at some of the sites • Types of replication – Synchronous Replication – Asynchronous Replication

• Rule of thumb: – If replication is advantageous, –


replication may cause

Synchronous Replication • All copies of a modified relation (fragment) must be updated before the modifying transaction commits. – Data distribution is made transparent to users.

• 2 techniques for synchronous replication – Voting – Read-any Write-all

Asynchronous Replication • Allows modifying transaction to commit before all copies have been changed (and readers nonetheless look at just one copy). • Copies of a modified relation are only periodically updated; different copies may get out of synch in the meantime. – Users must be aware of data distribution. – Current products follow this approach.

• 2 techniques for asynchronous replication – Primary Site Replication – Peer-to-Peer Replication

• Difference lies in how many copies are “updatable’’ or “master copies’’.

Techniques for Synchronous Replication • Voting: Transactions must write a majority of copies to modify an object; must read enough copies to be sure of seeing at least one most recent copy. – E.g., 10 copies; 7 written for update; 4 copies read. – Each copy has version number. – Not attractive usually because reads are common.

• Read-any Write-all: Writes are slower and reads are faster, relative to Voting. – Most common approach to synchronous replication.

• Choice of technique determines which locks to set.

Primary Site Replication • Exactly one copy of a relation is designated the primary or master copy. Replicas at other sites cannot be directly updated. – The primary copy is published. – Other sites subscribe to (fragments of) this relation; these are secondary copies.

• Main issue: How are changes to the primary copy propagated to the secondary copies? – Done in two steps.

• First, capture changes made by committed transactions; • Then apply these changes.

Implementing the Capture Step • Log-Based Capture: The log (kept for recovery) is used to generate a Change Data Table (CDT). – If this is done when the log tail is written to disk, it must somehow remove changes due to subsequently aborted transactions.

• Procedural Capture: A procedure that is automatically invoked (trigger) does the capture; typically, just takes a snapshot. • Log-Based Capture is better (cheaper, faster) but relies on proprietary log details.

Implementing the Apply Step • The Apply process at the secondary site periodically obtains (a snapshot or) changes to the CDT table from the primary site, and updates the copy. – Period can be timer-based or user/application defined.

• Replica can be a view over the modified relation!

– If so, the replication consists of incrementally updating the materialized view as the relation changes.

• Log-Based Capture plus continuous Apply minimizes delay in propagating changes. • Procedural Capture plus application-driven Apply is the most flexible way to process

Peer-to-Peer Replication • More than one of the copies of an object can be a master in this approach. • Changes to a master copy must be propagated to other copies somehow. • If two master copies are changed in a conflicting manner, this must be resolved. (e.g., Site 1: Joe’s age changed to 35; Site 2: to 36) • Best used when conflicts do not arise: – E.g., Each master site owns a disjoint fragment. – E.g., Updating rights owned by one master

Distributed Query Processing Sailors (sid : int, sname : str, rating : int, age : int) Reserves (sid : int, bid : int, day : date, rname : string) SELECT AVG(S.age) FROM Sailors S WHERE S.rating > 3 AND S.rating < 7 • Horizontally Fragmented: Tuples with rating < 5 at Mumbai, >= 5 at Delhi. – Must compute SUM(age), COUNT(age) at both sites. – If WHERE contained just S.rating>6, just one site.

• Vertically Fragmented: sid and rating at Mumbai, sname and age at Delhi, tid at both.

– Must reconstruct relation by join on tid, then evaluate the query.

• Replicated: Sailors copies at both sites.

– Choice of site based on local costs, shipping costs.

Distributed Query Optimization • Cost-based approach; consider all plans, pick cheapest; similar to centralized optimization. – Difference 1: Communication costs must be considered. – Difference 2: Local site autonomy must be respected. – Difference 3: New distributed join methods.

• Query site constructs global plan, with suggested local plans describing processing at each site.

Distributed Query Processing – Example

Distributed Query Processing – Example • This query will return 10,000 records (every employee belongs to one department). • Each record will be 40 bytes long (FNAME + LNAME + DNAME = 15 + 15 + 10 = 40). • Thus the result set will be 400,000 bytes. • Assume cost to transfer query text between nodes can be safely ignored.

Distributed Query Processing – Example • Three alternatives: – Copy all EMPLOYEE and DEPARTMENT records to node 3. Perform the join and display the results. Total Cost = 1,000,000 + 3,500 = 1,003,500 bytes – Copy all EMPLOYEE records (1,000,000 bytes) from node 1 to node 2. Perform the join, then ship the results (400,000 bytes) to node 3. Total cost = 1,000,000 + 400,000 = 1,400,000 bytes – Copy all DEPARTMENT records (3,500) from node 2 to node 1. Perform the join. Ship the results from node 1 to node 3 (400,000). Total cost = 3,500 + 400,000 = 403,500 bytes

Distributed Query Processing – Example • For each department, retrieve the department name and the name of the department manager. ∏FNAME,LNAME,DNAME (DEPARTMENT XMGRSSN=SSN EMPLOYEE) • Total no of tuples in relation = 100. each of 40 bytes. • Transfer both the relations to site 3 and perform join Total size = 1,000,000 + 3500 = 1,003,500 bytes • Transfer EMPLOYEE relation to site 2 and perform join. Send the results to site 3. Total size = 1,000,000 + 4000 = 1,004,000 bytes • Transfer DEPARTMENT relation to site 1 and perform join. Send the results to site 3. Total size = 3500 + 4000 = 7500 bytes

Distributed Query Processing – Example • Taking the same example: – Copy just the FNAME, LNAME and DNO columns from Site 1 to Site 3 (cost = 34 bytes times 10,000 records = 340,000 bytes) – Copy just the DNUMBER and DNAME columns from site 2 to site 3 (cost = 14 bytes times 100 records = 1,400 bytes) – Perform the join at site 3 and display the results. Total cost = 341,400

Semi-Join • The semijoin of r1 with r2, is denoted by: r1


• The idea is to reduce the number of tuples in a relation before transferring it to another site. • Send the joining column of a relation R (say r1) to a site where the other relation S (say r2) is located, and perform a join with r2. • Then, the join attribute along with the other required attributes are projected and send to the original site. • A join operation is performed at this site.

Semi-Join Example London Site: Sailors (sid: int, sname: str, rating: int, age: int) Paris Site: Reserves (sid: int, bid: int, day: date, rname: string) • At London, project Sailors onto join columns and ship this to Paris. • At Paris, join Sailors projection with Reserves. – Result is called reduction of Reserves wrt Sailors.

• Ship reduction of Reserves to London. • At London, join Sailors with reduction of Reserves. • Especially useful if there is a selection on

Semi-Join Example • Project the join attribute of DEPARTMENT at site 2, and transfer them at site 1. F = ∏DNUMBER (DEPARTMENT) Size = 4*100 = 400 bytes • Join the transferred file with EMPLOYEE relation at site 1, and transfer the required attributes from the resulting file to site 2. R= ∏DNO,FNAME.LNAME (F X DNUMBER=DNO EMPLOYEE) Size = 34*10,000= 340,000 bytes. • Execute a join of transferred file R with DEPARTMENT, and present the result to site 3. Size = 400,000 bytes Total Size = 400+340,000+400,000 = 740,400 bytes

Semi-Join Example • Project the join attribute of DEPARTMENT at site 2, and transfer them at site 1. F = ∏MGRSSN (DEPARTMENT) Size = 9*100 = 900 bytes • Join the transferred file with EMPLOYEE relation at site 1, and transfer the required attributes from the resulting file to site 2. R= ∏MGRSSN,FNAME.LNAME (F X MGRSSN = SSN EMPLOYEE) Size = 39*100= 3,900 bytes. • Execute a join of transferred file R with DEPARTMENT, and present the result to site 3. Size = 4,000 bytes Total Size = 900+3,900+4,000 = 8,800 bytes

Joins - Fetch as Needed • Perform a page oriented Nested Loop Join in London with Sailors as the outer and for each Sailors page, fetch all Reserves pages from Paris. • Cache all the pages. • Fetch as Needed,

– Cost: 500 D + 500 * 1000 (D+S) – D is cost to read/write page; S is cost to ship page. – If query was not submitted at London, must add cost of shipping result to query site.

• Can also do Indexed Nested Loop Join at London, fetching matching Reserves tuples to London as needed.

Joins - Ship to One Site • Transfer Reserves to London. – Cost: 1000 S + 4500 D

• Transfer Sailors to Paris. – Cost: 500 S + 4500 D

• If result size is very large, may be better to ship both relations to result site and then join them!

Bloomjoins • At London, compute a bit-vector of some size k: – Hash join column values into range 0 to k-1. – If some tuple hashes to i, set bit i to 1 (i from 0 to k-1). – Ship bit-vector to Paris.

• At Paris, hash each tuple of Reserves similarly, and discard tuples that hash to 0 in Sailors bit-vector. – Result is called reduction of Reserves wrt Sailors.

• Ship bit-vector reduced Reserves to London. • At London, join Sailors with reduced Reserves.

Distributed Transactions • Distributed Concurrency Control: – How can locks for objects stored across several sites be managed. – How can deadlocks be detected in a distributed database

• Distributed Recovery: – Transaction atomicity must be ensured.

Distributed Transactions

Concurrency Control and Recovery • Dealing with multiple copies of the data items • Failure of individual sites • Failure of communication links • Distributed commit • Distributed deadlock

Distributed Locking • How do we manage locks for objects across many sites? – Centralized: One site does all locking. • Vulnerable to single site failure.

– Primary Copy: All locking for an object done at the primary copy site for this object. • Reading requires access to locking site as well as site where the object is stored.

– Fully Distributed: Locking for a copy done at site where the copy is stored. • Locks at all sites while writing an object.

• Obtaining and releasing of locks is determined by the concurrency control protocol.

Deadlock Handling Consider the following two transactions and history, with item X and transaction T1 at site 1, and item Y and transaction T2 (X) at site 2: T: write T: write (Y) 1


write (Y) X-lock on X write (X)

write (X) X-lock on Y write (Y) wait for X-lock on X

Wait for X-lock on Y Result: deadlock which cannot be detected locally at either site

Local and Global Wait-For Graphs



Distributed Deadlock – Solution • Three solutions: – Centralized (send all local graphs to one site); – Hierarchical (organize sites into a hierarchy and send local graphs to parent in the hierarchy); – Timeout (abort transaction if it waits too long).

Centralized Approach • A global wait-for graph is constructed and maintained in a single site; the deadlockdetection coordinator – Real graph: Real, but unknown, state of the system. – Constructed graph:Approximation generated by the controller during the execution of its algorithm .

• The global wait-for graph can be constructed when: – a new edge is inserted in or removed from one of the local wait-for graphs. – a number of changes have occurred in a local wait-for graph.

Example Wait-For Graph for False Cycles Initial state:

False Cycles (Cont.) • Suppose that starting from the state shown in figure, 1. T2 releases resources at S1 • resulting in a message remove T1 → T2 message from the Transaction Manager at site S1 to the coordinator)

2. And then T2 requests a resource held by T3 at site S2 • resulting in a message insert T2 → T3 from S2 to the coordinator

• Suppose further that the insert message reaches before the delete message (this can happen due to network delays) • The coordinator would then find a false cycle T1 → T2 → T3 → T1

Unnecessary Rollbacks • Unnecessary rollbacks may result when deadlock has indeed occurred and a victim has been picked, and meanwhile one of the transactions was aborted for reasons unrelated to the deadlock. • Unnecessary rollbacks can result from false cycles in the global wait-for graph; however, likelihood of false cycles is low.

Distributed Recovery • Two new issues: – New kinds of failure, e.g., links and remote sites. – If “sub-transactions” of an transaction+ execute at different sites, all or none must commit. Need a commit protocol to achieve this.

• A log is maintained at each site, as in a centralized DBMS, and commit protocol actions are additionally logged.

Coordinator Selection • Backup coordinators – site which maintains enough information locally to assume the role of coordinator if the actual coordinator fails – executes the same algorithms and maintains the same internal state information as the actual coordinator fails executes state information as the actual coordinator – allows fast recovery from coordinator failure but involves overhead during normal processing.

• Election algorithms – used to elect a new coordinator in case of failures – Example: Bully Algorithm - applicable to systems where every site can send a message to every

Bully Algorithm • If site Si sends a request that is not answered by the coordinator within a time interval T, assume that the coordinator has failed Si tries to elect itself as the new coordinator. • Si sends an election message to every site with a higher identification number, Si then waits for any of these processes to answer within T. • If no response within T, assume that all sites with number greater than i have failed, Si elects itself the new coordinator. • If answer is received Si begins time interval T’, waiting to receive a message that a site with a higher identification number has been elected.

Bully Algorithm • If no message is sent within T’, assume the site with a higher number has failed; Si restarts the algorithm. • After a failed site recovers, it immediately begins execution of the same algorithm. • If there are no active sites with higher numbers, the recovered site forces all processes with lower numbers to let it become the coordinator site, even if there is a currently active coordinator with a lower number.

Distributed Concurrency Control • Idea is to designate a particular copy of each data item as a distinguished copy. • The locks for this data item are associated with the distinguished copy and all the locking and unlocking requests are sent to the site that contains the copy. • Methods for concurrency control – – – –

Primary Site Technique Primary Site with Backup Site Primary Copy Technique Voting

Distributed Concurrency Control • Primary site technique – A single site is designated to be coordinator site for all database items – All locks are kept at this site. – All requests are sent at this site.

• Advantages – Simple extension of centralized approach

• Disadvantages – Performance bottleneck – Failure of primary site

Distributed Concurrency Control • Primary site with backup site – Overcomes the second disadvantage of primary site technique – All locking information maintained at the primary as well as backup site. – In case of failure of primary site, backup site takes the control and becomes a primary site. – It also chooses a site as a backup site and copies the lock information.

Distributed Concurrency Control • Primary copy technique – Attempts to distribute load of lock coordination by having distinguished copies of different data items stored at different sites. – Failure of a site affects transactions that access locks on that particular site. – Other transactions can continue to run. – Can use the method of backup to increase availability and reliability.

Distributed Concurrency Control • Based on voting – To lock a data item: • Send a message to all nodes that maintain a replica of this item. • If a node can safely lock the item, then vote "Yes", otherwise, vote "No". • If a majority of participating nodes vote "Yes" then the lock is granted. • Send the results of the vote back out to all participating sites.

Normal Execution and Commit Protocols • Commit protocols are used to ensure atomicity across sites – a transaction which executes at multiple sites must either be committed at all the sites, or aborted at all the sites. – not acceptable to have a transaction committed at one site and aborted at another

• The two-phase commit (2PC) protocol is widely used • The three-phase commit (3PC) protocol is more complicated and more expensive, but avoids some drawbacks of two-phase commit protocol. This protocol is not used in practice.

Two-Phase Commit (2PC) • Site at which transaction originates is coordinator; other sites at which it executes are subordinates. • When an transaction wants to commit: – Coordinator sends prepare msg to each subordinate. – Subordinate force-writes an abort or prepare log record and then sends a no or yes msg to coordinator. – If coordinator gets unanimous yes votes, forcewrites a commit log record and sends commit msg to all subs. Else, force-writes abort log rec, and sends abort msg. – Subordinates force-write abort/commit log rec based on msg they get, then send ack msg to

Two-Phase Commit (2PC) • Two rounds of communication: first, voting; then, termination. Both initiated by coordinator. • Any site can decide to abort an transaction. • Every message reflects a decision by the sender; to ensure that this decision survives failures, it is first recorded in the local log. • All commit protocol log records for an transactions contain Transaction_id and Coordinator_id. The coordinator’s

Handling of Failures - Site Failure When site Si recovers, it examines its log to determine the fate of transactions active at the time of the failure. • Log contain record: site executes redo (T) • Log contains record: site executes undo (T) • Log contains record: site must consult Ci to determine the fate of T. – If T committed, redo (T) – If T aborted, undo (T)

• The log contains no control records concerning T replies that Sk failed before responding to the prepare T message from Ci – since the failure of Sk precludes the sending of such a

Handling of Failures- Coordinator Failure • If coordinator fails while the commit protocol for T is executing then participating sites must decide on T’s fate: ★ If an active site contains a record in its log, then T must be committed. ★ If an active site contains an record in its log, then T must be aborted. ★ If some active participating site does not contain a record in its log, then the failed coordinator Ci cannot have decided to commit T. Can therefore abort T. ★ If none of the above cases holds, then all active sites must have a record in their logs, but no additional control records (such as of ). In this case active sites must wait for Ci to recover, to find decision.

• Blocking problem : active sites may have to wait for failed coordinator to recover.

Handling of Failures - Network Partition • If the coordinator and all its participants remain in one partition, the failure has no effect on the commit protocol. • If the coordinator and its participants belong to several partitions: – Sites that are not in the partition containing the coordinator think the coordinator has failed, and execute the protocol to deal with failure of the coordinator. • No harm results, but sites may still have to wait for decision from coordinator.

• The coordinator and the sites are in the same partition as the coordinator think that the sites in the other partition have failed, and follow the usual commit protocol. • Again, no harm results

Recovery and Concurrency Control • In-doubt transactions have a , but neither a , nor an log record. • The recovering site must determine the commit-abort status of such transactions by contacting other sites; this can slow and potentially block recovery. • Recovery algorithms can note lock information in the log. – Instead of , write out L = list of locks held by T when the log is written (read locks can be omitted). – For every in-doubt transaction T, all the locks noted in the log record are reacquired.

• After lock reacquisition, transaction processing can resume; the commit or rollback of in-doubt transactions is performed concurrently with the execution of new transactions.

Restart after a Failure • If we have a commit or abort log record for transaction T, but not an end record, must redo/undo T. – If this site is the coordinator for T, keep sending commit/abort msgs to subs until acks received.

• If we have a prepare log record for transaction T, but not commit/abort, this site is a subordinate for T. – Repeatedly contact the coordinator to find status of T, then write commit/abort log record; redo/undo T; and write end log record.

• If we don’t have even a prepare log record for T, unilaterally abort and undo T. – This site may be coordinator! If so, subs may send msgs.

Observations on 2PC • Ack msgs used to let coordinator know when it can “forget” an transaction; until it receives all acks, it must keep T in the transaction Table. • If coordinator fails after sending prepare msgs but before writing commit/abort log records, when it comes back up it aborts the transaction . • If a sub-transaction does no updates, its commit or abort status is irrelevant.

2PC with Presumed Abort • When coordinator aborts T, it undoes T and removes it from the transaction Table immediately. – Doesn’t wait for acks; “presumes abort” if transaction not in transaction Table. Names of subs not recorded in abort log rec.

• Subordinates do not send acks on abort. • If sub- transaction does not do updates, it responds to prepare msg with reader instead of yes/no. • Coordinator subsequently ignores readers. • If all sub- transaction are readers, 2nd phase not needed.

Three Phase Commit (3PC) • Assumptions: – No network partitioning – At any point, at least one site must be up. – At most K sites (participants as well as coordinator) can fail

• Phase 1: Obtaining Preliminary Decision: Identical to 2PC Phase 1. – Every site is ready to commit if instructed to do so

Three-Phase Commit (3PC) • Phase 2 of 2PC is split into 2 phases, Phase 2 and Phase 3 of 3PC – In phase 2 coordinator makes a decision as in 2PC (called the pre-commit decision) and records it in multiple (at least K) sites – In phase 3, coordinator sends commit/abort message to all participating sites,

• Under 3PC, knowledge of pre-commit decision can be used to commit despite coordinator failure – Avoids blocking problem as long as < K sites fail

• Drawbacks: – higher overheads – assumptions may not be satisfied in practice

