Restart and rewind the server during failover (requires policykit)

pmeunier
Jan 17, 2022, 3:48 PM
ENEREHTW2UEXJGIOSSKZA2FQKFDLWQEIOAD6NIT2EH6BJW6E3AZQC

Dependencies

Change contents

  • edit in src/main.rs at line 27
    [4.741]
    [4.741]
    .arg(clap::Arg::with_name("port").long("port").takes_value(true))
  • replacement in src/main.rs at line 29
    [4.755][4.755:888](),[4.85][4.85:153]()
    clap::Arg::with_name("signal")
    .long("signal")
    .help("Path to the postgres signal file")
    .required(true)
    .takes_value(true),
    [4.755]
    [4.153]
    clap::Arg::with_name("db")
    .long("db")
    .takes_value(true)
    .required(true),
  • replacement in src/main.rs at line 34
    [4.163][4.15:89]()
    .arg(clap::Arg::with_name("port").long("port").takes_value(true))
    [4.163]
    [4.163]
    .arg(
    clap::Arg::with_name("path")
    .long("path")
    .takes_value(true)
    .required(true),
    )
  • replacement in src/main.rs at line 49
    [4.1545][4.1545:1611]()
    let signal = matches.value_of("signal").unwrap().to_string();
    [4.1545]
    [4.1611]
    let port: u16 = if let Some(port) = matches.value_of("port") {
    port.parse().unwrap()
    } else {
    5432
    };
  • edit in src/main.rs at line 55
    [4.1681]
    [4.1681]
    let path = matches.value_of("path").unwrap().to_string();
    let db_name = matches.value_of("db").unwrap().to_string();
  • replacement in src/main.rs at line 58
    [4.1682][4.1682:1726]()
    let (lead, is_leader) = channel(false);
    [4.1682]
    [4.1726]
    let (lead, is_leader) = channel(String::new());
  • edit in src/main.rs at line 65
    [3.128][3.128:163]()
    let mut is_normal = false;
  • replacement in src/main.rs at line 67
    [3.211][3.211:338]()
    if let Ok((db, connection)) =
    tokio_postgres::connect("host=localhost user=postgres", NoTls).await
    [3.211]
    [3.338]
    match tokio_postgres::connect("host=localhost user=postgres connect_timeout=1", NoTls)
    .await
  • replacement in src/main.rs at line 70
    [3.352][3.352:422]()
    if is_normal {
    is_normal = false;
    [3.352]
    [3.422]
    Ok((db, connection)) => {
    *db_.lock().await = Some(db);
    let e = connection.await;
    debug!("connection ended: {:?}", e);
  • replacement in src/main.rs at line 75
    [3.440][3.440:946]()
    *db_.lock().await = Some(db);
    let e = connection.await;
    eprintln!("connection ended: {:?}", e);
    // Ignore "normal" restarts (i.e. promotes).
    let db = db_.lock().await.take();
    if db.is_some() {
    lead_.send(false).unwrap();
    *db_.lock().await = None;
    } else {
    debug!("normal restart, reconnecting");
    is_normal = true;
    [3.440]
    [3.946]
    Err(e) => {
    error!("{:?}", e);
    lead_.send(String::new()).unwrap();
  • replacement in src/main.rs at line 86
    [4.1843][4.1843:1867]()
    signal.clone(),
    [4.1843]
    [4.1867]
    port,
  • edit in src/main.rs at line 88
    [4.1893]
    [4.1893]
    db_name.clone(),
    path.clone(),
  • replacement in src/main.rs at line 93
    [4.1943][3.1075:1366]()
    let mut race = tokio::spawn(race(host.clone(), signal.clone(), is_leader.clone()));
    // let addr = SocketAddr::from((
    // [0, 0, 0, 0],
    // matches
    // .value_of("port")
    // .and_then(|x| x.parse().ok())
    // .unwrap_or(8008),
    // ));
    [4.1943]
    [3.1366]
    let mut race = tokio::spawn(race(host.clone(), is_leader.clone()));
  • edit in src/main.rs at line 101
    [4.2266]
    [4.2266]
    let host = Arc::new(host);
  • replacement in src/main.rs at line 104
    [4.2363][4.2363:2461]()
    async move { Ok::<_, Infallible>(service_fn(move |req| handle(req, is_leader.clone()))) }
    [4.2363]
    [4.2461]
    let host = host.clone();
    async move {
    let host = host.clone();
    Ok::<_, Infallible>(service_fn(move |req| {
    handle(req, host.clone(), is_leader.clone())
    }))
    }
  • replacement in src/main.rs at line 133
    [4.3797][4.3797:3828]()
    is_leader: Receiver<bool>,
    [4.3797]
    [4.3828]
    name: Arc<String>,
    is_leader: Receiver<String>,
  • replacement in src/main.rs at line 136
    [4.3870][4.3870:3899]()
    if *is_leader.borrow() {
    [4.3870]
    [4.3899]
    if is_leader.borrow().as_str() == name.as_str() {
  • replacement in src/main.rs at line 145
    [4.4094][4.4094:4147](),[4.4176][4.4176:4244]()
    async fn race(
    host: String,
    signal: String,
    mut is_leader: Receiver<bool>,
    ) -> Result<(), anyhow::Error> {
    [4.4094]
    [3.1697]
    async fn race(host: String, mut leader: Receiver<String>) -> Result<(), anyhow::Error> {
  • replacement in src/main.rs at line 170
    [4.5242][4.5242:5292]()
    let leader = lock.take_leader().unwrap();
    [4.4591]
    [4.234]
    let leader_key = lock.take_leader().unwrap();
  • replacement in src/main.rs at line 173
    [4.289][4.289:383]()
    std::str::from_utf8(leader.name()),
    std::str::from_utf8(leader.key())
    [4.289]
    [4.383]
    std::str::from_utf8(leader_key.name()),
    std::str::from_utf8(leader_key.key())
  • edit in src/main.rs at line 176
    [4.394][4.5338:5391](),[4.5338][4.5338:5391](),[4.5425][4.5425:5486]()
    std::fs::remove_file(&signal).unwrap_or(());
    debug!("waiting for instance to become non-leader");
  • replacement in src/main.rs at line 177
    [4.5501][4.5501:5644]()
    is_leader.changed().await.unwrap();
    debug!("changed: {:?}", is_leader.borrow());
    if !*is_leader.borrow() {
    [4.5501]
    [4.5644]
    leader.changed().await.unwrap();
    debug!("leader: {:?}", leader.borrow());
    if leader.borrow().as_str() != host {
  • replacement in src/main.rs at line 182
    [4.5795][4.5795:5884]()
    .resign(Some(etcd_client::ResignOptions::new().with_leader(leader)))
    [4.5795]
    [4.5884]
    .resign(Some(
    etcd_client::ResignOptions::new().with_leader(leader_key),
    ))
  • replacement in src/main.rs at line 194
    [4.6015][4.6015:6035]()
    signal: String,
    [4.6015]
    [4.6035]
    port: u16,
  • replacement in src/main.rs at line 196
    [4.6057][4.6057:6086]()
    lead: Arc<Sender<bool>>,
    [4.6057]
    [3.2422]
    db_name: String,
    path: String,
    lead: Arc<Sender<String>>,
  • edit in src/main.rs at line 203
    [4.6288]
    [4.6288]
    let mut was_leader = {
    if let Some(pool_) = pool.lock().await.take() {
    let row = pool_.query_one("SELECT pg_is_in_recovery()", &[]).await?;
    let is_in_recovery: bool = row.get(0);
    *pool.lock().await = Some(pool_);
    Some(!is_in_recovery)
    } else {
    None
    }
    };
    debug!("was_leader = {:?}", was_leader);
  • replacement in src/main.rs at line 218
    [3.2518][4.6418:6446](),[4.6418][4.6418:6446]()
    if leader != host {
    [3.2518]
    [4.6446]
    let is_leader = leader == host;
    if is_leader {
  • edit in src/main.rs at line 222
    [4.6495][4.6495:6589]()
    std::fs::write(&signal, "").unwrap_or(());
    lead.send(false).unwrap();
  • edit in src/main.rs at line 224
    [4.6700][4.6700:6738]()
    lead.send(true).unwrap();
  • replacement in src/main.rs at line 225
    [4.6748][3.2519:2602]()
    let pool_ = pool.lock().await.take();
    if let Some(pool_) = pool_ {
    [4.6748]
    [3.2602]
    lead.send(leader.to_string()).unwrap();
    if let Some(pool_) = pool.lock().await.take() {
  • replacement in src/main.rs at line 229
    [3.2763][3.2763:3173]()
    if leader == host {
    info!("promoting");
    pool_.execute("SELECT pg_promote()", &[]).await.unwrap_or(0);
    // Don't replace pool_ into *pool, since the server might restart.
    *pool.lock().await = Some(pool_)
    } else {
    *pool.lock().await = Some(pool_)
    }
    } else {
    lead.send(false)?;
    [3.2763]
    [3.3173]
    *pool.lock().await = Some(pool_);
    }
    if is_leader && was_leader != Some(true) {
    promote(&path).await
    } else if !is_leader && was_leader == Some(true) {
    rewind(&path, port, &db_name, &leader).await
  • edit in src/main.rs at line 237
    [3.3183]
    [4.6942]
    was_leader = Some(leader == host);
  • edit in src/main.rs at line 241
    [4.6959]
    [4.6959]
    }
    async fn promote(path: &str) {
    use tokio::process::Command;
    Command::new("pg_ctl")
    .args(["promote", "-D", path])
    .output()
    .await
    .expect("failed to execute process");
  • edit in src/main.rs at line 251
    [4.6961]
    async fn rewind(path: &str, port: u16, db_name: &str, leader: &str) {
    use tokio::process::Command;
    Command::new("systemctl")
    .args(["stop", "postgresql-repl"])
    .output()
    .await
    .expect("failed to execute process");
    Command::new("pg_rewind")
    .args([
    "-D",
    &path,
    "-R",
    "--source-server",
    &format!(
    "port={} user=postgres dbname={} host={}",
    port, db_name, leader
    ),
    ])
    .output()
    .await
    .expect("failed to execute process");
    Command::new("systemctl")
    .args(["start", "postgresql-repl"])
    .output()
    .await
    .expect("failed to execute process");
    }
  • edit in README.md at line 10
    [4.773]
    ### Cooperation from the OS
    The same user must run the PostgreSQL server and this tool. If that user is called `postgres`, here is a possible PolicyKit rule to make it work:
    ``` js
    polkit.addRule(function(action, subject) {
    if (action.id == "org.freedesktop.systemd1.manage-units" &&
    action.lookup("unit") == "postgresql-repl.service" &&
    subject.user == "postgres"){
    return polkit.Result.YES;
    }
    });
    ```
  • replacement in Cargo.toml at line 14
    [2.128][2.128:250]()
    tokio = { version = "1.6", features = [ "net", "time", "rt-multi-thread", "macros", "io-util", "fs", "sync", "signal" ] }
    [2.128]
    [2.250]
    tokio = { version = "1.6", features = [ "net", "time", "rt-multi-thread", "macros", "io-util", "fs", "sync", "signal", "process" ] }