Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions tinyman/swap_router/base_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import time
from base64 import b64decode, b64encode
from algosdk import transaction
from algosdk.logic import get_application_address

from tinyman.utils import TransactionGroup
from tinyman.swap_router.struct import get_struct, get_box_costs


class BaseClient():
def __init__(self, algod, app_id, user_address, user_sk) -> None:
self.algod = algod
self.app_id = app_id
self.application_address = get_application_address(self.app_id)
self.user_address = user_address
self.keys = {}
self.add_key(user_address, user_sk)
self.current_timestamp = None
self.simulate = False

def get_suggested_params(self):
return self.algod.suggested_params()

def get_current_timestamp(self):
return self.current_timestamp or time.time()

def _submit(self, transactions, additional_fees=0):
transactions = self.flatten_transactions(transactions)
fee = transactions[0].fee
n = 0
for txn in transactions:
if txn.fee == fee:
txn.fee = 0
n += 1
transactions[0].fee = (n + additional_fees) * fee
txn_group = TransactionGroup(transactions)
for address, key in self.keys.items():
if isinstance(key, transaction.LogicSigAccount):
txn_group.sign_with_logicsig(key, address=address)
else:
txn_group.sign_with_private_key(address, key)
if self.simulate:
txn_info = self.algod.simulate_raw_transactions(txn_group.signed_transactions)
else:
txn_info = txn_group.submit(self.algod, wait=True)
return txn_info

def flatten_transactions(self, txns):
result = []
if isinstance(txns, transaction.Transaction):
result = [txns]
elif isinstance(txns, list):
for txn in txns:
result += self.flatten_transactions(txn)
return result

def calculate_min_balance(self, accounts=0, assets=0, boxes=None):
cost = 0
cost += accounts * 100_000
cost += assets * 100_000
cost += get_box_costs(boxes or {})
return cost

def add_key(self, address, key):
self.keys[address] = key

def get_globals(self, app_id=None):
app_id = app_id or self.app_id
gs = self.algod.application_info(app_id)["params"]["global-state"]
global_state = {s["key"]: s["value"] for s in gs}
state = {}
for key in global_state:
k = b64decode(key)
value = global_state[key]
if value["type"] == 2:
state[k] = value["uint"]
else:
state[k] = b64decode(value["bytes"])
state = dict(sorted(state.items(), key=lambda x: x[0]))
return state

def get_global(self, key, default=None, app_id=None):
app_id = app_id or self.app_id
global_state = {s["key"]: s["value"] for s in self.algod.application_info(app_id)["params"]["global-state"]}
key = b64encode(key).decode()
if key in global_state:
value = global_state[key]
if value["type"] == 2:
return value["uint"]
else:
return b64decode(value["bytes"])
else:
return default

def get_box(self, box_name, struct_name, app_id=None):
app_id = app_id or self.app_id
box_value = b64decode(self.algod.application_box_by_name(app_id, box_name)["value"])
struct_class = get_struct(struct_name)
struct = struct_class(box_value)
return struct

def box_exists(self, box_name, app_id=None):
app_id = app_id or self.app_id
try:
self.algod.application_box_by_name(app_id, box_name)
return True
except Exception:
return False

def is_opted_in(self, address, asset_id):
if asset_id == 0:
return True

try:
self.algod.account_asset_info(address, asset_id)
return True
except Exception:
return False

def get_optin_if_needed_txn(self, sender, asset_id):
if not self.is_opted_in(sender, asset_id):
txn = transaction.AssetOptInTxn(
sender=sender,
sp=self.get_suggested_params(),
index=asset_id,
)
return txn
6 changes: 6 additions & 0 deletions tinyman/swap_router/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
TESTNET_SWAP_ROUTER_APP_ID_V1 = 184778019
MAINNET_SWAP_ROUTER_APP_ID_V1 = 1083651166

TESTNET_SWAP_ROUTER_V2_APP_ID = 730573191
MAINNET_SWAP_ROUTER_V2_APP_ID = 2614712672

TESTNET_SWAP_ROUTER_V3_APP_ID = 753151769
MAINNET_SWAP_ROUTER_V3_APP_ID = 3422861405

FIXED_INPUT_SWAP_TYPE = "fixed-input"
FIXED_OUTPUT_SWAP_TYPE = "fixed-output"

Expand Down
162 changes: 162 additions & 0 deletions tinyman/swap_router/struct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import json
import re
from typing import Any, Dict


MINIMUM_BALANCE_REQUIREMENT_PER_BOX = 2_500
MINIMUM_BALANCE_REQUIREMENT_PER_BOX_BYTE = 400


class StructRegistry:
def __init__(self):
self.struct_definitions: Dict[str, Dict] = {}

def load_from_file(self, filepath: str) -> None:
"""Load struct definitions from a JSON file."""
with open(filepath, 'r') as f:
data = json.load(f)
self.load_from_dict(data.get('structs', {}))

def load_from_dict(self, struct_dict: Dict) -> None:
"""Load struct definitions from a dictionary."""
self.struct_definitions.update(struct_dict)

def get_type(self, name: str) -> Any:
"""Get the appropriate type handler for a given type name."""
if name == "int":
return TealishInt()
elif name.startswith("uint"):
return TealishInt()
elif name.startswith("bytes"):
return TealishBytes()
elif name in self.struct_definitions:
return Struct(name=name, manager=self, **self.struct_definitions[name])
elif "[" in name:
name, length = re.match(r"([A-Za-z_0-9]+)\[(\d+)\]", name).groups()
return ArrayData(
Struct(name=name, manager=self, **self.struct_definitions[name]),
int(length)
)
else:
raise KeyError(f"Unknown type: {name}")

def get_struct(self, name: str) -> 'Struct':
"""Get a Struct instance by name."""
if name not in self.struct_definitions:
raise KeyError(f"Struct '{name}' not found")
return Struct(name=name, **self.struct_definitions[name])


STRUCT_REGISTRY = StructRegistry()


def register_struct_file(filepath: str) -> None:
STRUCT_REGISTRY.load_from_file(filepath)


def get_struct(name: str) -> 'Struct':
return STRUCT_REGISTRY.get_struct(name)


class Struct():
def __init__(self, name, size, fields):
self._name = name
self._size = size
self._fields = fields
self._data = None

def __call__(self, data=None) -> Any:
if data is None:
data = bytearray(self._size)
struct = Struct(self._name, self._size, self._fields)
struct._data = memoryview(data)
return struct

def __getattribute__(self, name: str) -> Any:
if name.startswith("_"):
return super().__getattribute__(name)
field = self._fields[name]
start = field["offset"]
end = field["offset"] + field["size"]
value = self._data[start:end]
type = STRUCT_REGISTRY.get_type(field["type"])
return type(value)

def __setattr__(self, name: str, value: Any) -> None:
if name.startswith("_"):
return super().__setattr__(name, value)
field = self._fields[name]
start = field["offset"]
end = field["offset"] + field["size"]
if field["type"] in ("int",):
value = value.to_bytes(field["size"], "big")
if isinstance(value, (Struct, ArrayData)):
value = value._data
self._data[start:end] = value

def __setitem__(self, index, value):
if isinstance(value, (Struct, ArrayData)):
value = value._data
self._data[:] = value

def __str__(self) -> str:
return repr(bytes(self._data))

def __repr__(self) -> str:
fields = {f: getattr(self, f) for f in self._fields}
return f"{self._name}({fields})"

def __len__(self):
return len(self._data)

def __conform__(self, protocol):
return bytes(self._data)

def __bytes__(self):
return bytes(self._data.tobytes())


class ArrayData():
def __init__(self, struct, length):
self._struct = struct
self._length = length

def __call__(self, data=None) -> Any:
if data is None:
data = bytearray(self._struct._size * self.length)
self._data = memoryview(data)
return self

def __getitem__(self, index):
offset = self._struct._size * index
end = offset + self._struct._size
value = self._data[offset:end]
return self._struct(value)

def __setitem__(self, index, value):
offset = self._struct._size * index
end = offset + self._struct._size
if isinstance(value, Struct):
value = value._data
self._data[offset:end] = value

def __repr__(self) -> str:
return ", ".join(repr(self[i]) for i in range(self._length))


class TealishInt():
def __call__(self, value) -> Any:
return int.from_bytes(value, "big")


class TealishBytes():
def __call__(self, value) -> Any:
return value


def get_box_costs(boxes):
cost = MINIMUM_BALANCE_REQUIREMENT_PER_BOX
for name, struct in boxes.items():
cost += len(name) * MINIMUM_BALANCE_REQUIREMENT_PER_BOX_BYTE
cost += struct._size * MINIMUM_BALANCE_REQUIREMENT_PER_BOX_BYTE
return cost
71 changes: 71 additions & 0 deletions tinyman/swap_router/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from base64 import b64decode
from typing import Union, Optional
from algosdk.encoding import decode_address
from algosdk.constants import ZERO_ADDRESS
from algosdk.logic import get_application_address


def parse_swap_router_event_log(log: Union[bytes, str]) -> Optional[dict]:
Expand All @@ -19,3 +22,71 @@ def parse_swap_router_event_log(log: Union[bytes, str]) -> Optional[dict]:
)

return None


# TODO: Move these later.
def int_to_bytes(num, length=8):
return num.to_bytes(length, "big")


def int_array(elements, size, default=0):
array = [default] * size
for i in range(len(elements)):
array[i] = elements[i]
bytes = b"".join(map(int_to_bytes, array))
return bytes


def bytes_array(elements, size, default=b""):
array = [default] * size
for i in range(len(elements)):
array[i] = elements[i]
bytes = b"".join(array)
return bytes


def encode_router_args(route, pools):
route_arg = int_array(route, size=8, default=0)
pools_arg = bytes_array([decode_address(a) for a in pools], size=8, default=decode_address(ZERO_ADDRESS))
return route_arg, pools_arg


def group_references(route, pools, amm_app_id, talgo_asset_id, talgo_app_id, talgo_app_accounts):
talgo_app_address = get_application_address(talgo_app_id)
is_talgo_app_used = False
swaps = len(pools)
pairs = []
for i in range(swaps):
pool = pools[i]

if pool == talgo_app_address:
is_talgo_app_used = True
continue

assets = route[i], route[i + 1]
pairs.append((pool, assets))

# Group account and asset references for every 2 step in route.
grouped_references = []
for i in range(0, 8, 2):
refs = {"accounts": [], "assets": [], "apps": [amm_app_id]}

for pool, assets in pairs[i: i + 2]:
refs["accounts"].append(pool)
refs["assets"] += assets

refs["assets"] = list(set(refs["assets"]))

if not refs["accounts"]:
break

grouped_references.append(refs)

if is_talgo_app_used:
grouped_references.append({
"apps": [talgo_app_id],
"accounts": talgo_app_accounts[1:5],
"assets": [talgo_asset_id],
})

return grouped_references
Empty file.
Loading
Loading