|
@@ -0,0 +1,171 @@
|
|
|
+#This is an example that uses the websockets api to know when a prompt execution is done
|
|
|
+#Once the prompt execution is done it downloads the images using the /history endpoint
|
|
|
+
|
|
|
+import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
|
|
|
+import uuid
|
|
|
+import json
|
|
|
+import urllib.request
|
|
|
+import urllib.parse
|
|
|
+from PIL import Image
|
|
|
+import io
|
|
|
+import random
|
|
|
+import sys
|
|
|
+import base64
|
|
|
+
|
|
|
+server_address = "127.0.0.1:8188"
|
|
|
+client_id = str(uuid.uuid4())
|
|
|
+
|
|
|
+def convert_base64_string_to_object(base64_string):
|
|
|
+ bytes = base64.b64decode(base64_string)
|
|
|
+ string = bytes.decode("ascii")
|
|
|
+
|
|
|
+ return json.loads(string)
|
|
|
+
|
|
|
+def set_filename(json_obj, title, new_prefix):
|
|
|
+ for key, value in json_obj.items():
|
|
|
+ if isinstance(value, dict):
|
|
|
+ if value.get("_meta", {}).get("title") == title:
|
|
|
+ if "inputs" in value and "filename_prefix" in value["inputs"]:
|
|
|
+ value["inputs"]["filename_prefix"] = new_prefix
|
|
|
+ else:
|
|
|
+ result = set_filename(value, title, new_prefix)
|
|
|
+ if result:
|
|
|
+ return result
|
|
|
+ return None
|
|
|
+
|
|
|
+def find_node(json_obj, title):
|
|
|
+ for key, value in json_obj.items():
|
|
|
+ if isinstance(value, dict):
|
|
|
+ if value.get("_meta", {}).get("title") == title:
|
|
|
+ return value
|
|
|
+ else:
|
|
|
+ result = find_node(value, title)
|
|
|
+ if result:
|
|
|
+ return result
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def queue_prompt(prompt):
|
|
|
+ p = {"prompt": prompt, "client_id": client_id}
|
|
|
+ data = json.dumps(p).encode('utf-8')
|
|
|
+ req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
|
|
|
+ return json.loads(urllib.request.urlopen(req).read())
|
|
|
+
|
|
|
+def get_prompt(ai_scene_info):
|
|
|
+ with open(
|
|
|
+ "D://Git//ap-canvas-creation-module//04_stable_diffusion//workflows//canvas_3d_to_img_standard_V1.json",
|
|
|
+ "r",
|
|
|
+ ) as f:
|
|
|
+ prompt_text_json = f.read()
|
|
|
+
|
|
|
+ prompt = json.loads(prompt_text_json)
|
|
|
+ #set the text prompt for our positive CLIPTextEncode
|
|
|
+ positive_text = ai_scene_info["ai_scene"]["settings"]["positive_prompt"]
|
|
|
+ negative_text = ai_scene_info["ai_scene"]["settings"]["negative_prompt"]
|
|
|
+
|
|
|
+ image_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Renders//15a314a1-8ba1-4e0e-ad0c-f605b06f89f8//"
|
|
|
+
|
|
|
+ image_base_path = image_path + "base0001.jpg"
|
|
|
+ image_alpha_products_path = image_path + "alpha_products0001.jpg"
|
|
|
+ # image_depth_path = image_path + "depth0001.png"
|
|
|
+
|
|
|
+ prompt = json.loads(prompt_text_json)
|
|
|
+ set_filename(prompt, "Save Image", "custom/basic_api_example")
|
|
|
+
|
|
|
+ ksampler_main = find_node(prompt, "KSampler")
|
|
|
+ ksampler_main["inputs"]["noise_seed"] = random.randint(0, 1000000)
|
|
|
+
|
|
|
+ ksampler_main = find_node(prompt, "KSampler")
|
|
|
+ ksampler_main["inputs"]["steps"] = 30
|
|
|
+
|
|
|
+ prompt_positive = find_node(prompt, "positive_CLIPTextEncodeSDXL")
|
|
|
+ prompt_positive["inputs"]["text_g"] = positive_text
|
|
|
+
|
|
|
+ prompt_positive["inputs"]["text_l"] = positive_text
|
|
|
+
|
|
|
+ prompt_negative = find_node(prompt, "negative_CLIPTextEncodeSDXL")
|
|
|
+ prompt_negative["inputs"]["text_g"] = negative_text
|
|
|
+ prompt_negative["inputs"]["text_l"] = negative_text
|
|
|
+
|
|
|
+ image_base = find_node(prompt, "image_base")
|
|
|
+ image_base["inputs"]["image"] = image_base_path
|
|
|
+
|
|
|
+ image_base = find_node(prompt, "image_product_mask")
|
|
|
+ image_base["inputs"]["image"] = image_alpha_products_path
|
|
|
+
|
|
|
+ image_base = find_node(prompt, "image_depth")
|
|
|
+ # image_base["inputs"]["image"] = image_depth_path
|
|
|
+
|
|
|
+ return prompt
|
|
|
+
|
|
|
+def get_image(filename, subfolder, folder_type):
|
|
|
+ data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
|
|
|
+ url_values = urllib.parse.urlencode(data)
|
|
|
+ with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response:
|
|
|
+ return response.read()
|
|
|
+
|
|
|
+def get_history(prompt_id):
|
|
|
+ with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
|
|
|
+ return json.loads(response.read())
|
|
|
+
|
|
|
+def get_images(ws, prompt):
|
|
|
+ prompt_id = queue_prompt(prompt)['prompt_id']
|
|
|
+ output_images = {}
|
|
|
+ while True:
|
|
|
+ out = ws.recv()
|
|
|
+ if isinstance(out, str):
|
|
|
+ message = json.loads(out)
|
|
|
+ if message['type'] == 'executing':
|
|
|
+ data = message['data']
|
|
|
+ if data['node'] is None and data['prompt_id'] == prompt_id:
|
|
|
+ break #Execution is done
|
|
|
+ else:
|
|
|
+ continue #previews are binary data
|
|
|
+
|
|
|
+ history = get_history(prompt_id)[prompt_id]
|
|
|
+ for node_id in history['outputs']:
|
|
|
+ node_output = history['outputs'][node_id]
|
|
|
+ images_output = []
|
|
|
+ if 'images' in node_output:
|
|
|
+ for image in node_output['images']:
|
|
|
+ image_data = get_image(image['filename'], image['subfolder'], image['type'])
|
|
|
+ images_output.append(image_data)
|
|
|
+ output_images[node_id] = images_output
|
|
|
+
|
|
|
+ return output_images
|
|
|
+
|
|
|
+def main():
|
|
|
+
|
|
|
+ argv = sys.argv
|
|
|
+
|
|
|
+ try:
|
|
|
+
|
|
|
+ argv = argv[argv.index("--") + 1 :]
|
|
|
+
|
|
|
+ ai_scene_info = convert_base64_string_to_object(argv[0])
|
|
|
+
|
|
|
+ print("loading scene data", ai_scene_info)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print("Error:", e)
|
|
|
+
|
|
|
+ prompt = get_prompt(ai_scene_info)
|
|
|
+
|
|
|
+ ws = websocket.WebSocket()
|
|
|
+ ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
|
|
|
+ images = get_images(ws, prompt)
|
|
|
+
|
|
|
+ print("Workflow completed, images are stored in the images variable")
|
|
|
+
|
|
|
+ #Commented out code to display the output images:
|
|
|
+
|
|
|
+ # for node_id in images:
|
|
|
+ # for image_data in images[node_id]:
|
|
|
+
|
|
|
+ # image = Image.open(io.BytesIO(image_data))
|
|
|
+ # image.show()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|
|
|
+
|
|
|
+
|