Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added __init__.py
Empty file.
80 changes: 80 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from nostr.client.client import NostrClient
from nostr.event import Event
from nostr.key import PublicKey
import asyncio
import threading


async def dm():
print("This is an example NIP-04 DM flow")
pk = input("Enter your privatekey to post from (enter nothing for a random one): ")

def callback(event: Event, decrypted_content):
"""
Callback to trigger when a DM is received.
"""
print(
f"From {event.public_key[:3]}..{event.public_key[-3:]}: {decrypted_content}"
)

client = NostrClient(privatekey_hex=pk)
await asyncio.sleep(1)

t = threading.Thread(
target=client.get_dm,
args=(
client.public_key,
callback,
),
)
t.start()

to_pubk_hex = (
input("Enter other pubkey to post to (enter nothing to DM yourself): ")
or client.public_key.hex()
)
print(f"Subscribing to DMs to {to_pubk_hex}")
while True:
msg = input("\nEnter message: ")
client.dm(msg, PublicKey(bytes.fromhex(to_pubk_hex)))


async def post():
print("This posts and reads a nostr note")
pk = input("Enter your privatekey to post from (enter nothing for a random one): ")

def callback(event: Event):
"""
Callback to trigger when post appers.
"""
print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}")

sender_client = NostrClient(privatekey_hex=pk)
await asyncio.sleep(1)

to_pubk_hex = (
input("Enter other pubkey (enter nothing to read your own posts): ")
or sender_client.public_key.hex()
)
print(f"Subscribing to posts by {to_pubk_hex}")
to_pubk = PublicKey(bytes.fromhex(to_pubk_hex))

t = threading.Thread(
target=sender_client.get_post,
args=(
to_pubk,
callback,
),
)
t.start()

while True:
msg = input("\nEnter post: ")
sender_client.post(msg)


# write a DM and receive DMs
asyncio.run(dm())

# make a post and subscribe to posts
# asyncio.run(post())
Empty file added nostr/client/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions nostr/client/cbc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@

from Cryptodome import Random
from Cryptodome.Cipher import AES

plain_text = "This is the text to encrypts"

# encrypted = "7mH9jq3K9xNfWqIyu9gNpUz8qBvGwsrDJ+ACExdV1DvGgY8q39dkxVKeXD7LWCDrPnoD/ZFHJMRMis8v9lwHfNgJut8EVTMuJJi8oTgJevOBXl+E+bJPwej9hY3k20rgCQistNRtGHUzdWyOv7S1tg==".encode()
# iv = "GzDzqOVShWu3Pl2313FBpQ==".encode()

key = bytes.fromhex("3aa925cb69eb613e2928f8a18279c78b1dca04541dfd064df2eda66b59880795")

BLOCK_SIZE = 16

class AESCipher(object):
"""This class is compatible with crypto.createCipheriv('aes-256-cbc')

"""
def __init__(self, key=None):
self.key = key

def pad(self, data):
length = BLOCK_SIZE - (len(data) % BLOCK_SIZE)
return data + (chr(length) * length).encode()

def unpad(self, data):
return data[: -(data[-1] if type(data[-1]) == int else ord(data[-1]))]

def encrypt(self, plain_text):
cipher = AES.new(self.key, AES.MODE_CBC)
b = plain_text.encode("UTF-8")
return cipher.iv, cipher.encrypt(self.pad(b))

def decrypt(self, iv, enc_text):
cipher = AES.new(self.key, AES.MODE_CBC, iv=iv)
return self.unpad(cipher.decrypt(enc_text).decode("UTF-8"))

if __name__ == "__main__":
aes = AESCipher(key=key)
iv, enc_text = aes.encrypt(plain_text)
dec_text = aes.decrypt(iv, enc_text)
print(dec_text)
164 changes: 164 additions & 0 deletions nostr/client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from typing import *
import ssl
import time
import json
import os
import base64

from ..event import Event
from ..relay_manager import RelayManager
from ..message_type import ClientMessageType
from ..key import PrivateKey, PublicKey

from ..filter import Filter, Filters
from ..event import Event, EventKind
from ..relay_manager import RelayManager
from ..message_type import ClientMessageType

# from aes import AESCipher
from . import cbc


class NostrClient:
relays = [
"wss://lnbits.link/nostrrelay/client"
# "wss://nostr-pub.wellorder.net",
# "wss://nostr.zebedee.cloud",
# "wss://no.str.cr",
] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr"
relay_manager = RelayManager()
private_key: PrivateKey
public_key: PublicKey

def __init__(self, privatekey_hex: str = "", relays: List[str] = [], connect=True):
self.generate_keys(privatekey_hex)

if len(relays):
self.relays = relays

if connect:
for relay in self.relays:
self.relay_manager.add_relay(relay)
self.relay_manager.open_connections(
{"cert_reqs": ssl.CERT_NONE}
) # NOTE: This disables ssl certificate verification

def close(self):
self.relay_manager.close_connections()

def generate_keys(self, privatekey_hex: str = None):
pk = bytes.fromhex(privatekey_hex) if privatekey_hex else None
self.private_key = PrivateKey(pk)
self.public_key = self.private_key.public_key
# print(
# f"Nostr private key: {self.private_key.hex()} ({self.private_key.bech32()})"
# )
# print(f"Nostr public key: {self.public_key.hex()} ({self.public_key.bech32()})")

def post(self, message: str):
event = Event(self.public_key.hex(), message, kind=EventKind.TEXT_NOTE)
self.private_key.sign_event(event)
message = json.dumps([ClientMessageType.EVENT, event.to_message()])
# print("Publishing message:")
# print(message)
self.relay_manager.publish_message(message)

def get_post(self, sender_publickey: PublicKey, callback_func=None):
filters = Filters(
[Filter(authors=[sender_publickey.hex()], kinds=[EventKind.TEXT_NOTE])]
)
subscription_id = os.urandom(4).hex()
self.relay_manager.add_subscription(subscription_id, filters)

request = [ClientMessageType.REQUEST, subscription_id]
request.extend(filters.to_json_array())
message = json.dumps(request)
# print("Subscribing to events:")
# print(message)
self.relay_manager.publish_message(message)

while True:
while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event()
print(event_msg.event.content)
if callback_func:
callback_func(event_msg.event)
time.sleep(0.1)

def dm(self, message: str, to_pubkey: PublicKey):

shared_secret = self.private_key.compute_shared_secret(to_pubkey.hex())

# print("shared secret: ", shared_secret.hex())
# print("plain text:", message)
aes = cbc.AESCipher(key=shared_secret)
iv, enc_text = aes.encrypt(message)
# print("encrypt iv: ", iv)
content = f"{base64.b64encode(enc_text).decode('utf-8')}?iv={base64.b64encode(iv).decode('utf-8')}"

event = Event(
self.public_key.hex(),
content,
tags=[["p", to_pubkey.hex()]],
kind=EventKind.ENCRYPTED_DIRECT_MESSAGE,
)
self.private_key.sign_event(event)
event_message = json.dumps([ClientMessageType.EVENT, event.to_message()])
# print("DM message:")
# print(event_message)

time.sleep(1)
self.relay_manager.publish_message(event_message)

def get_dm(self, sender_publickey: PublicKey, callback_func=None):
filters = Filters(
[
Filter(
kinds=[EventKind.ENCRYPTED_DIRECT_MESSAGE],
pubkey_refs={"#p": [sender_publickey.hex()]},
)
]
)
subscription_id = os.urandom(4).hex()
self.relay_manager.add_subscription(subscription_id, filters)

request = [ClientMessageType.REQUEST, subscription_id]
request.extend(filters.to_json_array())
message = json.dumps(request)
# print("Subscribing to events:")
# print(message)
self.relay_manager.publish_message(message)

while True:
while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event()
if "?iv=" in event_msg.event.content:
try:
shared_secret = self.private_key.compute_shared_secret(
event_msg.event.public_key
)
# print("shared secret: ", shared_secret.hex())
# print("plain text:", message)
aes = cbc.AESCipher(key=shared_secret)
enc_text_b64, iv_b64 = event_msg.event.content.split("?iv=")
iv = base64.decodebytes(iv_b64.encode("utf-8"))
enc_text = base64.decodebytes(enc_text_b64.encode("utf-8"))
# print("decrypt iv: ", iv)
dec_text = aes.decrypt(iv, enc_text)
# print(f"From {event_msg.event.public_key[:5]}...: {dec_text}")
if callback_func:
callback_func(event_msg.event, dec_text)
except:
pass
# else:
# print(f"\nFrom {event_msg.event.public_key[:5]}...: {event_msg.event.content}")
break
time.sleep(0.1)

async def subscribe(self):
while True:
while self.relay_manager.message_pool.has_events():
event_msg = self.relay_manager.message_pool.get_event()
print(event_msg.event.content)
break
time.sleep(0.1)
Loading