Skip to content
273 changes: 268 additions & 5 deletions netsyslog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2005 Graham Ashton <ashtong@users.sourceforge.net>
# Copyright (C) 2010 Daniel Pocock http://danielpocock.com
#
# This module is free software, and you may redistribute it and/or modify
# it under the same terms as Python itself, so long as this copyright message
Expand Down Expand Up @@ -47,10 +48,28 @@


import os
import logging
import socket
import SocketServer
import sys
import time

class Error(Exception):
"""Base class for exceptions in this module."""
pass

class ParseError(Error):
"""Exception raised for errors parsing frames from the wire.

Attributes:
field -- input expression in which the error occurred
msg -- explanation of the error
"""

def __init__(self, field, msg):
print field + ", " + msg
self.field = field
self.msg = msg

class PriPart(object):

Expand All @@ -77,6 +96,19 @@ def __init__(self, facility, severity):
self.facility = facility
self.severity = severity

@classmethod
def fromWire(cls, pri_text):
"""Initialise the object, specifying a numerical priority from the wire.
"""
assert pri_text is not None
try:
pri_n = int(pri_text)
except ValueError:
raise ParseError("priority", "not numeric")
facility = pri_n & 0xf8
severity = pri_n & 0x07
return cls(facility, severity)

def __str__(self):
value = self.facility + self.severity
return "<%s>" % value
Expand Down Expand Up @@ -111,20 +143,61 @@ def __init__(self, timestamp=None, hostname=None):
self.timestamp = timestamp
self.hostname = hostname

@classmethod
def fromWire(cls, header_text):
"""Initialise the object, specifying text from the wire.
"""
assert header_text is not None

# timestamp (15 bytes), space (1 byte), hostname (at least one byte)
if len(header_text) < 17:
raise ParseError("header", "should be at least 17 bytes")
# timestamp is fixed length
if header_text[15] != " ":
raise ParseError("header", "16th byte should be a space")

timestamp = header_text[0:15]
if not cls._timestamp_is_valid(timestamp):
raise ParseError("header/timestamp", "invalid timestamp: '%s'" % timestamp)
hostname = header_text[16:]
return cls(timestamp, hostname)

def __str__(self):
return "%s %s" % (self.timestamp, self.hostname)

def _get_timestamp(self):
return self._timestamp

def _calculate_current_timestamp(self):
def parse_timestamp(self):
"""Parses the syslog timestamp string into a struct_time object.

"""
# syslog RFC3164 timestamps don't include a year value so
# we must substitute it manually
localtime = time.localtime()
day = time.strftime("%d", localtime)
year = localtime.tm_year
full_ts = "%d %s" % (year, self._timestamp)
result = time.strptime(full_ts, "%Y %b %d %H:%M:%S")
# In the first day of a year (tm_mon==1) we may still
# receive some values from the last day of the previous year
if result.tm_mon == 12 and localtime.tm_mon == 1:
year = year - 1
full_ts = "%d %s" % (year, self._timestamp)
result = time.strptime(full_ts, "%Y %b %d %H:%M:%S")
return result

def _format_timestamp_rfc3164(self, _timestamp):
day = time.strftime("%d", _timestamp)
if day[0] == "0":
day = " " + day[1:]
value = time.strftime("%b %%s %H:%M:%S", localtime)
value = time.strftime("%b %%s %H:%M:%S", _timestamp)
return value % day

def _calculate_current_timestamp(self):
localtime = time.localtime()
return self._format_timestamp_rfc3164(localtime)

@classmethod
def _timestamp_is_valid(self, value):
if value is None:
return False
Expand Down Expand Up @@ -197,8 +270,34 @@ def __init__(self, tag=None, content="", pid=None):
self.content = content
self.pid = pid

@classmethod
def fromWire(cls, message_text):
"""Initialise the object, specifying text from the wire."""

assert message_text is not None

# look for the tag[PID] text
_colon = message_text.find(":")
if _colon < 0:
raise ParseError("message", "missing colon to separate tag from message")
tag_text = message_text[0:_colon]
begin_pid = tag_text.find("[")
end_pid = tag_text.find("]")
_pid = None
if begin_pid > -1:
if end_pid < 0:
# not a valid message
raise ParseError("message", "missing ']' in tag/pid section")
_tag = tag_text[0:begin_pid]
_pid = tag_text[begin_pid+1:end_pid]
else:
_tag = tag_text
_pid = None
_content = message_text[_colon+2:]
return cls(_tag, _content, _pid)

def __str__(self):
content = self.content
content = self._prepend_seperator(self.content)
if self.pid is not None:
content = "[%s]" % self.pid + content
return self.tag + content
Expand Down Expand Up @@ -234,7 +333,6 @@ def _prepend_seperator(self, value):
return value

def _set_content(self, value):
value = self._prepend_seperator(value)
self._content = value

content = property(_get_content, _set_content, None,
Expand Down Expand Up @@ -270,6 +368,36 @@ def __init__(self, pri, header, msg):
self.header = header
self.msg = msg

@classmethod
def fromWire(cls, packet_text):
"""Initialise the object, specifying packet text from the wire."""

assert packet_text is not None

if len(packet_text) < 6:
# not long enough
raise ParseError("frame", "too short")

if packet_text[0] != "<":
# not correct syntax
raise ParseError("frame", "should begin with '<'")

gt = packet_text.index(">", 1)
pri_text = packet_text[1:gt]

# skip the next space and the timestamp
sp = gt + 1 + 15
# now skip the hostname
sp = packet_text.index(" ", sp + 1)
header_text = packet_text[gt+1:sp]

msg_text = packet_text[sp+1:]

pri = PriPart.fromWire(pri_text)
header = HeaderPart.fromWire(header_text)
msg = MsgPart.fromWire(msg_text)
return cls(pri, header, msg)

def __str__(self):
message = "%s%s %s" % (self.pri, self.header, self.msg)
return message[:self.MAX_LEN]
Expand Down Expand Up @@ -360,3 +488,138 @@ def send_packet(self, packet):

"""
self._send_packet_to_hosts(packet)

class SyslogTCPHandler(SocketServer.BaseRequestHandler):

BUF_SIZE = 2048
MAX_CACHED = 4096
MAX_FRAME = 2048
TERM_CHAR = "\n"

def setup(self):
"""Setup variables used by this instance."""
self.logger = logging.getLogger(__name__)
self.cached = None
self.frame_size = None
self.logger.info("incoming TCP connection accepted")

def handle(self):
"""Handle the incoming bytes, try to resolve them to frames."""
data = self.request.recv(self.BUF_SIZE)
while len(data) > 0:
if self.cached is None:
self.cached = data
else:
if (len(self.cached) + len(data)) > self.MAX_CACHED:
# too many bytes
self.logger.warning("too much data")
self.request.close()
return
self.cached = self.cached + data

if len(self.cached) > 8:
if self.frame_size is None:
if self.cached[0] == "<":
# non-transparent framing
self.frame_size = -1
else:
# octet counting
sp = self.cached.find(" ")
if sp < 0:
# didn't find frame length terminated by a space
self.logger.warning("suspected octet-framing, but frame length not terminated by a space")
self.request.close()
return
try:
self.frame_size = int(self.cached[0:sp])
except ValueError:
# frame length is not a number
self.logger.warning("frame length is not a number")
self.request.close()
return
if self.frame_size < 1 or self.frame_size > self.MAX_FRAME:
# specified frame size too small/big
self.logger.warning("specified frame size is too big or too small")
self.request.close()
return
# now we parsed the size, trim the frame size string from the
# beginning of the frame
self.cached = self.cached[sp+1:]

try:
if self.frame_size > 0:
if len(self.cached) >= self.frame_size:
self.handle_frame_text(self.frame_size, self.frame_size)
else:
term_idx = self.cached.find(self.TERM_CHAR)
if term_idx >= 0:
# do not consider the TERM_CHAR as part of the frame
self.handle_frame_text(term_idx, term_idx + 1)
except Exception as e:
self.logger.warning("exception occurred parsing/handling a frame: " + str(e))
self.request.close()
return
# loop again
data = self.request.recv(self.BUF_SIZE)

# we get here if the received data size == 0 (connection closed)
self.request.close()

def handle_frame_text(self, frame_len, skip_len):
"""Handle the frame text, convert to L{Packet}."""
# extract the frame itself
frame_text = self.cached[0:frame_len]

# manage the buffer, there may be more data available
if len(self.cached) > skip_len:
self.cached = self.cached[skip_len:]
else:
self.cached = None
self.frame_size = None

# parse the frame
try:
frame = Packet.fromWire(frame_text)
except ParseError:
# these are errors we noticed
raise
except Exception:
# these are errors the parser didn't correctly detect, should
# analyze them and improve the parser
raise ParseError("frame", "unexpected parsing error")

try:
self.handle_message(frame)
except Exception:
# the application (subclass) raised some exception
raise

def handle_message(self, frame):
"""Handle parsed Syslog frames.

Applications should override this method.

This default implementation prints some data from each frame.

"""
pass

class ThreadedSyslogServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass

class Collector(object):
"""Accept log messages from Syslog clients.

Accept Syslog messages over the network and pass them to the application.

"""

def __init__(self, port=514, handler=SyslogTCPHandler):
address = ("0.0.0.0", port)
ThreadedSyslogServer.daemon_threads = True
ThreadedSyslogServer.allow_reuse_address = True
self.server = ThreadedSyslogServer(address, handler)

def run(self):
self.server.serve_forever()

33 changes: 33 additions & 0 deletions netsyslog_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2005 Graham Ashton <ashtong@users.sourceforge.net>
# Copyright (C) 2010 Daniel Pocock http://danielpocock.com
#
# This module is free software, and you may redistribute it and/or modify
# it under the same terms as Python itself, so long as this copyright message
Expand Down Expand Up @@ -177,6 +178,38 @@ def test_max_length(self):
packet = netsyslog.Packet(DEFAULT_PRI, DEFAULT_HEADER, message)
self.assertEqual(len(str(packet)), netsyslog.Packet.MAX_LEN)

def test_parse(self):
"""Check that we can parse a message from the wire"""
packet = netsyslog.Packet.fromWire("<142>Mar 16 08:58:41 alpha1 foobar[1234]: testing")
self.assertEqual(packet.pri.severity, syslog.LOG_INFO)
self.assertEqual(packet.pri.facility, syslog.LOG_LOCAL1)
self.assertEqual(packet.header.hostname, "alpha1")
self.assertEqual(packet.msg.tag, "foobar")
self.assertEqual(packet.msg.pid, "1234")
self.assertEqual(packet.msg.content, "testing")
parsed_ts = packet.header.parse_timestamp()
self.assertEqual(parsed_ts.tm_mon, 3)
self.assertEqual(parsed_ts.tm_mday, 16)
self.assertEqual(parsed_ts.tm_hour, 8)

# now try with an unusual date (single digit for day of month):
packet = netsyslog.Packet.fromWire("<142>Mar 6 08:58:41 alpha1 foobar[1234]: testing")
self.assertEqual(packet.header.hostname, "alpha1")

try:
# just too short
packet = netsyslog.Packet.fromWire("<2>")
self.assert_(true)
except netsyslog.ParseError:
pass

try:
# timestamp single digit for day of month:
packet = netsyslog.Packet.fromWire("<142>Mar 6 08:58:41 alpha1 foobar[1234]: testing")
self.assert_(true)
except netsyslog.ParseError:
pass


class LoggerTest(MockHeaderTest, MockMsgTest):

Expand Down
Loading