1414limitations under the License.
1515"""
1616
17- import proton
18- import proton .handlers
1917import threading
2018import logging
2119import json
2523import time
2624from typing import List , Any , Tuple , Callable , Dict
2725from charon .config import get_config
28- from charon .constants import DEFAULT_SIGN_RESULT_LOC
2926from charon .constants import DEFAULT_RADAS_SIGN_TIMEOUT_RETRY_COUNT
3027from charon .constants import DEFAULT_RADAS_SIGN_TIMEOUT_RETRY_INTERVAL
3128from charon .pkgs .oras_client import OrasClient
29+ from proton import Event
30+ from proton .handlers import MessagingHandler
3231
3332logger = logging .getLogger (__name__ )
3433
3534
36- class SignHandler :
37- """
38- Handle the sign result status management
39- """
40-
41- _is_processing : bool = True
42- _downloaded_files : List [str ] = []
43-
44- @classmethod
45- def is_processing (cls ) -> bool :
46- return cls ._is_processing
47-
48- @classmethod
49- def get_downloaded_files (cls ) -> List [str ]:
50- return cls ._downloaded_files .copy ()
51-
52- @classmethod
53- def set_processing (cls , value : bool ) -> None :
54- cls ._is_processing = value
55-
56- @classmethod
57- def set_downloaded_files (cls , files : List [str ]) -> None :
58- cls ._downloaded_files = files
59-
60-
61- class UmbListener (proton .handlers .MessagingHandler ):
35+ class UmbListener (MessagingHandler ):
6236 """
6337 UmbListener class (AMQP version), register this when setup UmbClient
38+ Attributes:
39+ sign_result_loc (str): Local save path (e.g. “/tmp/sign”) for oras pull result,
40+ this value transfers from the cmd flag, should register UmbListener when the client starts
6441 """
6542
66- def __init__ (self ) -> None :
43+ def __init__ (self , sign_result_loc : str ) -> None :
6744 super ().__init__ ()
45+ self .sign_result_loc = sign_result_loc
6846
69- def on_start (self , event : proton . Event ) -> None :
47+ def on_start (self , event : Event ) -> None :
7048 """
7149 On start callback
7250 """
7351 conf = get_config ()
74- if not conf :
52+ rconf = conf .get_radas_config () if conf else None
53+ if not rconf :
7554 sys .exit (1 )
76- event .container .create_receiver (conf .get_amqp_queue ())
55+ conn = event .container .connect (rconf .umb_target ())
56+ event .container .create_receiver (conn , rconf .result_queue ())
57+ logger .info ("Listening on %s, queue: %s" , rconf .umb_target (), rconf .result_queue ())
7758
78- def on_message (self , event : proton . Event ) -> None :
59+ def on_message (self , event : Event ) -> None :
7960 """
8061 On message callback
8162 """
8263 # handle response from radas in a thread
8364 thread = threading .Thread (target = self ._process_message , args = [event .message .body ])
8465 thread .start ()
8566
86- def on_error (self , event : proton . Event ) -> None :
67+ def on_connection_error (self , event : Event ) -> None :
8768 """
88- On error callback
69+ On connection error callback
8970 """
9071 logger .error ("Received an error event:\n %s" , event )
9172
92- def on_disconnected (self , event : proton . Event ) -> None :
73+ def on_disconnected (self , event : Event ) -> None :
9374 """
9475 On disconnected callback
9576 """
9677 logger .error ("Disconnected from AMQP broker." )
9778
98- def _process_message (msg : Any ) -> None :
79+ def _process_message (self , msg : Any ) -> None :
9980 """
10081 Process a message received from UMB
10182 Args:
10283 msg: The message body received
10384 """
104- try :
105- msg_dict = json .loads (msg )
106- result_reference_url = msg_dict .get ("result_reference" )
85+ msg_dict = json .loads (msg )
86+ result_reference_url = msg_dict .get ("result_reference" )
10787
108- if not result_reference_url :
109- logger .warning ("Not found result_reference in message,ignore." )
110- return
111-
112- conf = get_config ()
113- if not conf :
114- sign_result_loc = DEFAULT_SIGN_RESULT_LOC
115- sign_result_loc = os .getenv ("SIGN_RESULT_LOC" ) or conf .get_sign_result_loc ()
116- logger .info ("Using SIGN RESULT LOC: %s" , sign_result_loc )
88+ if not result_reference_url :
89+ logger .warning ("Not found result_reference in message,ignore." )
90+ return
11791
118- sign_result_parent_dir = os .path .dirname (sign_result_loc )
119- os .makedirs (sign_result_parent_dir , exist_ok = True )
92+ logger .info ("Using SIGN RESULT LOC: %s" , self .sign_result_loc )
93+ sign_result_parent_dir = os .path .dirname (self .sign_result_loc )
94+ os .makedirs (sign_result_parent_dir , exist_ok = True )
12095
121- oras_client = OrasClient ()
122- files = oras_client .pull (
123- result_reference_url = result_reference_url , sign_result_loc = sign_result_loc
124- )
125- SignHandler .set_downloaded_files (files )
126- finally :
127- SignHandler .set_processing (False )
96+ oras_client = OrasClient ()
97+ files = oras_client .pull (
98+ result_reference_url = result_reference_url , sign_result_loc = self .sign_result_loc
99+ )
100+ logger .info ("Number of files pulled: %d, path: %s" , len (files ), files [0 ])
128101
129102
130- def generate_radas_sign (top_level : str ) -> Tuple [List [str ], List [str ]]:
103+ def generate_radas_sign (top_level : str , sign_result_loc : str ) -> Tuple [List [str ], List [str ]]:
131104 """
132105 Generate .asc files based on RADAS sign result json file
133106 """
@@ -138,21 +111,34 @@ def generate_radas_sign(top_level: str) -> Tuple[List[str], List[str]]:
138111 )
139112 timeout_retry_interval = (
140113 rconf .radas_sign_timeout_retry_interval ()
141- if conf
114+ if rconf
142115 else DEFAULT_RADAS_SIGN_TIMEOUT_RETRY_INTERVAL
143116 )
144117 wait_count = 0
145- while SignHandler .is_processing ():
118+
119+ # Wait until files appear in the sign_result_loc directory
120+ while True :
121+ files = [
122+ os .path .join (sign_result_loc , f )
123+ for f in os .listdir (sign_result_loc )
124+ if os .path .isfile (os .path .join (sign_result_loc , f ))
125+ ]
126+ if files : # If files exist, break the loop
127+ break
128+
146129 wait_count += 1
147130 if wait_count > timeout_retry_count :
148131 logger .warning ("Timeout when waiting for sign response." )
149132 break
150133 time .sleep (timeout_retry_interval )
151134
152- files = SignHandler .get_downloaded_files ()
153135 if not files :
154136 return [], []
155137
138+ if len (files ) > 1 :
139+ logger .error ("Multiple files found in %s. Expected only one file." , sign_result_loc )
140+ return [], []
141+
156142 # should only have the single sign result json file from the radas registry
157143 json_file_path = files [0 ]
158144 try :
0 commit comments