@@ -703,120 +703,192 @@ namespace zmq
703703 class monitor_t
704704 {
705705 public:
706- monitor_t () : socketPtr(NULL ) {}
707- virtual ~monitor_t () {}
706+ monitor_t () : socketPtr(NULL ), monitor_socket{NULL } {}
707+
708+ virtual ~monitor_t ()
709+ {
710+ if (socketPtr)
711+ zmq_socket_monitor (socketPtr, NULL , 0 );
712+
713+ if (monitor_socket)
714+ zmq_close (monitor_socket);
715+
716+ }
717+
718+
719+ #ifdef ZMQ_HAS_RVALUE_REFS
720+ monitor_t (monitor_t && rhs) ZMQ_NOTHROW :
721+ socketPtr(rhs.socketPtr),
722+ monitor_socket(rhs.monitor_socket)
723+ {
724+ rhs.socketPtr = NULL ;
725+ rhs.monitor_socket = NULL ;
726+ }
727+
728+ socket_t & operator =(socket_t && rhs) ZMQ_DELETED_FUNCTION ;
729+ #endif
730+
731+
708732
709733 void monitor (socket_t &socket, std::string const & addr, int events = ZMQ_EVENT_ALL)
710734 {
711735 monitor (socket, addr.c_str (), events);
712736 }
713737
714738 void monitor (socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
739+ {
740+ init (socket, addr_, events) ;
741+ while (true )
742+ {
743+ check_event (-1 ) ;
744+ }
745+ }
746+
747+ void init (socket_t &socket, std::string const & addr, int events = ZMQ_EVENT_ALL)
748+ {
749+ init (socket, addr.c_str (), events);
750+ }
751+
752+ void init (socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
715753 {
716754 int rc = zmq_socket_monitor (socket.ptr , addr_, events);
717755 if (rc != 0 )
718756 throw error_t ();
719757
720758 socketPtr = socket.ptr ;
721- void *s = zmq_socket (socket.ctxptr , ZMQ_PAIR);
722- assert (s );
759+ monitor_socket = zmq_socket (socket.ctxptr , ZMQ_PAIR);
760+ assert (monitor_socket );
723761
724- rc = zmq_connect (s , addr_);
762+ rc = zmq_connect (monitor_socket , addr_);
725763 assert (rc == 0 );
726764
727765 on_monitor_started ();
766+ }
767+
768+ bool check_event (int timeout = 0 )
769+ {
770+ assert (monitor_socket);
771+
772+ zmq_msg_t eventMsg;
773+ zmq_msg_init (&eventMsg);
774+
775+ zmq::pollitem_t items [] = {
776+ { monitor_socket, 0 , ZMQ_POLLIN, 0 },
777+ };
778+
779+ zmq::poll (&items [0 ], 1 , timeout);
728780
729- while (true ) {
730- zmq_msg_t eventMsg;
731- zmq_msg_init (&eventMsg);
732- rc = zmq_msg_recv (&eventMsg, s, 0 );
781+ if (items [0 ].revents & ZMQ_POLLIN)
782+ {
783+ int rc = zmq_msg_recv (&eventMsg, monitor_socket, 0 );
733784 if (rc == -1 && zmq_errno () == ETERM)
734- break ;
785+ return false ;
735786 assert (rc != -1 );
787+
788+ }
789+ else
790+ {
791+ zmq_msg_close (&eventMsg);
792+ return false ;
793+ }
794+
736795#if ZMQ_VERSION_MAJOR >= 4
737- const char * data = static_cast <const char *>(zmq_msg_data (&eventMsg));
738- zmq_event_t msgEvent;
739- memcpy (&msgEvent.event , data, sizeof (uint16_t )); data += sizeof (uint16_t );
740- memcpy (&msgEvent.value , data, sizeof (int32_t ));
741- zmq_event_t * event = &msgEvent;
796+ const char * data = static_cast <const char *>(zmq_msg_data (&eventMsg));
797+ zmq_event_t msgEvent;
798+ memcpy (&msgEvent.event , data, sizeof (uint16_t )); data += sizeof (uint16_t );
799+ memcpy (&msgEvent.value , data, sizeof (int32_t ));
800+ zmq_event_t * event = &msgEvent;
742801#else
743- zmq_event_t * event = static_cast <zmq_event_t *>(zmq_msg_data (&eventMsg));
802+ zmq_event_t * event = static_cast <zmq_event_t *>(zmq_msg_data (&eventMsg));
744803#endif
745804
746805#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
747- zmq_msg_t addrMsg;
748- zmq_msg_init (&addrMsg);
749- rc = zmq_msg_recv (&addrMsg, s, 0 );
750- if (rc == -1 && zmq_errno () == ETERM)
751- break ;
752- assert (rc != -1 );
753- const char * str = static_cast <const char *>(zmq_msg_data (&addrMsg));
754- std::string address (str, str + zmq_msg_size (&addrMsg));
755- zmq_msg_close (&addrMsg);
806+ zmq_msg_t addrMsg;
807+ zmq_msg_init (&addrMsg);
808+ int rc = zmq_msg_recv (&addrMsg, monitor_socket, 0 );
809+ if (rc == -1 && zmq_errno () == ETERM)
810+ {
811+ zmq_msg_close (&eventMsg);
812+ return false ;
813+ }
814+
815+ assert (rc != -1 );
816+ const char * str = static_cast <const char *>(zmq_msg_data (&addrMsg));
817+ std::string address (str, str + zmq_msg_size (&addrMsg));
818+ zmq_msg_close (&addrMsg);
756819#else
757- // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
758- std::string address = event->data .connected .addr ;
820+ // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
821+ std::string address = event->data .connected .addr ;
759822#endif
760823
761824#ifdef ZMQ_EVENT_MONITOR_STOPPED
762- if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
763- break ;
825+ if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
826+ {
827+ zmq_msg_close (&eventMsg);
828+ return true ;
829+ }
830+
764831#endif
765832
766- switch (event->event ) {
767- case ZMQ_EVENT_CONNECTED:
768- on_event_connected (*event, address.c_str ());
769- break ;
770- case ZMQ_EVENT_CONNECT_DELAYED:
771- on_event_connect_delayed (*event, address.c_str ());
772- break ;
773- case ZMQ_EVENT_CONNECT_RETRIED:
774- on_event_connect_retried (*event, address.c_str ());
775- break ;
776- case ZMQ_EVENT_LISTENING:
777- on_event_listening (*event, address.c_str ());
778- break ;
779- case ZMQ_EVENT_BIND_FAILED:
780- on_event_bind_failed (*event, address.c_str ());
781- break ;
782- case ZMQ_EVENT_ACCEPTED:
783- on_event_accepted (*event, address.c_str ());
784- break ;
785- case ZMQ_EVENT_ACCEPT_FAILED:
786- on_event_accept_failed (*event, address.c_str ());
787- break ;
788- case ZMQ_EVENT_CLOSED:
789- on_event_closed (*event, address.c_str ());
790- break ;
791- case ZMQ_EVENT_CLOSE_FAILED:
792- on_event_close_failed (*event, address.c_str ());
793- break ;
794- case ZMQ_EVENT_DISCONNECTED:
795- on_event_disconnected (*event, address.c_str ());
796- break ;
833+ switch (event->event ) {
834+ case ZMQ_EVENT_CONNECTED:
835+ on_event_connected (*event, address.c_str ());
836+ break ;
837+ case ZMQ_EVENT_CONNECT_DELAYED:
838+ on_event_connect_delayed (*event, address.c_str ());
839+ break ;
840+ case ZMQ_EVENT_CONNECT_RETRIED:
841+ on_event_connect_retried (*event, address.c_str ());
842+ break ;
843+ case ZMQ_EVENT_LISTENING:
844+ on_event_listening (*event, address.c_str ());
845+ break ;
846+ case ZMQ_EVENT_BIND_FAILED:
847+ on_event_bind_failed (*event, address.c_str ());
848+ break ;
849+ case ZMQ_EVENT_ACCEPTED:
850+ on_event_accepted (*event, address.c_str ());
851+ break ;
852+ case ZMQ_EVENT_ACCEPT_FAILED:
853+ on_event_accept_failed (*event, address.c_str ());
854+ break ;
855+ case ZMQ_EVENT_CLOSED:
856+ on_event_closed (*event, address.c_str ());
857+ break ;
858+ case ZMQ_EVENT_CLOSE_FAILED:
859+ on_event_close_failed (*event, address.c_str ());
860+ break ;
861+ case ZMQ_EVENT_DISCONNECTED:
862+ on_event_disconnected (*event, address.c_str ());
863+ break ;
797864#ifdef ZMQ_BUILD_DRAFT_API
798- case ZMQ_EVENT_HANDSHAKE_FAILED:
799- on_event_handshake_failed (*event, address.c_str ());
800- break ;
801- case ZMQ_EVENT_HANDSHAKE_SUCCEED:
802- on_event_handshake_succeed (*event, address.c_str ());
803- break ;
865+ case ZMQ_EVENT_HANDSHAKE_FAILED:
866+ on_event_handshake_failed (*event, address.c_str ());
867+ break ;
868+ case ZMQ_EVENT_HANDSHAKE_SUCCEED:
869+ on_event_handshake_succeed (*event, address.c_str ());
870+ break ;
804871#endif
805- default :
806- on_event_unknown (*event, address.c_str ());
807- break ;
808- }
809- zmq_msg_close (&eventMsg);
872+ default :
873+ on_event_unknown (*event, address.c_str ());
874+ break ;
810875 }
811- zmq_close (s);
812- socketPtr = NULL ;
876+ zmq_msg_close (&eventMsg);
877+
878+ return true ;
813879 }
814880
815881#ifdef ZMQ_EVENT_MONITOR_STOPPED
816882 void abort ()
817883 {
818884 if (socketPtr)
819885 zmq_socket_monitor (socketPtr, NULL , 0 );
886+
887+ if (monitor_socket)
888+ zmq_close (monitor_socket);
889+
890+ socketPtr = NULL ;
891+ monitor_socket = NULL ;
820892 }
821893#endif
822894 virtual void on_monitor_started () {}
@@ -834,7 +906,12 @@ namespace zmq
834906 virtual void on_event_handshake_succeed (const zmq_event_t &event_, const char * addr_) { (void ) event_; (void ) addr_; }
835907 virtual void on_event_unknown (const zmq_event_t &event_, const char * addr_) { (void )event_; (void )addr_; }
836908 private:
909+
910+ monitor_t (const monitor_t &) ZMQ_DELETED_FUNCTION;
911+ void operator = (const monitor_t &) ZMQ_DELETED_FUNCTION;
912+
837913 void * socketPtr;
914+ void *monitor_socket ;
838915 };
839916
840917#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
0 commit comments