Fixing lock issues

pmeunier
Jan 18, 2022, 4:39 PM
FIIWK33FEEUURH7QGHQPHGQHSVN7XXLDDPLI5CKKWRK2DGIAHLGAC

Dependencies

  • [2] ENEREHTW Restart and rewind the server during failover (requires policykit)
  • [3] DSKXOP5Q README + formatting + argument checking
  • [4] 6Z6PYXTS Testing and debugging
  • [5] NPSWSVZN Init
  • [6] QISO3HXB Debug++

Change contents

  • edit in src/main.rs at line 65
    [3.128]
    [3.163]
    let mut err: Option<String> = None;
  • edit in src/main.rs at line 67
    [3.178][3.178:211]()
    debug!("reconnect");
  • replacement in src/main.rs at line 71
    [2.850][2.850:900]()
    *db_.lock().await = Some(db);
    [2.850]
    [2.900]
    err = None;
    debug!("reconnected");
    {
    *db_.lock().await = Some(db);
    }
    debug!("waiting for connection to end");
  • edit in src/main.rs at line 79
    [2.1003]
    [3.422]
    {
    *db_.lock().await = None;
    }
  • replacement in src/main.rs at line 84
    [2.1032][2.1032:1071]()
    error!("{:?}", e);
    [2.1032]
    [2.1071]
    let ee = format!("{}", e);
    if let Some(ee_) = err.take() {
    if ee_ != ee {
    error!("{}", e);
    err = Some(ee)
    } else {
    err = Some(ee_)
    }
    } else {
    error!("{}", e);
    err = Some(ee)
    }
  • replacement in src/main.rs at line 112
    [3.1943][2.1191:1263]()
    let mut race = tokio::spawn(race(host.clone(), is_leader.clone()));
    [3.1943]
    [3.1366]
    let mut race = tokio::spawn(race(
    host.clone(),
    path.clone(),
    is_leader.clone(),
    db.clone(),
    ));
  • edit in src/main.rs at line 160
    [3.3870]
    [2.1588]
    trace!("handle: {:?} {:?}", is_leader.borrow(), name);
  • replacement in src/main.rs at line 170
    [3.4094][2.1643:1732]()
    async fn race(host: String, mut leader: Receiver<String>) -> Result<(), anyhow::Error> {
    [3.4094]
    [3.1697]
    async fn race(
    host: String,
    path: String,
    mut leader: Receiver<String>,
    pool: Arc<Mutex<Option<tokio_postgres::Client>>>,
    ) -> Result<(), anyhow::Error> {
  • edit in src/main.rs at line 197
    [3.4476]
    [3.4476]
    let has_db = pool.lock().await.is_some();
    debug!("has_db = {:?}", has_db);
    if !has_db {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    continue;
    }
  • edit in src/main.rs at line 204
    [3.4553][3.4553:4591]()
    debug!("lock = {:?}", lock);
  • replacement in src/main.rs at line 205
    [2.1787][3.234:289](),[3.5292][3.234:289](),[3.289][2.1788:1890](),[2.1890][3.383:394](),[3.383][3.383:394]()
    info!(
    "Lock acquired: {:?} {:?}",
    std::str::from_utf8(leader_key.name()),
    std::str::from_utf8(leader_key.key())
    );
    [2.1787]
    [3.5486]
    debug!("Lock acquired {:?}", leader_key);
    promote(&path).await;
  • edit in src/main.rs at line 233
    [3.6236]
    [3.6236]
    let mut was_leader = None;
  • edit in src/main.rs at line 235
    [3.6288][2.2268:2659]()
    let mut was_leader = {
    if let Some(pool_) = pool.lock().await.take() {
    let row = pool_.query_one("SELECT pg_is_in_recovery()", &[]).await?;
    let is_in_recovery: bool = row.get(0);
    *pool.lock().await = Some(pool_);
    Some(!is_in_recovery)
    } else {
    None
    }
    };
    debug!("was_leader = {:?}", was_leader);
  • edit in src/main.rs at line 236
    [3.6335][3.6335:6367]()
    debug!("obs: {:?}", m);
  • replacement in src/main.rs at line 237
    [3.6418][3.2477:2518]()
    debug!("leader = {:?}", leader);
    [3.6418]
    [2.2660]
    debug!("leader = {:?}, host = {:?}", leader, host);
  • edit in src/main.rs at line 239
    [2.2700][2.2700:2724](),[2.2724][3.6446:6495](),[3.6446][3.6446:6495](),[3.6589][3.6589:6643](),[3.6738][3.6738:6748]()
    if is_leader {
    info!("The leader is {:?}", leader);
    } else {
    info!("I'm the leader");
    }
  • replacement in src/main.rs at line 241
    [2.2774][2.2774:2830]()
    if let Some(pool_) = pool.lock().await.take() {
    [2.2774]
    [3.2602]
    let pool_ = { pool.lock().await.take() };
    if let Some(pool_) = pool_ {
  • edit in src/main.rs at line 247
    [2.2888]
    [2.2888]
    debug!("is_leader = {:?}, was_leader = {:?}", is_leader, was_leader);
  • replacement in src/main.rs at line 250
    [2.2972][2.2972:3088]()
    } else if !is_leader && was_leader == Some(true) {
    rewind(&path, port, &db_name, &leader).await
    [2.2972]
    [3.3173]
    } else if !is_leader && was_leader != Some(false) {
    let pool_ = { pool.lock().await.take() };
    let recov: bool = if let Some(pool_) = pool_ {
    let row = pool_.query_one("SELECT pg_is_in_recovery()", &[]).await?;
    *pool.lock().await = Some(pool_);
    row.get(0)
    } else {
    false
    };
    debug!("recov = {:?}", recov);
    if !recov {
    rewind(&path, port, &db_name, leader).await;
    }
  • edit in src/main.rs at line 271
    [2.3168]
    [2.3168]
    debug!("promoting");
  • replacement in src/main.rs at line 273
    [2.3201][2.3201:3228]()
    Command::new("pg_ctl")
    [2.3201]
    [2.3228]
    let out = Command::new("pg_ctl")
  • edit in src/main.rs at line 278
    [2.3346]
    [3.6959]
    use std::io::Write;
    std::io::stdout().write_all(&out.stdout).unwrap();
    std::io::stderr().write_all(&out.stderr).unwrap();
  • edit in src/main.rs at line 284
    [2.3418]
    [2.3418]
    debug!("rewinding");
  • replacement in src/main.rs at line 286
    [2.3451][2.3451:3481]()
    Command::new("systemctl")
    [2.3451]
    [2.3481]
    let out = Command::new("systemctl")
  • replacement in src/main.rs at line 291
    [2.3603][2.3603:3989]()
    Command::new("pg_rewind")
    .args([
    "-D",
    &path,
    "-R",
    "--source-server",
    &format!(
    "port={} user=postgres dbname={} host={}",
    port, db_name, leader
    ),
    ])
    .output()
    .await
    .expect("failed to execute process");
    Command::new("systemctl")
    [2.3603]
    [2.3989]
    use std::io::Write;
    std::io::stdout().write_all(&out.stdout).unwrap();
    std::io::stderr().write_all(&out.stderr).unwrap();
    let mut success = false;
    while !success {
    // Waiting for source to promote.
    let out = Command::new("pg_rewind")
    .args([
    "-D",
    &path,
    "-R",
    "--source-server",
    &format!(
    "port={} user=postgres dbname={} host={}",
    port, db_name, leader
    ),
    ])
    .output()
    .await
    .expect("failed to execute process");
    std::io::stdout().write_all(&out.stdout).unwrap();
    std::io::stderr().write_all(&out.stderr).unwrap();
    success = out.status.success();
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
    let out = Command::new("systemctl")
  • edit in src/main.rs at line 321
    [2.4112]
    [2.4112]
    std::io::stdout().write_all(&out.stdout).unwrap();
    std::io::stderr().write_all(&out.stderr).unwrap();
  • replacement in README.md at line 3
    [3.7157][3.7157:7321]()
    Postrep uses a fixed replication strategy (streaming replication), a fixed leader election tool (Etcd 3), to achieve proper failover when the cluster leader fails.
    [3.7157]
    [3.335]
    Postrep uses a leader election tool (Etcd 3), to achieve proper failover when the cluster leader fails, in the context of PostgreSQL's streaming replication feature.
  • edit in README.md at line 6
    [3.364][3.364:582]()
    PostgreSQL has multiple ways to operate in a "leader-replicae" mode. However, failover isn't automatic, and requires another protocol to decide which of the replicae becomes the leader when the previous leader fails.
  • replacement in README.md at line 7
    [3.583][3.583:773]()
    This crate solves this problem, in the particular configuration where one has an Etcd v3 server running, for example because of requirements from another server running on the same service.
    [3.583]
    [2.4115]
    Failover between the leader and replica mode of PostgreSQL isn't automatic, and requires another protocol to decide which of the replicae becomes the leader when the previous leader fails.