I want gradio to show output from file in real-time

I’m creating with gradio, a programs that connects to switches via api and can do various operations. The code is working fine, but I would like that the output text inbox to be ‘fed’ in real-time, Meaning, I don’t want to see the results when the whole loop is done. I want each time there is a new line in the destination files to see their output in the output text area.

import gradio as gr
import ipaddress
import requests
from requests.auth import HTTPBasicAuth


###SWITCH###
def switch_ver(ip):
    ip_addr = ip.split()
    for i in ip_addr:
        ip_addr = list(ipaddress.ip_network(i))
        try:
            basic=HTTPBasicAuth('some','password')
            login = requests.post('http://'+i+':80/rest/v7/login-sessions', auth=basic)
            get_ver = requests.get('http://'+i+':80/rest/v7/system/status')
            get_ver = get_ver.json()
            get_ver = get_ver['firmware_version']
            get_ver = get_ver
            with open('switches_good_results.txt', 'a+') as sw:
                results = 'Switch version for {} is: {} \n'.format(i, get_ver)
                sw.write(results)
            #return 'Switch version for {} is: {} \n'.format(i, get_ver)
                
        except requests.exceptions.ConnectTimeout:
            timeout = 'Could not connect to switch: '+i+' REQUEST TIMED OUT\n'
            with open('switches_bad_results.txt', 'a+') as sw:
                sw.write(timeout)

    with open('switches_good_results.txt','r') as switches_bad, open('switches_bad_results.txt', 'r') as switches_good:
       summary = switches_bad.read() + switches_good.read()


    return (summary),['switches_good_results.txt', 'switches_bad_results.txt']

            
        
###IPBlockerK###
def block_ip(ip):
    ip_addr = ip.split()
    for i in ip_addr:
        ip_addr = list(ipaddress.ip_network(i))
        with open('fortigate.txt', 'a+') as f:
            f.write('ping ' + i + '\n')
            if i in f.read():
                return i + ' ' ' is a duplicate!'

    with open('fortigate.txt', 'r') as f:
       return f.read()


with gr.Blocks(title = 'Switcher') as switches_ver:
    gr.Markdown('Welcome to IPBlocker')
    with gr.Tab(label = 'IPBlocker'):
        with gr.Row():
            ips_to_block = gr.Textbox(label = " ", lines = 10, PlaceHolder=('Please fill Ips to block'))
    with gr.Tab(label = 'Switcher'):
        with gr.Row():   
            with gr.Column():
                switch_box = gr.Textbox(label = 'Switches', lines = 10, placeholder='Please fill switches IPs...')
                show_ver = gr.Button('Show current switches version')
                upgrade_ver = gr.Button('Upgrade selected switches')
            output_textbox = gr.Textbox(label='Results',lines = 10)
        with gr.Column():
            output_file = gr.File(['switches_good_results.txt', 'switches_bad_results.txt'])
            show_ver.click(fn=switch_ver, inputs = switch_box, outputs = [output_textbox, output_file])

switches_ver.launch()

Thank you!

1 Like

In streamlit I would add a timer to repeat the action when triggerred for a duration then use caching on a parameter to see if the requests download of HTML from a url had changed from last variable.

In gradio, try the live feature which works great for ASR in a loop. I think if you put a function in there and iterate until you click stop it should work the same way.

Below are two examples that do similar. Let me know if you solve yours since an event sensor behind a URL would be useful to see for when we want to poll data externally yet also want to minimize repeat request calls.


use st.cache. or memo or singleton depending if you want repeat calls to resolve to returning cached value which could reduce the re-entrant redundant requests across users…

near real-time / live feed simulation

for seconds in range(200):
df[“age_new”] = df[“age”] * np.random.choice(range(1, 5))
df[“balance_new”] = df[“balance”] * np.random.choice(range(1, 5))

st.markdown(“### Detailed Data View”)
st.dataframe(df)
time.sleep(1)

import gradio as gr
gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(source=“microphone”, type=“numpy”),
“state”
],
outputs= [
“text”,
“state”
],
live=True).launch()

Third way I almost forgot. Have the file land in github. Add github actions script to autopush to HF when file changes (example here: GitHub - AaronCWacker/2-MLOpsNLPASR-g). Then you have an auto CI/CD flow. You’d need to include your HF token as a secret over on your github repo and add the action sccript, but then each time it has an update pushed into your external github, the action script sends across your change to HF. Which then will auto rebuild.

Here is my main.yml file that does this for my auto MLOPS example I demonstrate for open source updates to HF from external github:

name: Sync to Hugging Face hub
on:
push:
branches: [main]

to run this workflow manually from the Actions tab

workflow_dispatch:

jobs:
sync-to-hub:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Push to hub
env:
HF_TOKEN: ${{ secrets.HF_TOKEN }}
run: git push --force https://awacke1:[email protected]/spaces/awacke1/2-MLOpsNLPASR-g main

Hi @LittleWing !

Thank you for posting the question and thanks @awacke1 for jumping in.

@LittleWing If I understand your use case correctly, you can use iterative outputs with the yield keyword as described in the docs here

So it would be something like

def switch_ver(ip):
    ip_addr = ip.split()
    for i in ip_addr:
        good_results = []
        bad_results = []
        ip_addr = list(ipaddress.ip_network(i))
        try:
            basic=HTTPBasicAuth('some','password')
            login = requests.post('http://'+i+':80/rest/v7/login-sessions', auth=basic)
            get_ver = requests.get('http://'+i+':80/rest/v7/system/status')
            get_ver = get_ver.json()
            get_ver = get_ver['firmware_version']
            get_ver = get_ver
            results = 'Switch version for {} is: {} \n'.format(i, get_ver)
                good_results.append(results)
                yield results
                
        except requests.exceptions.ConnectTimeout:
            timeout = 'Could not connect to switch: '+i+' REQUEST TIMED OUT\n'
            bad_results.append(timeout)
            yield timeout
    with open('switches_bad_results.txt', 'a+') as sw:
                sw.writelines(bad_results)
    with open('switches_good_results.txt', 'a+') as sw:
                sw.writelines(good_results)
     yield good_results + bad_results, ['switches_good_results.txt', 'switches_bad_results.txt']
1 Like

Thank you all for your time!!
Really appreciate it.
I tried yield but I got some generator error, I’ll check again,
Thank you very much for your effort to help me very appreciative!
:slight_smile:
@freddyaboulton @awacke1

Hi,

This is what I’m getting when I tried to return with yield:
raise ValueError(“Need to enable queue to use generators.”)

What am I doing wrong?

Thank you

Hi @LittleWing ! You need to enable the queue first:

demo.queue(concurrency_count=5, max_size=20).launch()

You can change the values of concurrency_count and max_size does are just examples.

1 Like

Hi @freddyaboulton
Thank you for the quick reply!
I will update the script
Thanks!

Hi @freddyaboulton

Thanks for the reply.

I changed the script according to your reply, no error messages.

UPDATE
I moved the yield under the except and now I’m getting this error:
with open(src, ‘rb’) as fsrc:
FileNotFoundError: [Errno 2] No such file or directory: ‘o’

But, I still cannot see realtime results,

for example, I’m trying to connect to some switches that doesn’t exist, thus I’m getting time out (which is ok)
because the module try to reconnect to the switches it takes around 20 seconds for every switch to be written to the file, if I have 3 switches for example that get time out, I’m getting the three results at once after a minute.

Can I make it appear after 20 second (for every switch) on the output?

updated code:

> def switch_ver(ip):
>     with open('switches_successful_results.txt','w') as switches_successful, open('switches_failed_results.txt', 'w') as switches_failed:
>         ip_addr = ip.split()
>         for i in ip_addr:
>             ip_addr = list(ipaddress.ip_network(i))
>             try:
>                 basic=HTTPBasicAuth('some','password')
>                 login = requests.post('http://'+i+':80/rest/v7/login-sessions', auth=basic)
>                 cookie = login.cookies
>                 get_ver = requests.get('http://'+i+':80/rest/v7/system/status', cookies=cookie)
>                 get_ver = get_ver.json()
>                 get_ver = get_ver['firmware_version']
>                 get_ver = get_ver
>                 with open('switches_successful_results.txt', 'a+') as sw:
>                     results = 'Switch version for {} is: {} \n'.format(i, get_ver)
>                     sw.write(results)
>                                             
>             except requests.exceptions.ConnectTimeout:
>                 timeout = 'Could not connect to switch: '+i+' REQUEST TIMED OUT\n'
>                 with open('switches_failed_results.txt', 'a+') as sw:
>                     sw.write(timeout)
>                 yield timeout
> 
>     with open('switches_successful_results.txt','r') as switches_successful, open('switches_failed_results.txt', 'r') as switches_failed:
>         summary = switches_failed.read() + switches_successful.read()
> 
> 
>     yield (summary),['switches_successful_results.txt', 'switches_failed_results.txt']

Thank you.

1 Like

Cool example. You might be able to detect the changes to the file with file watcher. Streamlit for example will reload when one of its files changes. That is done with file watcher. Good article here: A Simple Python File Watcher | Towards Data Science

I’ve been curious on same sort of real time auto update caused by new data (like kafka stream pattern or like file watcher).

I will try your method and queue later since that looks promising.

Here is the snippet that does reload. I’ve seen example that import the text file data or add a watch to it like its another code module. I’m curious how that works from docker or kube.


with open(os.path.dirname(__file__) + os.sep + 'refresh.py', 'r') as f:    \
    exec(compile(f.read().replace('__BASE__',                              \
        os.path.basename(__file__).replace('.py', '')).replace('__FILE__', \
            __file__), __file__, 'exec'))
def refresh(filepath = __file__, _globals = None, _locals = None):
    print("Reading {}...".format(filepath))
    if _globals is None:
        _globals = globals()
    _globals.update({
        "__file__": filepath,
        "__name__": "__main__",
    })
    with open(filepath, 'rb') as file:
        exec(compile(file.read(), filepath, 'exec'), _globals, _locals)

def refresh___BASE__():
    refresh("__FILE__")

you can also doa self modify code block to do it which is ugly code but useful nonetheless:

def updateCount():
    fin = open(__file__, 'r')
    code = fin.read()
    fin.close()

    second_line = code.split('\n')[1]
    second_line_parts = second_line.split(' ')
    second_line_parts[2] = str(int(second_line_parts[2])+1)

    second_line = ' '.join(second_line_parts)
    lines = code.split('\n')
    lines[1] = second_line
    code = '\n'.join(lines)

    fout = open(__file__, 'w')
    fout.write(code)
    fout.close()

Thank you guys for the support!, you are truly helpful to me (and way more advanced!) :slight_smile:
@awacke1 @freddyaboulton

Apologies for jumping in an older topic, but this answer (generators) got me so close to a solution that I hoped to piggy-back on it.

From everything I’ve read so far, I think the Gradio generators are the best option to stream tokens from a LLM to the Gradio Chatbot? I can see that the Chatbot can be connected to a queue, I just don’t know how you’d deal with the tokens showing up in the last message of a chat conversation?

Am I on the right track? And do you have any tips on how to use the queue with the Gradio Chatbot?