2025-06-12 12:13:46 +05:30

239 lines
7.5 KiB
Python

# app.py
import os
import io
import sys
import json
import subprocess
import threading
import platform
import time
import gradio as gr
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from huggingface_hub import login
from dotenv import load_dotenv
# System prompt
system_message = (
"You are an assistant that reimplements Python code in high performance C++ for an arduino. "
"Respond only with C++ code; use comments sparingly and do not provide any explanation other than occasional comments. "
"The C++ response needs to produce an identical output in the fastest possible time. "
"Keep implementations of random number generators identical so that results match exactly."
)
# Helper functions
def user_prompt_for(python):
return (
"Rewrite this Python code in C++ with the fastest possible implementation that produces identical output in the least time. "
"Respond only with C++ code; do not explain your work other than a few comments. "
"Pay attention to number types to ensure no int overflows. Remember to #include all necessary C++ packages such as iomanip.\n\n"
+ python
)
def messages_for(python):
return [
{"role": "system", "content": system_message},
{"role": "user", "content": user_prompt_for(python)},
]
def write_output(cpp):
code = cpp.replace("```cpp", "").replace("```", "")
with open("optimized.cpp", "w") as f:
f.write(code)
def execute_python(code):
try:
output = io.StringIO()
sys.stdout = output
exec(code)
finally:
sys.stdout = sys.__stdout__
return output.getvalue()
def execute_cpp(code):
write_output(code)
compiler_cmd = ["clang++", "-O3", "-std=c++17", "-march=armv8.3-a", "-o", "optimized", "optimized.cpp"]
try:
subprocess.run(compiler_cmd, check=True, text=True, capture_output=True)
run_result = subprocess.run(["./optimized"], check=True, text=True, capture_output=True)
return run_result.stdout
except subprocess.CalledProcessError as e:
return f"An error occurred:\n{e.stderr}"
def run_cmd(command):
try:
result = subprocess.run(command, check=True, text=True, capture_output=True)
return result.stdout.strip() if result.stdout else "SUCCESS"
except:
return ""
simple_cpp = """
#include <iostream>
int main() {
std::cout << "Hello";
return 0;
}
"""
def c_compiler_cmd(filename_base):
my_platform = platform.system()
try:
with open("simple.cpp", "w") as f:
f.write(simple_cpp)
if my_platform == "Linux":
compile_cmd = ["g++", "simple.cpp", "-o", "simple"]
if run_cmd(compile_cmd) == "Hello":
return [my_platform, "GCC", ["g++", f"{filename_base}.cpp", "-o", filename_base]]
elif my_platform == "Darwin":
compile_cmd = ["clang++", "simple.cpp", "-o", "simple"]
if run_cmd(compile_cmd) == "Hello":
return [my_platform, "Clang++", ["clang++", f"{filename_base}.cpp", "-o", filename_base]]
elif my_platform == "Windows":
return [my_platform, "Unsupported", []]
except:
return [my_platform, "Unavailable", []]
return [my_platform, "Unavailable", []]
# HF Model Config
load_dotenv()
hf_token = os.environ["HF_TOKEN"]
login(hf_token)
code_qwen = "Qwen/Qwen3-0.6B"
tokenizer = AutoTokenizer.from_pretrained(code_qwen)
def stream_code_qwen(python):
messages = messages_for(python)
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
model_inputs = tokenizer([text], return_tensors="pt")
model = AutoModelForCausalLM.from_pretrained(code_qwen, device_map="auto", torch_dtype="auto")
model_inputs = model_inputs.to(model.device)
streamer = TextIteratorStreamer(tokenizer, skip_special_tokens=True)
thread = threading.Thread(
target=model.generate,
kwargs=dict(inputs=model_inputs.input_ids, streamer=streamer, max_new_tokens=1024, do_sample=False)
)
thread.start()
output = ""
for token in streamer:
output += token
yield output
def optimize(python, model):
if model == "CodeQwen":
result = ""
for chunk in stream_code_qwen(python):
result = chunk
return result
return "Only CodeQwen is supported."
# Sample Programs
pi = """
import time
def calculate(iterations, param1, param2):
result = 1.0
for i in range(1, iterations+1):
j = i * param1 - param2
result -= (1/j)
j = i * param1 + param2
result += (1/j)
return result
start_time = time.time()
result = calculate(100_000_000, 4, 1) * 4
end_time = time.time()
print(f"Result: {result:.12f}")
print(f"Execution Time: {(end_time - start_time):.6f} seconds")
"""
python_hard = """
def lcg(seed, a=1664525, c=1013904223, m=2**32):
value = seed
while True:
value = (a * value + c) % m
yield value
def max_subarray_sum(n, seed, min_val, max_val):
lcg_gen = lcg(seed)
random_numbers = [next(lcg_gen) % (max_val - min_val + 1) + min_val for _ in range(n)]
max_sum = float('-inf')
for i in range(n):
current_sum = 0
for j in range(i, n):
current_sum += random_numbers[j]
if current_sum > max_sum:
max_sum = current_sum
return max_sum
def total_max_subarray_sum(n, initial_seed, min_val, max_val):
total_sum = 0
lcg_gen = lcg(initial_seed)
for _ in range(20):
seed = next(lcg_gen)
total_sum += max_subarray_sum(n, seed, min_val, max_val)
return total_sum
import time
start_time = time.time()
result = total_max_subarray_sum(10000, 42, -10, 10)
end_time = time.time()
print("Total Maximum Subarray Sum (20 runs):", result)
print("Execution Time: {:.6f} seconds".format(end_time - start_time))
"""
def select_sample_program(sample_program):
return pi if sample_program == "pi" else python_hard
# Gradio UI
compiler_cmd = c_compiler_cmd("optimized")
css = """
.python {background-color: #306998;}
.cpp {background-color: #050;}
"""
with gr.Blocks(css=css) as ui:
gr.Markdown("## Convert Python Code to High-Performance C++")
with gr.Row():
python = gr.Textbox(label="Python code", value=python_hard, lines=12)
cpp = gr.Textbox(label="Generated C++ code", lines=12)
with gr.Row():
with gr.Column():
sample_program = gr.Radio(["pi", "python_hard"], label="Sample program", value="python_hard")
model = gr.Dropdown(["CodeQwen"], label="Select model", value="CodeQwen")
with gr.Column():
architecture = gr.Radio([compiler_cmd[0]], label="Architecture", interactive=False)
compiler = gr.Radio([compiler_cmd[1]], label="Compiler", interactive=False)
with gr.Row():
convert = gr.Button("Convert to C++")
with gr.Row():
python_run = gr.Button("Run Python")
cpp_run = gr.Button("Run C++", interactive=(compiler_cmd[1] != "Unavailable"))
with gr.Row():
python_out = gr.TextArea(label="Python output", elem_classes=["python"])
cpp_out = gr.TextArea(label="C++ output", elem_classes=["cpp"])
sample_program.change(select_sample_program, inputs=[sample_program], outputs=[python])
convert.click(optimize, inputs=[python, model], outputs=[cpp])
python_run.click(execute_python, inputs=[python], outputs=[python_out])
cpp_run.click(execute_cpp, inputs=[cpp], outputs=[cpp_out])
ui.launch(inbrowser=True)