Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions wsdiscovery/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
from contextlib import contextmanager
from urllib.parse import urlparse
import click

from wsdiscovery.discovery import ThreadedWSDiscovery as WSDiscovery
from wsdiscovery.publishing import ThreadedWSPublishing as WSPublishing
from wsdiscovery.scope import Scope
from wsdiscovery.qname import QName
from wsdiscovery.discovery import DEFAULT_DISCOVERY_TIMEOUT
from wsdiscovery.udp import UNICAST_UDP_REPEAT, MULTICAST_UDP_REPEAT

DEFAULT_LOGLEVEL = "INFO"

@contextmanager
def discovery(capture=None):
wsd = WSDiscovery(capture=capture)
def discovery(capture=None, unicast_num=UNICAST_UDP_REPEAT,
multicast_num=MULTICAST_UDP_REPEAT):
wsd = WSDiscovery(capture=capture, unicast_num=unicast_num,
multicast_num=multicast_num)
wsd.start()
yield wsd
wsd.stop()
Expand Down Expand Up @@ -46,12 +50,17 @@ def setup_logger(name, loglevel):
@click.option('--capture', '-c', nargs=1, type=click.File('w'), help='Capture messages to a file')
@click.option('--timeout', '-t', default=DEFAULT_DISCOVERY_TIMEOUT, show_default=True,
type=int, help='Discovery timeout in seconds')
def discover(scope, address, port, loglevel, capture, timeout):
@click.option('--unicast-num', '-un', type=int, default=UNICAST_UDP_REPEAT,
show_default=True, help='Number of Unicast messages to send')
@click.option('--multicast-num', '-mn', type=int, default=MULTICAST_UDP_REPEAT,
show_default=True, help='Number of Multicast messages to send')
def discover(scope, address, port, loglevel, capture, timeout, unicast_num,
multicast_num):
"Discover services using WS-Discovery"

logger = setup_logger("ws-discovery", loglevel)

with discovery(capture) as wsd:
with discovery(capture, unicast_num, multicast_num) as wsd:
scopes = [Scope(scope)] if scope else []
svcs = wsd.searchServices(scopes=scopes, address=address, port=port,
timeout=timeout)
Expand All @@ -71,7 +80,12 @@ def discover(scope, address, port, loglevel, capture, timeout):
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]),
help='Log level')
@click.option('--capture', '-c', nargs=1, type=click.File('w'), help='Capture messages to a file')
def publish(scope, typename, address, port, loglevel, capture):
@click.option('--unicast-num', '-un', type=int, default=UNICAST_UDP_REPEAT,
show_default=True, help='Number of Unicast messages to send')
@click.option('--multicast-num', '-mn', type=int, default=MULTICAST_UDP_REPEAT,
show_default=True, help='Number of Multicast messages to send')
def publish(scope, typename, address, port, loglevel, capture, unicast_num,
multicast_num):
"Publish services using WS-Discovery"

logger = setup_logger("ws-publishing", loglevel)
Expand All @@ -89,4 +103,3 @@ def publish(scope, typename, address, port, loglevel, capture):

xAddrs = ["%s:%i" % (address, port)] if address else ['127.0.0.1']
svc = wsp.publishService(types, scopes, xAddrs)

28 changes: 17 additions & 11 deletions wsdiscovery/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .uri import URI
from .service import Service
from .envelope import SoapEnvelope

from .udp import UNICAST_UDP_REPEAT, MULTICAST_UDP_REPEAT

APP_MAX_DELAY = 500 # miliseconds

Expand All @@ -33,6 +33,8 @@ def __init__(self, uuid_=None, capture=None, ttl=1, **kwargs):

self._capture = capture
self.ttl = ttl
self._unicast_num = kwargs.get('unicast_num', UNICAST_UDP_REPEAT)
self._multicast_num = kwargs.get('multicast_num', MULTICAST_UDP_REPEAT)

super().__init__(**kwargs)

Expand All @@ -48,35 +50,39 @@ def envReceived(self, env, addr):

def _sendResolveMatch(self, service, relatesTo, addr):
env = constructResolveMatch(service, relatesTo)
self.sendUnicastMessage(env, addr[0], addr[1])
self.sendUnicastMessage(env, addr[0], addr[1], unicast_num=self._unicast_num)

def _sendProbeMatch(self, services, relatesTo, addr):
env = constructProbeMatch(services, relatesTo)
self.sendUnicastMessage(env, addr[0], addr[1], random.randint(0, APP_MAX_DELAY))
self.sendUnicastMessage(env, addr[0], addr[1], random.randint(0, APP_MAX_DELAY),
unicast_num=self._unicast_num)

def _sendProbe(self, types=None, scopes=None, address=None, port=None):
env = constructProbe(types, scopes)
if self._dpActive:
self.sendUnicastMessage(env, self._dpAddr[0], self._dpAddr[1])
self.sendUnicastMessage(env, self._dpAddr[0], self._dpAddr[1],
unicast_num=self._unicast_num)
elif address and port:
self.sendUnicastMessage(env, address, port)
self.sendUnicastMessage(env, address, port,
unicast_num=self._unicast_num)
else:
self.sendMulticastMessage(env)
self.sendMulticastMessage(env, multicast_num=self._multicast_num)

def _sendResolve(self, epr):
env = constructResolve(epr)
if self._dpActive:
self.sendUnicastMessage(env, self._dpAddr[0], self._dpAddr[1])
self.sendUnicastMessage(env, self._dpAddr[0], self._dpAddr[1],
unicast_num=self._unicast_num)
else:
self.sendMulticastMessage(env)
self.sendMulticastMessage(env, multicast_num=self._multicast_num)

def _sendHello(self, service):
env = constructHello(service)
random.seed((int)(time.time() * 1000000))
self.sendMulticastMessage(env,initialDelay=random.randint(0, APP_MAX_DELAY))
self.sendMulticastMessage(env,initialDelay=random.randint(0, APP_MAX_DELAY),
multicast_num=self._multicast_num)

def _sendBye(self, service):
env = constructBye(service)
service.incrementMessageNumber()
self.sendMulticastMessage(env)

self.sendMulticastMessage(env, multicast_num=self._multicast_num)
47 changes: 34 additions & 13 deletions wsdiscovery/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
from .message import createSOAPMessage, parseSOAPMessage
from .udp import UDPMessage
from .util import _getNetworkAddrs, dom2Str

logger = logging.getLogger("threading")
from .udp import UNICAST_UDP_REPEAT, MULTICAST_UDP_REPEAT

BUFFER_SIZE = 0xffff
NETWORK_ADDRESSES_CHECK_TIMEOUT = 5
MULTICAST_PORT = 3702
MULTICAST_IPV4_ADDRESS = "239.255.255.250"
MULTICAST_IPV6_ADDRESS = "FF02::C"

logger = logging.getLogger("threading")


class _StoppableDaemonThread(threading.Thread):
"""Stoppable daemon thread.
Expand Down Expand Up @@ -157,14 +158,18 @@ def removeSourceAddr(self, addr):
sock.close()
del self._multiOutUniInSockets[addr]

def addUnicastMessage(self, env, addr, port, initialDelay=0):
msg = UDPMessage(env, addr, port, UDPMessage.UNICAST, initialDelay)
def addUnicastMessage(self, env, addr, port, initialDelay=0,
unicast_num=UNICAST_UDP_REPEAT):
msg = UDPMessage(env, addr, port, UDPMessage.UNICAST, initialDelay,
unicast_num=unicast_num)

self._queue.append(msg)
self._knownMessageIds.add(env.getMessageId())

def addMulticastMessage(self, env, addr, port, initialDelay=0):
msg = UDPMessage(env, addr, port, UDPMessage.MULTICAST, initialDelay)
def addMulticastMessage(self, env, addr, port, initialDelay=0,
multicast_num=MULTICAST_UDP_REPEAT):
msg = UDPMessage(env, addr, port, UDPMessage.MULTICAST, initialDelay,
multicast_num=multicast_num)

self._queue.append(msg)
self._knownMessageIds.add(env.getMessageId())
Expand Down Expand Up @@ -358,12 +363,16 @@ def _get_multicast_ttl(self):
class ThreadedNetworking:
"handle threaded networking start & stop, address add/remove & message sending"

def __init__(self, **kwargs):
def __init__(self,
unicast_num=UNICAST_UDP_REPEAT,
multicast_num=MULTICAST_UDP_REPEAT, **kwargs):
self._networkingThread_v4 = None
self._networkingThread_v6 = None
self._addrsMonitorThread_v4 = None
self._addrsMonitorThread_v6 = None
self._serverStarted = False
self._unicast_num = unicast_num
self._multicast_num = multicast_num
super().__init__(**kwargs)

def _startThreads(self):
Expand Down Expand Up @@ -423,12 +432,24 @@ def removeSourceAddr(self, addr):
elif version == 6:
self._networkingThread_v6.removeSourceAddr(addr)

def sendUnicastMessage(self, env, host, port, initialDelay=0):
def sendUnicastMessage(self, env, host, port, initialDelay=0,
unicast_num=UNICAST_UDP_REPEAT):
"handle unicast message sending"
self._networkingThread_v4.addUnicastMessage(env, host, port, initialDelay)
self._networkingThread_v6.addUnicastMessage(env, host, port, initialDelay)
self._networkingThread_v4.addUnicastMessage(env, host, port,
initialDelay, unicast_num)
self._networkingThread_v6.addUnicastMessage(env, host, port,
initialDelay, unicast_num)

def sendMulticastMessage(self, env, initialDelay=0):
def sendMulticastMessage(self, env, initialDelay=0,
multicast_num=MULTICAST_UDP_REPEAT):
"handle multicast message sending"
self._networkingThread_v4.addMulticastMessage(env, MULTICAST_IPV4_ADDRESS, MULTICAST_PORT, initialDelay)
self._networkingThread_v6.addMulticastMessage(env, MULTICAST_IPV6_ADDRESS, MULTICAST_PORT, initialDelay)
self._networkingThread_v4.addMulticastMessage(env,
MULTICAST_IPV4_ADDRESS,
MULTICAST_PORT,
initialDelay,
multicast_num)
self._networkingThread_v6.addMulticastMessage(env,
MULTICAST_IPV6_ADDRESS,
MULTICAST_PORT,
initialDelay,
multicast_num)
14 changes: 7 additions & 7 deletions wsdiscovery/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,27 @@
class UDPMessage:
"UDP message management implementation"

MULTICAST = 'multicast'
UNICAST = 'unicast'
MULTICAST = 'multicast'

def __init__(self, env, addr, port, msgType, initialDelay=0):
"""msgType shall be UDPMessage.MULTICAST or UDPMessage.UNICAST"""
def __init__(self, env, addr, port, msgType, initialDelay=0,
unicast_num=UNICAST_UDP_REPEAT,
multicast_num=MULTICAST_UDP_REPEAT):
"""msgType shall be UDPMessage.UNICAST or UDPMessage.MULTICAST"""
self._env = env
self._addr = addr
self._port = port
self._msgType = msgType

if msgType == self.UNICAST:
udpRepeat, udpMinDelay, udpMaxDelay, udpUpperDelay = \
UNICAST_UDP_REPEAT, \
unicast_num, \
UNICAST_UDP_MIN_DELAY, \
UNICAST_UDP_MAX_DELAY, \
UNICAST_UDP_UPPER_DELAY
else:
udpRepeat, udpMinDelay, udpMaxDelay, udpUpperDelay = \
MULTICAST_UDP_REPEAT, \
multicast_num, \
MULTICAST_UDP_MIN_DELAY, \
MULTICAST_UDP_MAX_DELAY, \
MULTICAST_UDP_UPPER_DELAY
Expand Down Expand Up @@ -79,5 +81,3 @@ def refresh(self):
self._t = self._udpUpperDelay
self._nextTime = int(time.time() * 1000) + self._t
self._udpRepeat = self._udpRepeat - 1


Loading