Testing and debugging

pmeunier
Jan 7, 2022, 8:53 PM
6Z6PYXTSU3DYKZKXWSZ5JZXAO2MO4KJADTYNDLUYMD4LROXRREZAC

Dependencies

Change contents

  • edit in src/main.rs at line 43
    [3.1147][3.1147:1483]()
    let (db, connection) = tokio_postgres::connect("host=localhost user=postgres", NoTls)
    .await
    .unwrap();
    let db = Arc::new(Mutex::new(db));
    let dbt = tokio::spawn(async move {
    if let Err(e) = connection.await {
    eprintln!("connection error: {}", e);
    }
    });
    tokio::pin!(dbt);
  • edit in src/main.rs at line 49
    [3.1757]
    [3.1757]
    let db = Arc::new(Mutex::new(None));
    let db_ = db.clone();
    let lead_ = lead.clone();
    tokio::spawn(async move {
    let mut is_normal = false;
    loop {
    debug!("reconnect");
    if let Ok((db, connection)) =
    tokio_postgres::connect("host=localhost user=postgres", NoTls).await
    {
    if is_normal {
    is_normal = false;
    }
    *db_.lock().await = Some(db);
    let e = connection.await;
    eprintln!("connection ended: {:?}", e);
    // Ignore "normal" restarts (i.e. promotes).
    let db = db_.lock().await.take();
    if db.is_some() {
    lead_.send(false).unwrap();
    *db_.lock().await = None;
    } else {
    debug!("normal restart, reconnecting");
    is_normal = true;
    }
    }
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    }
    });
  • replacement in src/main.rs at line 87
    [3.1943][3.1943:2140]()
    let mut race = tokio::spawn(race(
    host.clone(),
    signal.clone(),
    lead.clone(),
    is_leader.clone(),
    ));
    let addr = SocketAddr::from((
    [0, 0, 0, 0],
    [3.1943]
    [3.2140]
    let mut race = tokio::spawn(race(host.clone(), signal.clone(), is_leader.clone()));
    // let addr = SocketAddr::from((
    // [0, 0, 0, 0],
    // matches
    // .value_of("port")
    // .and_then(|x| x.parse().ok())
    // .unwrap_or(8008),
    // ));
    let addr6 = SocketAddr::from((
    [0, 0, 0, 0, 0, 0, 0, 0],
  • edit in src/main.rs at line 106
    [3.2469]
    [3.2469]
  • replacement in src/main.rs at line 109
    [3.2504][3.2504:2573]()
    _ = Server::bind(&addr).serve(make_service.clone()) => {
    [3.2504]
    [2.90]
    _ = Server::bind(&addr6).serve(make_service.clone()) => {
  • replacement in src/main.rs at line 113
    [3.2658][3.2658:3527](),[3.3527][2.139:179]()
    _ = &mut dbt => {
    // If this database breaks, release the lock.
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    info!("Restarting database connection");
    let (db_, connection) = if let Ok(x) = tokio_postgres::connect("host=localhost user=postgres", NoTls).await {
    x
    } else {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    continue
    };
    lead.send(false).unwrap();
    dbt.set(tokio::spawn(async move {
    if let Err(e) = connection.await {
    eprintln!("connection error: {}", e);
    }
    }));
    *db.lock().await = db_
    }
    _ = &mut race => {
    error!("race stopped");
    [3.2658]
    [3.3567]
    e = &mut race => {
    error!("race stopped: {:?}", e);
  • replacement in src/main.rs at line 117
    [3.3603][3.3603:3647](),[3.3647][2.180:233]()
    _ = &mut election_observer => {
    error!("election observer stopped");
    [3.3603]
    [3.3700]
    e = &mut election_observer => {
    error!("election observer stopped: {:?}", e);
  • edit in src/main.rs at line 141
    [3.4147][3.4147:4176]()
    lead: Arc<Sender<bool>>,
  • edit in src/main.rs at line 143
    [3.4244]
    [3.4244]
    let mut client = Client::connect(["localhost:2379"], None).await.unwrap();
    let lease = client.lease_grant(LEASE_DURATION, None).await?;
    let id = lease.id();
    debug!("id = {:?}", id);
    let (mut keeper, _stream) = client.lease_keep_alive(id).await?;
    tokio::spawn(async move {
    tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
    loop {
    trace!("sending keep alive");
    if let Err(e) = keeper.keep_alive().await {
    error!("{:?}", e);
    break;
    }
    tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
    }
    debug!("done renewing");
    });
  • edit in src/main.rs at line 163
    [3.4255][3.4255:4436]()
    let mut client = Client::connect(["localhost:2379"], None).await.unwrap();
    let lease = client.lease_grant(LEASE_DURATION, None).await?;
    let id = lease.id();
  • edit in src/main.rs at line 167
    [3.4591][3.4591:5242]()
    let (mut keeper, _stream) = client.lease_keep_alive(id).await?;
    let is_leader_ = is_leader.clone();
    let t = tokio::spawn(async move {
    tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
    while *is_leader_.borrow() {
    debug!("sending keep alive");
    if let Err(e) = keeper.keep_alive().await {
    error!("{:?}", e);
    break;
    }
    tokio::time::sleep(std::time::Duration::from_secs(LEASE_DURATION as u64 / 2)).await;
    }
    debug!("done renewing");
    });
  • edit in src/main.rs at line 174
    [3.5391][3.5391:5425]()
    lead.send(true).unwrap();
  • edit in src/main.rs at line 180
    [3.5684][3.5684:5772]()
    // at.store(false, Ordering::SeqCst);
    t.await.unwrap();
  • replacement in src/main.rs at line 194
    [3.6086][3.6086:6132]()
    pool: Arc<Mutex<tokio_postgres::Client>>,
    [3.6086]
    [3.6132]
    pool: Arc<Mutex<Option<tokio_postgres::Client>>>,
  • edit in src/main.rs at line 201
    [3.6418]
    [3.6418]
    debug!("leader = {:?}", leader);
  • edit in src/main.rs at line 208
    [3.6643][3.6643:6700]()
    std::fs::remove_file(&signal).unwrap_or(());
  • replacement in src/main.rs at line 210
    [3.6748][3.6748:6942]()
    let pool = pool.lock().await;
    pool.execute(format!("ALTER SYSTEM SET primary_conninfo='host={} port=5432 user=replication password={}'", leader, password).as_str(), &[]).await?;
    [3.6748]
    [3.6942]
    let pool_ = pool.lock().await.take();
    if let Some(pool_) = pool_ {
    pool_.execute(format!("ALTER SYSTEM SET primary_conninfo='host={} port=5432 user=replication password={}'", leader, password).as_str(), &[]).await?;
    if leader == host {
    info!("promoting");
    pool_.execute("SELECT pg_promote()", &[]).await.unwrap_or(0);
    // Don't replace pool_ into *pool, since the server might restart.
    *pool.lock().await = Some(pool_)
    } else {
    *pool.lock().await = Some(pool_)
    }
    } else {
    lead.send(false)?;
    }