File size: 3,029 Bytes
46dd24e
 
 
 
 
 
80c0c76
a5ead65
 
46dd24e
80c0c76
 
46dd24e
a5ead65
 
 
 
 
 
 
 
46dd24e
 
 
 
a8d670e
 
 
80c0c76
a8d670e
46dd24e
 
a5ead65
 
 
 
 
 
46dd24e
a5ead65
46dd24e
a8d670e
46dd24e
a5ead65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46dd24e
 
 
a5ead65
11dd0a0
a5ead65
 
11dd0a0
a5ead65
 
 
 
 
46dd24e
a5ead65
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""A simple script to run a Flow that can be used for development and debugging."""

import os

import hydra

import aiflows
from aiflows.backends.api_info import ApiInfo
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys

from aiflows import logging
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache

from aiflows.utils import serve_utils
from aiflows.workers import run_dispatch_worker_thread
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.utils.colink_utils import start_colink_server
from aiflows.workers import run_dispatch_worker_thread

CACHING_PARAMETERS.do_caching = False  # Set to True in order to disable caching
# clear_cache() # Uncomment this line to clear the cache

logging.set_verbosity_debug()

dependencies = [
    {"url": "aiflows/LCToolFlowModule", "revision": os.getcwd()},
]
from aiflows import flow_verse
flow_verse.sync_dependencies(dependencies)

if __name__ == "__main__":
   
    #1. ~~~~~ Set up a colink server ~~~~
    FLOW_MODULES_PATH = "./"
    
    cl = start_colink_server()
    

    #2. ~~~~~Load flow config~~~~~~
    root_dir = "."
    cfg_path = os.path.join(root_dir, "demo.yaml")
    cfg = read_yaml_file(cfg_path)
    
    #3. ~~~~ Serve The Flow ~~~~
    serve_utils.serve_flow(
        cl = cl,
        flow_type="LCToolFlowModule",
        default_config=cfg,
        default_state=None,
        default_dispatch_point="coflows_dispatch"
    )
    
    #4. ~~~~~Start A Worker Thread~~~~~
    run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)

    #5. ~~~~~Mount the flow and get its proxy~~~~~~
    proxy_flow = serve_utils.recursive_mount(
        cl=cl,
        client_id="local",
        flow_type="LCToolFlowModule",
        config_overrides=None,
        initial_state=None,
        dispatch_point_override=None,
    )   
    

    #6. ~~~ Get the data ~~~
    data = {"id": 0, "query": "Obama's first name?"}  # Add your data here 
    # data = {"id": 0, "question": "Who was the NBA champion in 2023?"}  # This can be a list of samples
    
    #option1: use the FlowMessage class
    input_message = FlowMessage(
        data=data,
    )

    #option2: use the proxy_flow
    #input_message = proxy_flow.package_input_message(data = data)
    
    #7. ~~~ Run inference ~~~
    future = proxy_flow.get_reply_future(input_message)
    
    #uncomment this line if you would like to get the full message back
    #reply_message = future.get_message()
    reply_data = future.get_data()
    
    # ~~~ Print the output ~~~
    print("~~~~~~Reply~~~~~~")
    print(reply_data)
    
    
    #8. ~~~~ (Optional) apply output interface on reply ~~~~
    # output_interface = KeyInterface(
    #     keys_to_rename={"api_output": "answer"},
    # )
    # print("Output: ", output_interface(reply_data))
    
    
    #9. ~~~~~Optional: Unserve Flow~~~~~~
    # serve_utils.delete_served_flow(cl, "ReverseNumberAtomicFlow_served")