edit in src/main.rs at line 27
+ .arg(clap::Arg::with_name("port").long("port").takes_value(true))
replacement in src/main.rs at line 29
[4.755]→[4.755:888](∅→∅),
[4.85]→[4.85:153](∅→∅) − clap::Arg::with_name("signal")
− .long("signal")
− .help("Path to the postgres signal file")
− .required(true)
− .takes_value(true),
+ clap::Arg::with_name("db")
+ .long("db")
+ .takes_value(true)
+ .required(true),
replacement in src/main.rs at line 34
− .arg(clap::Arg::with_name("port").long("port").takes_value(true))
+ .arg(
+ clap::Arg::with_name("path")
+ .long("path")
+ .takes_value(true)
+ .required(true),
+ )
replacement in src/main.rs at line 49
[4.1545]→[4.1545:1611](∅→∅) − let signal = matches.value_of("signal").unwrap().to_string();
+ let port: u16 = if let Some(port) = matches.value_of("port") {
+ port.parse().unwrap()
+ } else {
+ 5432
+ };
edit in src/main.rs at line 55
+ let path = matches.value_of("path").unwrap().to_string();
+ let db_name = matches.value_of("db").unwrap().to_string();
replacement in src/main.rs at line 58
[4.1682]→[4.1682:1726](∅→∅) − let (lead, is_leader) = channel(false);
+ let (lead, is_leader) = channel(String::new());
edit in src/main.rs at line 65
− let mut is_normal = false;
replacement in src/main.rs at line 67
− if let Ok((db, connection)) =
− tokio_postgres::connect("host=localhost user=postgres", NoTls).await
+ match tokio_postgres::connect("host=localhost user=postgres connect_timeout=1", NoTls)
+ .await
replacement in src/main.rs at line 70
− if is_normal {
− is_normal = false;
+ Ok((db, connection)) => {
+ *db_.lock().await = Some(db);
+ let e = connection.await;
+ debug!("connection ended: {:?}", e);
replacement in src/main.rs at line 75
− *db_.lock().await = Some(db);
− let e = connection.await;
− eprintln!("connection ended: {:?}", e);
− // Ignore "normal" restarts (i.e. promotes).
− let db = db_.lock().await.take();
− if db.is_some() {
− lead_.send(false).unwrap();
− *db_.lock().await = None;
− } else {
− debug!("normal restart, reconnecting");
− is_normal = true;
+ Err(e) => {
+ error!("{:?}", e);
+ lead_.send(String::new()).unwrap();
replacement in src/main.rs at line 86
[4.1843]→[4.1843:1867](∅→∅) edit in src/main.rs at line 88
+ db_name.clone(),
+ path.clone(),
replacement in src/main.rs at line 93
[4.1943]→[3.1075:1366](∅→∅) − let mut race = tokio::spawn(race(host.clone(), signal.clone(), is_leader.clone()));
− // let addr = SocketAddr::from((
− // [0, 0, 0, 0],
− // matches
− // .value_of("port")
− // .and_then(|x| x.parse().ok())
− // .unwrap_or(8008),
− // ));
+ let mut race = tokio::spawn(race(host.clone(), is_leader.clone()));
edit in src/main.rs at line 101
+ let host = Arc::new(host);
replacement in src/main.rs at line 104
[4.2363]→[4.2363:2461](∅→∅) − async move { Ok::<_, Infallible>(service_fn(move |req| handle(req, is_leader.clone()))) }
+ let host = host.clone();
+ async move {
+ let host = host.clone();
+ Ok::<_, Infallible>(service_fn(move |req| {
+ handle(req, host.clone(), is_leader.clone())
+ }))
+ }
replacement in src/main.rs at line 133
[4.3797]→[4.3797:3828](∅→∅) − is_leader: Receiver<bool>,
+ name: Arc<String>,
+ is_leader: Receiver<String>,
replacement in src/main.rs at line 136
[4.3870]→[4.3870:3899](∅→∅) − if *is_leader.borrow() {
+ if is_leader.borrow().as_str() == name.as_str() {
replacement in src/main.rs at line 145
[4.4094]→[4.4094:4147](∅→∅),
[4.4176]→[4.4176:4244](∅→∅) − async fn race(
− host: String,
− signal: String,
− mut is_leader: Receiver<bool>,
− ) -> Result<(), anyhow::Error> {
+ async fn race(host: String, mut leader: Receiver<String>) -> Result<(), anyhow::Error> {
replacement in src/main.rs at line 170
[4.5242]→[4.5242:5292](∅→∅) − let leader = lock.take_leader().unwrap();
+ let leader_key = lock.take_leader().unwrap();
replacement in src/main.rs at line 173
− std::str::from_utf8(leader.name()),
− std::str::from_utf8(leader.key())
+ std::str::from_utf8(leader_key.name()),
+ std::str::from_utf8(leader_key.key())
edit in src/main.rs at line 176
[4.394]→[4.5338:5391](∅→∅),
[4.5338]→[4.5338:5391](∅→∅),
[4.5425]→[4.5425:5486](∅→∅) − std::fs::remove_file(&signal).unwrap_or(());
− debug!("waiting for instance to become non-leader");
replacement in src/main.rs at line 177
[4.5501]→[4.5501:5644](∅→∅) − is_leader.changed().await.unwrap();
− debug!("changed: {:?}", is_leader.borrow());
− if !*is_leader.borrow() {
+ leader.changed().await.unwrap();
+ debug!("leader: {:?}", leader.borrow());
+ if leader.borrow().as_str() != host {
replacement in src/main.rs at line 182
[4.5795]→[4.5795:5884](∅→∅) − .resign(Some(etcd_client::ResignOptions::new().with_leader(leader)))
+ .resign(Some(
+ etcd_client::ResignOptions::new().with_leader(leader_key),
+ ))
replacement in src/main.rs at line 194
[4.6015]→[4.6015:6035](∅→∅) replacement in src/main.rs at line 196
[4.6057]→[4.6057:6086](∅→∅) − lead: Arc<Sender<bool>>,
+ db_name: String,
+ path: String,
+ lead: Arc<Sender<String>>,
edit in src/main.rs at line 203
+ let mut was_leader = {
+ if let Some(pool_) = pool.lock().await.take() {
+ let row = pool_.query_one("SELECT pg_is_in_recovery()", &[]).await?;
+ let is_in_recovery: bool = row.get(0);
+ *pool.lock().await = Some(pool_);
+ Some(!is_in_recovery)
+ } else {
+ None
+ }
+ };
+ debug!("was_leader = {:?}", was_leader);
replacement in src/main.rs at line 218
[3.2518]→[4.6418:6446](∅→∅),
[4.6418]→[4.6418:6446](∅→∅) + let is_leader = leader == host;
+
+ if is_leader {
edit in src/main.rs at line 222
[4.6495]→[4.6495:6589](∅→∅) − std::fs::write(&signal, "").unwrap_or(());
− lead.send(false).unwrap();
edit in src/main.rs at line 224
[4.6700]→[4.6700:6738](∅→∅) − lead.send(true).unwrap();
replacement in src/main.rs at line 225
[4.6748]→[3.2519:2602](∅→∅) − let pool_ = pool.lock().await.take();
− if let Some(pool_) = pool_ {
+ lead.send(leader.to_string()).unwrap();
+
+ if let Some(pool_) = pool.lock().await.take() {
replacement in src/main.rs at line 229
[3.2763]→[3.2763:3173](∅→∅) − if leader == host {
− info!("promoting");
− pool_.execute("SELECT pg_promote()", &[]).await.unwrap_or(0);
− // Don't replace pool_ into *pool, since the server might restart.
− *pool.lock().await = Some(pool_)
− } else {
− *pool.lock().await = Some(pool_)
− }
− } else {
− lead.send(false)?;
+ *pool.lock().await = Some(pool_);
+ }
+
+ if is_leader && was_leader != Some(true) {
+ promote(&path).await
+ } else if !is_leader && was_leader == Some(true) {
+ rewind(&path, port, &db_name, &leader).await
edit in src/main.rs at line 237
+
+ was_leader = Some(leader == host);
edit in src/main.rs at line 241
+ }
+
+ async fn promote(path: &str) {
+ use tokio::process::Command;
+ Command::new("pg_ctl")
+ .args(["promote", "-D", path])
+ .output()
+ .await
+ .expect("failed to execute process");
edit in src/main.rs at line 251
+
+ async fn rewind(path: &str, port: u16, db_name: &str, leader: &str) {
+ use tokio::process::Command;
+ Command::new("systemctl")
+ .args(["stop", "postgresql-repl"])
+ .output()
+ .await
+ .expect("failed to execute process");
+ Command::new("pg_rewind")
+ .args([
+ "-D",
+ &path,
+ "-R",
+ "--source-server",
+ &format!(
+ "port={} user=postgres dbname={} host={}",
+ port, db_name, leader
+ ),
+ ])
+ .output()
+ .await
+ .expect("failed to execute process");
+ Command::new("systemctl")
+ .args(["start", "postgresql-repl"])
+ .output()
+ .await
+ .expect("failed to execute process");
+ }
edit in README.md at line 10
+
+ ### Cooperation from the OS
+
+ The same user must run the PostgreSQL server and this tool. If that user is called `postgres`, here is a possible PolicyKit rule to make it work:
+
+ ``` js
+ polkit.addRule(function(action, subject) {
+ if (action.id == "org.freedesktop.systemd1.manage-units" &&
+ action.lookup("unit") == "postgresql-repl.service" &&
+ subject.user == "postgres"){
+ return polkit.Result.YES;
+ }
+ });
+ ```
replacement in Cargo.toml at line 14
− tokio = { version = "1.6", features = [ "net", "time", "rt-multi-thread", "macros", "io-util", "fs", "sync", "signal" ] }
+ tokio = { version = "1.6", features = [ "net", "time", "rt-multi-thread", "macros", "io-util", "fs", "sync", "signal", "process" ] }