diff --git a/netsyslog.py b/netsyslog.py index 61b4e3e..736732c 100644 --- a/netsyslog.py +++ b/netsyslog.py @@ -1,4 +1,5 @@ # Copyright (C) 2005 Graham Ashton +# Copyright (C) 2010 Daniel Pocock http://danielpocock.com # # This module is free software, and you may redistribute it and/or modify # it under the same terms as Python itself, so long as this copyright message @@ -47,10 +48,28 @@ import os +import logging import socket +import SocketServer import sys import time +class Error(Exception): + """Base class for exceptions in this module.""" + pass + +class ParseError(Error): + """Exception raised for errors parsing frames from the wire. + + Attributes: + field -- input expression in which the error occurred + msg -- explanation of the error + """ + + def __init__(self, field, msg): + print field + ", " + msg + self.field = field + self.msg = msg class PriPart(object): @@ -77,6 +96,19 @@ def __init__(self, facility, severity): self.facility = facility self.severity = severity + @classmethod + def fromWire(cls, pri_text): + """Initialise the object, specifying a numerical priority from the wire. + """ + assert pri_text is not None + try: + pri_n = int(pri_text) + except ValueError: + raise ParseError("priority", "not numeric") + facility = pri_n & 0xf8 + severity = pri_n & 0x07 + return cls(facility, severity) + def __str__(self): value = self.facility + self.severity return "<%s>" % value @@ -111,20 +143,61 @@ def __init__(self, timestamp=None, hostname=None): self.timestamp = timestamp self.hostname = hostname + @classmethod + def fromWire(cls, header_text): + """Initialise the object, specifying text from the wire. + """ + assert header_text is not None + + # timestamp (15 bytes), space (1 byte), hostname (at least one byte) + if len(header_text) < 17: + raise ParseError("header", "should be at least 17 bytes") + # timestamp is fixed length + if header_text[15] != " ": + raise ParseError("header", "16th byte should be a space") + + timestamp = header_text[0:15] + if not cls._timestamp_is_valid(timestamp): + raise ParseError("header/timestamp", "invalid timestamp: '%s'" % timestamp) + hostname = header_text[16:] + return cls(timestamp, hostname) + def __str__(self): return "%s %s" % (self.timestamp, self.hostname) def _get_timestamp(self): return self._timestamp - def _calculate_current_timestamp(self): + def parse_timestamp(self): + """Parses the syslog timestamp string into a struct_time object. + + """ + # syslog RFC3164 timestamps don't include a year value so + # we must substitute it manually localtime = time.localtime() - day = time.strftime("%d", localtime) + year = localtime.tm_year + full_ts = "%d %s" % (year, self._timestamp) + result = time.strptime(full_ts, "%Y %b %d %H:%M:%S") + # In the first day of a year (tm_mon==1) we may still + # receive some values from the last day of the previous year + if result.tm_mon == 12 and localtime.tm_mon == 1: + year = year - 1 + full_ts = "%d %s" % (year, self._timestamp) + result = time.strptime(full_ts, "%Y %b %d %H:%M:%S") + return result + + def _format_timestamp_rfc3164(self, _timestamp): + day = time.strftime("%d", _timestamp) if day[0] == "0": day = " " + day[1:] - value = time.strftime("%b %%s %H:%M:%S", localtime) + value = time.strftime("%b %%s %H:%M:%S", _timestamp) return value % day + def _calculate_current_timestamp(self): + localtime = time.localtime() + return self._format_timestamp_rfc3164(localtime) + + @classmethod def _timestamp_is_valid(self, value): if value is None: return False @@ -197,8 +270,34 @@ def __init__(self, tag=None, content="", pid=None): self.content = content self.pid = pid + @classmethod + def fromWire(cls, message_text): + """Initialise the object, specifying text from the wire.""" + + assert message_text is not None + + # look for the tag[PID] text + _colon = message_text.find(":") + if _colon < 0: + raise ParseError("message", "missing colon to separate tag from message") + tag_text = message_text[0:_colon] + begin_pid = tag_text.find("[") + end_pid = tag_text.find("]") + _pid = None + if begin_pid > -1: + if end_pid < 0: + # not a valid message + raise ParseError("message", "missing ']' in tag/pid section") + _tag = tag_text[0:begin_pid] + _pid = tag_text[begin_pid+1:end_pid] + else: + _tag = tag_text + _pid = None + _content = message_text[_colon+2:] + return cls(_tag, _content, _pid) + def __str__(self): - content = self.content + content = self._prepend_seperator(self.content) if self.pid is not None: content = "[%s]" % self.pid + content return self.tag + content @@ -234,7 +333,6 @@ def _prepend_seperator(self, value): return value def _set_content(self, value): - value = self._prepend_seperator(value) self._content = value content = property(_get_content, _set_content, None, @@ -270,6 +368,36 @@ def __init__(self, pri, header, msg): self.header = header self.msg = msg + @classmethod + def fromWire(cls, packet_text): + """Initialise the object, specifying packet text from the wire.""" + + assert packet_text is not None + + if len(packet_text) < 6: + # not long enough + raise ParseError("frame", "too short") + + if packet_text[0] != "<": + # not correct syntax + raise ParseError("frame", "should begin with '<'") + + gt = packet_text.index(">", 1) + pri_text = packet_text[1:gt] + + # skip the next space and the timestamp + sp = gt + 1 + 15 + # now skip the hostname + sp = packet_text.index(" ", sp + 1) + header_text = packet_text[gt+1:sp] + + msg_text = packet_text[sp+1:] + + pri = PriPart.fromWire(pri_text) + header = HeaderPart.fromWire(header_text) + msg = MsgPart.fromWire(msg_text) + return cls(pri, header, msg) + def __str__(self): message = "%s%s %s" % (self.pri, self.header, self.msg) return message[:self.MAX_LEN] @@ -360,3 +488,138 @@ def send_packet(self, packet): """ self._send_packet_to_hosts(packet) + +class SyslogTCPHandler(SocketServer.BaseRequestHandler): + + BUF_SIZE = 2048 + MAX_CACHED = 4096 + MAX_FRAME = 2048 + TERM_CHAR = "\n" + + def setup(self): + """Setup variables used by this instance.""" + self.logger = logging.getLogger(__name__) + self.cached = None + self.frame_size = None + self.logger.info("incoming TCP connection accepted") + + def handle(self): + """Handle the incoming bytes, try to resolve them to frames.""" + data = self.request.recv(self.BUF_SIZE) + while len(data) > 0: + if self.cached is None: + self.cached = data + else: + if (len(self.cached) + len(data)) > self.MAX_CACHED: + # too many bytes + self.logger.warning("too much data") + self.request.close() + return + self.cached = self.cached + data + + if len(self.cached) > 8: + if self.frame_size is None: + if self.cached[0] == "<": + # non-transparent framing + self.frame_size = -1 + else: + # octet counting + sp = self.cached.find(" ") + if sp < 0: + # didn't find frame length terminated by a space + self.logger.warning("suspected octet-framing, but frame length not terminated by a space") + self.request.close() + return + try: + self.frame_size = int(self.cached[0:sp]) + except ValueError: + # frame length is not a number + self.logger.warning("frame length is not a number") + self.request.close() + return + if self.frame_size < 1 or self.frame_size > self.MAX_FRAME: + # specified frame size too small/big + self.logger.warning("specified frame size is too big or too small") + self.request.close() + return + # now we parsed the size, trim the frame size string from the + # beginning of the frame + self.cached = self.cached[sp+1:] + + try: + if self.frame_size > 0: + if len(self.cached) >= self.frame_size: + self.handle_frame_text(self.frame_size, self.frame_size) + else: + term_idx = self.cached.find(self.TERM_CHAR) + if term_idx >= 0: + # do not consider the TERM_CHAR as part of the frame + self.handle_frame_text(term_idx, term_idx + 1) + except Exception as e: + self.logger.warning("exception occurred parsing/handling a frame: " + str(e)) + self.request.close() + return + # loop again + data = self.request.recv(self.BUF_SIZE) + + # we get here if the received data size == 0 (connection closed) + self.request.close() + + def handle_frame_text(self, frame_len, skip_len): + """Handle the frame text, convert to L{Packet}.""" + # extract the frame itself + frame_text = self.cached[0:frame_len] + + # manage the buffer, there may be more data available + if len(self.cached) > skip_len: + self.cached = self.cached[skip_len:] + else: + self.cached = None + self.frame_size = None + + # parse the frame + try: + frame = Packet.fromWire(frame_text) + except ParseError: + # these are errors we noticed + raise + except Exception: + # these are errors the parser didn't correctly detect, should + # analyze them and improve the parser + raise ParseError("frame", "unexpected parsing error") + + try: + self.handle_message(frame) + except Exception: + # the application (subclass) raised some exception + raise + + def handle_message(self, frame): + """Handle parsed Syslog frames. + + Applications should override this method. + + This default implementation prints some data from each frame. + + """ + pass + +class ThreadedSyslogServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + pass + +class Collector(object): + """Accept log messages from Syslog clients. + + Accept Syslog messages over the network and pass them to the application. + + """ + + def __init__(self, port=514, handler=SyslogTCPHandler): + address = ("0.0.0.0", port) + ThreadedSyslogServer.daemon_threads = True + ThreadedSyslogServer.allow_reuse_address = True + self.server = ThreadedSyslogServer(address, handler) + + def run(self): + self.server.serve_forever() + diff --git a/netsyslog_test.py b/netsyslog_test.py index ef070cd..b71afc7 100644 --- a/netsyslog_test.py +++ b/netsyslog_test.py @@ -1,4 +1,5 @@ # Copyright (C) 2005 Graham Ashton +# Copyright (C) 2010 Daniel Pocock http://danielpocock.com # # This module is free software, and you may redistribute it and/or modify # it under the same terms as Python itself, so long as this copyright message @@ -177,6 +178,38 @@ def test_max_length(self): packet = netsyslog.Packet(DEFAULT_PRI, DEFAULT_HEADER, message) self.assertEqual(len(str(packet)), netsyslog.Packet.MAX_LEN) + def test_parse(self): + """Check that we can parse a message from the wire""" + packet = netsyslog.Packet.fromWire("<142>Mar 16 08:58:41 alpha1 foobar[1234]: testing") + self.assertEqual(packet.pri.severity, syslog.LOG_INFO) + self.assertEqual(packet.pri.facility, syslog.LOG_LOCAL1) + self.assertEqual(packet.header.hostname, "alpha1") + self.assertEqual(packet.msg.tag, "foobar") + self.assertEqual(packet.msg.pid, "1234") + self.assertEqual(packet.msg.content, "testing") + parsed_ts = packet.header.parse_timestamp() + self.assertEqual(parsed_ts.tm_mon, 3) + self.assertEqual(parsed_ts.tm_mday, 16) + self.assertEqual(parsed_ts.tm_hour, 8) + + # now try with an unusual date (single digit for day of month): + packet = netsyslog.Packet.fromWire("<142>Mar 6 08:58:41 alpha1 foobar[1234]: testing") + self.assertEqual(packet.header.hostname, "alpha1") + + try: + # just too short + packet = netsyslog.Packet.fromWire("<2>") + self.assert_(true) + except netsyslog.ParseError: + pass + + try: + # timestamp single digit for day of month: + packet = netsyslog.Packet.fromWire("<142>Mar 6 08:58:41 alpha1 foobar[1234]: testing") + self.assert_(true) + except netsyslog.ParseError: + pass + class LoggerTest(MockHeaderTest, MockMsgTest): diff --git a/simple_server.py b/simple_server.py new file mode 100644 index 0000000..1b4fb79 --- /dev/null +++ b/simple_server.py @@ -0,0 +1,59 @@ + +# +# This is a very trivial demo of how to write a syslog collector +# that receives messages from the wire and prints them on +# the screen. +# +# To run it, specify the port number as the first command line +# argument, e.g. +# +# python simple_server.py 10514 +# + + +import logging +import netsyslog +import sys + +class MyHandler(netsyslog.SyslogTCPHandler): + + def handle_message(self, frame): + """Handle parsed Syslog frames. + + Applications should override this method. + + This default implementation prints some data from each frame. + + """ + print "severity: " + str(frame.pri.severity) + print "facility: " + str(frame.pri.facility) + print "tag: " + str(frame.msg.tag) + print "pid: " + str(frame.msg.pid) + print "content: " + str(frame.msg.content) + print "host: " + str(frame.header.hostname) + print "ts: " + str(frame.header.timestamp) + print "" + +if __name__ == '__main__': + + logging.basicConfig(level=logging.INFO) + c = netsyslog.Collector(int(sys.argv[1]), MyHandler) + c.run() + +# Copyright (C) 2010, Daniel Pocock http://danielpocock.com +# +# This module is free software, and you may redistribute it and/or modify +# it under the same terms as Python itself, so long as this copyright message +# and disclaimer are retained in their original form. +# +# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, +# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF +# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +# DAMAGE. +# +# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, +# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, +# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. +