#This is an example that uses the websockets api to know when a prompt execution is done #Once the prompt execution is done it downloads the images using the /history endpoint import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client) import uuid import json import urllib.request import urllib.parse from PIL import Image import io import random import sys import base64 server_address = "127.0.0.1:8188" client_id = str(uuid.uuid4()) def load_debug_ai_scene_info(): #open ai_scene_info.json with open("D:/Git/ap-canvas-creation-module/04_stable_diffusion/ai_scene_info.json", "r") as f: ai_scene_info = json.load(f) return ai_scene_info def convert_base64_string_to_object(base64_string): bytes = base64.b64decode(base64_string) string = bytes.decode("ascii") return json.loads(string) def set_filename(json_obj, title, new_prefix): for key, value in json_obj.items(): if isinstance(value, dict): if value.get("_meta", {}).get("title") == title: if "inputs" in value and "filename_prefix" in value["inputs"]: value["inputs"]["filename_prefix"] = new_prefix return new_prefix else: result = set_filename(value, title, new_prefix) if result: return result return None def find_node(json_obj, title): for key, value in json_obj.items(): if isinstance(value, dict): if value.get("_meta", {}).get("title") == title: return value else: result = find_node(value, title) if result: return result return None def queue_prompt(prompt): p = {"prompt": prompt, "client_id": client_id} data = json.dumps(p).encode('utf-8') req = urllib.request.Request("http://{}/prompt".format(server_address), data=data) return json.loads(urllib.request.urlopen(req).read()) def get_prompt(ai_scene_info): with open( "D://Git//ap-canvas-creation-module//04_stable_diffusion//workflows//canvas_3d_to_img_standard_V1.json", "r", ) as f: prompt_text_json = f.read() prompt = json.loads(prompt_text_json) #set the text prompt for our positive CLIPTextEncode positive_text = ai_scene_info["ai_scene"]["prompt"]["positive_prompt"] negative_text = ai_scene_info["ai_scene"]["prompt"]["negative_prompt"] image_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Renders//15a314a1-8ba1-4e0e-ad0c-f605b06f89f8//" image_base_path = image_path + "base0001.jpg" image_alpha_products_path = image_path + "alpha_products0001.jpg" # image_depth_path = image_path + "depth0001.png" prompt = json.loads(prompt_text_json) file_name = set_filename(prompt, "Save Image", "{project_id}/basic_api_example".format(project_id=ai_scene_info["project_id"])) ksampler_main = find_node(prompt, "KSampler") ksampler_main["inputs"]["noise_seed"] = random.randint(0, 1000000) ksampler_main = find_node(prompt, "KSampler") ksampler_main["inputs"]["steps"] = ai_scene_info["ai_scene"]["settings"]["steps"] ksampler_main["inputs"]["cfg"] = ai_scene_info["ai_scene"]["settings"]["cfg"] prompt_positive = find_node(prompt, "positive_CLIPTextEncodeSDXL") prompt_positive["inputs"]["text_g"] = positive_text prompt_positive["inputs"]["text_l"] = positive_text prompt_negative = find_node(prompt, "negative_CLIPTextEncodeSDXL") prompt_negative["inputs"]["text_g"] = negative_text prompt_negative["inputs"]["text_l"] = negative_text image_base = find_node(prompt, "image_base") image_base["inputs"]["image"] = image_base_path image_base = find_node(prompt, "image_product_mask") image_base["inputs"]["image"] = image_alpha_products_path image_base = find_node(prompt, "image_depth") # image_base["inputs"]["image"] = image_depth_path return prompt def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} url_values = urllib.parse.urlencode(data) with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response: return response.read() def get_history(prompt_id): with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response: return json.loads(response.read()) def get_images(ws, prompt): prompt_id = queue_prompt(prompt)['prompt_id'] output_images = {} while True: out = ws.recv() if isinstance(out, str): message = json.loads(out) if message['type'] == 'executing': data = message['data'] if data['node'] is None and data['prompt_id'] == prompt_id: break # Execution is done else: continue # previews are binary data history = get_history(prompt_id)[prompt_id] for node_id in history['outputs']: node_output = history['outputs'][node_id] images_output = [] if 'images' in node_output: for image in node_output['images']: image_data = get_image(image['filename'], image['subfolder'], image['type']) images_output.append({ 'filename': image['filename'], 'data': image_data, 'type': image['type'] }) output_images[node_id] = images_output return output_images def main(): argv = sys.argv try: argv = argv[argv.index("--") + 1 :] ai_scene_info = convert_base64_string_to_object(argv[0]) print("loading scene data", ai_scene_info) except Exception as e: print("Error:", e) ai_scene_info = load_debug_ai_scene_info() prompt = get_prompt(ai_scene_info) ws = websocket.WebSocket() ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) images = get_images(ws, prompt) prompt_id = queue_prompt(prompt)['prompt_id'] for node_id in images: for image_info in images[node_id]: if image_info['type'] == 'output': print("Upload image") #Commented out code to display the output images: # for node_id in images: # for image_data in images[node_id]: # image = Image.open(io.BytesIO(image_data)) # image.show() if __name__ == "__main__": main()