Skip to content
10 changes: 6 additions & 4 deletions pgpdump/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ def __init__(self, data):
self.data = data
self.length = len(data)

def packets(self):
'''A generator function returning PGP data packets.'''
def packets(self, skip=False):
'''A generator function returning PGP data packets.
if skip=True, failing packets will log an error instead of raising an exception.'''
offset = 0
while offset < self.length:
total_length, packet = construct_packet(self.data, offset)
total_length, packet = construct_packet(self.data, offset, skip)
offset += total_length
yield packet
if packet is not None:
yield packet

def __repr__(self):
return "<%s: length %d>" % (
Expand Down
57 changes: 42 additions & 15 deletions pgpdump/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import hashlib
from math import ceil, log
import re
import logging

from .utils import (PgpdumpException, get_int2, get_int4, get_mpi,
from .utils import (PgpdumpException, encode_packet,
get_int2, get_int4, get_mpi,
get_key_id, get_hex_data, get_int_bytes, pack_data)


Expand Down Expand Up @@ -46,6 +48,7 @@ class AlgoLookup(object):
19: "ECDSA",
20: "Formerly ElGamal Encrypt or Sign",
21: "Diffie-Hellman",
22: "EdDSA",
}

@classmethod
Expand Down Expand Up @@ -143,6 +146,7 @@ def __init__(self, raw, hashed, data):
30: "Features",
31: "Signature Target",
32: "Embedded Signature",
33: "Issuer Fingerprint",
}

@property
Expand Down Expand Up @@ -172,6 +176,8 @@ def __init__(self, *args, **kwargs):
self.raw_expiration_time = None
self.expiration_time = None
self.key_id = None
self.fingerprint = None
self.fingerprint_version = None
self.hash2 = None
self.subpackets = []
super(SignaturePacket, self).__init__(*args, **kwargs)
Expand Down Expand Up @@ -275,6 +281,9 @@ def parse_subpackets(self, outer_offset, outer_length, hashed=False):
self.raw_expiration_time = get_int4(subpacket.data, 0)
elif subpacket.subtype == 16:
self.key_id = get_key_id(subpacket.data, 0)
elif subpacket.subtype == 33:
self.fingerprint_version = int(subpacket.data[0])
self.fingerprint = get_hex_data(subpacket.data, 1, len(subpacket.data))
offset += sub_len
self.subpackets.append(subpacket)

Expand Down Expand Up @@ -420,12 +429,9 @@ def parse_key_material(self, offset):
self.prime, offset = get_mpi(self.data, offset)
self.group_gen, offset = get_mpi(self.data, offset)
self.key_value, offset = get_mpi(self.data, offset)
elif 100 <= self.raw_pub_algorithm <= 110:
# Private/Experimental algorithms, just move on
pass
else:
raise PgpdumpException("Unsupported public key algorithm %d" %
self.raw_pub_algorithm)
# If we don't know how to handle the algorithm, just move on
pass

return offset

Expand Down Expand Up @@ -578,7 +584,7 @@ def parse(self):
"Unsupported GnuPG S2K extension, encountered mode %d" % mode)
else:
raise PgpdumpException(
"Unsupported public key algorithm %d" % s2k_type_id)
"Unsupported S2K algorithm %d" % s2k_type_id)

if s2k_length != (offset - offset_before_s2k):
raise PgpdumpException(
Expand Down Expand Up @@ -609,12 +615,9 @@ def parse_private_key_material(self, offset):
self.pub_algorithm_type = "elg"
# x
self.exponent_x, offset = get_mpi(self.data, offset)
elif 100 <= self.raw_pub_algorithm <= 110:
# Private/Experimental algorithms, just move on
pass
else:
raise PgpdumpException("Unsupported public key algorithm %d" %
self.raw_pub_algorithm)
# If we don't know how to handle the algorithm, just move on
pass

return offset

Expand Down Expand Up @@ -668,6 +671,9 @@ def parse(self):
sub_offset, sub_len, sub_part = new_tag_length(self.data, offset)
# sub_len includes the subtype single byte, knock that off
sub_len -= 1
if offset + sub_offset >= len(self.data):
raise PgpdumpException("Attribute at position %d wants another %d octets, but only %d octets remain"%(
offset, sub_offset, len(self.data) - offset))
# initial length bytes
offset += sub_offset

Expand All @@ -677,10 +683,16 @@ def parse(self):
# there is only one currently known type- images (1)
if sub_type == 1:
# the only little-endian encoded value in OpenPGP
if len(self.data) <= (offset + 3):
raise PgpdumpException("Needs 4-octet attribute header at position %d of packet size %d"%(offset, len(self.data)))
hdr_size = self.data[offset] + (self.data[offset + 1] << 8)
hdr_version = self.data[offset + 2]
self.raw_image_format = self.data[offset + 3]
if len(self.data) <= (offset + hdr_size):
raise PgpdumpException("Claimed attribute header has %d octets at position %d of packet size %d"%(hdr_size, offset, len(self.data)))
offset += hdr_size
# FIXME: ensure that the reserved octets of the header are all-zeros
# (see https://tools.ietf.org/html/rfc4880#section-5.12.1)

self.image_data = self.data[offset:]
if self.raw_image_format == 1:
Expand Down Expand Up @@ -767,6 +779,8 @@ def new_tag_length(data, start):
look. Returns a derived (offset, length, partial) tuple.
Reference: http://tools.ietf.org/html/rfc4880#section-4.2.2
'''
if len(data) <= start:
raise PgpdumpException("new_tag_length at start %d of packet of length %d"%(start, len(data)))
first = data[start]
offset = length = 0
partial = False
Expand Down Expand Up @@ -817,10 +831,16 @@ def old_tag_length(data, start):
return (offset, length)


def construct_packet(data, header_start):
def construct_packet(data, header_start, skip=False):
'''Returns a (length, packet) tuple constructed from 'data' at index
'header_start'. If there is a next packet, it will be found at
header_start + length.'''
header_start + length.

If skip=True, then a packet with an error will emit a warning (via
the logging module) and return None as the packet; otherwise the
error will be raised directly.

'''

# tag encoded in bits 5-0 (new packet format)
# 0x3f == 111111b
Expand Down Expand Up @@ -867,5 +887,12 @@ def construct_packet(data, header_start):
data, header_start)
else:
break
packet = PacketType(tag, name, new, packet_data)
packet = None
try:
packet = PacketType(tag, name, new, packet_data)
except PgpdumpException as e:
if skip:
logging.warning(str(e) + '\n' + encode_packet(tag, new, packet_data, armored=True))
else:
raise
return (consumed, packet)
58 changes: 58 additions & 0 deletions pgpdump/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import binascii
import sys
import codecs
from base64 import b64encode

PY26 = sys.version_info[0] == 2 and sys.version_info[1] <= 6

Expand Down Expand Up @@ -71,27 +73,83 @@ def crc24(data):

def get_int2(data, offset):
'''Pull two bytes from data at offset and return as an integer.'''
if offset+2 > len(data):
raise PgpdumpException("Wants a 2-byte integer at position %d of packet of size %d"%(
offset, len(data)))
return (data[offset] << 8) + data[offset + 1]


def get_int4(data, offset):
'''Pull four bytes from data at offset and return as an integer.'''
if offset+4 > len(data):
raise PgpdumpException("Wants a 4-byte integer at position %d of packet of size %d"%(
offset, len(data)))
return ((data[offset] << 24) + (data[offset + 1] << 16) +
(data[offset + 2] << 8) + data[offset + 3])


def get_int8(data, offset):
'''Pull eight bytes from data at offset and return as an integer.'''
if offset+8 > len(data):
raise PgpdumpException("Wants an 8-byte integer at position %d of packet of size %d"%(
offset, len(data)))
return (get_int4(data, offset) << 32) + get_int4(data, offset + 4)


def encode_packet(tag, new, data, armored=False):
if tag > 0x0f and not new:
raise PgpdumpException("cannot make new packet with tag %d"%(tag))
hdr = bytearray()
if new:
hdr += bytearray([0xc0|tag])
if len(data) < 192:
hdr += bytearray([len(data)])
elif len(data) < 8384:
i = len(data) - 192
hdr += bytearray([i//256, i%256])
else:
i = len(data)
hdr += bytearray([256, i >> 24, (i>>16)&0xff, (i>>8)&0xff, i&0xff])
else: # old-style packet format
if len(data) < (1<<8):
hdr += bytearray([0x80|(tag <<2), len(data)])
elif len(data) < (1<<16):
hdr += bytearray([0x80|(tag <<2)|1,
(len(data)>>8)&0xff,
len(data)&0xff])
elif len(data) < (1<<32):
hdr += bytearray([0x80|(tag <<2)|2,
(len(data)>>24)&0xff,
(len(data)>>16)&0xff,
(len(data)>>8)&0xff,
len(data)&0xff])
else:
raise NotImplementedError('packet of length %d, but we do not generate indeterminate-sized packets'%(len(data),))
frame = hdr + data
if not armored:
return frame
else:
strdata = codecs.decode(b64encode(frame), 'ascii')
return '''-----BEGIN PGP {block}-----

{body}
={crc}
-----END PGP {block}-----
'''.format(block={2: 'SIGNATURE', 6: 'KEY BLOCK'}.get(tag, 'MESSAGE'),
body='\n'.join([strdata[i:i+64] for i in range(0, len(strdata), 64)]),
crc=codecs.decode(b64encode(crc24(frame).to_bytes(3, 'big')), 'ascii'))



def get_mpi(data, offset):
'''Gets a multi-precision integer as per RFC-4880.
Returns the MPI and the new offset.
See: http://tools.ietf.org/html/rfc4880#section-3.2'''
mpi_len = get_int2(data, offset)
offset += 2
to_process = (mpi_len + 7) // 8
if to_process > (len(data) - offset):
raise PgpdumpException("MPI wants %s octets, but buffer has only %s left"%(to_process, len(data) - offset))
mpi = 0
i = -4
for i in range(0, to_process - 3, 4):
Expand Down