"""*A cross-lingual semantic similarity preserving hash for plain-text content (soft hash).* |
The ISCC Text-Code Semantic is a content-based compact binary code generated from multilingual text. |
!!! Warning |
This is a non-standard Proof of Concept implementation. |
Plain-text extraction from documents in various formats (especially PDF) may |
yield different results depending on the extraction tools being used. |
The [iscc-sdk](https://github.com/iscc/iscc-sdk) uses [Apache Tika](https://tika.apache.org) |
to extract text from documents for Text-Code generation. |
**Algorithm overview** |
- Split text into semantically coherent overlapping chunks. |
- Create vector embeddings of the chunks. |
- Average and binarize the chunk embeddings. |
- Encode as ISCC-UNIT of MainType SEMANTIC and SubType TEXT |
""" |
from loguru import logger as log |
from onnxruntime.capi.onnxruntime_pybind11_state import NoSuchFile |
from semantic_text_splitter import TextSplitter |
from tokenizers import Tokenizer |
from pathlib import Path |
from typing import Any |
import numpy as np |
import onnxruntime as rt |
from numpy.typing import NDArray |
from functools import cache |
import iscc_sct as sct |
HERE = Path(__file__).parent.absolute() |
__all__ = [ |
"code_text_semantic", |
"gen_text_code_semantic", |
"soft_hash_text_semantic", |
"embed_chunks", |
] |
32: "0000", |
64: "0001", |
96: "0010", |
128: "0011", |
160: "0100", |
192: "0101", |
224: "0110", |
256: "0111", |
} |
TOKENIZER_PATH = HERE / "tokenizer.json" |
MAINTYPE = "0001" |
SUBTYPE = "0000" |
SCT_VERSION = "0000" |
def code_text_semantic(fp, **options): |
""" |
Generate ISCC Semantic-Code Text from a text file. |
If you enable generating granular features with `features=True` those features will have |
the same bit-length as the generated ISCC-UNIT. |
:param fp: File path of plaintext file to process |
:param options: Custom processing options for overriding global options |
:key bits (int): Length of generated Semantic Text-Code in bits (default 64) |
:key characters (bool): Return document character count (default True). |
:key embedding (bool): Return global document embedding (default False). |
:key precision (int): Max fractional digits for embeddings (default 8). |
:key features (bool): Return granular document features (default False). |
:key offsets (bool): Return character offsets for granular features (default False). |
:key chunks (bool): Return text chunks (default False). |
:key max_tokens (int): Max tokens per chunk (default 127). |
:key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
:key trim (int): Trim whitespace from chunks (default False). |
:return: Dict with ISCC processing results |
""" |
fp = Path(fp) |
return gen_text_code_semantic(fp.read_text(encoding="utf-8"), **options) |
def gen_text_code_semantic(text, **options): |
""" |
Create an ISCC Semantic-Code Text from plaintext. |
:param str text: Plaint text for ISCC processing |
:param options: Custom processing options for overriding global options |
:key bits (int): Length of generated Semantic Text-Code in bits (default 64) |
:key characters (bool): Return document character count (default True). |
:key embedding (bool): Return global document embedding (default False). |
:key precision (int): Max fractional digits for embeddings (default 8). |
:key features (bool): Return granular document features (default False). |
:key offsets (bool): Return character offsets for granular features (default False). |
:key chunks (bool): Return text chunks (default False). |
:key max_tokens (int): Max tokens per chunk (default 127). |
:key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
:key trim (int): Trim whitespace from chunks (default False). |
:return: Dict with ISCC processing results (using Index-Format for granular features) |
""" |
if not text: |
raise ValueError("Input text cannot be empty.") |
opts = sct.sct_opts.override(options) |
result = {"iscc": None} |
if opts.characters: |
result["characters"] = len(text) |
splits = split_text(text, **opts.model_dump()) |
offsets, chunks = [list(item) for item in zip(*splits)] |
with sct.timer("EMBEDDING time"): |
embeddings = embed_chunks(chunks) |
embedding = mean_pooling(embeddings) |
if any([opts.simprints, opts.offsets, opts.sizes, opts.contents, opts.embedding]): |
feature_set = { |
"maintype": "semantic", |
"subtype": "text", |
"version": 0, |
} |
if opts.embedding: |
feature_set["embedding"] = compress(embedding, opts.precision) |
if opts.simprints: |
feature_digests = [binarize(vec)[: opts.bits_granular // 8] for vec in embeddings] |
feature_set["simprints"] = [sct.encode_base64(digest) for digest in feature_digests] |
if opts.offsets: |
feature_set["offsets"] = offsets |
if opts.sizes: |
feature_set["sizes"] = [len(chunk) for chunk in chunks] |
if opts.contents: |
feature_set["contents"] = chunks |
result["features"] = [feature_set] |
length = BIT_LEN_MAP[opts.bits] |
header = int(MAINTYPE + SUBTYPE + SCT_VERSION + length, 2).to_bytes(2, byteorder="big") |
digest = binarize(embedding)[: opts.bits // 8] |
code = sct.encode_base32(header + digest) |
result["iscc"] = "ISCC:" + code |
return result |
def soft_hash_text_semantic(text): |
"""Creates a 256-bit semantic similarity preserving hash for text input.""" |
chunks = [item[1] for item in split_text(text)] |
embeddings = embed_chunks(chunks) |
embedding = mean_pooling(embeddings) |
digest = binarize(embedding) |
return digest |
def split_text(text, **options): |
""" |
Split text into semantically coherent chunks for embedding. |
:param text: Text to split. |
:param options: Custom processing options for overriding global options |
:key max_tokens (int): Max tokens per chunk (default 127). |
:key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
:key trim (int): Trim whitespace from chunks (default False). |
:return: A list of offset, chunk tuples [(offset,chunk), ...] |
""" |
opts = sct.sct_opts.override(options) |
return splitter(**opts.model_dump()).chunk_indices(text) |
@cache |
def tokenizer(): |
""" |
Load and cache the tokenizer model based on the predefined model name. |
:return: An instance of the Tokenizer. |
""" |
with sct.timer("TOKENIZER load time"): |
return Tokenizer.from_file(TOKENIZER_PATH.as_posix()) |
@cache |
def splitter(**options): |
""" |
Load and cache the text splitter, initialized with tokenizer. |
:param options: Custom processing options for overriding global options |
:key max_tokens (int): Max tokens per chunk (default 127). |
:key overlap (int): Max tokens allowed to overlap between chunks (default 48). |
:key trim (int): Trim whitespace from chunks (default False). |
:return: An instance of TextSplitter. |
""" |
opts = sct.sct_opts.override(options) |
with sct.timer("TEXTSPLITTER load time"): |
return TextSplitter.from_huggingface_tokenizer( |
tokenizer(), capacity=opts.max_tokens, overlap=opts.overlap, trim=opts.trim |
) |
@cache |
def model(): |
""" |
Load and cache the ONNX inference model from a specified path. |
:return: An ONNX inference session. |
""" |
available_onnx_providers = rt.get_available_providers() |
log.debug(f"Available ONNX providers {', '.join(available_onnx_providers)}") |
selected_onnx_providers = ["CPUExecutionProvider"] |
if "CUDAExecutionProvider" in available_onnx_providers: |
selected_onnx_providers.insert(0, "CUDAExecutionProvider") |
log.debug(f"Using ONNX providers {', '.join(selected_onnx_providers)}") |
so = rt.SessionOptions() |
so.graph_optimization_level = rt.GraphOptimizationLevel.ORT_ENABLE_ALL |
try: |
with sct.timer("ONNXMODEL load time"): |
return rt.InferenceSession(sct.MODEL_PATH, sess_options=so, providers=selected_onnx_providers) |
except NoSuchFile: |
with sct.timer("ONNXMODEL aquisition/load time"): |
model_path = sct.get_model() |
return rt.InferenceSession(model_path, sess_options=so, providers=selected_onnx_providers) |
def tokenize_chunks(chunks): |
""" |
Tokenize text chunks into model-compatible formats. |
:param chunks: Text chunks to tokenize. |
:return: Dictionary of tokenized data including input IDs, attention masks, and type IDs. |
""" |
encodings = tokenizer().encode_batch(chunks) |
input_ids = np.array([encoding.ids for encoding in encodings], dtype=np.int64) |
attention_mask = np.array([encoding.attention_mask for encoding in encodings], dtype=np.int64) |
type_ids = np.array([encoding.type_ids for encoding in encodings], dtype=np.int64) |
return {"input_ids": input_ids, "attention_mask": attention_mask, "token_type_ids": type_ids} |
def embed_chunks(chunks, batch_size=100): |
""" |
Embed text chunks and return vector embeddings. |
:param chunks: Text chunks to embed. |
:param batch_size: Number of chunks to process in each batch. |
:return: An array of embeddings for each chunk. |
""" |
embeddings = [] |
for start_idx in range(0, len(chunks), batch_size): |
batch_chunks = chunks[start_idx : start_idx + batch_size] |
tokens = tokenize_chunks(batch_chunks) |
token_embeddings = embed_tokens(tokens) |
batch_embeddings = attention_pooling(token_embeddings, tokens["attention_mask"]) |
embeddings.append(batch_embeddings) |
return np.vstack(embeddings) |
def embed_tokens(tokens): |
""" |
Create embeddings from tokenized text chunks using the model. |
:param tokens: Tokenized text data. |
:return: An array of embeddings. |
""" |
result = model().run(None, tokens) |
return np.array(result[0]) |
def attention_pooling(token_embeddings, attention_mask): |
""" |
Apply attention mask based mean pooling to the token embeddings. |
:param token_embeddings: Raw token embeddings from the model. |
:param attention_mask: Attention masks for the embeddings. |
:return: An array of pooled and normalized embeddings. |
""" |
input_mask_expanded = attention_mask[:, :, None].astype(np.float32) |
sum_embeddings = np.sum(token_embeddings * input_mask_expanded, axis=1) |
sum_mask = np.clip(np.sum(input_mask_expanded, axis=1), a_min=1e-9, a_max=None) |
mean_pooled = sum_embeddings / sum_mask |
norm = np.linalg.norm(mean_pooled, ord=2, axis=1, keepdims=True) |
result = mean_pooled / np.clip(norm, a_min=1e-9, a_max=None) |
return result.astype(np.float32) |
def mean_pooling(embeddings): |
""" |
Calculate the document vector from chunk embeddings using mean pooling. |
:param embeddings: Chunk embeddings. |
:return: A normalized document vector. |
""" |
document_vector = embeddings.mean(axis=0) |
return document_vector / np.linalg.norm(document_vector) |
def binarize(vec): |
""" |
Binarize an embedding vector into a hash digest. |
:param vec: Vector to be binarized. |
:return: A bytes object representing the binary hash. |
""" |
return bytes((np.packbits(np.array(vec) >= 0))) |
def compress(vec, precision): |
""" |
Round down vector values to specified precision to reduce storage requirements. |
:param vec: Embedding vector. |
:param precision: Max number of fractional decimal places. |
:return: Vector as native python list of rounded floats. |
""" |
rounded_array = np.around(vec, decimals=precision) |
compress_list = [round(x, precision) for x in rounded_array.tolist()] |
return compress_list |