sd_comfy_api_v2.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. #This is an example that uses the websockets api to know when a prompt execution is done
  2. #Once the prompt execution is done it downloads the images using the /history endpoint
  3. import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
  4. import uuid
  5. import json
  6. import urllib.request
  7. import urllib.parse
  8. from PIL import Image
  9. import io
  10. import random
  11. import sys
  12. import base64
  13. import requests
  14. server_address = "127.0.0.1:8188"
  15. client_id = str(uuid.uuid4())
  16. api_path = "https://canvas-api-test.anvil.app/_/api"
  17. image_path = "D:/Temp/ComfyUI_windows_portable_nvidia/ComfyUI_windows_portable/ComfyUI/output/"
  18. def update_ai_image_task_status(row_id, new_status):
  19. # Define the URL for the API endpoint
  20. url = "{}/creation-module/ai-image/update-status".format(api_path)
  21. # Create a JSON payload
  22. payload = {"row_id": row_id, "new_status": new_status}
  23. # Make a POST request to the API endpoint with the JSON payload
  24. response = requests.post(url, json=payload)
  25. # Handle the response
  26. if response.status_code == 200:
  27. print("Status update was successful")
  28. return response.json()
  29. else:
  30. print("Status update failed")
  31. print("Status code:", response.status_code)
  32. print("Response:", response.text)
  33. return None
  34. def get_ai_image_task(row_id):
  35. # Define the URL for the API endpoint
  36. url = "{}/creation-module/ai-image/{}".format(api_path, row_id)
  37. print("Constructed URL:", url) # Print the URL for debugging
  38. # Make a GET request to the API endpoint
  39. response = requests.get(url)
  40. # Handle the response
  41. if response.status_code == 200:
  42. print("Request was successful")
  43. return response.json()
  44. else:
  45. print("Request failed")
  46. print("Status code:", response.status_code)
  47. print("Response:", response.text)
  48. return None
  49. def find_image_and_convert_to_base64(image_path):
  50. with open(image_path, "rb") as image_file:
  51. image_data = image_file.read()
  52. image_base64 = base64.b64encode(image_data).decode("utf-8")
  53. return image_base64
  54. def upload_image_to_anvil(row_id, image_base64):
  55. url = "{}/creation-module/ai-image/upload-preview".format(api_path)
  56. payload = {"row_id": row_id, "image_base64": image_base64}
  57. response = requests.post(url, json=payload)
  58. if response.status_code == 200:
  59. print("Image uploaded successfully")
  60. update_ai_image_task_status(row_id=row_id, new_status=3)
  61. return response.json()
  62. else:
  63. print("Image upload failed")
  64. print("Status code:", response.status_code)
  65. print("Response:", response.text)
  66. return None
  67. def load_debug_ai_scene_info():
  68. #open ai_scene_info.json
  69. with open("D:/Git/ap-canvas-creation-module/04_stable_diffusion/ai_scene_info.json", "r") as f:
  70. ai_scene_info = json.load(f)
  71. return ai_scene_info
  72. def convert_base64_string_to_object(base64_string):
  73. bytes = base64.b64decode(base64_string)
  74. string = bytes.decode("ascii")
  75. return json.loads(string)
  76. def set_filename(json_obj, title, new_prefix):
  77. for key, value in json_obj.items():
  78. if isinstance(value, dict):
  79. if value.get("_meta", {}).get("title") == title:
  80. if "inputs" in value and "filename_prefix" in value["inputs"]:
  81. value["inputs"]["filename_prefix"] = new_prefix
  82. return new_prefix
  83. else:
  84. result = set_filename(value, title, new_prefix)
  85. if result:
  86. return result
  87. return None
  88. def find_node(json_obj, title):
  89. for key, value in json_obj.items():
  90. if isinstance(value, dict):
  91. if value.get("_meta", {}).get("title") == title:
  92. return value
  93. else:
  94. result = find_node(value, title)
  95. if result:
  96. return result
  97. return None
  98. def queue_prompt(prompt):
  99. p = {"prompt": prompt, "client_id": client_id}
  100. data = json.dumps(p).encode('utf-8')
  101. req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
  102. return json.loads(urllib.request.urlopen(req).read())
  103. def get_prompt(ai_scene_info):
  104. with open(
  105. "D://Git//ap-canvas-creation-module//04_stable_diffusion//workflows//canvas_3d_to_img_standard_V1.json",
  106. "r",
  107. ) as f:
  108. prompt_text_json = f.read()
  109. prompt = json.loads(prompt_text_json)
  110. #set the text prompt for our positive CLIPTextEncode
  111. positive_text = ai_scene_info["ai_scene"]["prompt"]["positive_prompt"]
  112. negative_text = ai_scene_info["ai_scene"]["prompt"]["negative_prompt"]
  113. image_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Renders//15a314a1-8ba1-4e0e-ad0c-f605b06f89f8//"
  114. image_base_path = image_path + "base0001.jpg"
  115. image_alpha_products_path = image_path + "alpha_products0001.jpg"
  116. # image_depth_path = image_path + "depth0001.png"
  117. prompt = json.loads(prompt_text_json)
  118. file_name = set_filename(prompt, "Save Image", "{project_id}/basic_api_example".format(project_id=ai_scene_info["project_id"]))
  119. ksampler_main = find_node(prompt, "KSampler")
  120. ksampler_main["inputs"]["noise_seed"] = random.randint(0, 1000000)
  121. ksampler_main = find_node(prompt, "KSampler")
  122. ksampler_main["inputs"]["steps"] = ai_scene_info["ai_scene"]["settings"]["steps"]
  123. ksampler_main["inputs"]["cfg"] = ai_scene_info["ai_scene"]["settings"]["cfg"]
  124. prompt_positive = find_node(prompt, "positive_CLIPTextEncodeSDXL")
  125. prompt_positive["inputs"]["text_g"] = positive_text
  126. prompt_positive["inputs"]["text_l"] = positive_text
  127. prompt_negative = find_node(prompt, "negative_CLIPTextEncodeSDXL")
  128. prompt_negative["inputs"]["text_g"] = negative_text
  129. prompt_negative["inputs"]["text_l"] = negative_text
  130. image_base = find_node(prompt, "image_base")
  131. image_base["inputs"]["image"] = image_base_path
  132. image_base = find_node(prompt, "image_product_mask")
  133. image_base["inputs"]["image"] = image_alpha_products_path
  134. image_base = find_node(prompt, "image_depth")
  135. # image_base["inputs"]["image"] = image_depth_path
  136. return prompt
  137. def get_image(filename, subfolder, folder_type):
  138. data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
  139. url_values = urllib.parse.urlencode(data)
  140. with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response:
  141. return response.read()
  142. def get_history(prompt_id):
  143. with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
  144. return json.loads(response.read())
  145. def get_images(ws, prompt):
  146. prompt_id = queue_prompt(prompt)['prompt_id']
  147. output_images = {}
  148. while True:
  149. out = ws.recv()
  150. if isinstance(out, str):
  151. message = json.loads(out)
  152. if message['type'] == 'executing':
  153. data = message['data']
  154. if data['node'] is None and data['prompt_id'] == prompt_id:
  155. break # Execution is done
  156. else:
  157. continue # previews are binary data
  158. history = get_history(prompt_id)[prompt_id]
  159. for node_id in history['outputs']:
  160. node_output = history['outputs'][node_id]
  161. images_output = []
  162. if 'images' in node_output:
  163. for image in node_output['images']:
  164. image_data = get_image(image['filename'], image['subfolder'], image['type'])
  165. images_output.append({
  166. 'filename': image['filename'],
  167. 'data': image_data,
  168. 'type': image['type']
  169. })
  170. output_images[node_id] = images_output
  171. return output_images
  172. def main(*args):
  173. argv = sys.argv
  174. try:
  175. argv = argv[argv.index("--") + 1 :]
  176. ai_scene_info = convert_base64_string_to_object(argv[0])
  177. row_id = ai_scene_info["image_id"]
  178. print("loading scene data", ai_scene_info)
  179. except Exception as e:
  180. print("Error:", e)
  181. # ai_scene_info = load_debug_ai_scene_info()
  182. prompt = get_prompt(ai_scene_info)
  183. ws = websocket.WebSocket()
  184. ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
  185. update_ai_image_task_status(row_id, 2)
  186. images = get_images(ws, prompt)
  187. for node_id in images:
  188. for image_info in images[node_id]:
  189. if image_info['type'] == 'output':
  190. response = get_ai_image_task(row_id)
  191. data = json.loads(response["data"])
  192. project_id = data["project_id"]
  193. complete_image_path = image_path + "{}/{}".format(project_id,image_info['filename'])
  194. print(complete_image_path)
  195. image_base64 = find_image_and_convert_to_base64(
  196. image_path + "{}/{}".format(project_id,image_info['filename'])
  197. )
  198. upload_image_to_anvil(row_id, image_base64)
  199. if __name__ == "__main__":
  200. main(sys.argv)