wip
Some checks failed
Rust / build (ubuntu-latest, stable) (push) Failing after 1m18s

This commit is contained in:
2026-02-16 16:53:06 +07:00
parent 452253bece
commit d25080f5e7
7 changed files with 290 additions and 215 deletions

View File

@@ -7,8 +7,6 @@ use std::time::Duration;
use anyhow::{anyhow, Context as AnyhowContext, Error};
use common::EventUtils;
use device::DeviceRegistry;
use flume::Sender;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use gpui::{
@@ -45,11 +43,9 @@ pub enum ChatEvent {
/// Channel signal.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum NostrEvent {
enum Signal {
/// Message received from relay pool
Message(NewMessage),
/// Unwrapping status
Unwrapping(bool),
/// Eose received from relay pool
Eose,
}
@@ -60,23 +56,14 @@ pub struct ChatRegistry {
/// Collection of all chat rooms
rooms: Vec<Entity<Room>>,
/// Loading status of the registry
loading: bool,
/// Channel's sender for communication between nostr and gpui
sender: Sender<NostrEvent>,
/// Tracking the status of unwrapping gift wrap events.
tracking_flag: Arc<AtomicBool>,
/// Handle tracking asynchronous task
tracking: Option<Task<Result<(), Error>>>,
tracking_task: Option<Task<Result<(), Error>>>,
/// Handle notifications asynchronous task
notifications: Option<Task<()>>,
/// Tasks for asynchronous operations
tasks: Vec<Task<()>>,
/// Handle notification asynchronous task
notification_task: Option<Task<()>>,
/// Subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
@@ -97,79 +84,30 @@ impl ChatRegistry {
/// Create a new chat registry instance
fn new(cx: &mut Context<Self>) -> Self {
let device = DeviceRegistry::global(cx);
let nostr = NostrRegistry::global(cx);
let nip17_state = nostr.read(cx).nip17_state();
let nip17 = nostr.read(cx).nip17_state();
// A flag to indicate if the registry is loading
let tracking_flag = Arc::new(AtomicBool::new(false));
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<NostrEvent>(2048);
let mut tasks = vec![];
let mut subscriptions = smallvec![];
subscriptions.push(
// Observe the identity
cx.observe(&nip17_state, |this, state, cx| {
cx.observe(&nip17, |this, state, cx| {
if state.read(cx) == &RelayState::Configured {
// Handle nostr notifications
this.handle_notifications(cx);
// Track unwrapping progress
this.tracking(cx);
}
// Get chat rooms from the database on every identity change
// Get chat rooms from the database on every state changes
this.get_rooms(cx);
}),
);
subscriptions.push(
// Observe the device signer state
cx.observe(&device, |this, state, cx| {
if state.read(cx).state().set() {
this.handle_notifications(cx);
}
}),
);
tasks.push(
// Update GPUI states
cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
NostrEvent::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.ok();
}
NostrEvent::Unwrapping(status) => {
this.update(cx, |this, cx| {
this.set_loading(status, cx);
this.get_rooms(cx);
})
.ok();
}
NostrEvent::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.ok();
}
};
}
}),
);
Self {
rooms: vec![],
loading: false,
sender: tx.clone(),
tracking_flag,
tracking: None,
notifications: None,
tasks,
tracking_flag: Arc::new(AtomicBool::new(false)),
tracking_task: None,
notification_task: None,
_subscriptions: subscriptions,
}
}
@@ -179,16 +117,17 @@ impl ChatRegistry {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
let status = self.tracking_flag.clone();
let tx = self.sender.clone();
self.notifications = Some(cx.background_spawn(async move {
let initialized_at = Timestamp::now();
let sub_id1 = SubscriptionId::new(DEVICE_GIFTWRAP);
let sub_id2 = SubscriptionId::new(USER_GIFTWRAP);
let device_signer = signer.get_encryption_signer().await;
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<Signal>(1024);
cx.background_spawn(async move {
let device_signer = signer.get_encryption_signer().await;
let mut notifications = client.notifications();
let mut processed_events = HashSet::new();
@@ -223,7 +162,7 @@ impl ChatRegistry {
// the event is already emitted
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = NostrEvent::Message(new_message);
let signal = Signal::Message(new_message);
tx.send_async(signal).await.ok();
}
@@ -239,29 +178,45 @@ impl ChatRegistry {
}
RelayMessage::EndOfStoredEvents(id) => {
if id.as_ref() == &sub_id1 || id.as_ref() == &sub_id2 {
tx.send_async(NostrEvent::Eose).await.ok();
tx.send_async(Signal::Eose).await.ok();
}
}
_ => {}
}
}
})
.detach();
self.notification_task = Some(cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
Signal::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.ok();
}
Signal::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.ok();
}
};
}
}));
}
/// Tracking the status of unwrapping gift wrap events.
fn tracking(&mut self, cx: &mut Context<Self>) {
let status = self.tracking_flag.clone();
let tx = self.sender.clone();
self.tracking = Some(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(12);
self.tracking_task = Some(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(10);
loop {
if status.load(Ordering::Acquire) {
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
tx.send_async(NostrEvent::Unwrapping(true)).await.ok();
} else {
tx.send_async(NostrEvent::Unwrapping(false)).await.ok();
}
smol::Timer::after(loop_duration).await;
}
@@ -270,13 +225,7 @@ impl ChatRegistry {
/// Get the loading status of the chat registry
pub fn loading(&self) -> bool {
self.loading
}
/// Set the loading status of the chat registry
pub fn set_loading(&mut self, loading: bool, cx: &mut Context<Self>) {
self.loading = loading;
cx.notify();
self.tracking_flag.load(Ordering::Acquire)
}
/// Get a weak reference to a room by its ID.
@@ -312,19 +261,19 @@ impl ChatRegistry {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
self.tasks.push(cx.spawn(async move |this, cx| {
if let Some(signer) = client.signer() {
if let Ok(public_key) = signer.get_public_key().await {
cx.spawn(async move |this, cx| {
let signer = client.signer()?;
let public_key = signer.get_public_key().await.ok()?;
let room: Room = room.into().organize(&public_key);
this.update(cx, |this, cx| {
this.rooms
.insert(0, cx.new(|_| room.into().organize(&public_key)));
this.rooms.insert(0, cx.new(|_| room));
cx.emit(ChatEvent::Ping);
cx.notify();
})
.ok();
}
}
}));
.ok()
})
.detach();
}
/// Emit an open room event.
@@ -417,20 +366,16 @@ impl ChatRegistry {
pub fn get_rooms(&mut self, cx: &mut Context<Self>) {
let task = self.get_rooms_from_database(cx);
self.tasks.push(cx.spawn(async move |this, cx| {
match task.await {
Ok(rooms) => {
cx.spawn(async move |this, cx| {
let rooms = task.await.ok()?;
this.update(cx, move |this, cx| {
this.extend_rooms(rooms, cx);
this.sort(cx);
})
.ok();
}
Err(e) => {
log::error!("Failed to load rooms: {e}")
}
};
}));
.ok()
})
.detach();
}
/// Create a task to load rooms from the database

View File

@@ -314,12 +314,21 @@ impl Room {
continue;
};
// Construct a filter for gossip relays
let filter = Filter::new().kind(Kind::RelayList).author(member).limit(1);
// Construct a filter for messaging relays
let inbox = Filter::new()
.kind(Kind::InboxRelays)
.author(member)
.limit(1);
// Construct a filter for announcement
let announcement = Filter::new()
.kind(Kind::Custom(10044))
.author(member)
.limit(1);
// Subscribe to get member's gossip relays
client
.subscribe(filter)
.subscribe(vec![inbox, announcement])
.with_id(subscription_id.clone())
.close_on(
SubscribeAutoCloseOptions::default()
@@ -357,62 +366,22 @@ impl Room {
})
}
/// Construct extra tags for a message
fn extra_tags(&self, sender: PublicKey, members: &[Person], replies: &[EventId]) -> Vec<Tag> {
let mut extra_tags = vec![];
// Add subject tag if present
if let Some(value) = self.subject.as_ref() {
extra_tags.push(Tag::from_standardized_without_cell(TagStandard::Subject(
value.to_string(),
)));
}
// Add all reply tags
for id in replies {
extra_tags.push(Tag::event(*id))
}
// Add all receiver tags
for member in members.iter() {
// Skip current user
if member.public_key() == sender {
continue;
}
extra_tags.push(Tag::from_standardized_without_cell(
TagStandard::PublicKey {
public_key: member.public_key(),
relay_url: member.messaging_relay_hint(),
alias: None,
uppercase: false,
},
));
}
extra_tags
}
pub fn send<S, I>(&self, content: S, replies: I, cx: &App) -> Option<Task<Vec<SendReport>>>
// Construct a rumor event for direct message
pub fn rumor<S, I>(&self, content: S, replies: I, cx: &App) -> Option<UnsignedEvent>
where
S: Into<String>,
I: IntoIterator<Item = EventId>,
{
let persons = PersonRegistry::global(cx);
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
let kind = Kind::PrivateDirectMessage;
let content: String = content.into();
let replies: Vec<EventId> = replies.into_iter().collect();
let persons = PersonRegistry::global(cx);
let nostr = NostrRegistry::global(cx);
// Get current user's public key
let sender = nostr.read(cx).signer().public_key()?;
// get room's config
let config = self.config.clone();
// Get all members
let members: Vec<Person> = self
.members
@@ -421,40 +390,107 @@ impl Room {
.map(|member| persons.read(cx).get(member, cx))
.collect();
// Get extra tags
let extra_tags = self.extra_tags(sender, &members, &replies);
// Construct event's tags
let mut tags = vec![];
// Add subject tag if present
if let Some(value) = self.subject.as_ref() {
tags.push(Tag::from_standardized_without_cell(TagStandard::Subject(
value.to_string(),
)));
}
// Add all reply tags
for id in replies.into_iter() {
tags.push(Tag::event(id))
}
// Add all receiver tags
for member in members.into_iter() {
// Skip current user
if member.public_key() == sender {
continue;
}
tags.push(Tag::from_standardized_without_cell(
TagStandard::PublicKey {
public_key: member.public_key(),
relay_url: member.messaging_relay_hint(),
alias: None,
uppercase: false,
},
));
}
// Construct a direct message rumor event
// WARNING: never sign and send this event to relays
let mut event = EventBuilder::new(kind, content).tags(tags).build(sender);
// Ensure that the ID is set
event.ensure_id();
Some(event)
}
/// Send rumor event to all members's messaging relays
pub fn send(&self, rumor: UnsignedEvent, cx: &App) -> Option<Task<Vec<SendReport>>> {
let persons = PersonRegistry::global(cx);
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let signer = nostr.read(cx).signer();
// Get room's config
let config = self.config.clone();
// Get current user's public key
let sender = nostr.read(cx).signer().public_key()?;
// Get all members (excluding sender)
let members: Vec<Person> = self
.members
.iter()
.filter(|public_key| public_key != &&sender)
.map(|member| persons.read(cx).get(member, cx))
.collect();
Some(cx.background_spawn(async move {
let signer_kind = config.signer_kind();
let backup = config.backup();
// Get all available signers
let user_signer = signer.get().await;
let encryption_signer = signer.get_encryption_signer().await;
let mut reports: Vec<SendReport> = vec![];
let mut reports = Vec::new();
for member in members {
let relays = member.messaging_relays();
let announcement = member.announcement();
for member in members.into_iter() {
// Skip if member has no messaging relays
if member.messaging_relays().is_empty() {
let report = SendReport::new(member.public_key()).error("No messaging relays");
reports.push(report);
if relays.is_empty() {
reports.push(SendReport::new(member.public_key()).error("No messaging relays"));
continue;
}
// When the room is forced to use an encryption signer,
// skip if the receiver has not set up an encryption signer.
if signer_kind.encryption() && member.announcement().is_none() {
let report = SendReport::new(member.public_key()).error("Encryption not found");
reports.push(report);
// Ensure relay connections
for url in relays.iter() {
client
.add_relay(url)
.and_connect()
.capabilities(RelayCapabilities::GOSSIP)
.await
.ok();
}
// When forced to use encryption signer, skip if receiver has no announcement
if signer_kind.encryption() && announcement.is_none() {
reports
.push(SendReport::new(member.public_key()).error("Encryption not found"));
continue;
}
let (receiver, signer) = match signer_kind {
// Determine receiver and signer based on signer kind
let (receiver, signer_to_use) = match signer_kind {
SignerKind::Auto => {
if let Some(announcement) = member.announcement() {
if let Some(announcement) = announcement {
if let Some(enc_signer) = encryption_signer.as_ref() {
(announcement.public_key(), enc_signer.clone())
} else {
@@ -466,21 +502,48 @@ impl Room {
}
SignerKind::Encryption => {
let Some(encryption_signer) = encryption_signer.as_ref() else {
let report =
SendReport::new(member.public_key()).error("Encryption not found");
reports.push(report);
reports.push(
SendReport::new(member.public_key()).error("Encryption not found"),
);
continue;
};
let Some(announcement) = member.announcement() else {
let report = SendReport::new(member.public_key())
.error("Announcement not found");
reports.push(report);
let Some(announcement) = announcement else {
reports.push(
SendReport::new(member.public_key())
.error("Announcement not found"),
);
continue;
};
(announcement.public_key(), encryption_signer.clone())
}
SignerKind::User => (member.public_key(), user_signer.clone()),
};
// Create and send gift-wrapped event
match EventBuilder::gift_wrap(&signer_to_use, &receiver, rumor.clone(), vec![])
.await
{
Ok(event) => {
match client
.send_event(&event)
.to(relays)
.ack_policy(AckPolicy::none())
.await
{
Ok(output) => {
reports.push(SendReport::new(member.public_key()).output(output));
}
Err(e) => {
reports.push(
SendReport::new(member.public_key()).error(e.to_string()),
);
}
}
}
Err(e) => {
reports.push(SendReport::new(member.public_key()).error(e.to_string()));
}
}
}
reports

View File

@@ -1,8 +1,8 @@
use std::collections::HashSet;
pub use actions::*;
use anyhow::Error;
use chat::{Message, RenderedMessage, Room, RoomEvent, RoomKind, SendReport};
use anyhow::{Context as AnyhowContext, Error};
use chat::{Message, RenderedMessage, Room, RoomEvent, SendReport};
use common::{nip96_upload, RenderedTimestamp};
use dock::panel::{Panel, PanelEvent};
use gpui::prelude::FluentBuilder;
@@ -60,7 +60,7 @@ pub struct ChatPanel {
/// Mapping message ids to their rendered texts
rendered_texts_by_id: BTreeMap<EventId, RenderedText>,
/// Mapping message ids to their reports
/// Mapping message (rumor event) ids to their reports
reports_by_id: BTreeMap<EventId, Vec<SendReport>>,
/// Input state
@@ -124,6 +124,7 @@ impl ChatPanel {
// Define all functions that will run after the current cycle
cx.defer_in(window, |this, window, cx| {
this.subscribe_room_events(window, cx);
this.connect(window, cx);
this.get_messages(window, cx);
});
@@ -164,6 +165,15 @@ impl ChatPanel {
);
}
/// Get all necessary data for each member
fn connect(&mut self, _window: &mut Window, cx: &mut Context<Self>) {
let Ok(connect) = self.room.read_with(cx, |this, cx| this.early_connect(cx)) else {
return;
};
self.tasks.push(cx.background_spawn(connect));
}
/// Load all messages belonging to this room
fn get_messages(&mut self, _window: &mut Window, cx: &mut Context<Self>) {
let Ok(get_messages) = self.room.read_with(cx, |this, cx| this.get_messages(cx)) else {
@@ -182,7 +192,7 @@ impl ChatPanel {
}));
}
/// Get user input content and merged all attachments
/// Get user input content and merged all attachments if available
fn get_input_value(&self, cx: &Context<Self>) -> String {
// Get input's value
let mut content = self.input.read(cx).value().trim().to_string();
@@ -222,7 +232,52 @@ impl ChatPanel {
/// Send a message to all members of the chat
fn send_message(&mut self, value: &str, window: &mut Window, cx: &mut Context<Self>) {
// TODO
if value.trim().is_empty() {
window.push_notification("Cannot send an empty message", cx);
return;
}
// Get room entity
let room = self.room.clone();
let replies: Vec<EventId> = self.replies_to.read(cx).iter().copied().collect();
let content = value.to_string();
self.tasks.push(cx.spawn_in(window, async move |this, cx| {
let room = room.upgrade().context("Room is not available")?;
this.update_in(cx, |this, window, cx| {
match room.read(cx).rumor(content, replies, cx) {
Some(rumor) => {
this.insert_message(&rumor, true, cx);
this.send_and_wait(rumor, window, cx);
}
None => {
window.push_notification("Failed to create message", cx);
}
}
})?;
Ok(())
}));
}
fn send_and_wait(&mut self, rumor: UnsignedEvent, window: &mut Window, cx: &mut Context<Self>) {
let Some(room) = self.room.upgrade() else {
return;
};
let Some(task) = room.read(cx).send(rumor, cx) else {
window.push_notification("Failed to send message", cx);
return;
};
self.tasks.push(cx.spawn_in(window, async move |this, cx| {
let outputs = task.await;
log::info!("Message sent successfully: {outputs:?}");
Ok(())
}))
}
fn insert_reports(&mut self, id: EventId, reports: Vec<SendReport>, cx: &mut Context<Self>) {

View File

@@ -78,6 +78,9 @@ fn main() {
// Initialize theme registry
theme::init(cx);
// Initialize settings
settings::init(cx);
// Initialize the nostr client
state::init(cx);
@@ -86,9 +89,6 @@ fn main() {
// NIP-4e: https://github.com/nostr-protocol/nips/blob/per-device-keys/4e.md
device::init(window, cx);
// Initialize settings
settings::init(cx);
// Initialize relay auth registry
relay_auth::init(window, cx);

View File

@@ -206,17 +206,6 @@ impl Sidebar {
/// Search
fn search(&mut self, window: &mut Window, cx: &mut Context<Self>) {
// Return if a search is already in progress
if self.finding {
if self.find_task.is_none() {
window.push_notification("There is another search in progress", cx);
return;
} else {
// Cancel the ongoing search request
self.find_task = None;
}
}
// Get query
let query = self.find_input.read(cx).value();
@@ -228,12 +217,14 @@ impl Sidebar {
// Block the input until the search completes
self.set_finding(true, window, cx);
// Create the search task
let nostr = NostrRegistry::global(cx);
let find_users = nostr.read(cx).search(&query, cx);
// Run task in the main thread
self.find_task = Some(cx.spawn_in(window, async move |this, cx| {
let rooms = find_users.await?;
// Update the UI with the search results
this.update_in(cx, |this, window, cx| {
this.set_results(rooms, cx);

View File

@@ -37,7 +37,7 @@ pub const USER_GIFTWRAP: &str = "user-gift-wraps";
pub const WOT_RELAYS: [&str; 1] = ["wss://relay.vertexlab.io"];
/// Default search relays
pub const SEARCH_RELAYS: [&str; 1] = ["wss://antiprimal.net"];
pub const SEARCH_RELAYS: [&str; 2] = ["wss://antiprimal.net", "wss://search.nos.today"];
/// Default bootstrap relays
pub const BOOTSTRAP_RELAYS: [&str; 3] = [

View File

@@ -837,9 +837,30 @@ impl NostrRegistry {
let client = self.client();
let query = query.to_string();
// Get the address task if the query is a valid NIP-05 address
let address_task = if let Ok(addr) = Nip05Address::parse(&query) {
Some(self.get_address(addr, cx))
} else {
None
};
cx.background_spawn(async move {
let mut results: Vec<PublicKey> = Vec::with_capacity(FIND_LIMIT);
// Return early if the query is a valid NIP-05 address
if let Some(task) = address_task {
if let Ok(public_key) = task.await {
results.push(public_key);
return Ok(results);
}
}
// Return early if the query is a valid public key
if let Ok(public_key) = PublicKey::parse(&query) {
results.push(public_key);
return Ok(results);
}
// Construct the filter for the search query
let filter = Filter::new()
.search(query.to_lowercase())