From f241ff13100c12132a067b4b8576205f8a8f058d Mon Sep 17 00:00:00 2001 From: Reid Beels Date: Sat, 10 Mar 2018 17:26:28 -0800 Subject: [PATCH 1/3] Fix typo in field_names list comprehension Fixes #81 --- csvdedupe/csvdedupe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/csvdedupe/csvdedupe.py b/csvdedupe/csvdedupe.py index a40c021..09655c0 100644 --- a/csvdedupe/csvdedupe.py +++ b/csvdedupe/csvdedupe.py @@ -48,7 +48,7 @@ def __init__(self): except KeyError: raise self.parser.error("You must provide field_names") else : - self.field_names = [self.field_def['field'] + self.field_names = [field_def['field'] for field_def in self.field_definition] self.destructive = self.configuration.get('destructive', False) From 54bd39540f3c8ac99f2273725dae20421188ccbe Mon Sep 17 00:00:00 2001 From: Reid Beels Date: Sat, 10 Mar 2018 20:37:26 -0800 Subject: [PATCH 2/3] Remove trailing whitespace --- csvdedupe/csvdedupe.py | 6 +++--- csvdedupe/csvhelpers.py | 18 +++++++++--------- csvdedupe/csvlink.py | 12 ++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/csvdedupe/csvdedupe.py b/csvdedupe/csvdedupe.py index 09655c0..76ab7da 100644 --- a/csvdedupe/csvdedupe.py +++ b/csvdedupe/csvdedupe.py @@ -25,7 +25,7 @@ def __init__(self): # We need to get control of STDIN again. # This is a UNIX/Mac OSX solution only # http://stackoverflow.com/questions/7141331/pipe-input-to-python-program-and-later-get-input-from-user - # + # # Same question has a Windows solution sys.stdin = open('/dev/tty') # Unix only solution, else: @@ -94,7 +94,7 @@ def main(self): fields = {variable.field for variable in deduper.data_model.primary_fields} unique_d, parents = exact_matches(data_d, fields) - + else: # # Create a new deduper object and pass our data model to it. deduper = dedupe.Dedupe(self.field_definition) @@ -115,7 +115,7 @@ def main(self): # ## Clustering - # Find the threshold that will maximize a weighted average of our precision and recall. + # Find the threshold that will maximize a weighted average of our precision and recall. # When we set the recall weight to 2, we are saying we care twice as much # about recall as we do precision. # diff --git a/csvdedupe/csvhelpers.py b/csvdedupe/csvhelpers.py index 72d64a2..b6be5c7 100644 --- a/csvdedupe/csvhelpers.py +++ b/csvdedupe/csvhelpers.py @@ -23,7 +23,7 @@ def preProcess(column): """ - Do a little bit of data cleaning. Things like casing, extra spaces, + Do a little bit of data cleaning. Things like casing, extra spaces, quotes and new lines are ignored. """ column = re.sub(' +', ' ', column) @@ -36,8 +36,8 @@ def preProcess(column): def readData(input_file, field_names, delimiter=',', prefix=None): """ - Read in our data from a CSV file and create a dictionary of records, - where the key is a unique record ID and each value is a dict + Read in our data from a CSV file and create a dictionary of records, + where the key is a unique record ID and each value is a dict of the row fields. **Currently, dedupe depends upon records' unique ids being integers @@ -46,7 +46,7 @@ def readData(input_file, field_names, delimiter=',', prefix=None): """ data = {} - + reader = csv.DictReader(StringIO(input_file),delimiter=delimiter) for i, row in enumerate(reader): clean_row = {k: preProcess(v) for (k, v) in row.items() if k is not None} @@ -62,7 +62,7 @@ def readData(input_file, field_names, delimiter=',', prefix=None): # ## Writing results def writeResults(clustered_dupes, input_file, output_file): - # Write our original data back out to a CSV with a new column called + # Write our original data back out to a CSV with a new column called # 'Cluster ID' which indicates which records refer to each other. logging.info('saving results to: %s' % output_file) @@ -95,7 +95,7 @@ def writeResults(clustered_dupes, input_file, output_file): # ## Writing results def writeUniqueResults(clustered_dupes, input_file, output_file): - # Write our original data back out to a CSV with a new column called + # Write our original data back out to a CSV with a new column called # 'Cluster ID' which indicates which records refer to each other. logging.info('saving unique results to: %s' % output_file) @@ -232,13 +232,13 @@ def _common_args(self) : help='CSV file to store deduplication results') self.parser.add_argument('--skip_training', action='store_true', help='Skip labeling examples by user and read training from training_files only') - self.parser.add_argument('--training_file', type=str, + self.parser.add_argument('--training_file', type=str, help='Path to a new or existing file consisting of labeled training examples') self.parser.add_argument('--settings_file', type=str, help='Path to a new or existing file consisting of learned training settings') - self.parser.add_argument('--sample_size', type=int, + self.parser.add_argument('--sample_size', type=int, help='Number of random sample pairs to train off of') - self.parser.add_argument('--recall_weight', type=int, + self.parser.add_argument('--recall_weight', type=int, help='Threshold that will maximize a weighted average of our precision and recall') self.parser.add_argument('-d', '--delimiter', type=str, help='Delimiting character of the input CSV file', default=',') diff --git a/csvdedupe/csvlink.py b/csvdedupe/csvlink.py index 0c77d34..1c6ec3a 100644 --- a/csvdedupe/csvlink.py +++ b/csvdedupe/csvlink.py @@ -112,14 +112,14 @@ def main(self): % self.settings_file) with open(self.settings_file, 'rb') as f: deduper = dedupe.StaticRecordLink(f) - + fields = {variable.field for variable in deduper.data_model.primary_fields} (nonexact_1, nonexact_2, exact_pairs) = exact_matches(data_1, data_2, fields) - - + + else: # # Create a new deduper object and pass our data model to it. deduper = dedupe.RecordLink(self.field_definition) @@ -142,7 +142,7 @@ def main(self): # ## Clustering - # Find the threshold that will maximize a weighted average of our precision and recall. + # Find the threshold that will maximize a weighted average of our precision and recall. # When we set the recall weight to 2, we are saying we care twice as much # about recall as we do precision. # @@ -189,7 +189,7 @@ def exact_matches(data_1, data_2, match_fields): for key, record in data_1.items(): record_hash = hash(tuple(record[f] for f in match_fields)) - redundant[record_hash] = key + redundant[record_hash] = key for key_2, record in data_2.items(): record_hash = hash(tuple(record[f] for f in match_fields)) @@ -202,7 +202,7 @@ def exact_matches(data_1, data_2, match_fields): for key_1 in redundant.values(): nonexact_1[key_1] = data_1[key_1] - + return nonexact_1, nonexact_2, exact_pairs def launch_new_instance(): From efc26c1b74175e170a01893989ed22b98e11e2d9 Mon Sep 17 00:00:00 2001 From: Reid Beels Date: Sat, 10 Mar 2018 23:01:42 -0800 Subject: [PATCH 3/3] Add support for LatLong matching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …with extra LongLat, Latitude, and Longitude convenience types --- README.md | 34 +++++++++++++ csvdedupe/csvdedupe.py | 15 ++++-- csvdedupe/csvhelpers.py | 107 +++++++++++++++++++++++++++++++++++++--- csvdedupe/csvlink.py | 23 ++++++--- 4 files changed, 159 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index c3bcf11..203ff0a 100644 --- a/README.md +++ b/README.md @@ -188,6 +188,40 @@ Or Delimiting character of the input CSV file (default: ,) * `-h`, `--help` show help message and exit +## Field Definitions & Types + +When providing a JSON config file, you can include definitions for each field that tell csvdedupe what _type_ of data the field contains. + +A basic definition for a single field looks like this: + + {"field": "name", "type": "String"} + +The dedupe docs include a [handy reference of all types and what they mean](https://dedupe.io/developers/library/en/latest/Variable-definition.html). + +### Latitude & Longitude + +When working with geographic points, it's necessary to provide field defintions that match the format of your data. Otherwise, csvdedupe may treat your lat/long fields as text and match in an unexpected way. + +* For **one field containing both latitude and longitude**, specify `LatLong` or `LongLat` depending on the order. This expects a field with the coordinates separated by some (any) non-numeric characters, so formats like `-122.23,46.42`, `-122.23, 46.42`, and `-122.23 46.42` will all work. + + { + "field_names": ["coordinates"], + "field_definition" : [{"field" : "coordinates", "type" : "LatLong"}], + ...more config + } + + +* For **latitude and longitude in separate fields**, include separate `Latitude` and `Longitude` definitions. Internally, these are squished together into a single `LatLong` field, so you'll see them as `__LatLong` when training. + + { + "field_names": ["lat", "lng"], + "field_definition" : [ + {"field" : "lat", "type" : "Latitude"}, + {"field" : "lng", "type" : "Longitude"} + ], + ...more config + } + ## Training The _secret sauce_ of csvdedupe is human input. In order to figure out the best rules to deduplicate a set of data, you must give it a set of labeled examples to learn from. diff --git a/csvdedupe/csvdedupe.py b/csvdedupe/csvdedupe.py index 76ab7da..fdc5892 100644 --- a/csvdedupe/csvdedupe.py +++ b/csvdedupe/csvdedupe.py @@ -66,12 +66,18 @@ def main(self): data_d = {} # import the specified CSV file - data_d = csvhelpers.readData(self.input, self.field_names, delimiter=self.delimiter) + data_d = csvhelpers.readData(input_file=self.input, + field_names=self.field_names, + field_definition=self.field_definition, + delimiter=self.delimiter) + + internal_field_definition = csvhelpers.transformLatLongFieldDefinition( + self.field_definition) logging.info('imported %d rows', len(data_d)) # sanity check for provided field names in CSV file - for field in self.field_definition: + for field in internal_field_definition: if field['type'] != 'Interaction': if not field['field'] in data_d[0]: @@ -79,7 +85,7 @@ def main(self): field['field'] + "' in input") logging.info('using fields: %s' % [field['field'] - for field in self.field_definition]) + for field in internal_field_definition]) # If --skip_training has been selected, and we have a settings cache still # persisting from the last run, use it in this next run. @@ -97,7 +103,7 @@ def main(self): else: # # Create a new deduper object and pass our data model to it. - deduper = dedupe.Dedupe(self.field_definition) + deduper = dedupe.Dedupe(internal_field_definition) fields = {variable.field for variable in deduper.data_model.primary_fields} unique_d, parents = exact_matches(data_d, fields) @@ -161,6 +167,7 @@ def main(self): else : write_function(clustered_dupes, self.input, sys.stdout) + def exact_matches(data_d, match_fields): unique = {} redundant = {} diff --git a/csvdedupe/csvhelpers.py b/csvdedupe/csvhelpers.py index b6be5c7..e875030 100644 --- a/csvdedupe/csvhelpers.py +++ b/csvdedupe/csvhelpers.py @@ -5,6 +5,7 @@ import re import collections import logging +from copy import deepcopy from io import StringIO, open import sys import platform @@ -34,7 +35,7 @@ def preProcess(column): return column -def readData(input_file, field_names, delimiter=',', prefix=None): +def readData(input_file, field_names, field_definition=None, delimiter=',', prefix=None): """ Read in our data from a CSV file and create a dictionary of records, where the key is a unique record ID and each value is a dict @@ -47,18 +48,108 @@ def readData(input_file, field_names, delimiter=',', prefix=None): data = {} - reader = csv.DictReader(StringIO(input_file),delimiter=delimiter) + reader = csv.DictReader(StringIO(input_file), delimiter=delimiter) for i, row in enumerate(reader): - clean_row = {k: preProcess(v) for (k, v) in row.items() if k is not None} - if prefix: - row_id = u"%s|%s" % (prefix, i) - else: - row_id = i - data[row_id] = clean_row + row_id = u"%s|%s" % (prefix, i) if prefix else i + data[row_id] = parseLatLongFields( + {k: preProcess(v) for (k, v) in row.items() if k is not None}, + field_definition) return data +def parseLatLongFields(row, field_definition): + """ + Parse any fields defined as LatLong, LongLat, Latitude, or Longitude + in the field definition into the format that dedupe expects them. + + LongLat and LatLong fields can be separated by any non-number character(s), + for example: `-122,46`, `-122, 46`, `-122 46`, `-122;46`. + + Latitude and Longitude fields are removed from the row and converted to + a single LatLong column that we pass to dedupe. + """ + + if not field_definition: + return row + + has_latitude_field = False + has_longitude_field = False + + latitude = None + longitude = None + + for field in field_definition: + # Both lat and long in the same field, in either order + if field['type'] == 'LatLong' or field['type'] == 'LongLat': + try: + # Split the field by anything other than the characters + # we'd expect to find in a number (0-9, -, .) + row[field['field']] = tuple([ + float(l) for l in + re.split(r'[^-.0-9]+', row[field['field']])]) + + # Flip the order if LongLat was specified + if field['type'] == 'LongLat': + row[field['field']] = row[field['field']][::-1] + + except ValueError: # Thrown if float() fails + row[field['field']] = None + + elif field['type'] == 'Latitude': + has_latitude_field = True + if field['field'] in row: + try: + latitude = float(row.pop(field['field'])) + except ValueError: + latitude = None + + elif field['type'] == 'Longitude' and field['field'] in row: + has_longitude_field = True + if field['field'] in row: + try: + longitude = float(row.pop(field['field'])) + except ValueError: + longitude = None + + if has_latitude_field and has_longitude_field: + if latitude and longitude: + row['__LatLong'] = (latitude, longitude) + else: + row['__LatLong'] = None + + return row + + +def transformLatLongFieldDefinition(field_definition): + """ + Converts the custom LongLat, Latitude, and Longitude field types + used in the csvdedupe config to the single LatLong field type that + dedupe itself expects. + + Works in concert with csvhelpers.parseLatLongFields() + """ + + converted_defs = deepcopy(field_definition) + lat_long_indicies = [] + + for i, field in enumerate(converted_defs): + if field['type'] == 'LongLat': + field['type'] = 'LatLong' + elif field['type'] == 'Latitude' or field['type'] == 'Longitude': + lat_long_indicies.append(i) + + if len(lat_long_indicies) == 2: + for i in sorted(lat_long_indicies, reverse=True): + del converted_defs[i] + + converted_defs.append({ + 'field': '__LatLong', + 'type': 'LatLong'}) + + return converted_defs + + # ## Writing results def writeResults(clustered_dupes, input_file, output_file): diff --git a/csvdedupe/csvlink.py b/csvdedupe/csvlink.py index 1c6ec3a..308c9f6 100644 --- a/csvdedupe/csvlink.py +++ b/csvdedupe/csvlink.py @@ -70,12 +70,19 @@ def main(self): data_2 = {} # import the specified CSV file - data_1 = csvhelpers.readData(self.input_1, self.field_names_1, - delimiter=self.delimiter, - prefix='input_1') - data_2 = csvhelpers.readData(self.input_2, self.field_names_2, - delimiter=self.delimiter, - prefix='input_2') + data_1 = csvhelpers.readData(input_file=self.input_1, + field_names=self.field_names_1, + field_definition=self.field_definition, + delimiter=self.delimiter, + prefix='input_1') + data_2 = csvhelpers.readData(input_file=self.input_2, + field_names=self.field_names_2, + field_definition=self.field_definition, + delimiter=self.delimiter, + prefix='input_2') + + internal_field_definition = csvhelpers.transformLatLongFieldDefinition( + self.field_definition) # sanity check for provided field names in CSV file for field in self.field_names_1: @@ -100,7 +107,7 @@ def main(self): logging.info('imported %d rows from file 2', len(data_2)) logging.info('using fields: %s' % [field['field'] - for field in self.field_definition]) + for field in internal_field_definition]) # If --skip_training has been selected, and we have a settings cache still # persisting from the last run, use it in this next run. @@ -122,7 +129,7 @@ def main(self): else: # # Create a new deduper object and pass our data model to it. - deduper = dedupe.RecordLink(self.field_definition) + deduper = dedupe.RecordLink(internal_field_definition) fields = {variable.field for variable in deduper.data_model.primary_fields} (nonexact_1,