feat: rewrite the nip-4e implementation (#1)

Make NIP-4e a core feature, not an optional feature.

Note:
- The UI is broken and needs to be updated in a separate PR.

Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
2026-01-13 16:00:08 +08:00
parent bb455871e5
commit 75c3783522
50 changed files with 2818 additions and 3458 deletions

View File

@@ -5,24 +5,26 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use account::Account;
use anyhow::{anyhow, Context as AnyhowContext, Error};
use common::{EventUtils, BOOTSTRAP_RELAYS, METADATA_BATCH_LIMIT};
use encryption::Encryption;
use common::EventUtils;
use device::DeviceRegistry;
use flume::Sender;
use fuzzy_matcher::skim::SkimMatcherV2;
use fuzzy_matcher::FuzzyMatcher;
use gpui::{App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task};
pub use message::*;
use gpui::{
App, AppContext, Context, Entity, EventEmitter, Global, Subscription, Task, WeakEntity,
};
use nostr_sdk::prelude::*;
pub use room::*;
use settings::AppSettings;
use smallvec::{smallvec, SmallVec};
use state::{initialized_at, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
use state::{tracker, NostrRegistry, GIFTWRAP_SUBSCRIPTION};
mod message;
mod room;
pub use message::*;
pub use room::*;
pub fn init(cx: &mut App) {
ChatRegistry::set_global(cx.new(ChatRegistry::new), cx);
}
@@ -31,37 +33,51 @@ struct GlobalChatRegistry(Entity<ChatRegistry>);
impl Global for GlobalChatRegistry {}
/// Chat event.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ChatEvent {
/// An event to open a room by its ID
OpenRoom(u64),
/// An event to close a room by its ID
CloseRoom(u64),
/// An event to notify UI about a new chat request
Ping,
}
/// Channel signal.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum NostrEvent {
/// Message received from relay pool
Message(NewMessage),
/// Unwrapping status
Unwrapping(bool),
/// Eose received from relay pool
Eose,
}
/// Chat Registry
#[derive(Debug)]
pub struct ChatRegistry {
/// Collection of all chat rooms
pub rooms: Vec<Entity<Room>>,
rooms: Vec<Entity<Room>>,
/// Loading status of the registry
pub loading: bool,
loading: bool,
/// Async task for handling notifications
handle_notifications: Task<()>,
/// Tracking the status of unwrapping gift wrap events.
tracking_flag: Arc<AtomicBool>,
/// Event subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
/// Channel's sender for communication between nostr and gpui
sender: Sender<NostrEvent>,
/// Handle notifications asynchronous task
notifications: Option<Task<Result<(), Error>>>,
/// Tasks for asynchronous operations
_tasks: SmallVec<[Task<()>; 4]>,
}
tasks: Vec<Task<()>>,
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum ChatEvent {
OpenRoom(u64),
CloseRoom(u64),
NewChatRequest(RoomKind),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum Signal {
Loading(bool),
Message(NewMessage),
Eose,
/// Subscriptions
_subscriptions: SmallVec<[Subscription; 1]>,
}
impl EventEmitter<ChatEvent> for ChatRegistry {}
@@ -79,81 +95,65 @@ impl ChatRegistry {
/// Create a new chat registry instance
fn new(cx: &mut Context<Self>) -> Self {
let encryption = Encryption::global(cx);
let encryption_key = encryption.read(cx).encryption.clone();
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let identity = nostr.read(cx).identity();
let status = Arc::new(AtomicBool::new(true));
let (tx, rx) = flume::bounded::<Signal>(2048);
let device = DeviceRegistry::global(cx);
let device_signer = device.read(cx).device_signer.clone();
let handle_notifications = cx.background_spawn({
let client = nostr.read(cx).client();
let status = Arc::clone(&status);
let tx = tx.clone();
let signer: Option<Arc<dyn NostrSigner>> = None;
// A flag to indicate if the registry is loading
let tracking_flag = Arc::new(AtomicBool::new(true));
async move { Self::handle_notifications(&client, &signer, &tx, &status).await }
});
// Channel for communication between nostr and gpui
let (tx, rx) = flume::bounded::<NostrEvent>(2048);
let mut tasks = vec![];
let mut subscriptions = smallvec![];
let mut tasks = smallvec![];
subscriptions.push(
// Observe the encryption global state
cx.observe(&encryption_key, {
let status = Arc::clone(&status);
let tx = tx.clone();
// Observe the identity
cx.observe(&identity, |this, state, cx| {
if state.read(cx).has_public_key() {
// Handle nostr notifications
this.handle_notifications(cx);
// Track unwrapping progress
this.tracking(cx);
}
}),
);
move |this, state, cx| {
if let Some(signer) = state.read(cx).clone() {
this.handle_notifications = cx.background_spawn({
let client = nostr.read(cx).client();
let status = Arc::clone(&status);
let tx = tx.clone();
let signer = Some(signer);
async move {
Self::handle_notifications(&client, &signer, &tx, &status).await
}
});
cx.notify();
}
subscriptions.push(
// Observe the device signer state
cx.observe(&device_signer, |this, state, cx| {
if state.read(cx).is_some() {
this.handle_notifications(cx);
}
}),
);
tasks.push(
// Handle unwrapping status
cx.background_spawn(
async move { Self::handle_unwrapping(&client, &status, &tx).await },
),
);
tasks.push(
// Handle new messages
// Update GPUI states
cx.spawn(async move |this, cx| {
while let Ok(message) = rx.recv_async().await {
match message {
Signal::Message(message) => {
NostrEvent::Message(message) => {
this.update(cx, |this, cx| {
this.new_message(message, cx);
})
.expect("Entity has been released");
.ok();
}
Signal::Eose => {
NostrEvent::Eose => {
this.update(cx, |this, cx| {
this.get_rooms(cx);
})
.expect("Entity has been released");
.ok();
}
Signal::Loading(status) => {
NostrEvent::Unwrapping(status) => {
this.update(cx, |this, cx| {
this.set_loading(status, cx);
this.get_rooms(cx);
})
.expect("Entity has been released");
.ok();
}
};
}
@@ -163,127 +163,137 @@ impl ChatRegistry {
Self {
rooms: vec![],
loading: true,
handle_notifications,
tracking_flag,
sender: tx.clone(),
notifications: None,
tasks,
_subscriptions: subscriptions,
_tasks: tasks,
}
}
async fn handle_notifications<T>(
client: &Client,
signer: &Option<T>,
tx: &Sender<Signal>,
status: &Arc<AtomicBool>,
) where
T: NostrSigner,
{
let initialized_at = initialized_at();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
/// Handle nostr notifications
fn handle_notifications(&mut self, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
let mut notifications = client.notifications();
let mut public_keys = HashSet::new();
let mut processed_events = HashSet::new();
let device = DeviceRegistry::global(cx);
let device_signer = device.read(cx).signer(cx);
while let Ok(notification) = notifications.recv().await {
let RelayPoolNotification::Message { message, .. } = notification else {
// Skip non-message notifications
continue;
};
let status = self.tracking_flag.clone();
let tx = self.sender.clone();
match message {
RelayMessage::Event { event, .. } => {
if !processed_events.insert(event.id) {
// Skip if the event has already been processed
continue;
}
self.tasks.push(cx.background_spawn(async move {
let initialized_at = Timestamp::now();
let subscription_id = SubscriptionId::new(GIFTWRAP_SUBSCRIPTION);
if event.kind != Kind::GiftWrap {
// Skip non-gift wrap events
continue;
}
let mut notifications = client.notifications();
let mut processed_events = HashSet::new();
// Extract the rumor from the gift wrap event
match Self::extract_rumor(client, signer, event.as_ref()).await {
Ok(rumor) => {
// Get all public keys
public_keys.extend(rumor.all_pubkeys());
while let Ok(notification) = notifications.recv().await {
let RelayPoolNotification::Message { message, .. } = notification else {
// Skip non-message notifications
continue;
};
let limit_reached = public_keys.len() >= METADATA_BATCH_LIMIT;
let done = !status.load(Ordering::Acquire) && !public_keys.is_empty();
match message {
RelayMessage::Event { event, .. } => {
if !processed_events.insert(event.id) {
// Skip if the event has already been processed
continue;
}
// Get metadata for all public keys if the limit is reached
if limit_reached || done {
let public_keys = std::mem::take(&mut public_keys);
// Get metadata for the public keys
Self::get_metadata(client, public_keys).await.ok();
}
if event.kind != Kind::GiftWrap {
// Skip non-gift wrap events
continue;
}
match &rumor.created_at >= initialized_at {
// Extract the rumor from the gift wrap event
match Self::extract_rumor(&client, &device_signer, event.as_ref()).await {
Ok(rumor) => match rumor.created_at >= initialized_at {
true => {
let new_message = NewMessage::new(event.id, rumor);
let signal = Signal::Message(new_message);
// Check if the event is sent by coop
let sent_by_coop = {
let tracker = tracker().read().await;
tracker.is_sent_by_coop(&event.id)
};
// No need to emit if sent by coop
// the event is already emitted
if !sent_by_coop {
let new_message = NewMessage::new(event.id, rumor);
let signal = NostrEvent::Message(new_message);
if let Err(e) = tx.send_async(signal).await {
log::error!("Failed to send signal: {}", e);
tx.send_async(signal).await.ok();
}
}
false => {
status.store(true, Ordering::Release);
}
},
Err(e) => {
log::warn!("Failed to unwrap: {e}");
}
}
Err(e) => {
log::warn!("Failed to unwrap gift wrap event: {}", e);
}
RelayMessage::EndOfStoredEvents(id) => {
if id.as_ref() == &subscription_id {
tx.send_async(NostrEvent::Eose).await.ok();
}
}
_ => {}
}
RelayMessage::EndOfStoredEvents(id) => {
if id.as_ref() == &subscription_id {
if let Err(e) = tx.send_async(Signal::Eose).await {
log::error!("Failed to send signal: {}", e);
}
}
}
_ => {}
}
}
}));
}
async fn handle_unwrapping(client: &Client, status: &Arc<AtomicBool>, tx: &Sender<Signal>) {
let loop_duration = Duration::from_secs(20);
let mut is_start_processing = false;
let mut total_loops = 0;
/// Tracking the status of unwrapping gift wrap events.
fn tracking(&mut self, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
let client = nostr.read(cx).client();
loop {
if client.has_signer().await {
total_loops += 1;
let status = self.tracking_flag.clone();
let tx = self.sender.clone();
if status.load(Ordering::Acquire) {
is_start_processing = true;
self.notifications = Some(cx.background_spawn(async move {
let loop_duration = Duration::from_secs(12);
// Reset gift wrap processing flag
_ = status.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed);
let mut is_start_processing = false;
let mut total_loops = 0;
// Send loading signal
if let Err(e) = tx.send_async(Signal::Loading(true)).await {
log::error!("Failed to send signal: {}", e);
}
} else {
// Only run further if we are already processing
// Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 {
// Send loading signal
if let Err(e) = tx.send_async(Signal::Loading(false)).await {
log::error!("Failed to send signal: {}", e);
loop {
if client.has_signer().await {
total_loops += 1;
if status.load(Ordering::Acquire) {
is_start_processing = true;
// Reset gift wrap processing flag
_ = status.compare_exchange(
true,
false,
Ordering::Release,
Ordering::Relaxed,
);
tx.send_async(NostrEvent::Unwrapping(true)).await.ok();
} else {
// Only run further if we are already processing
// Wait until after 2 loops to prevent exiting early while events are still being processed
if is_start_processing && total_loops >= 2 {
tx.send_async(NostrEvent::Unwrapping(false)).await.ok();
// Reset the counter
is_start_processing = false;
total_loops = 0;
}
// Reset the counter
is_start_processing = false;
total_loops = 0;
}
}
smol::Timer::after(loop_duration).await;
}
smol::Timer::after(loop_duration).await;
}
}));
}
/// Get the loading status of the chat registry
pub fn loading(&self) -> bool {
self.loading
}
/// Set the loading status of the chat registry
@@ -292,12 +302,12 @@ impl ChatRegistry {
cx.notify();
}
/// Get a room by its ID.
pub fn room(&self, id: &u64, cx: &App) -> Option<Entity<Room>> {
/// Get a weak reference to a room by its ID.
pub fn room(&self, id: &u64, cx: &App) -> Option<WeakEntity<Room>> {
self.rooms
.iter()
.find(|model| model.read(cx).id == *id)
.cloned()
.find(|this| &this.read(cx).id == id)
.map(|this| this.downgrade())
}
/// Get all ongoing rooms.
@@ -319,11 +329,30 @@ impl ChatRegistry {
}
/// Add a new room to the start of list.
pub fn add_room(&mut self, room: Entity<Room>, cx: &mut Context<Self>) {
self.rooms.insert(0, room);
pub fn add_room<I>(&mut self, room: I, cx: &mut Context<Self>)
where
I: Into<Room>,
{
self.rooms.insert(0, cx.new(|_| room.into()));
cx.notify();
}
/// Emit an open room event.
/// If the room is new, add it to the registry.
pub fn emit_room(&mut self, room: WeakEntity<Room>, cx: &mut Context<Self>) {
if let Some(room) = room.upgrade() {
let id = room.read(cx).id;
// If the room is new, add it to the registry.
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.rooms.insert(0, room);
}
// Emit the open room event.
cx.emit(ChatEvent::OpenRoom(id));
}
}
/// Close a room.
pub fn close_room(&mut self, id: u64, cx: &mut Context<Self>) {
if self.rooms.iter().any(|r| r.read(cx).id == id) {
@@ -367,17 +396,6 @@ impl ChatRegistry {
cx.notify();
}
/// Push a new room to the chat registry
pub fn push_room(&mut self, room: Entity<Room>, cx: &mut Context<Self>) {
let id = room.read(cx).id;
if !self.rooms.iter().any(|r| r.read(cx).id == id) {
self.add_room(room, cx);
}
cx.emit(ChatEvent::OpenRoom(id));
}
/// Extend the registry with new rooms.
fn extend_rooms(&mut self, rooms: HashSet<Room>, cx: &mut Context<Self>) {
let mut room_map: HashMap<u64, usize> = self
@@ -410,7 +428,7 @@ impl ChatRegistry {
pub fn get_rooms(&mut self, cx: &mut Context<Self>) {
let task = self.create_get_rooms_task(cx);
self._tasks.push(
self.tasks.push(
// Run and finished in the background
cx.spawn(async move |this, cx| {
match task.await {
@@ -528,59 +546,61 @@ impl ChatRegistry {
/// If the room doesn't exist, it will be created.
/// Updates room ordering based on the most recent messages.
pub fn new_message(&mut self, message: NewMessage, cx: &mut Context<Self>) {
let nostr = NostrRegistry::global(cx);
// Get the unique id
let id = message.rumor.uniq_id();
// Get the author
let author = message.rumor.pubkey;
let account = Account::global(cx);
if let Some(room) = self.rooms.iter().find(|room| room.read(cx).id == id) {
let is_new_event = message.rumor.created_at > room.read(cx).created_at;
let created_at = message.rumor.created_at;
let event_for_emit = message.rumor.clone();
match self.rooms.iter().find(|room| room.read(cx).id == id) {
Some(room) => {
let new_message = message.rumor.created_at > room.read(cx).created_at;
let created_at = message.rumor.created_at;
// Update room
room.update(cx, |this, cx| {
if is_new_event {
this.set_created_at(created_at, cx);
// Update room
room.update(cx, |this, cx| {
// Update the last timestamp if the new message is newer
if new_message {
this.set_created_at(created_at, cx);
}
// Set this room is ongoing if the new message is from current user
if author == nostr.read(cx).identity().read(cx).public_key() {
this.set_ongoing(cx);
}
// Emit the new message to the room
this.emit_message(message, cx);
});
// Resort all rooms in the registry by their created at (after updated)
if new_message {
self.sort(cx);
}
// Set this room is ongoing if the new message is from current user
if author == account.read(cx).public_key() {
this.set_ongoing(cx);
}
// Emit the new message to the room
this.emit_message(message.gift_wrap, event_for_emit.clone(), cx);
});
// Resort all rooms in the registry by their created at (after updated)
if is_new_event {
self.sort(cx);
}
} else {
// Push the new room to the front of the list
self.add_room(cx.new(|_| Room::from(&message.rumor)), cx);
None => {
// Push the new room to the front of the list
self.add_room(&message.rumor, cx);
// Notify the UI about the new room
cx.emit(ChatEvent::NewChatRequest(RoomKind::default()));
// Notify the UI about the new room
cx.emit(ChatEvent::Ping);
}
}
}
// Unwraps a gift-wrapped event and processes its contents.
async fn extract_rumor<T>(
async fn extract_rumor(
client: &Client,
signer: &Option<T>,
device_signer: &Option<Arc<dyn NostrSigner>>,
gift_wrap: &Event,
) -> Result<UnsignedEvent, Error>
where
T: NostrSigner,
{
) -> Result<UnsignedEvent, Error> {
// Try to get cached rumor first
if let Ok(event) = Self::get_rumor(client, gift_wrap.id).await {
return Ok(event);
}
// Try to unwrap with the available signer
let unwrapped = Self::try_unwrap(client, signer, gift_wrap).await?;
let unwrapped = Self::try_unwrap(client, device_signer, gift_wrap).await?;
let mut rumor_unsigned = unwrapped.rumor;
// Generate event id for the rumor if it doesn't have one
@@ -593,37 +613,26 @@ impl ChatRegistry {
}
// Helper method to try unwrapping with different signers
async fn try_unwrap<T>(
async fn try_unwrap(
client: &Client,
signer: &Option<T>,
device_signer: &Option<Arc<dyn NostrSigner>>,
gift_wrap: &Event,
) -> Result<UnwrappedGift, Error>
where
T: NostrSigner,
{
if let Some(custom_signer) = signer.as_ref() {
if let Ok(seal) = custom_signer
) -> Result<UnwrappedGift, Error> {
if let Some(signer) = device_signer.as_ref() {
let seal = signer
.nip44_decrypt(&gift_wrap.pubkey, &gift_wrap.content)
.await
{
let seal: Event = Event::from_json(seal)?;
seal.verify_with_ctx(&SECP256K1)?;
.await?;
// Decrypt the rumor
// TODO: verify the sender
let rumor = custom_signer
.nip44_decrypt(&seal.pubkey, &seal.content)
.await?;
let seal: Event = Event::from_json(seal)?;
seal.verify_with_ctx(&SECP256K1)?;
// Construct the unsigned event
let rumor = UnsignedEvent::from_json(rumor)?;
let rumor = signer.nip44_decrypt(&seal.pubkey, &seal.content).await?;
let rumor = UnsignedEvent::from_json(rumor)?;
// Return the unwrapped gift
return Ok(UnwrappedGift {
sender: rumor.pubkey,
rumor,
});
}
return Ok(UnwrappedGift {
sender: seal.pubkey,
rumor,
});
}
let signer = client.signer().await?;
@@ -695,33 +704,6 @@ impl ChatRegistry {
}
}
/// Get metadata for a list of public keys
async fn get_metadata<I>(client: &Client, public_keys: I) -> Result<(), Error>
where
I: IntoIterator<Item = PublicKey>,
{
let authors: Vec<PublicKey> = public_keys.into_iter().collect();
let opts = SubscribeAutoCloseOptions::default().exit_policy(ReqExitPolicy::ExitOnEOSE);
let kinds = vec![Kind::Metadata, Kind::ContactList];
// Return if the list is empty
if authors.is_empty() {
return Err(anyhow!("You need at least one public key".to_string(),));
}
let filter = Filter::new()
.limit(authors.len() * kinds.len())
.authors(authors)
.kinds(kinds);
// Subscribe to filters to the bootstrap relays
client
.subscribe_to(BOOTSTRAP_RELAYS, filter, Some(opts))
.await?;
Ok(())
}
/// Get the conversation ID for a given rumor (message).
fn conversation_id(rumor: &UnsignedEvent) -> u64 {
let mut hasher = DefaultHasher::new();