Skip to content

Commit a9ae56e

Browse files
committed
start load config traffic example
1 parent 937a560 commit a9ae56e

File tree

5 files changed

+167
-0
lines changed

5 files changed

+167
-0
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from cloudshell.workflow.orchestration.setup.default_setup_orchestrator import DefaultSetupWorkflow
2+
from cloudshell.workflow.orchestration.sandbox import Sandbox
3+
from start_traffic import start_traffic_flow
4+
5+
sandbox = Sandbox()
6+
7+
DefaultSetupWorkflow().register(sandbox)
8+
sandbox.workflow.on_configuration_ended(start_traffic_flow, None)
9+
sandbox.execute_setup()
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
SERVER = "localhost"
2+
USER = "admin"
3+
PASSWORD = "admin"
4+
DOMAIN = "Global"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
cloudshell-orch-core
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import os
2+
3+
from cloudshell.workflow.orchestration.sandbox import Sandbox
4+
from cloudshell.api.cloudshell_api import InputNameValue
5+
6+
TRAFFIC_CONTROLLER_MODEL = "BreakingPoint Controller 2G"
7+
TRAFFIC_CONFIG_PATH_INPUT = "Traffic Config Path"
8+
9+
10+
def start_traffic_flow(sandbox, components=None):
11+
"""
12+
Functions passed into orchestration flow MUST have (sandbox, components) signature
13+
:param Sandbox sandbox:
14+
:param components
15+
:return:
16+
"""
17+
api = sandbox.automation_api
18+
res_id = sandbox.id
19+
res_details = api.GetReservationDetails(res_id).ReservationDescription
20+
services = res_details.Services
21+
traffic_controller_search = [x for x in services if x.ServiceName == TRAFFIC_CONTROLLER_MODEL]
22+
if not traffic_controller_search:
23+
raise ValueError(f"Traffic Controller Service '{TRAFFIC_CONTROLLER_MODEL}' not found on canvas")
24+
25+
if len(traffic_controller_search) > 1:
26+
raise ValueError("Multiple traffic controllers found. Adjust script logic")
27+
28+
# validate global input
29+
global_inputs = sandbox.global_inputs
30+
traffic_config_path = global_inputs.get(TRAFFIC_CONFIG_PATH_INPUT)
31+
if not traffic_config_path:
32+
raise ValueError(f"No sandbox input found for: '{TRAFFIC_CONFIG_PATH_INPUT}'")
33+
34+
traffic_controller = traffic_controller_search[0]
35+
36+
api.WriteMessageToReservationOutput(reservationId=res_id, message=f"Loading traffic config '{traffic_config_path}'...")
37+
command_inputs = [InputNameValue("config_file_location", traffic_config_path)]
38+
api.ExecuteCommand(reservationId=res_id,
39+
targetName=traffic_controller.Alias,
40+
targetType="Service",
41+
commandName="load_config",
42+
commandInputs=command_inputs,
43+
printOutput=True)
44+
45+
api.WriteMessageToReservationOutput(reservationId=res_id, message="load config flow finished")
46+
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""
2+
NOTE: - This script is only for updating EXISTING scripts.
3+
- Scripts MUST be uploaded manually first time. (this tool can still be used to do zipping)
4+
"""
5+
from cloudshell.api.cloudshell_api import CloudShellAPISession
6+
import os
7+
import credentials # may need to create this module and set the constants for the api session
8+
9+
# ===== Optional Variables to set =======
10+
11+
# To name zip package something other than the default directory name
12+
CUSTOM_SCRIPT_NAME = ''
13+
14+
15+
# =======================================
16+
17+
18+
def get_api_session():
19+
return CloudShellAPISession(host=credentials.SERVER,
20+
username=credentials.USER,
21+
password=credentials.PASSWORD,
22+
domain=credentials.DOMAIN)
23+
24+
25+
def error_red(err_str):
26+
"""
27+
for printing errors in red in pycharm.
28+
:param err_str:
29+
:return:
30+
"""
31+
CRED = '\033[91m'
32+
CEND = '\033[0m'
33+
return CRED + err_str + CEND
34+
35+
36+
def get_zip_details():
37+
parent_dir_path = os.path.abspath('.')
38+
parent_dir_name = os.path.basename(parent_dir_path)
39+
script_name = CUSTOM_SCRIPT_NAME or parent_dir_name
40+
zip_file_name = script_name + '.zip'
41+
42+
return {"parent_dir_path": parent_dir_path,
43+
"parent_dir_name": parent_dir_name,
44+
"script_name": script_name,
45+
"zip_file_name": zip_file_name}
46+
47+
48+
def is_whitelisted(f, file_path, files_to_exclude):
49+
is_regular_file = os.path.isfile(file_path)
50+
is_not_excluded = f not in files_to_exclude
51+
is_not_pyc = not f.endswith('.pyc')
52+
return is_regular_file and is_not_excluded and is_not_pyc
53+
54+
55+
def make_zipfile(output_filename, source_dir, files_to_exclude, dirs_to_exclude):
56+
import zipfile
57+
with zipfile.ZipFile(output_filename, "w", zipfile.ZIP_DEFLATED) as z:
58+
for root, dirs, files in os.walk(source_dir):
59+
dirs[:] = [d for d in dirs if d not in dirs_to_exclude]
60+
for f in files:
61+
file_path = os.path.join(root, f)
62+
if is_whitelisted(f, file_path, files_to_exclude):
63+
arcname = os.path.join(os.path.relpath(root, source_dir), f)
64+
z.write(file_path, arcname)
65+
66+
67+
def zip_files():
68+
zip_details = get_zip_details()
69+
zip_file_name = zip_details["zip_file_name"]
70+
dirs_to_exclude = [".git"]
71+
files_to_exclude = [zip_file_name, "venv", ".idea", "credentials.py"]
72+
try:
73+
make_zipfile(output_filename=zip_file_name,
74+
source_dir=zip_details["parent_dir_path"],
75+
files_to_exclude=files_to_exclude,
76+
dirs_to_exclude=dirs_to_exclude)
77+
except Exception as e:
78+
print(error_red("[-] error zipping up file: " + str(e)))
79+
exit(1)
80+
else:
81+
if zip_file_name in os.listdir("."):
82+
print("[+] ZIPPED UP: '{zip_name}'".format(zip_name=zip_file_name))
83+
else:
84+
print("[-] ZIP FILE NOT PRESENT")
85+
86+
87+
def update_script_api_wrapper(cs_ses, script_name, zip_address):
88+
try:
89+
cs_ses.UpdateScript(script_name, zip_address)
90+
except Exception as e:
91+
print(error_red("[-] ERROR UPDATING SCRIPT IN PORTAL\n" + str(e)) + "\n"
92+
"PLEASE LOAD SCRIPT MANUALLY THE FIRST TIME")
93+
exit(1)
94+
else:
95+
print("[+] '{script}' updated on CloudShell Successfully".format(script=script_name))
96+
97+
98+
def update_script_on_server():
99+
zip_files()
100+
cs_ses = get_api_session()
101+
zip_details = get_zip_details()
102+
update_script_api_wrapper(cs_ses=cs_ses,
103+
script_name=zip_details["script_name"],
104+
zip_address=zip_details["zip_file_name"])
105+
106+
107+
update_script_on_server()

0 commit comments

Comments
 (0)