Fixing lock issues
Dependencies
- [2]
ENEREHTWRestart and rewind the server during failover (requires policykit) - [3]
DSKXOP5QREADME + formatting + argument checking - [4]
6Z6PYXTSTesting and debugging - [5]
NPSWSVZNInit - [6]
QISO3HXBDebug++
Change contents
- edit in src/main.rs at line 65
let mut err: Option<String> = None; - edit in src/main.rs at line 67
debug!("reconnect"); - replacement in src/main.rs at line 71
*db_.lock().await = Some(db);err = None;debug!("reconnected");{*db_.lock().await = Some(db);}debug!("waiting for connection to end"); - edit in src/main.rs at line 79
{*db_.lock().await = None;} - replacement in src/main.rs at line 84
error!("{:?}", e);let ee = format!("{}", e);if let Some(ee_) = err.take() {if ee_ != ee {error!("{}", e);err = Some(ee)} else {err = Some(ee_)}} else {error!("{}", e);err = Some(ee)} - replacement in src/main.rs at line 112
let mut race = tokio::spawn(race(host.clone(), is_leader.clone()));let mut race = tokio::spawn(race(host.clone(),path.clone(),is_leader.clone(),db.clone(),)); - edit in src/main.rs at line 160
trace!("handle: {:?} {:?}", is_leader.borrow(), name); - replacement in src/main.rs at line 170
async fn race(host: String, mut leader: Receiver<String>) -> Result<(), anyhow::Error> {async fn race(host: String,path: String,mut leader: Receiver<String>,pool: Arc<Mutex<Option<tokio_postgres::Client>>>,) -> Result<(), anyhow::Error> { - edit in src/main.rs at line 197
let has_db = pool.lock().await.is_some();debug!("has_db = {:?}", has_db);if !has_db {tokio::time::sleep(std::time::Duration::from_secs(1)).await;continue;} - edit in src/main.rs at line 204
debug!("lock = {:?}", lock); - replacement in src/main.rs at line 205[2.1787]→[3.234:289](∅→∅),[3.5292]→[3.234:289](∅→∅),[3.289]→[2.1788:1890](∅→∅),[2.1890]→[3.383:394](∅→∅),[3.383]→[3.383:394](∅→∅)
info!("Lock acquired: {:?} {:?}",std::str::from_utf8(leader_key.name()),std::str::from_utf8(leader_key.key()));debug!("Lock acquired {:?}", leader_key);promote(&path).await; - edit in src/main.rs at line 233
let mut was_leader = None; - edit in src/main.rs at line 235
let mut was_leader = {if let Some(pool_) = pool.lock().await.take() {let row = pool_.query_one("SELECT pg_is_in_recovery()", &[]).await?;let is_in_recovery: bool = row.get(0);*pool.lock().await = Some(pool_);Some(!is_in_recovery)} else {None}};debug!("was_leader = {:?}", was_leader); - edit in src/main.rs at line 236
debug!("obs: {:?}", m); - replacement in src/main.rs at line 237
debug!("leader = {:?}", leader);debug!("leader = {:?}, host = {:?}", leader, host); - edit in src/main.rs at line 239[2.2700]→[2.2700:2724](∅→∅),[2.2724]→[3.6446:6495](∅→∅),[3.6446]→[3.6446:6495](∅→∅),[3.6589]→[3.6589:6643](∅→∅),[3.6738]→[3.6738:6748](∅→∅)
if is_leader {info!("The leader is {:?}", leader);} else {info!("I'm the leader");} - replacement in src/main.rs at line 241
if let Some(pool_) = pool.lock().await.take() {let pool_ = { pool.lock().await.take() };if let Some(pool_) = pool_ { - edit in src/main.rs at line 247
debug!("is_leader = {:?}, was_leader = {:?}", is_leader, was_leader); - replacement in src/main.rs at line 250
} else if !is_leader && was_leader == Some(true) {rewind(&path, port, &db_name, &leader).await} else if !is_leader && was_leader != Some(false) {let pool_ = { pool.lock().await.take() };let recov: bool = if let Some(pool_) = pool_ {let row = pool_.query_one("SELECT pg_is_in_recovery()", &[]).await?;*pool.lock().await = Some(pool_);row.get(0)} else {false};debug!("recov = {:?}", recov);if !recov {rewind(&path, port, &db_name, leader).await;} - edit in src/main.rs at line 271
debug!("promoting"); - replacement in src/main.rs at line 273
Command::new("pg_ctl")let out = Command::new("pg_ctl") - edit in src/main.rs at line 278
use std::io::Write;std::io::stdout().write_all(&out.stdout).unwrap();std::io::stderr().write_all(&out.stderr).unwrap(); - edit in src/main.rs at line 284
debug!("rewinding"); - replacement in src/main.rs at line 286
Command::new("systemctl")let out = Command::new("systemctl") - replacement in src/main.rs at line 291
Command::new("pg_rewind").args(["-D",&path,"-R","--source-server",&format!("port={} user=postgres dbname={} host={}",port, db_name, leader),]).output().await.expect("failed to execute process");Command::new("systemctl")use std::io::Write;std::io::stdout().write_all(&out.stdout).unwrap();std::io::stderr().write_all(&out.stderr).unwrap();let mut success = false;while !success {// Waiting for source to promote.let out = Command::new("pg_rewind").args(["-D",&path,"-R","--source-server",&format!("port={} user=postgres dbname={} host={}",port, db_name, leader),]).output().await.expect("failed to execute process");std::io::stdout().write_all(&out.stdout).unwrap();std::io::stderr().write_all(&out.stderr).unwrap();success = out.status.success();tokio::time::sleep(std::time::Duration::from_secs(1)).await;}let out = Command::new("systemctl") - edit in src/main.rs at line 321
std::io::stdout().write_all(&out.stdout).unwrap();std::io::stderr().write_all(&out.stderr).unwrap(); - replacement in README.md at line 3
Postrep uses a fixed replication strategy (streaming replication), a fixed leader election tool (Etcd 3), to achieve proper failover when the cluster leader fails.Postrep uses a leader election tool (Etcd 3), to achieve proper failover when the cluster leader fails, in the context of PostgreSQL's streaming replication feature. - edit in README.md at line 6
PostgreSQL has multiple ways to operate in a "leader-replicae" mode. However, failover isn't automatic, and requires another protocol to decide which of the replicae becomes the leader when the previous leader fails. - replacement in README.md at line 7
This crate solves this problem, in the particular configuration where one has an Etcd v3 server running, for example because of requirements from another server running on the same service.Failover between the leader and replica mode of PostgreSQL isn't automatic, and requires another protocol to decide which of the replicae becomes the leader when the previous leader fails.