#!/usr/bin/env python3 import argparse import json import os import zipfile import pandas as pd from catboost import CatBoostClassifier, Pool MATCH = 0 DELETE_FROM = 1 INSERT_TO = 2 START = 3 FILE_VERSION = 1 class Transmorgrifier: def train( self, from_sentences, to_sentences, iterations = 4000, device = 'cpu', trailing_context = 7, leading_context = 7, verbose=True ): """ Train the Transmorgrifier model. This does not save it to disk but just trains in memory. Keyword arguments: from_sentences -- An array of strings for the input sentences. to_sentences -- An array of strings of the same length as from_sentences which the model is to train to convert to. iterations -- An integer specifying the number of iterations to convert from or to. (default 4000) device -- The gpu reference which catboost wants or "cpu". (default cpu) trailing_context -- The number of characters after the action point to include for context. (default 7) leading_context -- The number of characters before the action point to include for context. (default 7) verbose -- Increased the amount of text output during training. (default True) """ X,Y = _parse_for_training( from_sentences, to_sentences, num_pre_context_chars=leading_context, num_post_context_chars=trailing_context ) #train and save the action_model self.action_model = _train_catboost( X, Y['action'], iterations, verbose=verbose, device=device, model_piece='action' ) #and the char model #slice through where only the action is insert. insert_indexes = Y['action'] == INSERT_TO #if there is only one char to insert, we can't train the second model and need to handle that as a boundary case. if Y['char'][insert_indexes].nunique() > 1: self.char_model = _train_catboost( X[insert_indexes], Y['char'][insert_indexes], iterations, verbose=verbose, device=device, model_piece='char' ) self.constant_output = None else: self.char_model = None if Y['char'][insert_indexes].nunique() == 1: self.constant_output = Y['char'][insert_indexes].unique()[0] else: #If there is never an insertion handle it as always inserting a space, #because it will never insert, but it handles the boundary case so the saving and loading code works. self.constant_output = ' ' self.trailing_context = trailing_context self.leading_context = leading_context self.iterations = iterations return self def save( self, model='my_model.tm' ): """ Saves the model previously trained with train to a specified model file. Keyword arguments: model -- The pathname to save the model such as "my_model.tm" (default my_model.tm) """ self.name = model with zipfile.ZipFile( model, mode="w", compression=zipfile.ZIP_DEFLATED, compresslevel=9 ) as my_zip: with my_zip.open( 'params.json', mode='w' ) as out: params = { 'version': FILE_VERSION, 'leading_context': self.leading_context, 'trailing_context': self.trailing_context, 'iterations': self.iterations, } if self.constant_output is not None: params['constant_output'] = self.constant_output out.write( json.dumps(params).encode()) temp_filename = _mktemp() self.action_model.save_model( temp_filename ) my_zip.write( temp_filename, "action.cb" ) if not self.char_model is None: self.char_model.save_model( temp_filename ) my_zip.write( temp_filename, "char.cb" ) os.unlink( temp_filename ) return self def load( self, model='my_model.tm' ): """ Loads the model previously saved from the file system. Keyword arguments: model -- The filename of the model to load. (default my_model.tm) """ self.name = model with zipfile.ZipFile( model, mode='r' ) as zip: with zip.open( 'params.json' ) as f_in: params = json.loads( f_in.read().decode() ) if params['version'] > FILE_VERSION: raise Exception( f"Version {params['version']} greater than {FILE_VERSION}" ) self.leading_context = int(params['leading_context']) self.trailing_context = int(params['trailing_context']) self.iterations = int(params['iterations']) temp_filename = _mktemp() with zip.open( 'action.cb' ) as f_in: with open( temp_filename, "wb" ) as f_out: f_out.write( f_in.read() ) self.action_model = CatBoostClassifier().load_model( temp_filename ) if 'constant_output' not in params: with zip.open( 'char.cb' ) as f_in: with open( temp_filename, "wb" ) as f_out: f_out.write( f_in.read() ) self.char_model = CatBoostClassifier().load_model( temp_filename ) self.constant_output = None else: self.constant_output = params['constant_output'] self.char_model = None os.unlink( temp_filename) return self def execute( self, from_sentences, verbose=False ): """ Runs the data from from_sentences. The results are returned using yield so you need to wrap this in list() if you want to index it. from_sentences can be an array or a generator. Keyword arguments: from_sentences -- Something iterable which returns strings. """ for i,from_sentence in enumerate(from_sentences): yield _do_reconstruct( action_model=self.action_model, char_model=self.char_model, constant_output=self.constant_output, text=from_sentence, num_pre_context_chars=self.leading_context, num_post_context_chars=self.trailing_context ) if verbose and i % 10 == 0: print( f"{i} of {len(from_sentences)}" ) def demo( self, share=False ): import gradio as gr def gradio_function( text ): return list(self.execute( [text] ))[0] with gr.Blocks() as demo: name = gr.Markdown( self.name ) inp = gr.Textbox( label="Input" ) out = gr.Textbox( label="Output" ) inp.change( gradio_function, inputs=[inp], outputs=[out] ) demo.launch( share=share ) def _list_trace( trace ): if trace.parent is None: result = [trace] else: result = _list_trace( trace.parent ) result.append( trace ) return result class _edit_trace_hop(): parent = None edit_distance = None char = None from_row_i = None to_column_i = None action = None def __str__( self ): if self.action == START: return "" elif self.action == INSERT_TO: return f" {self.char}" elif self.action == DELETE_FROM: return f" {self.char}" elif self.action == MATCH: return f" {self.char}" return "eh?" def __repr__( self ): return self.__str__() def _trace_edits( from_sentence, to_sentence, print_debug=False ): #iterating from will be the rows down the left side. #iterating to will be the columns across the top. #we will keep one row as we work on the next. last_row = None current_row = [] #the index handles one before the index in the string #to handle the root cases across the top and down the left of the #match matrix. for from_row_i in range( len(from_sentence)+1 ): for to_column_i in range( len(to_sentence )+1 ): best_option = None #root case. if from_row_i == 0 and to_column_i == 0: best_option = _edit_trace_hop() best_option.parent = None best_option.edit_distance = 0 best_option.char = "" best_option.from_row_i = from_row_i best_option.to_column_i = to_column_i best_option.action = START #check left if to_column_i > 0: if best_option is None or current_row[to_column_i-1].edit_distance + 1 < best_option.edit_distance: best_option = _edit_trace_hop() best_option.parent = current_row[to_column_i-1] best_option.edit_distance = best_option.parent.edit_distance + 1 best_option.char = to_sentence[to_column_i-1] best_option.from_row_i = from_row_i best_option.to_column_i = to_column_i best_option.action = INSERT_TO #check up if from_row_i > 0: if best_option is None or last_row[to_column_i].edit_distance + 1 < best_option.edit_distance: best_option = _edit_trace_hop() best_option.parent = last_row[to_column_i] best_option.edit_distance = best_option.parent.edit_distance + 1 best_option.char = from_sentence[from_row_i-1] best_option.from_row_i = from_row_i best_option.to_column_i = to_column_i best_option.action = DELETE_FROM #check match if to_column_i > 0: if to_sentence[to_column_i-1] == from_sentence[from_row_i-1]: if best_option is None or last_row[to_column_i-1].edit_distance <= best_option.edit_distance: #prefer match so use <= than < best_option = _edit_trace_hop() best_option.parent = last_row[to_column_i-1] best_option.edit_distance = best_option.parent.edit_distance + 1 best_option.char = from_sentence[from_row_i-1] best_option.from_row_i = from_row_i best_option.to_column_i = to_column_i best_option.action = MATCH if best_option is None: raise Exception( "Shouldn't end up with best_option being None" ) current_row.append(best_option) last_row = current_row current_row = [] if print_debug: def print_diffs( current_node ): if current_node.parent is not None: print_diffs( current_node.parent ) if current_node.action == START: print( "start" ) elif current_node.action == MATCH: print( f"match {current_node.char}" ) elif current_node.action == INSERT_TO: print( f"insert {current_node.char}" ) elif current_node.action == DELETE_FROM: print( f"del {current_node.char}" ) print_diffs( last_row[-1] ) return last_row[-1] def _parse_single_for_training( from_sentence, to_sentence, num_pre_context_chars, num_post_context_chars ): trace = _trace_edits( from_sentence, to_sentence ) #we will collect a snapshot at each step. trace_list = _list_trace(trace) training_collection = [] #execute these things on the from_sentence and see if we get the to_sentence. working_from = from_sentence working_to = "" used_from = "" continuous_added = 0 continuous_dropped = 0 for thing in trace_list: #gather action and context for training if thing.action != START: from_context = (working_from + (" " * num_post_context_chars))[:num_post_context_chars] to_context = ((" " * num_pre_context_chars) + working_to )[-num_pre_context_chars:] used_context = ((" " * num_pre_context_chars) + used_from )[-num_pre_context_chars:] training_collection.append({ "from_context": from_context, "to_context": to_context, "used_context": used_context, "action": thing.action, "continuous_added": continuous_added, "continuous_dropped": continuous_dropped, "char": thing.char if thing.action == INSERT_TO else ' ', }) #now execute the action for the next step. if thing.action == START: pass elif thing.action == INSERT_TO: working_to += thing.char continuous_added += 1 continuous_dropped = 0 elif thing.action == DELETE_FROM: used_from += working_from[0] working_from = working_from[1:] continuous_added = 0 continuous_dropped += 1 elif thing.action == MATCH: used_from += working_from[0] working_to += working_from[0] working_from = working_from[1:] continuous_added = 0 continuous_dropped = 0 if to_sentence != working_to: print( "Replay failure" ) #so now I have training_collection which is a list of dictionaries where each dictionary is an action with a context. #I need to change it into a dictionary of lists where each dictionary a column and the lists are the rows. context_split_into_dict = {} #first collect the from_context: for i in range( num_post_context_chars ): this_slice = [] for training in training_collection: this_slice.append( training['from_context'][i] ) context_split_into_dict[ f"f{i}" ] = this_slice #now collect to_context: for i in range( num_pre_context_chars ): this_slice = [] for training in training_collection: this_slice.append( training['to_context'][i] ) context_split_into_dict[ f"t{i}" ] = this_slice #now collect used_context for i in range( num_pre_context_chars ): this_slice = [] for training in training_collection: this_slice.append( training['used_context'][i] ) context_split_into_dict[ f"u{i}" ] = this_slice #now these two things. context_split_into_dict["continuous_added"] = [] context_split_into_dict["continuous_dropped"] = [] for training in training_collection: context_split_into_dict["continuous_added"].append( training["continuous_added"] ) context_split_into_dict["continuous_dropped"].append( training["continuous_dropped"] ) #now also collect the output answers. result_split_into_dict = {} action_slice = [] char_slice = [] for training in training_collection: action_slice.append( training['action'] ) char_slice.append( training['char'] ) result_split_into_dict['action'] = action_slice result_split_into_dict['char'] = char_slice #now return it as a data_frame. return pd.DataFrame( context_split_into_dict ), pd.DataFrame( result_split_into_dict ) def _parse_for_training( from_sentences, to_sentences, num_pre_context_chars, num_post_context_chars ): out_observations_list = [] out_results_list = [] for index, (from_sentence, to_sentence) in enumerate(zip( from_sentences, to_sentences )): if type(from_sentence) != float and type(to_sentence) != float: #bad lines are nan which are floats. specific_observation, specific_result = _parse_single_for_training( from_sentence, to_sentence, num_pre_context_chars=num_pre_context_chars, num_post_context_chars=num_post_context_chars ) out_observations_list.append( specific_observation ) out_results_list.append( specific_result ) if index % 100 == 0: print( f"parsing {index} of {len(from_sentences)}") return pd.concat( out_observations_list ), pd.concat( out_results_list ) def _train_catboost( X, y, iterations, device, verbose, model_piece, learning_rate = .07 ): X = X.fillna( ' ' ) passed = False while not passed: train_pool = Pool( data=X, label=y, cat_features=[i for i,x in enumerate(X.keys()) if x[0] in ['f','t','u']] #watchout if another field is added that it doesn't start with one of these. ) validation_pool = None #Can't use validation pool because it randomly has chars not in training. model = CatBoostClassifier( iterations = iterations, learning_rate = learning_rate, task_type="GPU" if device.lower() != 'cpu' else "CPU", devices=device if device.lower() != 'cpu' else None ) model.fit( train_pool, eval_set=validation_pool, verbose=True ) passed = True if( verbose ): print( '{} is fitted: {}'.format(model_piece,model.is_fitted())) if( verbose ): print( '{} params:\n{}'.format(model_piece,model.get_params())) return model def _mktemp(): #I know mktemp exists in the library but it has been deprecated suggesting using #mkstemp but catboost can't write to a file handle yet, so I need an actual #filename. number = 0 while os.path.exists( f".temp_{number}~" ): number += 1 return f".temp_{number}~" def predict_wrapper( model, model_input ): #Big hack. Catboost has shown itself to be unstable on producing #either a single value or an array with a single value in it. #I traced it back to the saved model, and then the model to what #data it is trained on. But I could figure out what it was #in the data which would make the saved model be one way or the other #so I am going to use the results this way so that it works either way. result = model.predict( model_input )[0] try: result = result[0] except: pass return result def _do_reconstruct( action_model, char_model, constant_output, text, num_pre_context_chars, num_post_context_chars ): #test for nan. if text != text: text = '' working_from = text working_to = "" used_from = "" continuous_added = 0 continuous_dropped = 0 while working_from and len(working_to) < 3*len(text) and (len(working_to) < 5 or working_to[-5:] != (working_to[-1] * 5)): from_context = (working_from + (" " * num_post_context_chars))[:num_post_context_chars] to_context = ((" " * num_pre_context_chars) + working_to )[-num_pre_context_chars:] used_context = ((" " * num_pre_context_chars) + used_from )[-num_pre_context_chars:] #construct the context. context_as_dictionary = {} #from_context for i in range( num_post_context_chars ): context_as_dictionary[ f"f{i}" ] = [from_context[i]] #to_context for i in range( num_pre_context_chars ): context_as_dictionary[ f"t{i}" ] = [to_context[i]] #used_context for i in range( num_pre_context_chars ): context_as_dictionary[ f"u{i}" ] = [used_context[i]] #these two things. context_as_dictionary["continuous_added"] = [continuous_added] context_as_dictionary["continuous_dropped"] = [continuous_dropped] #make it a pandas. context_as_pd = pd.DataFrame( context_as_dictionary ) #run the model action_model_result = predict_wrapper(action_model,context_as_pd ) #stop run away. If we have added more chars then our context, nothing is going to change. if action_model_result == INSERT_TO and continuous_added >= num_post_context_chars: #I can set this to MATCH or DELETE_FROM, but it is already a wreck, lets just see what happens with this. action_model_result = MATCH if action_model_result == START: pass elif action_model_result == INSERT_TO: if constant_output is None: #for an insert ask the char model what to insert char_model_result = predict_wrapper(char_model, context_as_pd ) else: char_model_result = constant_output working_to += char_model_result continuous_added += 1 continuous_dropped = 0 elif action_model_result == DELETE_FROM: used_from += working_from[0] working_from = working_from[1:] continuous_added = 0 continuous_dropped += 1 elif action_model_result == MATCH: used_from += working_from[0] working_to += working_from[0] working_from = working_from[1:] continuous_added = 0 continuous_dropped = 0 return working_to #edit distance from https://stackoverflow.com/a/32558749/1419054 def _levenshteinDistance(s1, s2): if s1 != s1: s1 = '' if s2 != s2: s2 = '' if len(s1) > len(s2): s1, s2 = s2, s1 distances = range(len(s1) + 1) for i2, c2 in enumerate(s2): distances_ = [i2+1] for i1, c1 in enumerate(s1): if c1 == c2: distances_.append(distances[i1]) else: distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1]))) distances = distances_ return distances[-1] def train( in_csv, a_header, b_header, model, iterations, device, leading_context, trailing_context, train_percentage, verbose ): if verbose: print( "loading csv" ) full_data = pd.read_csv( in_csv ) split_index = int( train_percentage/100*len(full_data) ) train_data = full_data.iloc[:split_index,:].reset_index(drop=True) if verbose: print( "parsing data for training" ) tm = Transmorgrifier() tm.train( from_sentences=train_data[a_header], to_sentences=train_data[b_header], iterations = iterations, device = device, leading_context = leading_context, trailing_context = trailing_context, verbose=verbose, ) tm.save( model ) def execute( include_stats, in_csv, out_csv, a_header, b_header, model, execute_percentage, verbose ): if verbose: print( "loading csv" ) full_data = pd.read_csv( in_csv ) split_index = int( (100-execute_percentage)/100*len(full_data) ) execute_data = full_data.iloc[split_index:,:].reset_index(drop=True) tm = Transmorgrifier() tm.load( model ) results = list(tm.execute( execute_data[a_header ], verbose=verbose )) if include_stats: before_edit_distances = [] after_edit_distances = [] percent_improvement = [] for row in range(len( execute_data )): before_edit_distances.append( _levenshteinDistance( execute_data[a_header][row], execute_data[b_header][row] ) ) after_edit_distances.append( _levenshteinDistance( results[row], execute_data[b_header][row] ) ) percent_improvement.append( 100*(before_edit_distances[row] - after_edit_distances[row])/max(1,before_edit_distances[row]) ) pd_results = pd.DataFrame( { "in_data": execute_data[a_header], "out_data": execute_data[b_header], "generated_data": results, "before_edit_distance": before_edit_distances, "after_edit_distance": after_edit_distances, "percent_improvement": percent_improvement, }) pd_results.to_csv( out_csv ) else: pd_results = pd.DataFrame( { "out_data": execute_data[b_header], }) pd_results.to_csv( out_csv ) def safe_float( str ): if str is not None: return float(str) return None #explicit None return. def main(): parser = argparse.ArgumentParser( prog = 'transmorgrify.py', description = 'Converts text from one to another according to a model.', epilog = '(C) Joshua Lansford') parser.add_argument('-t', '--train', action='store_true', help='Train a model instead of executing a model') parser.add_argument('-e', '--execute', action='store_true', help='Use an existing trained model.') parser.add_argument('-g', '--gradio', action='store_true', help='Start a gradio demo with the selected model.' ) parser.add_argument('-s', '--share', action='store_true', help="Share the gradio app with a temporary public URL." ) parser.add_argument('-i', '--in_csv', help='The csv to read training or input data from', default='in.csv' ) parser.add_argument('-o', '--out_csv', help='The csv to write conversion to', default='out.csv' ) parser.add_argument('-a', '--a_header', help='The column header for training or transforming from', default="source" ) parser.add_argument('-b', '--b_header', help='The column header for training the transformation to', default="target" ) parser.add_argument('-m', '--model',help='The model file to create during training or use during transformation', default='model.tm' ) parser.add_argument('-n', '--iterations', help='The number of iterations to train', default=2000 ) parser.add_argument('-d', '--device', help='Which device, i.e. if using GPU', default='cpu' ) parser.add_argument('-x', '--context', help='The number of leading and trailing chars to use as context', default=7 ) parser.add_argument('-p', '--train_percentage', help="The percentage of data to train on, leaving the rest for testing.") parser.add_argument('-v', '--verbose', action='store_true', help='Talks alot?' ) parser.add_argument('-c', '--include_stats', action='store_true', help='Use b_header to compute stats and add to output csv.') args = parser.parse_args() if not args.train and not args.execute and not args.gradio: print( "Must include --execute, --train and/or --gradio to do something." ) if args.train: train_percentage = safe_float(args.train_percentage) if train_percentage is None: if args.execute: train_percentage = 50 else: train_percentage = 100 train( in_csv=args.in_csv, a_header=args.a_header, b_header=args.b_header, model=args.model, iterations=int(args.iterations), device=args.device, leading_context=int(args.context), trailing_context=int(args.context), train_percentage=train_percentage, verbose=args.verbose, ) if args.execute: if args.train_percentage is None: if args.train: execute_percentage = 50 else: execute_percentage = 100 else: execute_percentage = 100-safe_float(args.train_percentage) execute( include_stats=args.include_stats, in_csv=args.in_csv, out_csv=args.out_csv, a_header=args.a_header, b_header=args.b_header, model=args.model, execute_percentage=execute_percentage, verbose=args.verbose, ) if args.gradio: tm = Transmorgrifier() tm.load( args.model ) tm.demo( share=args.share ) if __name__ == '__main__': main()