Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,108 changes: 650 additions & 458 deletions LICENSE

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Spider reads your follow list and pulls your feed. Point your client at `ws://lo

## Features

* NIPs: 1, 9, 11, 13, 16, 33, 40, 42, 45, 50, 65, 70, 77
* NIPs: 1, 2, 9, 11, 13, 16, 33, 40, 42, 45, 50, 65, 70, 77
* LMDB storage (no external database)
* Spider mode for syncing events from external relays
* Import/export to JSONL
Expand All @@ -62,4 +62,4 @@ See `wisp.toml.example` for all options.

## License

LGPL-2.1-or-later
AGPL-3.0
14 changes: 4 additions & 10 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@ pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});

// Import dependencies
const httpz = b.dependency("httpz", .{
.target = target,
.optimize = optimize,
});
// Get websocket from httpz to avoid version conflicts
const httpz_dep = httpz.builder.dependency("websocket", .{
const nostr = b.dependency("nostr", .{
.target = target,
.optimize = optimize,
});
const nostr = b.dependency("nostr", .{
// websocket client for spider outbound connections
const websocket = b.dependency("websocket", .{
.target = target,
.optimize = optimize,
});
Expand All @@ -26,9 +21,8 @@ pub fn build(b: *std.Build) void {
.target = target,
.optimize = optimize,
.imports = &.{
.{ .name = "httpz", .module = httpz.module("httpz") },
.{ .name = "websocket", .module = httpz_dep.module("websocket") },
.{ .name = "nostr", .module = nostr.module("nostr") },
.{ .name = "websocket", .module = websocket.module("websocket") },
},
}),
});
Expand Down
11 changes: 4 additions & 7 deletions build.zig.zon
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
.{
.name = .wisp,
.version = "0.1.2",
.version = "0.1.3",
.fingerprint = 0xc4bdec9fe6401a8d,
.dependencies = .{
.httpz = .{
.url = "https://github.com/karlseguin/http.zig/archive/refs/heads/master.tar.gz",
.hash = "httpz-0.0.0-PNVzrEktBwCzPoiua-S8LAYo2tILqczm3tSpneEzLQ9L",
},
// websocket client for spider outbound connections
.websocket = .{
.url = "https://github.com/karlseguin/websocket.zig/archive/refs/heads/master.tar.gz",
.hash = "websocket-0.1.0-ZPISdRNzAwAGszh62EpRtoQxu8wb1MSMVI6Ow0o2dmyJ",
},
.nostr = .{
.url = "https://github.com/privkeyio/libnostr-z/archive/refs/tags/v0.1.6.tar.gz",
.hash = "nostr-0.1.6-JY6OcMPABABT9tx-BY8jcJpBa-boEF20VbZY-iX3DMMe",
.url = "https://github.com/privkeyio/libnostr-z/archive/refs/tags/v0.1.8.tar.gz",
.hash = "nostr-0.1.8-JY6OcM8yBgBerCkIS-c3a_YdGedXYw_PHRPT1ey73eSw",
},
},
.paths = .{
Expand Down
46 changes: 37 additions & 9 deletions src/connection.zig
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const std = @import("std");
const posix = std.posix;
const nostr = @import("nostr.zig");
const httpz = @import("httpz");
const websocket = httpz.websocket;
const WriteQueue = @import("write_queue.zig").WriteQueue;
const write_queue = @import("write_queue.zig");
const WriteQueue = write_queue.WriteQueue;
const WriteFn = write_queue.WriteFn;

pub const NegSession = struct {
storage: nostr.negentropy.VectorStorage,
Expand All @@ -24,7 +25,9 @@ pub const Connection = struct {
neg_sessions: std.StringHashMap(NegSession),
created_at: i64,
last_activity: i64,
ws_conn: ?*websocket.Conn = null,
direct_write_fn: ?WriteFn = null,
direct_write_ctx: ?*anyopaque = null,
socket_handle: ?posix.socket_t = null,

events_received: u64 = 0,
events_sent: u64 = 0,
Expand Down Expand Up @@ -55,16 +58,18 @@ pub const Connection = struct {
self.events_sent = 0;
self.client_ip = undefined;
self.client_ip_len = 0;
self.ws_conn = null;
self.direct_write_fn = null;
self.direct_write_ctx = null;
self.socket_handle = null;
std.crypto.random.bytes(&self.auth_challenge);
self.authenticated_pubkeys = std.AutoHashMap([32]u8, void).init(self.arena.allocator());
self.challenge_sent = false;
self.write_queue = WriteQueue.init(backing_allocator);
self.deinitialized = false;
}

pub fn startWriteQueue(self: *Connection, ws_conn: *websocket.Conn) void {
self.write_queue.start(ws_conn);
pub fn startWriteQueue(self: *Connection, write_fn: WriteFn, write_ctx: *anyopaque) void {
self.write_queue.start(write_fn, write_ctx);
}

pub fn stopWriteQueue(self: *Connection) void {
Expand Down Expand Up @@ -98,8 +103,31 @@ pub const Connection = struct {
}

pub fn sendDirect(self: *Connection, data: []const u8) void {
if (self.ws_conn) |conn| {
conn.write(data) catch {};
if (self.direct_write_fn) |write_fn| {
if (self.direct_write_ctx) |ctx| {
write_fn(ctx, data);
}
}
}

pub fn setDirectWriter(self: *Connection, write_fn: WriteFn, ctx: *anyopaque) void {
self.direct_write_fn = write_fn;
self.direct_write_ctx = ctx;
}

pub fn clearDirectWriter(self: *Connection) void {
self.direct_write_fn = null;
self.direct_write_ctx = null;
}

pub fn setSocketHandle(self: *Connection, handle: posix.socket_t) void {
self.socket_handle = handle;
}

pub fn shutdown(self: *Connection) void {
if (self.socket_handle) |handle| {
posix.shutdown(handle, .both) catch {};
self.socket_handle = null;
}
}

Expand Down
44 changes: 33 additions & 11 deletions src/handler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub const Handler = struct {
broadcaster: *Broadcaster,
send_fn: *const fn (conn_id: u64, data: []const u8) void,
event_limiter: *rate_limiter.EventRateLimiter,
shutdown: *std.atomic.Value(bool),

pub fn init(
allocator: std.mem.Allocator,
Expand All @@ -157,6 +158,7 @@ pub const Handler = struct {
broadcaster: *Broadcaster,
send_fn: *const fn (conn_id: u64, data: []const u8) void,
event_limiter: *rate_limiter.EventRateLimiter,
shutdown: *std.atomic.Value(bool),
) Handler {
return .{
.allocator = allocator,
Expand All @@ -166,10 +168,12 @@ pub const Handler = struct {
.broadcaster = broadcaster,
.send_fn = send_fn,
.event_limiter = event_limiter,
.shutdown = shutdown,
};
}

pub fn handle(self: *Handler, conn: *Connection, message: []const u8) void {
if (self.shutdown.load(.acquire)) return;
conn.touch();

if (!validateMessageStructure(message)) {
Expand Down Expand Up @@ -370,13 +374,17 @@ pub const Handler = struct {
}

fn handleReq(self: *Handler, conn: *Connection, msg: *nostr.ClientMsg) void {
const sub_id = msg.subscriptionId();
const sub_id_raw = msg.subscriptionId();

if (sub_id.len == 0 or sub_id.len > 64) {
self.sendClosed(conn, sub_id, "error: invalid subscription ID");
if (sub_id_raw.len == 0 or sub_id_raw.len > 64) {
self.sendClosed(conn, sub_id_raw, "error: invalid subscription ID");
return;
}

var sub_id_buf: [64]u8 = undefined;
@memcpy(sub_id_buf[0..sub_id_raw.len], sub_id_raw);
const sub_id = sub_id_buf[0..sub_id_raw.len];

if (self.config.auth_required) {
if (!conn.isAuthenticated()) {
self.sendClosed(conn, sub_id, "auth-required: authentication required to subscribe");
Expand Down Expand Up @@ -419,6 +427,7 @@ pub const Handler = struct {
if (filters.len == 1 and isKindOnlyQuery(&filters[0])) {
const kinds = filters[0].kinds().?;
if (kinds.len == 1) {
if (self.shutdown.load(.acquire)) return;
var iter = self.store.query(filters, limit) catch {
self.sendClosed(conn, sub_id, "error: query failed");
return;
Expand All @@ -432,6 +441,7 @@ pub const Handler = struct {
conn.events_sent += 1;
}
} else {
if (self.shutdown.load(.acquire)) return;
var mk_iter = self.store.queryMultiKind(kinds, limit) catch {
self.sendClosed(conn, sub_id, "error: query failed");
return;
Expand All @@ -446,6 +456,7 @@ pub const Handler = struct {
}
}
} else {
if (self.shutdown.load(.acquire)) return;
var iter = self.store.query(filters, limit) catch {
self.sendClosed(conn, sub_id, "error: query failed");
return;
Expand Down Expand Up @@ -608,6 +619,7 @@ pub const Handler = struct {
return;
};

if (self.shutdown.load(.acquire)) return;
var iter = self.store.query(&[_]nostr.Filter{filter}, self.config.negentropy_max_sync_events) catch {
conn.removeNegSession(sub_id);
self.sendNegErr(conn, sub_id, "error: query failed");
Expand Down Expand Up @@ -665,60 +677,70 @@ pub const Handler = struct {
conn.removeNegSession(msg.subscriptionId());
}

fn reconcileAndSend(_: *Handler, conn: *Connection, sub_id: []const u8, session: *NegSession, query: []const u8) void {
fn reconcileAndSend(self: *Handler, conn: *Connection, sub_id: []const u8, session: *NegSession, query: []const u8) void {
if (self.shutdown.load(.acquire)) return;
var out_buf: [65536]u8 = undefined;
var neg = nostr.negentropy.Negentropy.init(session.storage.storage(), 0);

var result = neg.reconcile(query, &out_buf, conn.allocator()) catch {
if (self.shutdown.load(.acquire)) return;
var err_buf: [512]u8 = undefined;
const err_msg = nostr.RelayMsg.negErr(sub_id, "error: reconciliation failed", &err_buf) catch return;
conn.sendDirect(err_msg);
return;
};
defer result.deinit();

if (self.shutdown.load(.acquire)) return;
var msg_buf: [131072]u8 = undefined;
const neg_msg = nostr.RelayMsg.negMsg(sub_id, result.output, &msg_buf) catch return;
conn.sendDirect(neg_msg);
}

fn sendNegErr(_: *Handler, conn: *Connection, sub_id: []const u8, reason: []const u8) void {
fn sendNegErr(self: *Handler, conn: *Connection, sub_id: []const u8, reason: []const u8) void {
if (self.shutdown.load(.acquire)) return;
var buf: [512]u8 = undefined;
const msg = nostr.RelayMsg.negErr(sub_id, reason, &buf) catch return;
conn.sendDirect(msg);
}

fn sendOk(_: *Handler, conn: *Connection, event_id: *const [32]u8, success: bool, message: []const u8) void {
fn sendOk(self: *Handler, conn: *Connection, event_id: *const [32]u8, success: bool, message: []const u8) void {
if (self.shutdown.load(.acquire)) return;
var buf: [512]u8 = undefined;
const msg = nostr.RelayMsg.ok(event_id, success, message, &buf) catch return;
conn.sendDirect(msg);
}

fn sendEose(_: *Handler, conn: *Connection, sub_id: []const u8) void {
fn sendEose(self: *Handler, conn: *Connection, sub_id: []const u8) void {
if (self.shutdown.load(.acquire)) return;
var buf: [256]u8 = undefined;
const msg = nostr.RelayMsg.eose(sub_id, &buf) catch return;
conn.sendDirect(msg);
}

fn sendClosed(_: *Handler, conn: *Connection, sub_id: []const u8, message: []const u8) void {
fn sendClosed(self: *Handler, conn: *Connection, sub_id: []const u8, message: []const u8) void {
if (self.shutdown.load(.acquire)) return;
var buf: [512]u8 = undefined;
const msg = nostr.RelayMsg.closed(sub_id, message, &buf) catch return;
conn.sendDirect(msg);
}

fn sendCount(_: *Handler, conn: *Connection, sub_id: []const u8, count_val: u64) void {
fn sendCount(self: *Handler, conn: *Connection, sub_id: []const u8, count_val: u64) void {
if (self.shutdown.load(.acquire)) return;
var buf: [256]u8 = undefined;
const msg = nostr.RelayMsg.count(sub_id, count_val, &buf) catch return;
conn.sendDirect(msg);
}

fn sendNotice(_: *Handler, conn: *Connection, message: []const u8) void {
fn sendNotice(self: *Handler, conn: *Connection, message: []const u8) void {
if (self.shutdown.load(.acquire)) return;
var buf: [512]u8 = undefined;
const msg = nostr.RelayMsg.notice(message, &buf) catch return;
conn.sendDirect(msg);
}

fn sendAuthChallenge(_: *Handler, conn: *Connection) void {
fn sendAuthChallenge(self: *Handler, conn: *Connection) void {
if (self.shutdown.load(.acquire)) return;
if (conn.challenge_sent) return;
var buf: [256]u8 = undefined;
const auth_msg = nostr.RelayMsg.auth(&conn.auth_challenge, &buf) catch return;
Expand Down
Loading