@@ -15294,6 +15294,267 @@ def _try_flush_buffer():
1529415294 except Exception:
1529515295 pass
1529615296
15297+ def _udp_raw_send(fileobj, host, port, **kwargs):
15298+ logger = _logger_from_kwargs(kwargs)
15299+ addr = (host or "127.0.0.1", int(port))
15300+
15301+ # ---- normalize bool-ish flags (URL query values may be strings) ----
15302+ handshake = _kw_bool(kwargs.get("handshake", True), True)
15303+ raw_ack = _kw_bool(kwargs.get("raw_ack", False), False)
15304+ raw_meta = _kw_bool(kwargs.get("raw_meta", True), True)
15305+ raw_sha = _kw_bool(kwargs.get("raw_sha", False), False)
15306+ wait = _kw_bool(kwargs.get("wait", True), True) or _kw_bool(kwargs.get("connect_wait", False), False)
15307+
15308+ verbose = _kw_bool(kwargs.get("verbose", False), False)
15309+
15310+ def _log(msg):
15311+ _net_log(verbose, msg, logger=logger)
15312+
15313+ # ---- numeric params ----
15314+ try:
15315+ chunk = int(kwargs.get("chunk", 1200))
15316+ except Exception:
15317+ chunk = 1200
15318+ if chunk < 256:
15319+ chunk = 256
15320+
15321+ try:
15322+ wt = kwargs.get("wait_timeout", None)
15323+ wt = float(wt) if wt is not None else None
15324+ except Exception:
15325+ wt = None
15326+
15327+ try:
15328+ hello_iv = float(kwargs.get("hello_interval", 0.1) or 0.1)
15329+ except Exception:
15330+ hello_iv = 0.1
15331+ if hello_iv <= 0:
15332+ hello_iv = 0.1
15333+
15334+ # ---- compute total remaining length (for META and/or HASH) ----
15335+ total_len = None
15336+ pos = None
15337+ if raw_meta or raw_sha:
15338+ try:
15339+ pos = fileobj.tell()
15340+ fileobj.seek(0, os.SEEK_END)
15341+ end = fileobj.tell()
15342+ fileobj.seek(pos, os.SEEK_SET)
15343+ total_len = int(end - pos)
15344+ if total_len < 0:
15345+ total_len = None
15346+ except Exception:
15347+ total_len = None
15348+ try:
15349+ if pos is not None:
15350+ fileobj.seek(pos, os.SEEK_SET)
15351+ except Exception:
15352+ pass
15353+
15354+ # ---- precompute expected hash (optional) ----
15355+ expected_hex = None
15356+ raw_hash = (kwargs.get("raw_hash", "sha256") or "sha256").lower()
15357+ if raw_sha and total_len is not None:
15358+ try:
15359+ h = hashlib.sha256() if raw_hash != "md5" else hashlib.md5()
15360+ cur = fileobj.tell()
15361+ while True:
15362+ b = fileobj.read(65536)
15363+ if not b:
15364+ break
15365+ h.update(_to_bytes(b))
15366+ expected_hex = h.hexdigest()
15367+ fileobj.seek(cur, os.SEEK_SET)
15368+ except Exception:
15369+ expected_hex = None
15370+ try:
15371+ if pos is not None:
15372+ fileobj.seek(pos, os.SEEK_SET)
15373+ except Exception:
15374+ pass
15375+
15376+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
15377+
15378+ try:
15379+ # ---- handshake / wait-for-receiver ----
15380+ tok = kwargs.get("token")
15381+ tok = _hs_token() if tok is None else _to_bytes(tok)
15382+
15383+ if wait:
15384+ start_t = time.time()
15385+ while True:
15386+ if wt is not None and wt >= 0 and (time.time() - start_t) >= wt:
15387+ _log("UDP raw: wait_timeout reached; no receiver READY")
15388+ try:
15389+ sock.close()
15390+ except Exception:
15391+ pass
15392+ return False
15393+
15394+ # announce
15395+ if handshake:
15396+ try:
15397+ sock.sendto(b"HELLO " + tok + b"\n", addr)
15398+ except Exception:
15399+ pass
15400+
15401+ if raw_meta and total_len is not None:
15402+ try:
15403+ sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15404+ except Exception:
15405+ pass
15406+
15407+ if raw_sha and expected_hex:
15408+ try:
15409+ sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15410+ except Exception:
15411+ pass
15412+
15413+ # wait briefly for READY
15414+ try:
15415+ sock.settimeout(hello_iv)
15416+ except Exception:
15417+ pass
15418+
15419+ try:
15420+ pkt, _a = sock.recvfrom(1024)
15421+ if pkt.startswith(b"READY"):
15422+ # READY or READY <token>
15423+ if b" " in pkt:
15424+ rt = pkt.split(None, 1)[1].strip()
15425+ if rt and rt != tok:
15426+ continue
15427+ _log("UDP raw: received READY from receiver")
15428+ break
15429+ except socket.timeout:
15430+ continue
15431+ except Exception:
15432+ continue
15433+ else:
15434+ # if not waiting, still send META/HASH once up front
15435+ if handshake:
15436+ try:
15437+ sock.sendto(b"HELLO " + tok + b"\n", addr)
15438+ except Exception:
15439+ pass
15440+ if raw_meta and total_len is not None:
15441+ try:
15442+ sock.sendto(b"META " + str(total_len).encode("ascii") + b"\n", addr)
15443+ except Exception:
15444+ pass
15445+ if raw_sha and expected_hex:
15446+ try:
15447+ sock.sendto(b"HASH " + raw_hash.encode("ascii") + b" " + expected_hex.encode("ascii") + b"\n", addr)
15448+ except Exception:
15449+ pass
15450+
15451+ # ---- send data ----
15452+ if raw_ack:
15453+ # sliding window retransmit
15454+ try:
15455+ ack_to = float(kwargs.get("raw_ack_timeout", 0.5) or 0.5)
15456+ except Exception:
15457+ ack_to = 0.5
15458+ try:
15459+ retries_max = int(kwargs.get("raw_ack_retries", 40) or 40)
15460+ except Exception:
15461+ retries_max = 40
15462+ try:
15463+ win = int(kwargs.get("raw_ack_window", 1) or 1)
15464+ except Exception:
15465+ win = 1
15466+ if win < 1:
15467+ win = 1
15468+
15469+ try:
15470+ sock.settimeout(ack_to)
15471+ except Exception:
15472+ pass
15473+
15474+ def _make_pkt(seq, data):
15475+ return b"PKT " + str(seq).encode("ascii") + b" " + _to_bytes(data)
15476+
15477+ base_seq = 0
15478+ next_seq = 0
15479+ pkts = {}
15480+ eof = False
15481+ timeout_tries = 0
15482+
15483+ while True:
15484+ # fill window
15485+ while (not eof) and next_seq < base_seq + win:
15486+ data = fileobj.read(chunk)
15487+ if not data:
15488+ eof = True
15489+ break
15490+ pkt = _make_pkt(next_seq, data)
15491+ pkts[next_seq] = pkt
15492+ try:
15493+ sock.sendto(pkt, addr)
15494+ except Exception:
15495+ pass
15496+ next_seq += 1
15497+
15498+ if eof and base_seq == next_seq:
15499+ break
15500+
15501+ try:
15502+ apkt, _a = sock.recvfrom(1024)
15503+ if apkt.startswith(b"ACK "):
15504+ try:
15505+ aseq = int(apkt.split()[1])
15506+ except Exception:
15507+ aseq = -1
15508+ new_base = aseq + 1
15509+ if new_base > base_seq:
15510+ for s in list(pkts.keys()):
15511+ if s < new_base:
15512+ pkts.pop(s, None)
15513+ base_seq = new_base
15514+ timeout_tries = 0
15515+ except socket.timeout:
15516+ timeout_tries += 1
15517+ if retries_max >= 0 and timeout_tries >= retries_max:
15518+ _log("UDP raw: too many ACK timeouts, giving up")
15519+ return False
15520+ # retransmit all in-flight
15521+ for s in range(base_seq, next_seq):
15522+ pkt = pkts.get(s)
15523+ if pkt is None:
15524+ continue
15525+ try:
15526+ sock.sendto(pkt, addr)
15527+ except Exception:
15528+ pass
15529+ except Exception:
15530+ # treat as timeout-ish
15531+ timeout_tries += 1
15532+
15533+ else:
15534+ # legacy raw: just send datagrams
15535+ while True:
15536+ data = fileobj.read(chunk)
15537+ if not data:
15538+ break
15539+ try:
15540+ sock.sendto(_to_bytes(data), addr)
15541+ except Exception:
15542+ pass
15543+
15544+ # ---- finish ----
15545+ try:
15546+ sock.sendto(b"DONE", addr)
15547+ except Exception:
15548+ pass
15549+
15550+ return True
15551+
15552+ finally:
15553+ try:
15554+ sock.close()
15555+ except Exception:
15556+ pass
15557+
1529715558def _udp_seq_send(fileobj, host, port, resume=False, path_text=None, **kwargs):
1529815559 addr = (host, int(port))
1529915560 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
0 commit comments