6Z6PYXTSU3DYKZKXWSZ5JZXAO2MO4KJADTYNDLUYMD4LROXRREZAC let (db, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await.unwrap();let db = Arc::new(Mutex::new(db));let dbt = tokio::spawn(async move {if let Err(e) = connection.await {eprintln!("connection error: {}", e);}});tokio::pin!(dbt);
let db = Arc::new(Mutex::new(None));let db_ = db.clone();let lead_ = lead.clone();tokio::spawn(async move {let mut is_normal = false;loop {debug!("reconnect");if let Ok((db, connection)) =tokio_postgres::connect("host=localhost user=postgres", NoTls).await{if is_normal {is_normal = false;}*db_.lock().await = Some(db);let e = connection.await;eprintln!("connection ended: {:?}", e);// Ignore "normal" restarts (i.e. promotes).let db = db_.lock().await.take();if db.is_some() {lead_.send(false).unwrap();*db_.lock().await = None;} else {debug!("normal restart, reconnecting");is_normal = true;}}tokio::time::sleep(std::time::Duration::from_millis(100)).await;}});
let mut race = tokio::spawn(race(host.clone(),signal.clone(),lead.clone(),is_leader.clone(),));let addr = SocketAddr::from(([0, 0, 0, 0],
let mut race = tokio::spawn(race(host.clone(), signal.clone(), is_leader.clone()));// let addr = SocketAddr::from((// [0, 0, 0, 0],// matches// .value_of("port")// .and_then(|x| x.parse().ok())// .unwrap_or(8008),// ));let addr6 = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 0],
_ = &mut dbt => {// If this database breaks, release the lock.tokio::time::sleep(std::time::Duration::from_secs(1)).await;info!("Restarting database connection");let (db_, connection) = if let Ok(x) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await {x} else {tokio::time::sleep(std::time::Duration::from_millis(100)).await;continue};lead.send(false).unwrap();dbt.set(tokio::spawn(async move {if let Err(e) = connection.await {eprintln!("connection error: {}", e);}}));*db.lock().await = db_}_ = &mut race => {error!("race stopped");
e = &mut race => {error!("race stopped: {:?}", e);
let mut client = Client::connect(["localhost:2379"], None).await.unwrap();let lease = client.lease_grant(LEASE_DURATION, None).await?;let id = lease.id();debug!("id = {:?}", id);let (mut keeper, _stream) = client.lease_keep_alive(id).await?;tokio::spawn(async move {tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;loop {trace!("sending keep alive");if let Err(e) = keeper.keep_alive().await {error!("{:?}", e);break;}tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;}debug!("done renewing");});
let (mut keeper, _stream) = client.lease_keep_alive(id).await?;let is_leader_ = is_leader.clone();let t = tokio::spawn(async move {tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;while *is_leader_.borrow() {debug!("sending keep alive");if let Err(e) = keeper.keep_alive().await {error!("{:?}", e);break;}tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;}debug!("done renewing");});
let pool = pool.lock().await;pool.execute(format!("ALTER SYSTEM SET primary_conninfo='host={} port=5432 user=replication password={}'", leader, password).as_str(), &[]).await?;
let pool_ = pool.lock().await.take();if let Some(pool_) = pool_ {pool_.execute(format!("ALTER SYSTEM SET primary_conninfo='host={} port=5432 user=replication password={}'", leader, password).as_str(), &[]).await?;if leader == host {info!("promoting");pool_.execute("SELECT pg_promote()", &[]).await.unwrap_or(0);// Don't replace pool_ into *pool, since the server might restart.*pool.lock().await = Some(pool_)} else {*pool.lock().await = Some(pool_)}} else {lead.send(false)?;}