|
11 | 11 | Port, |
12 | 12 | Router, |
13 | 13 | RouterInterface, |
| 14 | + SecurityGroup, |
14 | 15 | Subnet, |
15 | 16 | ) |
16 | 17 |
|
@@ -56,6 +57,11 @@ def register_tools(self, mcp: FastMCP): |
56 | 57 | mcp.tool()(self.add_router_interface) |
57 | 58 | mcp.tool()(self.get_router_interfaces) |
58 | 59 | mcp.tool()(self.remove_router_interface) |
| 60 | + mcp.tool()(self.get_security_groups) |
| 61 | + mcp.tool()(self.create_security_group) |
| 62 | + mcp.tool()(self.get_security_group_detail) |
| 63 | + mcp.tool()(self.update_security_group) |
| 64 | + mcp.tool()(self.delete_security_group) |
59 | 65 |
|
60 | 66 | def get_networks( |
61 | 67 | self, |
@@ -1161,6 +1167,135 @@ def _sanitize_server_filters(self, filters: dict) -> dict: |
1161 | 1167 | if not filters: |
1162 | 1168 | return {} |
1163 | 1169 | attrs = dict(filters) |
1164 | | - # Remove client-only or unsupported filters |
1165 | 1170 | attrs.pop("status", None) |
1166 | 1171 | return attrs |
| 1172 | + |
| 1173 | + def get_security_groups( |
| 1174 | + self, |
| 1175 | + project_id: str | None = None, |
| 1176 | + name: str | None = None, |
| 1177 | + ) -> list[SecurityGroup]: |
| 1178 | + """ |
| 1179 | + Get the list of Security Groups with optional filtering. |
| 1180 | +
|
| 1181 | + :param project_id: Filter by project ID |
| 1182 | + :param name: Filter by security group name |
| 1183 | + :return: List of SecurityGroup objects |
| 1184 | + """ |
| 1185 | + conn = get_openstack_conn() |
| 1186 | + filters: dict = {} |
| 1187 | + if project_id: |
| 1188 | + filters["project_id"] = project_id |
| 1189 | + if name: |
| 1190 | + filters["name"] = name |
| 1191 | + security_groups = conn.network.security_groups(**filters) |
| 1192 | + return [ |
| 1193 | + self._convert_to_security_group_model(sg) for sg in security_groups |
| 1194 | + ] |
| 1195 | + |
| 1196 | + def create_security_group( |
| 1197 | + self, |
| 1198 | + name: str, |
| 1199 | + description: str | None = None, |
| 1200 | + project_id: str | None = None, |
| 1201 | + ) -> SecurityGroup: |
| 1202 | + """ |
| 1203 | + Create a new Security Group. |
| 1204 | +
|
| 1205 | + :param name: Security group name |
| 1206 | + :param description: Security group description |
| 1207 | + :param project_id: Project ID to assign ownership |
| 1208 | + :return: Created SecurityGroup object |
| 1209 | + """ |
| 1210 | + conn = get_openstack_conn() |
| 1211 | + args: dict = {"name": name} |
| 1212 | + if description is not None: |
| 1213 | + args["description"] = description |
| 1214 | + if project_id is not None: |
| 1215 | + args["project_id"] = project_id |
| 1216 | + sg = conn.network.create_security_group(**args) |
| 1217 | + return self._convert_to_security_group_model(sg) |
| 1218 | + |
| 1219 | + def get_security_group_detail( |
| 1220 | + self, security_group_id: str |
| 1221 | + ) -> SecurityGroup: |
| 1222 | + """ |
| 1223 | + Get detailed information about a specific Security Group. |
| 1224 | +
|
| 1225 | + :param security_group_id: ID of the security group to retrieve |
| 1226 | + :return: SecurityGroup details |
| 1227 | + """ |
| 1228 | + conn = get_openstack_conn() |
| 1229 | + sg = conn.network.get_security_group(security_group_id) |
| 1230 | + return self._convert_to_security_group_model(sg) |
| 1231 | + |
| 1232 | + def update_security_group( |
| 1233 | + self, |
| 1234 | + security_group_id: str, |
| 1235 | + name: str | None = None, |
| 1236 | + description: str | None = None, |
| 1237 | + ) -> SecurityGroup: |
| 1238 | + """ |
| 1239 | + Update an existing Security Group. |
| 1240 | +
|
| 1241 | + :param security_group_id: ID of the security group to update |
| 1242 | + :param name: New security group name |
| 1243 | + :param description: New security group description |
| 1244 | + :return: Updated SecurityGroup object |
| 1245 | + """ |
| 1246 | + conn = get_openstack_conn() |
| 1247 | + update_args: dict = {} |
| 1248 | + if name is not None: |
| 1249 | + update_args["name"] = name |
| 1250 | + if description is not None: |
| 1251 | + update_args["description"] = description |
| 1252 | + if not update_args: |
| 1253 | + current = conn.network.get_security_group(security_group_id) |
| 1254 | + return self._convert_to_security_group_model(current) |
| 1255 | + sg = conn.network.update_security_group( |
| 1256 | + security_group_id, **update_args |
| 1257 | + ) |
| 1258 | + return self._convert_to_security_group_model(sg) |
| 1259 | + |
| 1260 | + def delete_security_group(self, security_group_id: str) -> None: |
| 1261 | + """ |
| 1262 | + Delete a Security Group. |
| 1263 | +
|
| 1264 | + :param security_group_id: ID of the security group to delete |
| 1265 | + :return: None |
| 1266 | + """ |
| 1267 | + conn = get_openstack_conn() |
| 1268 | + conn.network.delete_security_group( |
| 1269 | + security_group_id, ignore_missing=False |
| 1270 | + ) |
| 1271 | + return None |
| 1272 | + |
| 1273 | + def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: |
| 1274 | + """ |
| 1275 | + Convert an OpenStack Security Group object to a SecurityGroup pydantic model. |
| 1276 | +
|
| 1277 | + :param openstack_sg: OpenStack security group object |
| 1278 | + :return: Pydantic SecurityGroup model |
| 1279 | + """ |
| 1280 | + rule_ids: list[str] | None = None |
| 1281 | + rules = getattr(openstack_sg, "security_group_rules", None) |
| 1282 | + if rules is not None: |
| 1283 | + extracted: list[str] = [] |
| 1284 | + for r in rules: |
| 1285 | + rid = None |
| 1286 | + if isinstance(r, dict): |
| 1287 | + rid = r.get("id") |
| 1288 | + else: |
| 1289 | + rid = getattr(r, "id", None) |
| 1290 | + if rid: |
| 1291 | + extracted.append(str(rid)) |
| 1292 | + rule_ids = extracted |
| 1293 | + |
| 1294 | + return SecurityGroup( |
| 1295 | + id=openstack_sg.id, |
| 1296 | + name=getattr(openstack_sg, "name", None), |
| 1297 | + status=getattr(openstack_sg, "status", None), |
| 1298 | + description=getattr(openstack_sg, "description", None), |
| 1299 | + project_id=getattr(openstack_sg, "project_id", None), |
| 1300 | + security_group_rule_ids=rule_ids, |
| 1301 | + ) |
0 commit comments