Fabrice-TIERCELIN commited on
Commit
a34c2d9
·
verified ·
1 Parent(s): 797fd08

Upload __init__.py

Browse files
Files changed (1) hide show
  1. hyvideo/text_encoder/__init__.py +357 -357
hyvideo/text_encoder/__init__.py CHANGED
@@ -1,357 +1,357 @@
1
- from dataclasses import dataclass
2
- from typing import Optional, Tuple
3
- from copy import deepcopy
4
-
5
- import torch
6
- import torch.nn as nn
7
- from transformers import CLIPTextModel, CLIPTokenizer, AutoTokenizer, AutoModel
8
- from transformers.utils import ModelOutput
9
-
10
- from ..constants import TEXT_ENCODER_PATH, TOKENIZER_PATH
11
- from ..constants import PRECISION_TO_TYPE
12
-
13
-
14
- def use_default(value, default):
15
- return value if value is not None else default
16
-
17
-
18
- def load_text_encoder(
19
- text_encoder_type,
20
- text_encoder_precision=None,
21
- text_encoder_path=None,
22
- logger=None,
23
- device=None,
24
- ):
25
- if text_encoder_path is None:
26
- text_encoder_path = TEXT_ENCODER_PATH[text_encoder_type]
27
- if logger is not None:
28
- logger.info(
29
- f"Loading text encoder model ({text_encoder_type}) from: {text_encoder_path}"
30
- )
31
-
32
- if text_encoder_type == "clipL":
33
- text_encoder = CLIPTextModel.from_pretrained(text_encoder_path)
34
- text_encoder.final_layer_norm = text_encoder.text_model.final_layer_norm
35
- elif text_encoder_type == "llm":
36
- text_encoder = AutoModel.from_pretrained(
37
- text_encoder_path, low_cpu_mem_usage=True
38
- )
39
- text_encoder.final_layer_norm = text_encoder.norm
40
- else:
41
- raise ValueError(f"Unsupported text encoder type: {text_encoder_type}")
42
- # from_pretrained will ensure that the model is in eval mode.
43
-
44
- if text_encoder_precision is not None:
45
- text_encoder = text_encoder.to(dtype=PRECISION_TO_TYPE[text_encoder_precision])
46
-
47
- text_encoder.requires_grad_(False)
48
-
49
- if logger is not None:
50
- logger.info(f"Text encoder to dtype: {text_encoder.dtype}")
51
-
52
- if device is not None:
53
- text_encoder = text_encoder.to(device)
54
-
55
- return text_encoder, text_encoder_path
56
-
57
-
58
- def load_tokenizer(
59
- tokenizer_type, tokenizer_path=None, padding_side="right", logger=None
60
- ):
61
- if tokenizer_path is None:
62
- tokenizer_path = TOKENIZER_PATH[tokenizer_type]
63
- if logger is not None:
64
- logger.info(f"Loading tokenizer ({tokenizer_type}) from: {tokenizer_path}")
65
-
66
- if tokenizer_type == "clipL":
67
- tokenizer = CLIPTokenizer.from_pretrained(tokenizer_path, max_length=77)
68
- elif tokenizer_type == "llm":
69
- tokenizer = AutoTokenizer.from_pretrained(
70
- tokenizer_path, padding_side=padding_side
71
- )
72
- else:
73
- raise ValueError(f"Unsupported tokenizer type: {tokenizer_type}")
74
-
75
- return tokenizer, tokenizer_path
76
-
77
-
78
- @dataclass
79
- class TextEncoderModelOutput(ModelOutput):
80
- """
81
- Base class for model's outputs that also contains a pooling of the last hidden states.
82
-
83
- Args:
84
- hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
85
- Sequence of hidden-states at the output of the last layer of the model.
86
- attention_mask (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
87
- Mask to avoid performing attention on padding token indices. Mask values selected in ``[0, 1]``:
88
- hidden_states_list (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed):
89
- Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
90
- one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
91
- Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
92
- text_outputs (`list`, *optional*, returned when `return_texts=True` is passed):
93
- List of decoded texts.
94
- """
95
-
96
- hidden_state: torch.FloatTensor = None
97
- attention_mask: Optional[torch.LongTensor] = None
98
- hidden_states_list: Optional[Tuple[torch.FloatTensor, ...]] = None
99
- text_outputs: Optional[list] = None
100
-
101
-
102
- class TextEncoder(nn.Module):
103
- def __init__(
104
- self,
105
- text_encoder_type: str,
106
- max_length: int,
107
- text_encoder_precision: Optional[str] = None,
108
- text_encoder_path: Optional[str] = None,
109
- tokenizer_type: Optional[str] = None,
110
- tokenizer_path: Optional[str] = None,
111
- output_key: Optional[str] = None,
112
- use_attention_mask: bool = True,
113
- input_max_length: Optional[int] = None,
114
- prompt_template: Optional[dict] = None,
115
- prompt_template_video: Optional[dict] = None,
116
- hidden_state_skip_layer: Optional[int] = None,
117
- apply_final_norm: bool = False,
118
- reproduce: bool = False,
119
- logger=None,
120
- device=None,
121
- ):
122
- super().__init__()
123
- self.text_encoder_type = text_encoder_type
124
- self.max_length = max_length
125
- self.precision = text_encoder_precision
126
- self.model_path = text_encoder_path
127
- self.tokenizer_type = (
128
- tokenizer_type if tokenizer_type is not None else text_encoder_type
129
- )
130
- self.tokenizer_path = (
131
- tokenizer_path if tokenizer_path is not None else text_encoder_path
132
- )
133
- self.use_attention_mask = use_attention_mask
134
- if prompt_template_video is not None:
135
- assert (
136
- use_attention_mask is True
137
- ), "Attention mask is True required when training videos."
138
- self.input_max_length = (
139
- input_max_length if input_max_length is not None else max_length
140
- )
141
- self.prompt_template = prompt_template
142
- self.prompt_template_video = prompt_template_video
143
- self.hidden_state_skip_layer = hidden_state_skip_layer
144
- self.apply_final_norm = apply_final_norm
145
- self.reproduce = reproduce
146
- self.logger = logger
147
-
148
- self.use_template = self.prompt_template is not None
149
- if self.use_template:
150
- assert (
151
- isinstance(self.prompt_template, dict)
152
- and "template" in self.prompt_template
153
- ), f"`prompt_template` must be a dictionary with a key 'template', got {self.prompt_template}"
154
- assert "{}" in str(self.prompt_template["template"]), (
155
- "`prompt_template['template']` must contain a placeholder `{}` for the input text, "
156
- f"got {self.prompt_template['template']}"
157
- )
158
-
159
- self.use_video_template = self.prompt_template_video is not None
160
- if self.use_video_template:
161
- if self.prompt_template_video is not None:
162
- assert (
163
- isinstance(self.prompt_template_video, dict)
164
- and "template" in self.prompt_template_video
165
- ), f"`prompt_template_video` must be a dictionary with a key 'template', got {self.prompt_template_video}"
166
- assert "{}" in str(self.prompt_template_video["template"]), (
167
- "`prompt_template_video['template']` must contain a placeholder `{}` for the input text, "
168
- f"got {self.prompt_template_video['template']}"
169
- )
170
-
171
- if "t5" in text_encoder_type:
172
- self.output_key = output_key or "last_hidden_state"
173
- elif "clip" in text_encoder_type:
174
- self.output_key = output_key or "pooler_output"
175
- elif "llm" in text_encoder_type or "glm" in text_encoder_type:
176
- self.output_key = output_key or "last_hidden_state"
177
- else:
178
- raise ValueError(f"Unsupported text encoder type: {text_encoder_type}")
179
-
180
- self.model, self.model_path = load_text_encoder(
181
- text_encoder_type=self.text_encoder_type,
182
- text_encoder_precision=self.precision,
183
- text_encoder_path=self.model_path,
184
- logger=self.logger,
185
- device=device,
186
- )
187
- self.dtype = self.model.dtype
188
- self.device = self.model.device
189
-
190
- self.tokenizer, self.tokenizer_path = load_tokenizer(
191
- tokenizer_type=self.tokenizer_type,
192
- tokenizer_path=self.tokenizer_path,
193
- padding_side="right",
194
- logger=self.logger,
195
- )
196
-
197
- def __repr__(self):
198
- return f"{self.text_encoder_type} ({self.precision} - {self.model_path})"
199
-
200
- @staticmethod
201
- def apply_text_to_template(text, template, prevent_empty_text=True):
202
- """
203
- Apply text to template.
204
-
205
- Args:
206
- text (str): Input text.
207
- template (str or list): Template string or list of chat conversation.
208
- prevent_empty_text (bool): If Ture, we will prevent the user text from being empty
209
- by adding a space. Defaults to True.
210
- """
211
- if isinstance(template, str):
212
- # Will send string to tokenizer. Used for llm
213
- return template.format(text)
214
- else:
215
- raise TypeError(f"Unsupported template type: {type(template)}")
216
-
217
- def text2tokens(self, text, data_type="image"):
218
- """
219
- Tokenize the input text.
220
-
221
- Args:
222
- text (str or list): Input text.
223
- """
224
- tokenize_input_type = "str"
225
- if self.use_template:
226
- if data_type == "image":
227
- prompt_template = self.prompt_template["template"]
228
- elif data_type == "video":
229
- prompt_template = self.prompt_template_video["template"]
230
- else:
231
- raise ValueError(f"Unsupported data type: {data_type}")
232
- if isinstance(text, (list, tuple)):
233
- text = [
234
- self.apply_text_to_template(one_text, prompt_template)
235
- for one_text in text
236
- ]
237
- if isinstance(text[0], list):
238
- tokenize_input_type = "list"
239
- elif isinstance(text, str):
240
- text = self.apply_text_to_template(text, prompt_template)
241
- if isinstance(text, list):
242
- tokenize_input_type = "list"
243
- else:
244
- raise TypeError(f"Unsupported text type: {type(text)}")
245
-
246
- kwargs = dict(
247
- truncation=True,
248
- max_length=self.max_length,
249
- padding="max_length",
250
- return_tensors="pt",
251
- )
252
- if tokenize_input_type == "str":
253
- return self.tokenizer(
254
- text,
255
- return_length=False,
256
- return_overflowing_tokens=False,
257
- return_attention_mask=True,
258
- **kwargs,
259
- )
260
- elif tokenize_input_type == "list":
261
- return self.tokenizer.apply_chat_template(
262
- text,
263
- add_generation_prompt=True,
264
- tokenize=True,
265
- return_dict=True,
266
- **kwargs,
267
- )
268
- else:
269
- raise ValueError(f"Unsupported tokenize_input_type: {tokenize_input_type}")
270
-
271
- def encode(
272
- self,
273
- batch_encoding,
274
- use_attention_mask=None,
275
- output_hidden_states=False,
276
- do_sample=None,
277
- hidden_state_skip_layer=None,
278
- return_texts=False,
279
- data_type="image",
280
- device=None,
281
- ):
282
- """
283
- Args:
284
- batch_encoding (dict): Batch encoding from tokenizer.
285
- use_attention_mask (bool): Whether to use attention mask. If None, use self.use_attention_mask.
286
- Defaults to None.
287
- output_hidden_states (bool): Whether to output hidden states. If False, return the value of
288
- self.output_key. If True, return the entire output. If set self.hidden_state_skip_layer,
289
- output_hidden_states will be set True. Defaults to False.
290
- do_sample (bool): Whether to sample from the model. Used for Decoder-Only LLMs. Defaults to None.
291
- When self.produce is False, do_sample is set to True by default.
292
- hidden_state_skip_layer (int): Number of hidden states to hidden_state_skip_layer. 0 means the last layer.
293
- If None, self.output_key will be used. Defaults to None.
294
- return_texts (bool): Whether to return the decoded texts. Defaults to False.
295
- """
296
- device = self.model.device if device is None else device
297
- use_attention_mask = use_default(use_attention_mask, self.use_attention_mask)
298
- hidden_state_skip_layer = use_default(
299
- hidden_state_skip_layer, self.hidden_state_skip_layer
300
- )
301
- do_sample = use_default(do_sample, not self.reproduce)
302
- attention_mask = (
303
- batch_encoding["attention_mask"].to(device) if use_attention_mask else None
304
- )
305
- outputs = self.model(
306
- input_ids=batch_encoding["input_ids"].to(device),
307
- attention_mask=attention_mask,
308
- output_hidden_states=output_hidden_states
309
- or hidden_state_skip_layer is not None,
310
- )
311
- if hidden_state_skip_layer is not None:
312
- last_hidden_state = outputs.hidden_states[-(hidden_state_skip_layer + 1)]
313
- # Real last hidden state already has layer norm applied. So here we only apply it
314
- # for intermediate layers.
315
- if hidden_state_skip_layer > 0 and self.apply_final_norm:
316
- last_hidden_state = self.model.final_layer_norm(last_hidden_state)
317
- else:
318
- last_hidden_state = outputs[self.output_key]
319
-
320
- # Remove hidden states of instruction tokens, only keep prompt tokens.
321
- if self.use_template:
322
- if data_type == "image":
323
- crop_start = self.prompt_template.get("crop_start", -1)
324
- elif data_type == "video":
325
- crop_start = self.prompt_template_video.get("crop_start", -1)
326
- else:
327
- raise ValueError(f"Unsupported data type: {data_type}")
328
- if crop_start > 0:
329
- last_hidden_state = last_hidden_state[:, crop_start:]
330
- attention_mask = (
331
- attention_mask[:, crop_start:] if use_attention_mask else None
332
- )
333
-
334
- if output_hidden_states:
335
- return TextEncoderModelOutput(
336
- last_hidden_state, attention_mask, outputs.hidden_states
337
- )
338
- return TextEncoderModelOutput(last_hidden_state, attention_mask)
339
-
340
- def forward(
341
- self,
342
- text,
343
- use_attention_mask=None,
344
- output_hidden_states=False,
345
- do_sample=False,
346
- hidden_state_skip_layer=None,
347
- return_texts=False,
348
- ):
349
- batch_encoding = self.text2tokens(text)
350
- return self.encode(
351
- batch_encoding,
352
- use_attention_mask=use_attention_mask,
353
- output_hidden_states=output_hidden_states,
354
- do_sample=do_sample,
355
- hidden_state_skip_layer=hidden_state_skip_layer,
356
- return_texts=return_texts,
357
- )
 
1
+ from dataclasses import dataclass
2
+ from typing import Optional, Tuple
3
+ from copy import deepcopy
4
+
5
+ import torch
6
+ import torch.nn as nn
7
+ from transformers import CLIPTextModel, CLIPTokenizer, AutoTokenizer, AutoModel
8
+ from transformers.utils import ModelOutput
9
+
10
+ from ..constants import TEXT_ENCODER_PATH, TOKENIZER_PATH
11
+ from ..constants import PRECISION_TO_TYPE
12
+
13
+
14
+ def use_default(value, default):
15
+ return value if value is not None else default
16
+
17
+
18
+ def load_text_encoder(
19
+ text_encoder_type,
20
+ text_encoder_precision=None,
21
+ text_encoder_path=None,
22
+ logger=None,
23
+ device=None,
24
+ ):
25
+ if text_encoder_path is None:
26
+ text_encoder_path = TEXT_ENCODER_PATH[text_encoder_type]
27
+ if logger is not None:
28
+ logger.info(
29
+ f"Loading text encoder model ({text_encoder_type}) from: {text_encoder_path}"
30
+ )
31
+
32
+ if text_encoder_type == "clipL":
33
+ text_encoder = CLIPTextModel.from_pretrained(text_encoder_path)
34
+ text_encoder.final_layer_norm = text_encoder.text_model.final_layer_norm
35
+ elif text_encoder_type == "llm":
36
+ text_encoder = AutoModel.from_pretrained(
37
+ text_encoder_path, low_cpu_mem_usage=True
38
+ )
39
+ text_encoder.final_layer_norm = text_encoder.norm
40
+ else:
41
+ raise ValueError(f"Unsupported text encoder type: {text_encoder_type}")
42
+ # from_pretrained will ensure that the model is in eval mode.
43
+
44
+ if text_encoder_precision is not None:
45
+ text_encoder = text_encoder.to(dtype=PRECISION_TO_TYPE[text_encoder_precision])
46
+
47
+ text_encoder.requires_grad_(False)
48
+
49
+ if logger is not None:
50
+ logger.info(f"Text encoder to dtype: {text_encoder.dtype}")
51
+
52
+ if device is not None:
53
+ text_encoder = text_encoder.to(device)
54
+
55
+ return text_encoder, text_encoder_path
56
+
57
+
58
+ def load_tokenizer(
59
+ tokenizer_type, tokenizer_path=None, padding_side="right", logger=None
60
+ ):
61
+ if tokenizer_path is None:
62
+ tokenizer_path = TOKENIZER_PATH[tokenizer_type]
63
+ if logger is not None:
64
+ logger.info(f"Loading tokenizer ({tokenizer_type}) from: {tokenizer_path}")
65
+
66
+ if tokenizer_type == "clipL":
67
+ tokenizer = CLIPTokenizer.from_pretrained(tokenizer_path, max_length=77)
68
+ elif tokenizer_type == "llm":
69
+ tokenizer = AutoTokenizer.from_pretrained(
70
+ tokenizer_path, padding_side=padding_side
71
+ )
72
+ else:
73
+ raise ValueError(f"Unsupported tokenizer type: {tokenizer_type}")
74
+
75
+ return tokenizer, tokenizer_path
76
+
77
+
78
+ @dataclass
79
+ class TextEncoderModelOutput(ModelOutput):
80
+ """
81
+ Base class for model's outputs that also contains a pooling of the last hidden states.
82
+
83
+ Args:
84
+ hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`):
85
+ Sequence of hidden-states at the output of the last layer of the model.
86
+ attention_mask (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
87
+ Mask to avoid performing attention on padding token indices. Mask values selected in ``[0, 1]``:
88
+ hidden_states_list (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed):
89
+ Tuple of `torch.FloatTensor` (one for the output of the embeddings, if the model has an embedding layer, +
90
+ one for the output of each layer) of shape `(batch_size, sequence_length, hidden_size)`.
91
+ Hidden-states of the model at the output of each layer plus the optional initial embedding outputs.
92
+ text_outputs (`list`, *optional*, returned when `return_texts=True` is passed):
93
+ List of decoded texts.
94
+ """
95
+
96
+ hidden_state: torch.FloatTensor = None
97
+ attention_mask: Optional[torch.LongTensor] = None
98
+ hidden_states_list: Optional[Tuple[torch.FloatTensor, ...]] = None
99
+ text_outputs: Optional[list] = None
100
+
101
+
102
+ class TextEncoder(nn.Module):
103
+ def __init__(
104
+ self,
105
+ text_encoder_type: str,
106
+ max_length: int,
107
+ text_encoder_precision: Optional[str] = None,
108
+ text_encoder_path: Optional[str] = None,
109
+ tokenizer_type: Optional[str] = None,
110
+ tokenizer_path: Optional[str] = None,
111
+ output_key: Optional[str] = None,
112
+ use_attention_mask: bool = True,
113
+ input_max_length: Optional[int] = None,
114
+ prompt_template: Optional[dict] = None,
115
+ prompt_template_video: Optional[dict] = None,
116
+ hidden_state_skip_layer: Optional[int] = None,
117
+ apply_final_norm: bool = False,
118
+ reproduce: bool = False,
119
+ logger=None,
120
+ device=None,
121
+ ):
122
+ super().__init__()
123
+ self.text_encoder_type = text_encoder_type
124
+ self.max_length = max_length
125
+ self.precision = text_encoder_precision
126
+ self.model_path = text_encoder_path
127
+ self.tokenizer_type = (
128
+ tokenizer_type if tokenizer_type is not None else text_encoder_type
129
+ )
130
+ self.tokenizer_path = (
131
+ tokenizer_path if tokenizer_path is not None else text_encoder_path
132
+ )
133
+ self.use_attention_mask = use_attention_mask
134
+ if prompt_template_video is not None:
135
+ assert (
136
+ use_attention_mask is True
137
+ ), "Attention mask is True required when training videos."
138
+ self.input_max_length = (
139
+ input_max_length if input_max_length is not None else max_length
140
+ )
141
+ self.prompt_template = prompt_template
142
+ self.prompt_template_video = prompt_template_video
143
+ self.hidden_state_skip_layer = hidden_state_skip_layer
144
+ self.apply_final_norm = apply_final_norm
145
+ self.reproduce = reproduce
146
+ self.logger = logger
147
+
148
+ self.use_template = self.prompt_template is not None
149
+ if self.use_template:
150
+ assert (
151
+ isinstance(self.prompt_template, dict)
152
+ and "template" in self.prompt_template
153
+ ), f"`prompt_template` must be a dictionary with a key 'template', got {self.prompt_template}"
154
+ assert "{}" in str(self.prompt_template["template"]), (
155
+ "`prompt_template['template']` must contain a placeholder `{}` for the input text, "
156
+ f"got {self.prompt_template['template']}"
157
+ )
158
+
159
+ self.use_video_template = self.prompt_template_video is not None
160
+ if self.use_video_template:
161
+ if self.prompt_template_video is not None:
162
+ assert (
163
+ isinstance(self.prompt_template_video, dict)
164
+ and "template" in self.prompt_template_video
165
+ ), f"`prompt_template_video` must be a dictionary with a key 'template', got {self.prompt_template_video}"
166
+ assert "{}" in str(self.prompt_template_video["template"]), (
167
+ "`prompt_template_video['template']` must contain a placeholder `{}` for the input text, "
168
+ f"got {self.prompt_template_video['template']}"
169
+ )
170
+
171
+ if "t5" in text_encoder_type:
172
+ self.output_key = output_key or "last_hidden_state"
173
+ elif "clip" in text_encoder_type:
174
+ self.output_key = output_key or "pooler_output"
175
+ elif "llm" in text_encoder_type or "glm" in text_encoder_type:
176
+ self.output_key = output_key or "last_hidden_state"
177
+ else:
178
+ raise ValueError(f"Unsupported text encoder type: {text_encoder_type}")
179
+
180
+ self.model, self.model_path = load_text_encoder(
181
+ text_encoder_type=self.text_encoder_type,
182
+ text_encoder_precision=self.precision,
183
+ text_encoder_path=self.model_path,
184
+ logger=self.logger,
185
+ device=device,
186
+ )
187
+ self.dtype = self.model.dtype
188
+ self.device = self.model.device
189
+
190
+ self.tokenizer, self.tokenizer_path = load_tokenizer(
191
+ tokenizer_type=self.tokenizer_type,
192
+ tokenizer_path=self.tokenizer_path,
193
+ padding_side="right",
194
+ logger=self.logger,
195
+ )
196
+
197
+ def __repr__(self):
198
+ return f"{self.text_encoder_type} ({self.precision} - {self.model_path})"
199
+
200
+ @staticmethod
201
+ def apply_text_to_template(text, template, prevent_empty_text=True):
202
+ """
203
+ Apply text to template.
204
+
205
+ Args:
206
+ text (str): Input text.
207
+ template (str or list): Template string or list of chat conversation.
208
+ prevent_empty_text (bool): If Ture, we will prevent the user text from being empty
209
+ by adding a space. Defaults to True.
210
+ """
211
+ if isinstance(template, str):
212
+ # Will send string to tokenizer. Used for llm
213
+ return template.format(text)
214
+ else:
215
+ raise TypeError(f"Unsupported template type: {type(template)}")
216
+
217
+ def text2tokens(self, text, data_type="image"):
218
+ """
219
+ Tokenize the input text.
220
+
221
+ Args:
222
+ text (str or list): Input text.
223
+ """
224
+ tokenize_input_type = "str"
225
+ if self.use_template:
226
+ if data_type == "image":
227
+ prompt_template = self.prompt_template["template"]
228
+ elif data_type == "video":
229
+ prompt_template = self.prompt_template_video["template"]
230
+ else:
231
+ raise ValueError(f"Unsupported data type: {data_type}")
232
+ if isinstance(text, (list, tuple)):
233
+ text = [
234
+ self.apply_text_to_template(one_text, prompt_template)
235
+ for one_text in text
236
+ ]
237
+ if isinstance(text[0], list):
238
+ tokenize_input_type = "list"
239
+ elif isinstance(text, str):
240
+ text = self.apply_text_to_template(text, prompt_template)
241
+ if isinstance(text, list):
242
+ tokenize_input_type = "list"
243
+ else:
244
+ raise TypeError(f"Unsupported text type: {type(text)}")
245
+
246
+ kwargs = dict(
247
+ truncation=True,
248
+ max_length=self.max_length,
249
+ padding="max_length",
250
+ return_tensors="pt",
251
+ )
252
+ if tokenize_input_type == "str":
253
+ return self.tokenizer(
254
+ text,
255
+ return_length=False,
256
+ return_overflowing_tokens=False,
257
+ return_attention_mask=True,
258
+ **kwargs,
259
+ )
260
+ elif tokenize_input_type == "list":
261
+ return self.tokenizer.apply_chat_template(
262
+ text,
263
+ add_generation_prompt=True,
264
+ tokenize=True,
265
+ return_dict=True,
266
+ **kwargs,
267
+ )
268
+ else:
269
+ raise ValueError(f"Unsupported tokenize_input_type: {tokenize_input_type}")
270
+
271
+ def encode(
272
+ self,
273
+ batch_encoding,
274
+ use_attention_mask=None,
275
+ output_hidden_states=False,
276
+ do_sample=None,
277
+ hidden_state_skip_layer=None,
278
+ return_texts=False,
279
+ data_type="image",
280
+ device=None,
281
+ ):
282
+ """
283
+ Args:
284
+ batch_encoding (dict): Batch encoding from tokenizer.
285
+ use_attention_mask (bool): Whether to use attention mask. If None, use self.use_attention_mask.
286
+ Defaults to None.
287
+ output_hidden_states (bool): Whether to output hidden states. If False, return the value of
288
+ self.output_key. If True, return the entire output. If set self.hidden_state_skip_layer,
289
+ output_hidden_states will be set True. Defaults to False.
290
+ do_sample (bool): Whether to sample from the model. Used for Decoder-Only LLMs. Defaults to None.
291
+ When self.produce is False, do_sample is set to True by default.
292
+ hidden_state_skip_layer (int): Number of hidden states to hidden_state_skip_layer. 0 means the last layer.
293
+ If None, self.output_key will be used. Defaults to None.
294
+ return_texts (bool): Whether to return the decoded texts. Defaults to False.
295
+ """
296
+ device = self.model.device if device is None else device
297
+ use_attention_mask = use_default(use_attention_mask, self.use_attention_mask)
298
+ hidden_state_skip_layer = use_default(
299
+ hidden_state_skip_layer, self.hidden_state_skip_layer
300
+ )
301
+ do_sample = use_default(do_sample, not self.reproduce)
302
+ attention_mask = (
303
+ batch_encoding["attention_mask"].to(device) if use_attention_mask else None
304
+ )
305
+ outputs = self.model(
306
+ input_ids=batch_encoding["input_ids"].to(device),
307
+ attention_mask=attention_mask,
308
+ output_hidden_states=output_hidden_states
309
+ or hidden_state_skip_layer is not None,
310
+ )
311
+ if hidden_state_skip_layer is not None:
312
+ last_hidden_state = outputs.hidden_states[-(hidden_state_skip_layer + 1)]
313
+ # Real last hidden state already has layer norm applied. So here we only apply it
314
+ # for intermediate layers.
315
+ if hidden_state_skip_layer > 0 and self.apply_final_norm:
316
+ last_hidden_state = self.model.final_layer_norm(last_hidden_state)
317
+ else:
318
+ last_hidden_state = outputs[self.output_key]
319
+
320
+ # Remove hidden states of instruction tokens, only keep prompt tokens.
321
+ if self.use_template:
322
+ if data_type == "image":
323
+ crop_start = self.prompt_template.get("crop_start", -1)
324
+ elif data_type == "video":
325
+ crop_start = self.prompt_template_video.get("crop_start", -1)
326
+ else:
327
+ raise ValueError(f"Unsupported data type: {data_type}")
328
+ if crop_start > 0:
329
+ last_hidden_state = last_hidden_state[:, crop_start:]
330
+ attention_mask = (
331
+ attention_mask[:, crop_start:] if use_attention_mask else None
332
+ )
333
+
334
+ if output_hidden_states:
335
+ return TextEncoderModelOutput(
336
+ last_hidden_state, attention_mask, outputs.hidden_states
337
+ )
338
+ return TextEncoderModelOutput(last_hidden_state, attention_mask)
339
+
340
+ def forward(
341
+ self,
342
+ text,
343
+ use_attention_mask=None,
344
+ output_hidden_states=False,
345
+ do_sample=False,
346
+ hidden_state_skip_layer=None,
347
+ return_texts=False,
348
+ ):
349
+ batch_encoding = self.text2tokens(text)
350
+ return self.encode(
351
+ batch_encoding,
352
+ use_attention_mask=use_attention_mask,
353
+ output_hidden_states=output_hidden_states,
354
+ do_sample=do_sample,
355
+ hidden_state_skip_layer=hidden_state_skip_layer,
356
+ return_texts=return_texts,
357
+ )