4242
4343
4444# Create mqtt client and configure it according to configuration
45- global mq
46-
45+ ID_RANDOM = MQTT_CLIENT_ID + str (random .randint (1 ,999 ))
46+ mq = MQTT (client_id = ID_RANDOM , transport = MQTT_TRANSPORT )
47+ mq .username_pw_set (MQTT_USER , MQTT_PASSWORD )
48+ if MQTT_TLS :
49+ mq .tls_set ()
50+ mq .enable_logger (LOGGER )
4751
4852def to_brefv_raw (point_cloud ):
4953 """Raw in message to brefv envelope"""
@@ -84,8 +88,6 @@ def to_mqtt(brefv_msg: Any):
8488
8589
8690def on_message (client , userdata , message ):
87- LOGGER .info ("New Message..." )
88-
8991 msg = message .payload .decode ("utf-8" )
9092 payload = json .loads (msg )
9193 topic = message .topic
@@ -157,6 +159,10 @@ def rotate_points_azimuth(input_stream):
157159
158160if __name__ == "__main__" :
159161
162+ mq .on_message = on_message
163+ mq .on_disconnect = on_disconnect
164+ mq .on_connect = on_connect
165+
160166 # Build pipeline
161167 LOGGER .info ("Building pipeline..." )
162168
@@ -169,21 +175,17 @@ def rotate_points_azimuth(input_stream):
169175
170176 # MQTT publish stream
171177 pipe_heading = source_heading .latest ()
172- pipe_rotate = source .zip (pipe_heading ).map (rotate_points_azimuth ).map (to_brefv_raw ).sink (to_mqtt )
173-
174- ID_RANDOM = MQTT_CLIENT_ID + str (random .randint (1 ,999 ))
175- mq = MQTT (client_id = ID_RANDOM , transport = MQTT_TRANSPORT )
176- mq .username_pw_set (MQTT_USER , MQTT_PASSWORD )
177- if MQTT_TLS :
178- mq .tls_set ()
179- mq .enable_logger (LOGGER )
178+ # combined = source.latest().zip(pipe_heading)
179+ combined = source .latest ().zip_latest (pipe_heading )
180+ combined .map (rotate_points_azimuth ).map (to_brefv_raw ).sink (to_mqtt )
181+
182+
183+
180184 LOGGER .info ("Connecting to MQTT broker..." )
181185 mq .connect (MQTT_BROKER_HOST , MQTT_BROKER_PORT )
182186
183187 LOGGER .info ("Setting up MQTT listener..." )
184188
185- mq .on_message = on_message
186- mq .on_disconnect = on_disconnect
187- mq .on_connect = on_connect
189+
188190 mq .loop_forever ()
189- # threading.Thread(target=mq.loop_forever, daemon=True).start()
191+ # threading.Thread(target=mq.loop_forever, daemon=True).start()
0 commit comments