.
Some checks failed
Rust / build (ubuntu-latest, stable) (push) Failing after 1m41s

This commit is contained in:
2026-02-19 06:41:23 +07:00
parent 8026a4f5a5
commit 2c33670ba5
9 changed files with 184 additions and 166 deletions

View File

@@ -10,11 +10,11 @@ use common::EventUtils;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use gpui::{
App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task, WeakEntity,
App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task, WeakEntity, Window,
};
use nostr_sdk::prelude::*;
use smallvec::{smallvec, SmallVec};
use state::{NostrRegistry, RelayState, DEVICE_GIFTWRAP, USER_GIFTWRAP};
use state::{NostrRegistry, DEVICE_GIFTWRAP, USER_GIFTWRAP};
mod message;
mod room;
@@ -22,8 +22,8 @@ mod room;
pub use message::*;
pub use room::*;
pub fn init(cx: &mut App) {
ChatRegistry::set_global(cx.new(ChatRegistry::new), cx);
pub fn init(window: &mut Window, cx: &mut App) {
ChatRegistry::set_global(cx.new(|cx| ChatRegistry::new(window, cx)), cx);
}
struct GlobalChatRegistry(Entity<ChatRegistry>);
@@ -59,11 +59,8 @@ pub struct ChatRegistry {
/// Tracking the status of unwrapping gift wrap events.
tracking_flag: Arc<AtomicBool>,
/// Handle tracking asynchronous task
tracking_task: Option<Task<Result<(), Error>>>,
/// Handle notification asynchronous task
notification_task: Option<Task<()>>,
/// Async tasks
tasks: SmallVec<[Task<Result<(), Error>>; 2]>,
/// Subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
@@ -83,31 +80,28 @@ impl ChatRegistry {
}
/// Create a new chat registry instance
fn new(cx: &mut Context<Self>) -> Self {
fn new(window: &mut Window, cx: &mut Context<Self>) -> Self {
let nostr = NostrRegistry::global(cx);
let nip17 = nostr.read(cx).nip17_state();
let mut subscriptions = smallvec![];
subscriptions.push(
// Observe the identity
cx.observe(&nip17, |this, state, cx| {
if state.read(cx) == &RelayState::Configured {
// Handle nostr notifications
this.handle_notifications(cx);
// Track unwrapping progress
this.tracking(cx);
}
// Get chat rooms from the database on every state changes
// Observe the nip17 state and load chat rooms on every state change
cx.observe(&nip17, |this, _state, cx| {
this.get_rooms(cx);
}),
);
cx.defer_in(window, |this, _window, cx| {
this.handle_notifications(cx);
this.tracking(cx);
});
Self {
rooms: vec![],
tracking_flag: Arc::new(AtomicBool::new(false)),
tracking_task: None,
notification_task: None,
tasks: smallvec![],
_subscriptions: subscriptions,
}
}
@@ -126,7 +120,7 @@ impl ChatRegistry {
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Signal>(1024);
cx.background_spawn(async move {
self.tasks.push(cx.background_spawn(async move {
let device_signer = signer.get_encryption_signer().await;
let mut notifications = client.notifications();
let mut processed_events = HashSet::new();
@@ -149,7 +143,7 @@ impl ChatRegistry {
continue;
}
log::info!("Received gift-wrap event: {event:?}");
log::info!("Received gift wrap event: {:?}", event);
// Extract the rumor from the gift wrap event
match Self::extract_rumor(&client, &device_signer, event.as_ref()).await {
@@ -158,7 +152,7 @@ impl ChatRegistry {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
tx.send_async(signal).await.ok();
tx.send_async(signal).await?;
}
false => {
status.store(true, Ordering::Release);
@@ -171,32 +165,33 @@ impl ChatRegistry {
}
RelayMessage::EndOfStoredEvents(id) => {
if id.as_ref() == &sub_id1 || id.as_ref() == &sub_id2 {
tx.send_async(Signal::Eose).await.ok();
tx.send_async(Signal::Eose).await?;
}
}
_ => {}
}
}
})
.detach();
self.notification_task = Some(cx.spawn(async move |this, cx| {
Ok(())
}));
self.tasks.push(cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
Signal::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.ok();
})?;
}
Signal::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.ok();
})?;
}
};
}
Ok(())
}));
}
@@ -204,7 +199,7 @@ impl ChatRegistry {
fn tracking(&mut self, cx: &mut Context<Self>) {
let status = self.tracking_flag.clone();
self.tracking_task = Some(cx.background_spawn(async move {
self.tasks.push(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(10);
loop {

View File

@@ -60,4 +60,4 @@ futures.workspace = true
oneshot.workspace = true
indexset = "0.12.3"
tracing-subscriber = { version = "0.3.18", features = ["fmt"] }
tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] }

View File

@@ -82,7 +82,7 @@ fn main() {
settings::init(cx);
// Initialize the nostr client
state::init(cx);
state::init(window, cx);
// Initialize relay auth registry
relay_auth::init(window, cx);
@@ -96,7 +96,7 @@ fn main() {
device::init(window, cx);
// Initialize app registry
chat::init(cx);
chat::init(window, cx);
// Initialize auto update
auto_update::init(cx);

View File

@@ -67,7 +67,7 @@ impl DeviceRegistry {
RelayState::Idle => {
this.reset(cx);
}
RelayState::Configured => {
RelayState::Configured(_) => {
this.get_announcement(cx);
}
_ => {}

View File

@@ -1,14 +1,14 @@
use std::borrow::Cow;
use std::cell::Cell;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::hash::Hash;
use std::rc::Rc;
use std::sync::Arc;
use anyhow::{anyhow, Context as AnyhowContext, Error};
use gpui::{
App, AppContext, Context, Entity, Global, IntoElement, ParentElement, SharedString, Styled,
Subscription, Task, Window,
Task, Window,
};
use nostr_sdk::prelude::*;
use settings::{AppSettings, AuthMode};
@@ -27,18 +27,12 @@ pub fn init(window: &mut Window, cx: &mut App) {
}
/// Authentication request
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
struct AuthRequest {
url: RelayUrl,
challenge: String,
}
impl Hash for AuthRequest {
fn hash<H: Hasher>(&self, state: &mut H) {
self.challenge.hash(state);
}
}
impl AuthRequest {
pub fn new(challenge: impl Into<String>, url: RelayUrl) -> Self {
Self {
@@ -74,9 +68,6 @@ pub struct RelayAuth {
/// Tasks for asynchronous operations
tasks: SmallVec<[Task<()>; 2]>,
/// Event subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
}
impl RelayAuth {
@@ -92,22 +83,13 @@ impl RelayAuth {
/// Create a new relay auth instance
fn new(window: &mut Window, cx: &mut Context<Self>) -> Self {
let nostr = NostrRegistry::global(cx);
let mut subscriptions = smallvec![];
subscriptions.push(
// Observe the nostr state
cx.observe_in(&nostr, window, move |this, state, window, cx| {
if state.read(cx).connected() {
this.handle_notifications(window, cx)
}
}),
);
cx.defer_in(window, |this, window, cx| {
this.handle_notifications(window, cx);
});
Self {
pending_events: HashSet::default(),
tasks: smallvec![],
_subscriptions: subscriptions,
}
}
@@ -119,7 +101,8 @@ impl RelayAuth {
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Signal>(256);
cx.background_spawn(async move {
self.tasks.push(cx.background_spawn(async move {
log::info!("Started handling nostr notifications");
let mut notifications = client.notifications();
let mut challenges: HashSet<Cow<'_, str>> = HashSet::default();
@@ -128,8 +111,8 @@ impl RelayAuth {
match message {
RelayMessage::Auth { challenge } => {
if challenges.insert(challenge.clone()) {
let request = AuthRequest::new(challenge, relay_url);
let signal = Signal::Auth(Arc::new(request));
let request = Arc::new(AuthRequest::new(challenge, relay_url));
let signal = Signal::Auth(request);
tx.send_async(signal).await.ok();
}
@@ -149,8 +132,7 @@ impl RelayAuth {
}
}
}
})
.detach();
}));
self.tasks.push(cx.spawn_in(window, async move |this, cx| {
while let Ok(signal) = rx.recv_async().await {

View File

@@ -52,8 +52,8 @@ pub enum AuthMode {
/// Signer kind
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub enum SignerKind {
#[default]
Auto,
#[default]
User,
Encryption,
}

View File

@@ -37,7 +37,9 @@ pub const USER_GIFTWRAP: &str = "user-gift-wraps";
pub const WOT_RELAYS: [&str; 1] = ["wss://relay.vertexlab.io"];
/// Default search relays
pub const SEARCH_RELAYS: [&str; 2] = ["wss://antiprimal.net", "wss://search.nos.today"];
pub const SEARCH_RELAYS: [&str; 1] = ["wss://antiprimal.net"];
pub const INDEXER_RELAYS: [&str; 1] = ["wss://indexer.coracle.social"];
/// Default bootstrap relays
pub const BOOTSTRAP_RELAYS: [&str; 3] = [

View File

@@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::{anyhow, Context as AnyhowContext, Error};
use common::config_dir;
use gpui::{App, AppContext, Context, Entity, Global, Subscription, Task};
use gpui::{App, AppContext, Context, Entity, Global, Subscription, Task, Window};
use nostr_connect::prelude::*;
use nostr_gossip_memory::prelude::*;
use nostr_lmdb::NostrLmdb;
@@ -19,7 +19,7 @@ pub use constants::*;
pub use nip05::*;
pub use signer::*;
pub fn init(cx: &mut App) {
pub fn init(window: &mut Window, cx: &mut App) {
// rustls uses the `aws_lc_rs` provider by default
// This only errors if the default provider has already
// been installed. We can ignore this `Result`.
@@ -30,7 +30,7 @@ pub fn init(cx: &mut App) {
// Initialize the tokio runtime
gpui_tokio::init(cx);
NostrRegistry::set_global(cx.new(NostrRegistry::new), cx);
NostrRegistry::set_global(cx.new(|cx| NostrRegistry::new(window, cx)), cx);
}
struct GlobalNostrRegistry(Entity<NostrRegistry>);
@@ -82,7 +82,7 @@ impl NostrRegistry {
}
/// Create a new nostr instance
fn new(cx: &mut Context<Self>) -> Self {
fn new(window: &mut Window, cx: &mut Context<Self>) -> Self {
// Construct the nostr lmdb instance
let lmdb = cx.foreground_executor().block_on(async move {
NostrLmdb::open(config_dir().join("nostr"))
@@ -122,7 +122,7 @@ impl NostrRegistry {
subscriptions.push(
// Observe the NIP-65 state
cx.observe(&nip65, |this, state, cx| {
if state.read(cx).configured() {
if state.read(cx).configured().is_some() {
this.get_profile(cx);
this.get_messaging_relays(cx);
}
@@ -131,20 +131,15 @@ impl NostrRegistry {
subscriptions.push(
// Observe the NIP-17 state
cx.observe(&nip17, |this, state, cx| {
if state.read(cx) == &RelayState::Configured {
this.get_messages(cx);
cx.observe(&nip17, |this, nip17, cx| {
if let Some(event) = nip17.read(cx).configured().cloned() {
this.subscribe_to_giftwrap_events(&event, cx);
};
}),
);
cx.defer(|cx| {
let nostr = NostrRegistry::global(cx);
// Connect to the bootstrapping relays
nostr.update(cx, |this, cx| {
this.connect(cx);
});
cx.defer_in(window, |this, _window, cx| {
this.connect(cx);
});
Self {
@@ -164,36 +159,35 @@ impl NostrRegistry {
fn connect(&mut self, cx: &mut Context<Self>) {
let client = self.client();
let task: Task<Result<(), Error>> = cx.background_spawn(async move {
// Add search relay to the relay pool
for url in SEARCH_RELAYS.into_iter() {
client.add_relay(url).and_connect().await?;
}
// Add bootstrap relay to the relay pool
for url in BOOTSTRAP_RELAYS.into_iter() {
client.add_relay(url).and_connect().await?;
}
Ok(())
});
self.tasks.push(cx.spawn(async move |this, cx| {
// Wait for the task to complete
task.await?;
// Update the state
this.update(cx, |this, cx| {
this.set_connected(cx);
})?;
// Small delay
cx.background_executor()
.timer(Duration::from_millis(200))
.await_on_background(async move {
// Add search relay to the relay pool
for url in INDEXER_RELAYS.into_iter() {
client
.add_relay(url)
.capabilities(RelayCapabilities::DISCOVERY)
.await
.ok();
}
// Add search relay to the relay pool
for url in SEARCH_RELAYS.into_iter() {
client.add_relay(url).await.ok();
}
// Add bootstrap relay to the relay pool
for url in BOOTSTRAP_RELAYS.into_iter() {
client.add_relay(url).await.ok();
}
client.connect().await;
})
.await;
// Update the state
this.update(cx, |this, cx| {
this.set_connected(cx);
this.get_signer(cx);
})?;
@@ -284,15 +278,36 @@ impl NostrRegistry {
}
/// Reset all relay states
pub fn reset_relay_states(&mut self, cx: &mut Context<Self>) {
pub fn reset_relays(&mut self, cx: &mut Context<Self>) {
let client = self.client();
self.nip65.update(cx, |this, cx| {
*this = RelayState::default();
cx.notify();
});
self.nip17.update(cx, |this, cx| {
*this = RelayState::default();
cx.notify();
});
self.tasks.push(cx.background_spawn(async move {
let relays = client.relays().await;
for (relay_url, relay) in relays.iter() {
let url = relay_url.as_str();
let default_relay = BOOTSTRAP_RELAYS.contains(&url)
|| SEARCH_RELAYS.contains(&url)
|| INDEXER_RELAYS.contains(&url);
if !default_relay {
relay.unsubscribe_all().await?;
relay.disconnect();
}
}
Ok(())
}));
}
/// Set the signer for the nostr client and verify the public key
@@ -308,6 +323,9 @@ impl NostrRegistry {
// Update signer
signer.switch(new, owned).await;
// Unsubscribe from all subscriptions
client.unsubscribe_all().await?;
// Verify signer
let signer = client.signer().context("Signer not found")?;
let public_key = signer.get_public_key().await?;
@@ -322,7 +340,7 @@ impl NostrRegistry {
// Update states
this.update(cx, |this, cx| {
this.reset_relay_states(cx);
this.reset_relays(cx);
this.get_relay_list(cx);
})?;
@@ -365,8 +383,6 @@ impl NostrRegistry {
while let Some((_url, res)) = stream.next().await {
match res {
Ok(event) => {
log::info!("Received relay list event: {event:?}");
// Construct a filter to continuously receive relay list events
let filter = Filter::new()
.kind(Kind::RelayList)
@@ -382,7 +398,7 @@ impl NostrRegistry {
// Subscribe to the relay list events
client.subscribe(target).await?;
return Ok(RelayState::Configured);
return Ok(RelayState::Configured(Box::new(event)));
}
Err(e) => {
log::error!("Failed to receive relay list event: {e}");
@@ -443,8 +459,6 @@ impl NostrRegistry {
while let Some((_url, res)) = stream.next().await {
match res {
Ok(event) => {
log::info!("Received messaging relays event: {event:?}");
// Construct a filter to continuously receive relay list events
let filter = Filter::new()
.kind(Kind::InboxRelays)
@@ -454,7 +468,7 @@ impl NostrRegistry {
// Subscribe to the relay list events
client.subscribe(filter).await?;
return Ok(RelayState::Configured);
return Ok(RelayState::Configured(Box::new(event)));
}
Err(e) => {
log::error!("Failed to get messaging relays: {e}");
@@ -485,24 +499,33 @@ impl NostrRegistry {
}
/// Continuously get gift wrap events for the current user in their messaging relays
fn get_messages(&mut self, cx: &mut Context<Self>) {
fn subscribe_to_giftwrap_events(&mut self, relay_list: &Event, cx: &mut Context<Self>) {
let client = self.client();
let signer = self.signer();
let messaging_relays = self.messaging_relays(cx);
let relay_urls: Vec<RelayUrl> = nip17::extract_relay_list(relay_list).cloned().collect();
let task: Task<Result<(), Error>> = cx.background_spawn(async move {
let urls = messaging_relays.await;
let public_key = signer.get_public_key().await?;
for url in relay_urls.iter() {
client.add_relay(url).and_connect().await?;
}
let filter = Filter::new().kind(Kind::GiftWrap).pubkey(public_key);
let id = SubscriptionId::new(USER_GIFTWRAP);
// Construct target for subscription
let target: HashMap<&RelayUrl, Filter> =
urls.iter().map(|relay| (relay, filter.clone())).collect();
let target: HashMap<&RelayUrl, Filter> = relay_urls
.iter()
.map(|relay| (relay, filter.clone()))
.collect();
client.subscribe(target).with_id(id).await?;
log::info!("Subscribed to user gift-wrap messages");
let output = client.subscribe(target).with_id(id).await?;
log::info!(
"Successfully subscribed to user gift-wrap messages on: {:?}",
output.success
);
Ok(())
});
@@ -973,7 +996,7 @@ fn default_relay_list() -> Vec<(RelayUrl, Option<RelayMetadata>)> {
Some(RelayMetadata::Write),
),
(
RelayUrl::parse("wss://relay.primal.net/").unwrap(),
RelayUrl::parse("wss://relay.damus.io/").unwrap(),
Some(RelayMetadata::Read),
),
(
@@ -985,18 +1008,18 @@ fn default_relay_list() -> Vec<(RelayUrl, Option<RelayMetadata>)> {
fn default_messaging_relays() -> Vec<RelayUrl> {
vec![
RelayUrl::parse("wss://auth.nostr1.com/").unwrap(),
//RelayUrl::parse("wss://auth.nostr1.com/").unwrap(),
RelayUrl::parse("wss://nip17.com/").unwrap(),
]
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum RelayState {
#[default]
Idle,
Checking,
NotConfigured,
Configured,
Configured(Box<Event>),
}
impl RelayState {
@@ -1012,8 +1035,11 @@ impl RelayState {
matches!(self, RelayState::NotConfigured)
}
pub fn configured(&self) -> bool {
matches!(self, RelayState::Configured)
pub fn configured(&self) -> Option<&Event> {
match self {
RelayState::Configured(event) => Some(event),
_ => None,
}
}
}