Skip to content

Commit 85ff158

Browse files
Update pycatfile_py3.py
1 parent 352e339 commit 85ff158

File tree

1 file changed

+261
-0
lines changed

1 file changed

+261
-0
lines changed

pycatfile_py3.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15076,6 +15076,267 @@ def send_from_fileobj(fileobj, host, port, proto="tcp", path_text=None, **kwargs
1507615076
return _udp_quic_send(fileobj, host, port, **kwargs)
1507715077
return _udp_seq_send(fileobj, host, port, **kwargs)
1507815078

15079+
def _udp_raw_send(fileobj, host, port, **kwargs):
15080+
logger = _logger_from_kwargs(kwargs)
15081+
addr = (host or "127.0.0.1", int(port))
15082+
15083+
# ---- normalize bool-ish flags (URL query values may be strings) ----
15084+
handshake = _kw_bool(kwargs.get("handshake", True), True)
15085+
raw_ack = _kw_bool(kwargs.get("raw_ack", False), False)
15086+
raw_meta = _kw_bool(kwargs.get("raw_meta", True), True)
15087+
raw_sha = _kw_bool(kwargs.get("raw_sha", False), False)
15088+
wait = _kw_bool(kwargs.get("wait", True), True) or _kw_bool(kwargs.get("connect_wait", False), False)
15089+
15090+
verbose = _kw_bool(kwargs.get("verbose", False), False)
15091+
15092+
def _log(msg):
15093+
_net_log(verbose, msg, logger=logger)
15094+
15095+
# ---- numeric params ----
15096+
try:
15097+
chunk = int(kwargs.get("chunk", 1200))
15098+
except Exception:
15099+
chunk = 1200
15100+
if chunk < 256:
15101+
chunk = 256
15102+
15103+
try:
15104+
wt = kwargs.get("wait_timeout", None)
15105+
wt = float(wt) if wt is not None else None
15106+
except Exception:
15107+
wt = None
15108+
15109+
try:
15110+
hello_iv = float(kwargs.get("hello_interval", 0.1) or 0.1)
15111+
except Exception:
15112+
hello_iv = 0.1
15113+
if hello_iv <= 0:
15114+
hello_iv = 0.1
15115+
15116+
# ---- compute total remaining length (for META and/or HASH) ----
15117+
total_len = None
15118+
pos = None
15119+
if raw_meta or raw_sha:
15120+
try:
15121+
pos = fileobj.tell()
15122+
fileobj.seek(0, os.SEEK_END)
15123+
end = fileobj.tell()
15124+
fileobj.seek(pos, os.SEEK_SET)
15125+
total_len = int(end - pos)
15126+
if total_len < 0:
15127+
total_len = None
15128+
except Exception:
15129+
total_len = None
15130+
try:
15131+
if pos is not None:
15132+
fileobj.seek(pos, os.SEEK_SET)
15133+
except Exception:
15134+
pass
15135+
15136+
# ---- precompute expected hash (optional) ----
15137+
expected_hex = None
15138+
raw_hash = (kwargs.get("raw_hash", "sha256") or "sha256").lower()
15139+
if raw_sha and total_len is not None:
15140+
try:
15141+
h = hashlib.sha256() if raw_hash != "md5" else hashlib.md5()
15142+
cur = fileobj.tell()
15143+
while True:
15144+
b = fileobj.read(65536)
15145+
if not b:
15146+
break
15147+
h.update(_to_bytes(b))
15148+
expected_hex = h.hexdigest()
15149+
fileobj.seek(cur, os.SEEK_SET)
15150+
except Exception:
15151+
expected_hex = None
15152+
try:
15153+
if pos is not None:
15154+
fileobj.seek(pos, os.SEEK_SET)
15155+
except Exception:
15156+
pass
15157+
15158+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15159+
15160+
try:
15161+
# ---- handshake / wait-for-receiver ----
15162+
tok = kwargs.get("token")
15163+
tok = _hs_token() if tok is None else _to_bytes(tok)
15164+
15165+
if wait:
15166+
start_t = time.time()
15167+
while True:
15168+
if wt is not None and wt >= 0 and (time.time() - start_t) >= wt:
15169+
_log("UDP raw: wait_timeout reached; no receiver READY")
15170+
try:
15171+
sock.close()
15172+
except Exception:
15173+
pass
15174+
return False
15175+
15176+
# announce
15177+
if handshake:
15178+
try:
15179+
sock.sendto(b"HELLO " + tok + b"\n", addr)
15180+
except Exception:
15181+
pass
15182+
15183+
if raw_meta and total_len is not None:
15184+
try:
15185+
sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15186+
except Exception:
15187+
pass
15188+
15189+
if raw_sha and expected_hex:
15190+
try:
15191+
sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15192+
except Exception:
15193+
pass
15194+
15195+
# wait briefly for READY
15196+
try:
15197+
sock.settimeout(hello_iv)
15198+
except Exception:
15199+
pass
15200+
15201+
try:
15202+
pkt, _a = sock.recvfrom(1024)
15203+
if pkt.startswith(b"READY"):
15204+
# READY or READY <token>
15205+
if b" " in pkt:
15206+
rt = pkt.split(None, 1)[1].strip()
15207+
if rt and rt != tok:
15208+
continue
15209+
_log("UDP raw: received READY from receiver")
15210+
break
15211+
except socket.timeout:
15212+
continue
15213+
except Exception:
15214+
continue
15215+
else:
15216+
# if not waiting, still send META/HASH once up front
15217+
if handshake:
15218+
try:
15219+
sock.sendto(b"HELLO " + tok + b"\n", addr)
15220+
except Exception:
15221+
pass
15222+
if raw_meta and total_len is not None:
15223+
try:
15224+
sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15225+
except Exception:
15226+
pass
15227+
if raw_sha and expected_hex:
15228+
try:
15229+
sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15230+
except Exception:
15231+
pass
15232+
15233+
# ---- send data ----
15234+
if raw_ack:
15235+
# sliding window retransmit
15236+
try:
15237+
ack_to = float(kwargs.get("raw_ack_timeout", 0.5) or 0.5)
15238+
except Exception:
15239+
ack_to = 0.5
15240+
try:
15241+
retries_max = int(kwargs.get("raw_ack_retries", 40) or 40)
15242+
except Exception:
15243+
retries_max = 40
15244+
try:
15245+
win = int(kwargs.get("raw_ack_window", 1) or 1)
15246+
except Exception:
15247+
win = 1
15248+
if win < 1:
15249+
win = 1
15250+
15251+
try:
15252+
sock.settimeout(ack_to)
15253+
except Exception:
15254+
pass
15255+
15256+
def _make_pkt(seq, data):
15257+
return b"PKT " + str(seq).encode("ascii") + b" " + _to_bytes(data)
15258+
15259+
base_seq = 0
15260+
next_seq = 0
15261+
pkts = {}
15262+
eof = False
15263+
timeout_tries = 0
15264+
15265+
while True:
15266+
# fill window
15267+
while (not eof) and next_seq < base_seq + win:
15268+
data = fileobj.read(chunk)
15269+
if not data:
15270+
eof = True
15271+
break
15272+
pkt = _make_pkt(next_seq, data)
15273+
pkts[next_seq] = pkt
15274+
try:
15275+
sock.sendto(pkt, addr)
15276+
except Exception:
15277+
pass
15278+
next_seq += 1
15279+
15280+
if eof and base_seq == next_seq:
15281+
break
15282+
15283+
try:
15284+
apkt, _a = sock.recvfrom(1024)
15285+
if apkt.startswith(b"ACK "):
15286+
try:
15287+
aseq = int(apkt.split()[1])
15288+
except Exception:
15289+
aseq = -1
15290+
new_base = aseq + 1
15291+
if new_base > base_seq:
15292+
for s in list(pkts.keys()):
15293+
if s < new_base:
15294+
pkts.pop(s, None)
15295+
base_seq = new_base
15296+
timeout_tries = 0
15297+
except socket.timeout:
15298+
timeout_tries += 1
15299+
if retries_max >= 0 and timeout_tries >= retries_max:
15300+
_log("UDP raw: too many ACK timeouts, giving up")
15301+
return False
15302+
# retransmit all in-flight
15303+
for s in range(base_seq, next_seq):
15304+
pkt = pkts.get(s)
15305+
if pkt is None:
15306+
continue
15307+
try:
15308+
sock.sendto(pkt, addr)
15309+
except Exception:
15310+
pass
15311+
except Exception:
15312+
# treat as timeout-ish
15313+
timeout_tries += 1
15314+
15315+
else:
15316+
# legacy raw: just send datagrams
15317+
while True:
15318+
data = fileobj.read(chunk)
15319+
if not data:
15320+
break
15321+
try:
15322+
sock.sendto(_to_bytes(data), addr)
15323+
except Exception:
15324+
pass
15325+
15326+
# ---- finish ----
15327+
try:
15328+
sock.sendto(b"DONE", addr)
15329+
except Exception:
15330+
pass
15331+
15332+
return True
15333+
15334+
finally:
15335+
try:
15336+
sock.close()
15337+
except Exception:
15338+
pass
15339+
1507915340
def _udp_raw_recv(fileobj, host, port, **kwargs):
1508015341
logger = _logger_from_kwargs(kwargs)
1508115342
addr = (host or "", int(port))

0 commit comments

Comments
 (0)