DrishtiSharma commited on
Commit
01b82e7
·
verified ·
1 Parent(s): fda35b1

Update mylab/attempt1/preprocess_data.py

Browse files
Files changed (1) hide show
  1. mylab/attempt1/preprocess_data.py +281 -0
mylab/attempt1/preprocess_data.py CHANGED
@@ -0,0 +1,281 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import requests
3
+ import zipfile
4
+ import xml.etree.ElementTree as ET
5
+ from datetime import datetime, timedelta
6
+ import tempfile
7
+ import pickle
8
+
9
+
10
+ def download_weekly_patents(year, month, day, logging):
11
+ """
12
+ Download weekly patent files from the USPTO website based on a specific date.
13
+ Parameters:
14
+ year (int): The year of the patent.
15
+ month (int): The month of the patent.
16
+ day (int): The day of the patent.
17
+ logging (bool): The boolean to print logs
18
+ Returns:
19
+ bool: True if the download is successful, False otherwise.
20
+ """
21
+
22
+ # Check if the "data" folder exists and create one if it doesn't
23
+ data_folder = os.path.join(os.getcwd(), "data")
24
+ if not os.path.exists(data_folder):
25
+ if logging:
26
+ print("Data folder not found. Creating a new 'data' folder.")
27
+ os.makedirs(data_folder)
28
+
29
+ directory = os.path.join(
30
+ os.getcwd(), "data", "ipa" + str(year)[2:] + f"{month:02d}" + f"{day:02d}"
31
+ )
32
+
33
+ if os.path.exists(directory):
34
+ print(f"File {directory} already exists. Skipping download.")
35
+ return True
36
+
37
+ if logging:
38
+ print("Building the URL...")
39
+ base_url = "https://bulkdata.uspto.gov/data/patent/application/redbook/fulltext"
40
+ file_url = (
41
+ base_url
42
+ + "/"
43
+ + str(year)
44
+ + "/ipa"
45
+ + str(year)[2:]
46
+ + f"{month:02d}"
47
+ + f"{day:02d}"
48
+ + ".zip"
49
+ )
50
+
51
+ if logging:
52
+ print(f"URL constructed: {file_url}")
53
+ r = requests.get(file_url, stream=True)
54
+
55
+ if logging:
56
+ print("Requesting the file...")
57
+ if r.status_code == 200:
58
+ if logging:
59
+ print("File retrieved successfully. Starting download...")
60
+ local_path = os.path.join(os.getcwd(), "data", "patents.zip")
61
+
62
+ with open(local_path, "wb") as f:
63
+ for chunk in r.iter_content(chunk_size=1024):
64
+ if chunk:
65
+ f.write(chunk)
66
+ if logging:
67
+ print("File downloaded successfully. Starting extraction...")
68
+ with zipfile.ZipFile(local_path, "r") as zip_ref:
69
+ zip_ref.extractall(os.path.join(os.getcwd(), "data"))
70
+
71
+ if logging:
72
+ print("File extracted successfully.")
73
+ # Deleting the ZIP file after extraction
74
+ os.remove(local_path)
75
+ if logging:
76
+ print(f"ZIP file {local_path} deleted after extraction.")
77
+
78
+ return True
79
+ else:
80
+ print(
81
+ "File could not be downloaded. Please make sure the year, month, and day are correct."
82
+ )
83
+ return False
84
+
85
+ def filter_rf_patents(patents, keywords=None, fields=None):
86
+ """
87
+ Filters patents based on keywords and specified fields.
88
+ Parameters:
89
+ patents (list): List of patent texts (as strings or structured data).
90
+ keywords (list): Keywords to filter patents.
91
+ fields (list): Fields to search for keywords (e.g., Title, Abstract, Claims).
92
+ Returns:
93
+ list: Filtered patents.
94
+ """
95
+ if keywords is None:
96
+ keywords = ["Radio Frequency", "Antenna", "UAV", "Wireless Charging"] # Default keywords
97
+ if fields is None:
98
+ fields = ["Title", "Abstract"] # Default fields
99
+
100
+ filtered_patents = []
101
+ for patent in patents:
102
+ # If patent is a string, search for keywords in the entire text
103
+ if isinstance(patent, str):
104
+ if any(keyword.lower() in patent.lower() for keyword in keywords):
105
+ filtered_patents.append(patent)
106
+ # If patent is structured (e.g., dictionary), search within fields
107
+ elif isinstance(patent, dict):
108
+ for field in fields:
109
+ field_content = patent.get(field.lower(), "")
110
+ if any(keyword.lower() in field_content.lower() for keyword in keywords):
111
+ filtered_patents.append(patent)
112
+ break
113
+ return filtered_patents
114
+
115
+
116
+
117
+ def extract_patents(year, month, day, logging):
118
+ """
119
+ This function reads a patent file in XML format, splits it into individual patents, parses each
120
+ XML file, and saves each patent as a separate txt file in a directory named 'data'.
121
+ Parameters:
122
+ year (int): The year of the patent file to process.
123
+ month (int): The month of the patent file to process.
124
+ day (int): The day of the patent file to process.
125
+ logging (bool): The boolean to print logs
126
+ Returns:
127
+ None
128
+ The function creates a separate XML file for each patent and stores these files in
129
+ a directory. The directory is named based on the year, month, and day provided.
130
+ If the directory does not exist, the function creates it. The function also prints
131
+ the total number of patents found.
132
+ """
133
+
134
+ directory = os.path.join(
135
+ os.getcwd(), "data", "ipa" + str(year)[2:] + f"{month:02d}" + f"{day:02d}"
136
+ )
137
+ saved_patent_names_path = os.path.join(directory, 'saved_patent_names.pkl')
138
+
139
+ if os.path.exists(directory):
140
+ print(f"File {directory} already exists. Skipping extract.")
141
+
142
+ # Load saved_patent_names from file
143
+ with open(saved_patent_names_path, 'rb') as f:
144
+ saved_patent_names = pickle.load(f)
145
+
146
+ return saved_patent_names
147
+ else:
148
+ os.mkdir(directory)
149
+
150
+ if logging:
151
+ print("Locating the patent file...")
152
+ file_path = os.path.join(
153
+ os.getcwd(),
154
+ "data",
155
+ "ipa" + str(year)[2:] + f"{month:02d}" + f"{day:02d}" + ".xml",
156
+ )
157
+
158
+ if logging:
159
+ print("Reading the patent file...")
160
+ with open(file_path, "r") as f:
161
+ contents = f.read()
162
+
163
+ if logging:
164
+ print("Splitting the XML file into individual XMLs...")
165
+ temp = contents.split('<?xml version="1.0" encoding="UTF-8"?>')
166
+ allXmls = [
167
+ '<?xml version="1.0" encoding="UTF-8"?>' + s.replace("\n", "") for s in temp
168
+ ]
169
+
170
+ # saving only the XMLs that contain a patent
171
+ patents = []
172
+ for xml_string in allXmls:
173
+ start_index = xml_string.find("<!DOCTYPE")
174
+ end_index = xml_string.find(">", start_index)
175
+
176
+ if start_index != -1 and end_index != -1:
177
+ doctype_declaration = xml_string[start_index : end_index + 1]
178
+ # Extract only the name of the DOCTYPE
179
+ doctype_name = doctype_declaration.split()[1]
180
+ if doctype_name == "us-patent-application":
181
+ patents.append(xml_string)
182
+
183
+ if logging:
184
+ print(f"Total patents found: {len(patents)}")
185
+ print("Writing individual patents to separate txt files...")
186
+
187
+ saved_patent_names = []
188
+ for patent in patents:
189
+ try:
190
+ root = ET.fromstring(patent)
191
+
192
+ patent_id = root.find(
193
+ ".//publication-reference/document-id/doc-number"
194
+ ).text
195
+ file_id = root.attrib["file"]
196
+
197
+ ipcr_classifications = root.findall(".//classification-ipcr")
198
+
199
+ if any(ipcr.find("./section").text == "C" for ipcr in ipcr_classifications):
200
+ description_element = root.find(".//description")
201
+ description_text = get_full_text(description_element)
202
+
203
+ # Filter RF-relevant content
204
+ filtered_description = filter_rf_patents(description_text)
205
+ if filtered_description:
206
+ description_string = " ".join(filtered_description)
207
+ output_file_path = os.path.join(directory, f"{file_id}.txt")
208
+ with open(output_file_path, "w") as f:
209
+ f.write(description_string)
210
+ saved_patent_names.append(f"{file_id}.txt")
211
+
212
+ elif logging:
213
+ print(
214
+ f"Patent {patent_id} does not belong to section 'C'. Skipping this patent."
215
+ )
216
+ except ET.ParseError as e:
217
+ print(f"Error while parsing patent: {patent_id}. Skipping this patent.")
218
+ print(f"Error message: {e}")
219
+
220
+ # Save saved_patent_names to file
221
+ with open(saved_patent_names_path, 'wb') as f:
222
+ pickle.dump(saved_patent_names, f)
223
+
224
+ if logging:
225
+ print("Patent extraction complete.")
226
+
227
+ # Deleting the main XML file after extraction
228
+ os.remove(file_path)
229
+
230
+ if logging:
231
+ print(f"Main XML file {file_path} deleted after extraction.")
232
+ return saved_patent_names
233
+
234
+
235
+ def get_full_text(element):
236
+ """
237
+ Recursively parse XML elements and retrieve the full text from the XML tree.
238
+ Parameters:
239
+ element (xml.etree.ElementTree.Element): The root XML element to start parsing.
240
+ Returns:
241
+ list: A list of strings containing the full text from the XML element and its children.
242
+ """
243
+
244
+ text = []
245
+ if element.text is not None and element.text.strip():
246
+ text.append(element.text.strip())
247
+ for child in element:
248
+ text.extend(get_full_text(child))
249
+ if child.tail is not None and child.tail.strip():
250
+ text.append(child.tail.strip())
251
+ return text
252
+
253
+
254
+ def parse_and_save_patents(start_date, end_date, logging=False):
255
+ """
256
+ Download weekly patent files from the USPTO website for a range of dates, extract individual
257
+ patents from the downloaded file, parse each patent's content, and save the information
258
+ as separate text files.
259
+ Parameters:
260
+ start_date (datetime): The start date of the range.
261
+ end_date (datetime): The end date of the range.
262
+ logging (bool): The boolean to print logs
263
+ Returns:
264
+ list: A list of strings containing the names of saved patent text files.
265
+ """
266
+ all_saved_patent_names = []
267
+
268
+ current_date = start_date
269
+ while current_date <= end_date:
270
+ year, month, day = current_date.year, current_date.month, current_date.day
271
+ if logging:
272
+ print(f"Processing patents for {current_date.strftime('%Y-%m-%d')}...")
273
+
274
+ download_success = download_weekly_patents(year, month, day, logging)
275
+ if download_success:
276
+ saved_patent_names = extract_patents(year, month, day, logging)
277
+ all_saved_patent_names.extend(saved_patent_names)
278
+
279
+ current_date += timedelta(days=7) # USPTO weekly files are organized by week
280
+
281
+ return all_saved_patent_names