#This is an example that uses the websockets api to know when a prompt execution is done #Once the prompt execution is done it downloads the images using the /history endpoint import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client) import uuid import json import urllib.request import urllib.parse from PIL import Image import io import random import sys import base64 import requests server_address = "127.0.0.1:8188" client_id = str(uuid.uuid4()) api_path = "https://canvas-api-test.anvil.app/_/api" image_path = "D:/Temp/ComfyUI_windows_portable_nvidia/ComfyUI_windows_portable/ComfyUI/output/" def update_ai_image_task_status(row_id, new_status): # Define the URL for the API endpoint url = "{}/tasks/ai-image/update-status".format(api_path) # Create a JSON payload payload = {"row_id": row_id, "new_status": new_status} # Make a POST request to the API endpoint with the JSON payload response = requests.post(url, json=payload) # Handle the response if response.status_code == 200: print("Status update was successful") return response.json() else: print("Status update failed") print("Status code:", response.status_code) print("Response:", response.text) return None def get_ai_image_task(row_id): # Define the URL for the API endpoint url = "{}/tasks/ai-image/{}".format(api_path, row_id) print("Constructed URL:", url) # Print the URL for debugging # Make a GET request to the API endpoint response = requests.get(url) # Handle the response if response.status_code == 200: print("Request was successful") return response.json() else: print("Request failed") print("Status code:", response.status_code) print("Response:", response.text) return None def find_image_and_convert_to_base64(image_path): with open(image_path, "rb") as image_file: image_data = image_file.read() image_base64 = base64.b64encode(image_data).decode("utf-8") return image_base64 def upload_image_to_anvil(row_id, image_base64): url = "{}/test-3".format(api_path) payload = {"row_id": row_id, "image_base64": image_base64} response = requests.post(url, json=payload) if response.status_code == 200: print("Image uploaded successfully") update_ai_image_task_status(row_id=row_id, new_status=3) return response.json() else: print("Image upload failed") print("Status code:", response.status_code) print("Response:", response.text) return None def load_debug_ai_scene_info(): #open ai_scene_info.json with open("D:/Git/ap-canvas-creation-module/04_stable_diffusion/ai_scene_info.json", "r") as f: ai_scene_info = json.load(f) return ai_scene_info def convert_base64_string_to_object(base64_string): bytes = base64.b64decode(base64_string) string = bytes.decode("ascii") return json.loads(string) def set_filename(json_obj, title, new_prefix): for key, value in json_obj.items(): if isinstance(value, dict): if value.get("_meta", {}).get("title") == title: if "inputs" in value and "filename_prefix" in value["inputs"]: value["inputs"]["filename_prefix"] = new_prefix return new_prefix else: result = set_filename(value, title, new_prefix) if result: return result return None def find_node(json_obj, title): for key, value in json_obj.items(): if isinstance(value, dict): if value.get("_meta", {}).get("title") == title: return value else: result = find_node(value, title) if result: return result return None def queue_prompt(prompt): p = {"prompt": prompt, "client_id": client_id} data = json.dumps(p).encode('utf-8') req = urllib.request.Request("http://{}/prompt".format(server_address), data=data) return json.loads(urllib.request.urlopen(req).read()) def get_prompt(ai_scene_info): with open( "D://Git//ap-canvas-creation-module//04_stable_diffusion//workflows//canvas_3d_to_img_standard_V1.json", "r", ) as f: prompt_text_json = f.read() prompt = json.loads(prompt_text_json) #set the text prompt for our positive CLIPTextEncode positive_text = ai_scene_info["ai_scene"]["prompt"]["positive_prompt"] negative_text = ai_scene_info["ai_scene"]["prompt"]["negative_prompt"] image_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Renders//15a314a1-8ba1-4e0e-ad0c-f605b06f89f8//" image_base_path = image_path + "base0001.jpg" image_alpha_products_path = image_path + "alpha_products0001.jpg" # image_depth_path = image_path + "depth0001.png" prompt = json.loads(prompt_text_json) file_name = set_filename(prompt, "Save Image", "{project_id}/basic_api_example".format(project_id=ai_scene_info["project_id"])) ksampler_main = find_node(prompt, "KSampler") ksampler_main["inputs"]["noise_seed"] = random.randint(0, 1000000) ksampler_main = find_node(prompt, "KSampler") ksampler_main["inputs"]["steps"] = ai_scene_info["ai_scene"]["settings"]["steps"] ksampler_main["inputs"]["cfg"] = ai_scene_info["ai_scene"]["settings"]["cfg"] prompt_positive = find_node(prompt, "positive_CLIPTextEncodeSDXL") prompt_positive["inputs"]["text_g"] = positive_text prompt_positive["inputs"]["text_l"] = positive_text prompt_negative = find_node(prompt, "negative_CLIPTextEncodeSDXL") prompt_negative["inputs"]["text_g"] = negative_text prompt_negative["inputs"]["text_l"] = negative_text image_base = find_node(prompt, "image_base") image_base["inputs"]["image"] = image_base_path image_base = find_node(prompt, "image_product_mask") image_base["inputs"]["image"] = image_alpha_products_path image_base = find_node(prompt, "image_depth") # image_base["inputs"]["image"] = image_depth_path return prompt def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} url_values = urllib.parse.urlencode(data) with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response: return response.read() def get_history(prompt_id): with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response: return json.loads(response.read()) def get_images(ws, prompt): prompt_id = queue_prompt(prompt)['prompt_id'] output_images = {} while True: out = ws.recv() if isinstance(out, str): message = json.loads(out) if message['type'] == 'executing': data = message['data'] if data['node'] is None and data['prompt_id'] == prompt_id: break # Execution is done else: continue # previews are binary data history = get_history(prompt_id)[prompt_id] for node_id in history['outputs']: node_output = history['outputs'][node_id] images_output = [] if 'images' in node_output: for image in node_output['images']: image_data = get_image(image['filename'], image['subfolder'], image['type']) images_output.append({ 'filename': image['filename'], 'data': image_data, 'type': image['type'] }) output_images[node_id] = images_output return output_images def main(*args): argv = sys.argv try: argv = argv[argv.index("--") + 1 :] ai_scene_info = convert_base64_string_to_object(argv[0]) row_id = ai_scene_info["image_id"] print("loading scene data", ai_scene_info) except Exception as e: print("Error:", e) # ai_scene_info = load_debug_ai_scene_info() prompt = get_prompt(ai_scene_info) ws = websocket.WebSocket() ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) update_ai_image_task_status(row_id, 2) images = get_images(ws, prompt) for node_id in images: for image_info in images[node_id]: if image_info['type'] == 'output': response = get_ai_image_task(row_id) data = json.loads(response["data"]) project_id = data["project_id"] complete_image_path = image_path + "{}/{}".format(project_id,image_info['filename']) print(complete_image_path) image_base64 = find_image_and_convert_to_base64( image_path + "{}/{}".format(project_id,image_info['filename']) ) upload_image_to_anvil(row_id, image_base64) if __name__ == "__main__": main(sys.argv)