SentenceTransmorgrifier / transmorgrify.py
Joshua Lansford
Fix Target contains only one unique value error (Issue #1)
2f7253d
raw
history blame
27.7 kB
#!/usr/bin/env python3
import argparse
import json
import os
import zipfile
import pandas as pd
from catboost import CatBoostClassifier, Pool
MATCH = 0
DELETE_FROM = 1
INSERT_TO = 2
START = 3
FILE_VERSION = 1
class Transmorgrifier:
def train( self, from_sentences, to_sentences, iterations = 4000, device = 'cpu', trailing_context = 7, leading_context = 7, verbose=True ):
"""
Train the Transmorgrifier model. This does not save it to disk but just trains in memory.
Keyword arguments:
from_sentences -- An array of strings for the input sentences.
to_sentences -- An array of strings of the same length as from_sentences which the model is to train to convert to.
iterations -- An integer specifying the number of iterations to convert from or to. (default 4000)
device -- The gpu reference which catboost wants or "cpu". (default cpu)
trailing_context -- The number of characters after the action point to include for context. (default 7)
leading_context -- The number of characters before the action point to include for context. (default 7)
verbose -- Increased the amount of text output during training. (default True)
"""
X,Y = _parse_for_training( from_sentences, to_sentences, num_pre_context_chars=leading_context, num_post_context_chars=trailing_context )
#train and save the action_model
self.action_model = _train_catboost( X, Y['action'], iterations, verbose=verbose, device=device, model_piece='action' )
#and the char model
#slice through where only the action is insert.
insert_indexes = Y['action'] == INSERT_TO
#if there is only one char to insert, we can't train the second model and need to handle that as a boundary case.
if Y['char'][insert_indexes].nunique() > 1:
self.char_model = _train_catboost( X[insert_indexes], Y['char'][insert_indexes], iterations, verbose=verbose, device=device, model_piece='char' )
self.constant_output = None
else:
self.char_model = None
if Y['char'][insert_indexes].nunique() == 1:
self.constant_output = Y['char'][insert_indexes].unique()[0]
else:
#If there is never an insertion handle it as always inserting a space,
#because it will never insert, but it handles the boundary case so the saving and loading code works.
self.constant_output = ' '
self.trailing_context = trailing_context
self.leading_context = leading_context
self.iterations = iterations
return self
def save( self, model='my_model.tm' ):
"""
Saves the model previously trained with train to a specified model file.
Keyword arguments:
model -- The pathname to save the model such as "my_model.tm" (default my_model.tm)
"""
self.name = model
with zipfile.ZipFile( model, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as my_zip:
with my_zip.open( 'params.json', mode='w' ) as out:
params = {
'version': FILE_VERSION,
'leading_context': self.leading_context,
'trailing_context': self.trailing_context,
'iterations': self.iterations,
}
if self.constant_output is not None:
params['constant_output'] = self.constant_output
out.write( json.dumps(params).encode())
temp_filename = _mktemp()
self.action_model.save_model( temp_filename )
my_zip.write( temp_filename, "action.cb" )
if not self.char_model is None:
self.char_model.save_model( temp_filename )
my_zip.write( temp_filename, "char.cb" )
os.unlink( temp_filename )
return self
def load( self, model='my_model.tm' ):
"""
Loads the model previously saved from the file system.
Keyword arguments:
model -- The filename of the model to load. (default my_model.tm)
"""
self.name = model
with zipfile.ZipFile( model, mode='r' ) as zip:
with zip.open( 'params.json' ) as f_in:
params = json.loads( f_in.read().decode() )
if params['version'] > FILE_VERSION: raise Exception( f"Version {params['version']} greater than {FILE_VERSION}" )
self.leading_context = int(params['leading_context'])
self.trailing_context = int(params['trailing_context'])
self.iterations = int(params['iterations'])
temp_filename = _mktemp()
with zip.open( 'action.cb' ) as f_in:
with open( temp_filename, "wb" ) as f_out:
f_out.write( f_in.read() )
self.action_model = CatBoostClassifier().load_model( temp_filename )
if 'constant_output' not in params:
with zip.open( 'char.cb' ) as f_in:
with open( temp_filename, "wb" ) as f_out:
f_out.write( f_in.read() )
self.char_model = CatBoostClassifier().load_model( temp_filename )
self.constant_output = None
else:
self.constant_output = params['constant_output']
self.char_model = None
os.unlink( temp_filename)
return self
def execute( self, from_sentences, verbose=False ):
"""
Runs the data from from_sentences. The results are returned
using yield so you need to wrap this in list() if you want
to index it. from_sentences can be an array or a generator.
Keyword arguments:
from_sentences -- Something iterable which returns strings.
"""
for i,from_sentence in enumerate(from_sentences):
yield _do_reconstruct(
action_model=self.action_model,
char_model=self.char_model,
constant_output=self.constant_output,
text=from_sentence,
num_pre_context_chars=self.leading_context,
num_post_context_chars=self.trailing_context )
if verbose and i % 10 == 0:
print( f"{i} of {len(from_sentences)}" )
def demo( self, share=False ):
import gradio as gr
def gradio_function( text ):
return list(self.execute( [text] ))[0]
with gr.Blocks() as demo:
name = gr.Markdown( self.name )
inp = gr.Textbox( label="Input" )
out = gr.Textbox( label="Output" )
inp.change( gradio_function, inputs=[inp], outputs=[out] )
demo.launch( share=share )
def _list_trace( trace ):
if trace.parent is None:
result = [trace]
else:
result = _list_trace( trace.parent )
result.append( trace )
return result
class _edit_trace_hop():
parent = None
edit_distance = None
char = None
from_row_i = None
to_column_i = None
action = None
def __str__( self ):
if self.action == START:
return "<start>"
elif self.action == INSERT_TO:
return f"<ins> {self.char}"
elif self.action == DELETE_FROM:
return f"<del> {self.char}"
elif self.action == MATCH:
return f"<match> {self.char}"
return "eh?"
def __repr__( self ):
return self.__str__()
def _trace_edits( from_sentence, to_sentence, print_debug=False ):
#iterating from will be the rows down the left side.
#iterating to will be the columns across the top.
#we will keep one row as we work on the next.
last_row = None
current_row = []
#the index handles one before the index in the string
#to handle the root cases across the top and down the left of the
#match matrix.
for from_row_i in range( len(from_sentence)+1 ):
for to_column_i in range( len(to_sentence )+1 ):
best_option = None
#root case.
if from_row_i == 0 and to_column_i == 0:
best_option = _edit_trace_hop()
best_option.parent = None
best_option.edit_distance = 0
best_option.char = ""
best_option.from_row_i = from_row_i
best_option.to_column_i = to_column_i
best_option.action = START
#check left
if to_column_i > 0:
if best_option is None or current_row[to_column_i-1].edit_distance + 1 < best_option.edit_distance:
best_option = _edit_trace_hop()
best_option.parent = current_row[to_column_i-1]
best_option.edit_distance = best_option.parent.edit_distance + 1
best_option.char = to_sentence[to_column_i-1]
best_option.from_row_i = from_row_i
best_option.to_column_i = to_column_i
best_option.action = INSERT_TO
#check up
if from_row_i > 0:
if best_option is None or last_row[to_column_i].edit_distance + 1 < best_option.edit_distance:
best_option = _edit_trace_hop()
best_option.parent = last_row[to_column_i]
best_option.edit_distance = best_option.parent.edit_distance + 1
best_option.char = from_sentence[from_row_i-1]
best_option.from_row_i = from_row_i
best_option.to_column_i = to_column_i
best_option.action = DELETE_FROM
#check match
if to_column_i > 0:
if to_sentence[to_column_i-1] == from_sentence[from_row_i-1]:
if best_option is None or last_row[to_column_i-1].edit_distance <= best_option.edit_distance: #prefer match so use <= than <
best_option = _edit_trace_hop()
best_option.parent = last_row[to_column_i-1]
best_option.edit_distance = best_option.parent.edit_distance + 1
best_option.char = from_sentence[from_row_i-1]
best_option.from_row_i = from_row_i
best_option.to_column_i = to_column_i
best_option.action = MATCH
if best_option is None: raise Exception( "Shouldn't end up with best_option being None" )
current_row.append(best_option)
last_row = current_row
current_row = []
if print_debug:
def print_diffs( current_node ):
if current_node.parent is not None:
print_diffs( current_node.parent )
if current_node.action == START:
print( "start" )
elif current_node.action == MATCH:
print( f"match {current_node.char}" )
elif current_node.action == INSERT_TO:
print( f"insert {current_node.char}" )
elif current_node.action == DELETE_FROM:
print( f"del {current_node.char}" )
print_diffs( last_row[-1] )
return last_row[-1]
def _parse_single_for_training( from_sentence, to_sentence, num_pre_context_chars, num_post_context_chars ):
trace = _trace_edits( from_sentence, to_sentence )
#we will collect a snapshot at each step.
trace_list = _list_trace(trace)
training_collection = []
#execute these things on the from_sentence and see if we get the to_sentence.
working_from = from_sentence
working_to = ""
used_from = ""
continuous_added = 0
continuous_dropped = 0
for thing in trace_list:
#gather action and context for training
if thing.action != START:
from_context = (working_from + (" " * num_post_context_chars))[:num_post_context_chars]
to_context = ((" " * num_pre_context_chars) + working_to )[-num_pre_context_chars:]
used_context = ((" " * num_pre_context_chars) + used_from )[-num_pre_context_chars:]
training_collection.append({
"from_context": from_context,
"to_context": to_context,
"used_context": used_context,
"action": thing.action,
"continuous_added": continuous_added,
"continuous_dropped": continuous_dropped,
"char": thing.char if thing.action == INSERT_TO else ' ',
})
#now execute the action for the next step.
if thing.action == START:
pass
elif thing.action == INSERT_TO:
working_to += thing.char
continuous_added += 1
continuous_dropped = 0
elif thing.action == DELETE_FROM:
used_from += working_from[0]
working_from = working_from[1:]
continuous_added = 0
continuous_dropped += 1
elif thing.action == MATCH:
used_from += working_from[0]
working_to += working_from[0]
working_from = working_from[1:]
continuous_added = 0
continuous_dropped = 0
if to_sentence != working_to:
print( "Replay failure" )
#so now I have training_collection which is a list of dictionaries where each dictionary is an action with a context.
#I need to change it into a dictionary of lists where each dictionary a column and the lists are the rows.
context_split_into_dict = {}
#first collect the from_context:
for i in range( num_post_context_chars ):
this_slice = []
for training in training_collection:
this_slice.append( training['from_context'][i] )
context_split_into_dict[ f"f{i}" ] = this_slice
#now collect to_context:
for i in range( num_pre_context_chars ):
this_slice = []
for training in training_collection:
this_slice.append( training['to_context'][i] )
context_split_into_dict[ f"t{i}" ] = this_slice
#now collect used_context
for i in range( num_pre_context_chars ):
this_slice = []
for training in training_collection:
this_slice.append( training['used_context'][i] )
context_split_into_dict[ f"u{i}" ] = this_slice
#now these two things.
context_split_into_dict["continuous_added"] = []
context_split_into_dict["continuous_dropped"] = []
for training in training_collection:
context_split_into_dict["continuous_added"].append( training["continuous_added"] )
context_split_into_dict["continuous_dropped"].append( training["continuous_dropped"] )
#now also collect the output answers.
result_split_into_dict = {}
action_slice = []
char_slice = []
for training in training_collection:
action_slice.append( training['action'] )
char_slice.append( training['char'] )
result_split_into_dict['action'] = action_slice
result_split_into_dict['char'] = char_slice
#now return it as a data_frame.
return pd.DataFrame( context_split_into_dict ), pd.DataFrame( result_split_into_dict )
def _parse_for_training( from_sentences, to_sentences, num_pre_context_chars, num_post_context_chars ):
out_observations_list = []
out_results_list = []
for index, (from_sentence, to_sentence) in enumerate(zip( from_sentences, to_sentences )):
if type(from_sentence) != float and type(to_sentence) != float: #bad lines are nan which are floats.
specific_observation, specific_result = _parse_single_for_training( from_sentence, to_sentence, num_pre_context_chars=num_pre_context_chars, num_post_context_chars=num_post_context_chars )
out_observations_list.append( specific_observation )
out_results_list.append( specific_result )
if index % 100 == 0:
print( f"parsing {index} of {len(from_sentences)}")
return pd.concat( out_observations_list ), pd.concat( out_results_list )
def _train_catboost( X, y, iterations, device, verbose, model_piece, learning_rate = .07 ):
X = X.fillna( ' ' )
passed = False
while not passed:
train_pool = Pool(
data=X,
label=y,
cat_features=[i for i,x in enumerate(X.keys()) if x[0] in ['f','t','u']] #watchout if another field is added that it doesn't start with one of these.
)
validation_pool = None #Can't use validation pool because it randomly has chars not in training.
model = CatBoostClassifier(
iterations = iterations,
learning_rate = learning_rate,
task_type="GPU" if device.lower() != 'cpu' else "CPU",
devices=device if device.lower() != 'cpu' else None
)
model.fit( train_pool, eval_set=validation_pool, verbose=True )
passed = True
if( verbose ): print( '{} is fitted: {}'.format(model_piece,model.is_fitted()))
if( verbose ): print( '{} params:\n{}'.format(model_piece,model.get_params()))
return model
def _mktemp():
#I know mktemp exists in the library but it has been deprecated suggesting using
#mkstemp but catboost can't write to a file handle yet, so I need an actual
#filename.
number = 0
while os.path.exists( f".temp_{number}~" ):
number += 1
return f".temp_{number}~"
def predict_wrapper( model, model_input ):
#Big hack. Catboost has shown itself to be unstable on producing
#either a single value or an array with a single value in it.
#I traced it back to the saved model, and then the model to what
#data it is trained on. But I could figure out what it was
#in the data which would make the saved model be one way or the other
#so I am going to use the results this way so that it works either way.
result = model.predict( model_input )[0]
try:
result = result[0]
except:
pass
return result
def _do_reconstruct( action_model, char_model, constant_output, text, num_pre_context_chars, num_post_context_chars ):
#test for nan.
if text != text: text = ''
working_from = text
working_to = ""
used_from = ""
continuous_added = 0
continuous_dropped = 0
while working_from and len(working_to) < 3*len(text) and (len(working_to) < 5 or working_to[-5:] != (working_to[-1] * 5)):
from_context = (working_from + (" " * num_post_context_chars))[:num_post_context_chars]
to_context = ((" " * num_pre_context_chars) + working_to )[-num_pre_context_chars:]
used_context = ((" " * num_pre_context_chars) + used_from )[-num_pre_context_chars:]
#construct the context.
context_as_dictionary = {}
#from_context
for i in range( num_post_context_chars ):
context_as_dictionary[ f"f{i}" ] = [from_context[i]]
#to_context
for i in range( num_pre_context_chars ):
context_as_dictionary[ f"t{i}" ] = [to_context[i]]
#used_context
for i in range( num_pre_context_chars ):
context_as_dictionary[ f"u{i}" ] = [used_context[i]]
#these two things.
context_as_dictionary["continuous_added"] = [continuous_added]
context_as_dictionary["continuous_dropped"] = [continuous_dropped]
#make it a pandas.
context_as_pd = pd.DataFrame( context_as_dictionary )
#run the model
action_model_result = predict_wrapper(action_model,context_as_pd )
#stop run away. If we have added more chars then our context, nothing is going to change.
if action_model_result == INSERT_TO and continuous_added >= num_post_context_chars:
#I can set this to MATCH or DELETE_FROM, but it is already a wreck, lets just see what happens with this.
action_model_result = MATCH
if action_model_result == START:
pass
elif action_model_result == INSERT_TO:
if constant_output is None:
#for an insert ask the char model what to insert
char_model_result = predict_wrapper(char_model, context_as_pd )
else:
char_model_result = constant_output
working_to += char_model_result
continuous_added += 1
continuous_dropped = 0
elif action_model_result == DELETE_FROM:
used_from += working_from[0]
working_from = working_from[1:]
continuous_added = 0
continuous_dropped += 1
elif action_model_result == MATCH:
used_from += working_from[0]
working_to += working_from[0]
working_from = working_from[1:]
continuous_added = 0
continuous_dropped = 0
return working_to
#edit distance from https://stackoverflow.com/a/32558749/1419054
def _levenshteinDistance(s1, s2):
if s1 != s1: s1 = ''
if s2 != s2: s2 = ''
if len(s1) > len(s2):
s1, s2 = s2, s1
distances = range(len(s1) + 1)
for i2, c2 in enumerate(s2):
distances_ = [i2+1]
for i1, c1 in enumerate(s1):
if c1 == c2:
distances_.append(distances[i1])
else:
distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
distances = distances_
return distances[-1]
def train( in_csv, a_header, b_header, model, iterations, device, leading_context, trailing_context, train_percentage, verbose ):
if verbose: print( "loading csv" )
full_data = pd.read_csv( in_csv )
split_index = int( train_percentage/100*len(full_data) )
train_data = full_data.iloc[:split_index,:].reset_index(drop=True)
if verbose: print( "parsing data for training" )
tm = Transmorgrifier()
tm.train( from_sentences=train_data[a_header],
to_sentences=train_data[b_header],
iterations = iterations,
device = device,
leading_context = leading_context,
trailing_context = trailing_context,
verbose=verbose,
)
tm.save( model )
def execute( include_stats, in_csv, out_csv, a_header, b_header, model, execute_percentage, verbose ):
if verbose: print( "loading csv" )
full_data = pd.read_csv( in_csv )
split_index = int( (100-execute_percentage)/100*len(full_data) )
execute_data = full_data.iloc[split_index:,:].reset_index(drop=True)
tm = Transmorgrifier()
tm.load( model )
results = list(tm.execute( execute_data[a_header ], verbose=verbose ))
if include_stats:
before_edit_distances = []
after_edit_distances = []
percent_improvement = []
for row in range(len( execute_data )):
before_edit_distances.append(
_levenshteinDistance( execute_data[a_header][row], execute_data[b_header][row] )
)
after_edit_distances.append(
_levenshteinDistance( results[row], execute_data[b_header][row] )
)
percent_improvement.append(
100*(before_edit_distances[row] - after_edit_distances[row])/max(1,before_edit_distances[row])
)
pd_results = pd.DataFrame( {
"in_data": execute_data[a_header],
"out_data": execute_data[b_header],
"generated_data": results,
"before_edit_distance": before_edit_distances,
"after_edit_distance": after_edit_distances,
"percent_improvement": percent_improvement,
})
pd_results.to_csv( out_csv )
else:
pd_results = pd.DataFrame( {
"out_data": execute_data[b_header],
})
pd_results.to_csv( out_csv )
def safe_float( str ):
if str is not None:
return float(str)
return None #explicit None return.
def main():
parser = argparse.ArgumentParser(
prog = 'transmorgrify.py',
description = 'Converts text from one to another according to a model.',
epilog = '(C) Joshua Lansford')
parser.add_argument('-t', '--train', action='store_true', help='Train a model instead of executing a model')
parser.add_argument('-e', '--execute', action='store_true', help='Use an existing trained model.')
parser.add_argument('-g', '--gradio', action='store_true', help='Start a gradio demo with the selected model.' )
parser.add_argument('-s', '--share', action='store_true', help="Share the gradio app with a temporary public URL." )
parser.add_argument('-i', '--in_csv', help='The csv to read training or input data from', default='in.csv' )
parser.add_argument('-o', '--out_csv', help='The csv to write conversion to', default='out.csv' )
parser.add_argument('-a', '--a_header', help='The column header for training or transforming from', default="source" )
parser.add_argument('-b', '--b_header', help='The column header for training the transformation to', default="target" )
parser.add_argument('-m', '--model',help='The model file to create during training or use during transformation', default='model.tm' )
parser.add_argument('-n', '--iterations', help='The number of iterations to train', default=2000 )
parser.add_argument('-d', '--device', help='Which device, i.e. if using GPU', default='cpu' )
parser.add_argument('-x', '--context', help='The number of leading and trailing chars to use as context', default=7 )
parser.add_argument('-p', '--train_percentage', help="The percentage of data to train on, leaving the rest for testing.")
parser.add_argument('-v', '--verbose', action='store_true', help='Talks alot?' )
parser.add_argument('-c', '--include_stats', action='store_true', help='Use b_header to compute stats and add to output csv.')
args = parser.parse_args()
if not args.train and not args.execute and not args.gradio: print( "Must include --execute, --train and/or --gradio to do something." )
if args.train:
train_percentage = safe_float(args.train_percentage)
if train_percentage is None:
if args.execute:
train_percentage = 50
else:
train_percentage = 100
train( in_csv=args.in_csv,
a_header=args.a_header,
b_header=args.b_header,
model=args.model,
iterations=int(args.iterations),
device=args.device,
leading_context=int(args.context),
trailing_context=int(args.context),
train_percentage=train_percentage,
verbose=args.verbose,
)
if args.execute:
if args.train_percentage is None:
if args.train:
execute_percentage = 50
else:
execute_percentage = 100
else:
execute_percentage = 100-safe_float(args.train_percentage)
execute(
include_stats=args.include_stats,
in_csv=args.in_csv,
out_csv=args.out_csv,
a_header=args.a_header,
b_header=args.b_header,
model=args.model,
execute_percentage=execute_percentage,
verbose=args.verbose,
)
if args.gradio:
tm = Transmorgrifier()
tm.load( args.model )
tm.demo( share=args.share )
if __name__ == '__main__':
main()