Skip to content

Commit 0418c94

Browse files
committed
feat(network): Add additional function in network tools (#30)
1 parent cc77a14 commit 0418c94

File tree

1 file changed

+269
-0
lines changed

1 file changed

+269
-0
lines changed

src/openstack_mcp_server/tools/network_tools.py

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,28 @@ def register_tools(self, mcp: FastMCP):
3434
mcp.tool()(self.get_port_detail)
3535
mcp.tool()(self.update_port)
3636
mcp.tool()(self.delete_port)
37+
mcp.tool()(self.add_port_fixed_ip)
38+
mcp.tool()(self.remove_port_fixed_ip)
39+
mcp.tool()(self.get_port_allowed_address_pairs)
40+
mcp.tool()(self.add_port_allowed_address_pair)
41+
mcp.tool()(self.remove_port_allowed_address_pair)
42+
mcp.tool()(self.set_port_binding)
43+
mcp.tool()(self.set_port_admin_state)
44+
mcp.tool()(self.toggle_port_admin_state)
3745
mcp.tool()(self.get_floating_ips)
3846
mcp.tool()(self.create_floating_ip)
3947
mcp.tool()(self.allocate_floating_ip_pool_to_project)
4048
mcp.tool()(self.attach_floating_ip_to_port)
4149
mcp.tool()(self.detach_floating_ip_from_port)
4250
mcp.tool()(self.delete_floating_ip)
51+
mcp.tool()(self.update_floating_ip_description)
52+
mcp.tool()(self.reassign_floating_ip_to_port)
53+
mcp.tool()(self.create_floating_ips_bulk)
54+
mcp.tool()(self.assign_first_available_floating_ip)
55+
mcp.tool()(self.set_subnet_gateway)
56+
mcp.tool()(self.clear_subnet_gateway)
57+
mcp.tool()(self.set_subnet_dhcp_enabled)
58+
mcp.tool()(self.toggle_subnet_dhcp)
4359

4460
def get_networks(
4561
self,
@@ -233,6 +249,9 @@ def get_subnets(
233249
self,
234250
network_id: str | None = None,
235251
ip_version: int | None = None,
252+
project_id: str | None = None,
253+
has_gateway: bool | None = None,
254+
is_dhcp_enabled: bool | None = None,
236255
) -> list[Subnet]:
237256
"""
238257
Get the list of Neutron subnets with optional filtering.
@@ -243,7 +262,20 @@ def get_subnets(
243262
filters["network_id"] = network_id
244263
if ip_version is not None:
245264
filters["ip_version"] = ip_version
265+
if project_id:
266+
filters["project_id"] = project_id
267+
if is_dhcp_enabled is not None:
268+
filters["enable_dhcp"] = is_dhcp_enabled
246269
subnets = conn.list_subnets(filters=filters)
270+
if has_gateway is not None:
271+
if has_gateway:
272+
subnets = [
273+
s for s in subnets if getattr(s, "gateway_ip", None)
274+
]
275+
else:
276+
subnets = [
277+
s for s in subnets if not getattr(s, "gateway_ip", None)
278+
]
247279
return [self._convert_to_subnet_model(subnet) for subnet in subnets]
248280

249281
def create_subnet(
@@ -340,6 +372,32 @@ def delete_subnet(self, subnet_id: str) -> None:
340372
conn.network.delete_subnet(subnet_id, ignore_missing=False)
341373
return None
342374

375+
def set_subnet_gateway(self, subnet_id: str, gateway_ip: str) -> Subnet:
376+
conn = get_openstack_conn()
377+
subnet = conn.network.update_subnet(subnet_id, gateway_ip=gateway_ip)
378+
return self._convert_to_subnet_model(subnet)
379+
380+
def clear_subnet_gateway(self, subnet_id: str) -> Subnet:
381+
conn = get_openstack_conn()
382+
subnet = conn.network.update_subnet(subnet_id, gateway_ip=None)
383+
return self._convert_to_subnet_model(subnet)
384+
385+
def set_subnet_dhcp_enabled(self, subnet_id: str, enabled: bool) -> Subnet:
386+
conn = get_openstack_conn()
387+
subnet = conn.network.update_subnet(subnet_id, enable_dhcp=enabled)
388+
return self._convert_to_subnet_model(subnet)
389+
390+
def toggle_subnet_dhcp(self, subnet_id: str) -> Subnet:
391+
conn = get_openstack_conn()
392+
current = conn.network.get_subnet(subnet_id)
393+
if not current:
394+
raise Exception(f"Subnet with ID {subnet_id} not found")
395+
subnet = conn.network.update_subnet(
396+
subnet_id,
397+
enable_dhcp=not bool(current.enable_dhcp),
398+
)
399+
return self._convert_to_subnet_model(subnet)
400+
343401
def _convert_to_subnet_model(self, openstack_subnet) -> Subnet:
344402
"""
345403
Convert an OpenStack subnet object to a Subnet pydantic model.
@@ -380,6 +438,149 @@ def get_ports(
380438
ports = conn.list_ports(filters=filters)
381439
return [self._convert_to_port_model(port) for port in ports]
382440

441+
def add_port_fixed_ip(
442+
self,
443+
port_id: str,
444+
subnet_id: str | None = None,
445+
ip_address: str | None = None,
446+
) -> Port:
447+
conn = get_openstack_conn()
448+
port = conn.network.get_port(port_id)
449+
if not port:
450+
raise Exception(f"Port with ID {port_id} not found")
451+
fixed_ips = list(port.fixed_ips or [])
452+
entry: dict = {}
453+
if subnet_id is not None:
454+
entry["subnet_id"] = subnet_id
455+
if ip_address is not None:
456+
entry["ip_address"] = ip_address
457+
fixed_ips.append(entry)
458+
updated = conn.network.update_port(port_id, fixed_ips=fixed_ips)
459+
return self._convert_to_port_model(updated)
460+
461+
def remove_port_fixed_ip(
462+
self,
463+
port_id: str,
464+
ip_address: str | None = None,
465+
subnet_id: str | None = None,
466+
) -> Port:
467+
conn = get_openstack_conn()
468+
port = conn.network.get_port(port_id)
469+
if not port:
470+
raise Exception(f"Port with ID {port_id} not found")
471+
current = list(port.fixed_ips or [])
472+
if not current:
473+
return self._convert_to_port_model(port)
474+
475+
def predicate(item: dict) -> bool:
476+
if ip_address is not None and item.get("ip_address") == ip_address:
477+
return False
478+
if subnet_id is not None and item.get("subnet_id") == subnet_id:
479+
return False
480+
return True
481+
482+
new_fixed = [fi for fi in current if predicate(fi)]
483+
updated = conn.network.update_port(port_id, fixed_ips=new_fixed)
484+
return self._convert_to_port_model(updated)
485+
486+
def get_port_allowed_address_pairs(self, port_id: str) -> list[dict]:
487+
conn = get_openstack_conn()
488+
port = conn.network.get_port(port_id)
489+
if not port:
490+
raise Exception(f"Port with ID {port_id} not found")
491+
return list(getattr(port, "allowed_address_pairs", []) or [])
492+
493+
def add_port_allowed_address_pair(
494+
self,
495+
port_id: str,
496+
ip_address: str,
497+
mac_address: str | None = None,
498+
) -> Port:
499+
conn = get_openstack_conn()
500+
port = conn.network.get_port(port_id)
501+
if not port:
502+
raise Exception(f"Port with ID {port_id} not found")
503+
pairs = list(getattr(port, "allowed_address_pairs", []) or [])
504+
entry = {"ip_address": ip_address}
505+
if mac_address is not None:
506+
entry["mac_address"] = mac_address
507+
pairs.append(entry)
508+
updated = conn.network.update_port(
509+
port_id,
510+
allowed_address_pairs=pairs,
511+
)
512+
return self._convert_to_port_model(updated)
513+
514+
def remove_port_allowed_address_pair(
515+
self,
516+
port_id: str,
517+
ip_address: str,
518+
mac_address: str | None = None,
519+
) -> Port:
520+
conn = get_openstack_conn()
521+
port = conn.network.get_port(port_id)
522+
if not port:
523+
raise Exception(f"Port with ID {port_id} not found")
524+
pairs = list(getattr(port, "allowed_address_pairs", []) or [])
525+
526+
def keep(p: dict) -> bool:
527+
if mac_address is None:
528+
return p.get("ip_address") != ip_address
529+
return not (
530+
p.get("ip_address") == ip_address
531+
and p.get("mac_address") == mac_address
532+
)
533+
534+
new_pairs = [p for p in pairs if keep(p)]
535+
updated = conn.network.update_port(
536+
port_id,
537+
allowed_address_pairs=new_pairs,
538+
)
539+
return self._convert_to_port_model(updated)
540+
541+
def set_port_binding(
542+
self,
543+
port_id: str,
544+
host_id: str | None = None,
545+
vnic_type: str | None = None,
546+
profile: dict | None = None,
547+
) -> Port:
548+
conn = get_openstack_conn()
549+
update_args: dict = {}
550+
if host_id is not None:
551+
update_args["binding_host_id"] = host_id
552+
if vnic_type is not None:
553+
update_args["binding_vnic_type"] = vnic_type
554+
if profile is not None:
555+
update_args["binding_profile"] = profile
556+
if not update_args:
557+
raise Exception("No update parameters provided")
558+
updated = conn.network.update_port(port_id, **update_args)
559+
return self._convert_to_port_model(updated)
560+
561+
def set_port_admin_state(
562+
self,
563+
port_id: str,
564+
is_admin_state_up: bool,
565+
) -> Port:
566+
conn = get_openstack_conn()
567+
updated = conn.network.update_port(
568+
port_id,
569+
admin_state_up=is_admin_state_up,
570+
)
571+
return self._convert_to_port_model(updated)
572+
573+
def toggle_port_admin_state(self, port_id: str) -> Port:
574+
conn = get_openstack_conn()
575+
current = conn.network.get_port(port_id)
576+
if not current:
577+
raise Exception(f"Port with ID {port_id} not found")
578+
updated = conn.network.update_port(
579+
port_id,
580+
admin_state_up=not bool(current.admin_state_up),
581+
)
582+
return self._convert_to_port_model(updated)
583+
383584
def create_port(
384585
self,
385586
network_id: str,
@@ -486,6 +687,9 @@ def get_floating_ips(
486687
self,
487688
status_filter: str | None = None,
488689
project_id: str | None = None,
690+
port_id: str | None = None,
691+
floating_network_id: str | None = None,
692+
unassigned_only: bool | None = None,
489693
) -> list[FloatingIP]:
490694
"""
491695
Get the list of Neutron floating IPs with optional filtering.
@@ -496,7 +700,13 @@ def get_floating_ips(
496700
filters["status"] = status_filter.upper()
497701
if project_id:
498702
filters["project_id"] = project_id
703+
if port_id:
704+
filters["port_id"] = port_id
705+
if floating_network_id:
706+
filters["floating_network_id"] = floating_network_id
499707
ips = list(conn.network.ips(**filters))
708+
if unassigned_only:
709+
ips = [i for i in ips if not getattr(i, "port_id", None)]
500710
return [self._convert_to_floating_ip_model(ip) for ip in ips]
501711

502712
def create_floating_ip(
@@ -575,6 +785,65 @@ def delete_floating_ip(self, floating_ip_id: str) -> None:
575785
conn.network.delete_ip(floating_ip_id, ignore_missing=False)
576786
return None
577787

788+
def update_floating_ip_description(
789+
self,
790+
floating_ip_id: str,
791+
description: str | None,
792+
) -> FloatingIP:
793+
conn = get_openstack_conn()
794+
ip = conn.network.update_ip(floating_ip_id, description=description)
795+
return self._convert_to_floating_ip_model(ip)
796+
797+
def reassign_floating_ip_to_port(
798+
self,
799+
floating_ip_id: str,
800+
port_id: str,
801+
fixed_ip_address: str | None = None,
802+
) -> FloatingIP:
803+
conn = get_openstack_conn()
804+
update_args: dict = {"port_id": port_id}
805+
if fixed_ip_address is not None:
806+
update_args["fixed_ip_address"] = fixed_ip_address
807+
ip = conn.network.update_ip(floating_ip_id, **update_args)
808+
return self._convert_to_floating_ip_model(ip)
809+
810+
def create_floating_ips_bulk(
811+
self,
812+
floating_network_id: str,
813+
count: int,
814+
) -> list[FloatingIP]:
815+
conn = get_openstack_conn()
816+
created = []
817+
for _ in range(max(0, count)):
818+
ip = conn.network.create_ip(
819+
floating_network_id=floating_network_id,
820+
)
821+
created.append(self._convert_to_floating_ip_model(ip))
822+
return created
823+
824+
def assign_first_available_floating_ip(
825+
self,
826+
floating_network_id: str,
827+
port_id: str,
828+
) -> FloatingIP:
829+
conn = get_openstack_conn()
830+
existing = list(
831+
conn.network.ips(floating_network_id=floating_network_id),
832+
)
833+
available = next(
834+
(i for i in existing if not getattr(i, "port_id", None)),
835+
None,
836+
)
837+
if available is None:
838+
created = conn.network.create_ip(
839+
floating_network_id=floating_network_id,
840+
)
841+
target_id = created.id
842+
else:
843+
target_id = available.id
844+
ip = conn.network.update_ip(target_id, port_id=port_id)
845+
return self._convert_to_floating_ip_model(ip)
846+
578847
def _convert_to_floating_ip_model(self, openstack_ip) -> FloatingIP:
579848
"""
580849
Convert an OpenStack floating IP object to a FloatingIP pydantic model.

0 commit comments

Comments
 (0)