File size: 4,666 Bytes
2de3774
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import modules.async_worker as worker

import shared
import glob
from pathlib import Path
import datetime
import re
import json

from PIL import Image

# Copy this file, add suitable code and add logic to modules/pipelines.py to select it

def search_for_words(searchfor, prompt):
    searchfor = searchfor.lower()
    prompt = prompt.lower()
    words = searchfor.split(",")
    result = True
    for word in words:
        word = word.strip()
        if word not in prompt:
            result = False
            break
    return result

def search(search_string, maxresults=10, callback=None):
    images = []
    skip = 0

    folder=shared.path_manager.model_paths["temp_outputs_path"]
    current_time = datetime.datetime.now()
    daystr = current_time.strftime("%Y-%m-%d")

    # Parse search arguments
    searchfor = re.sub(r"search: *", "", search_string, count=1, flags=re.IGNORECASE)

    chomp = True # Do this until we can't chomp off any more options
    while chomp:
        chomp = False

        # Date
        matchstr = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}\s?"
        match = re.match(matchstr, searchfor, re.IGNORECASE)
        if match is not None:
            daystr = match.group().strip()
            searchfor = re.sub(matchstr, "", searchfor)
            chomp = True

        # All Dates
        matchstr = r"^all:\s?"
        match = re.match(matchstr, searchfor, re.IGNORECASE)
        if match is not None:
            daystr = "*"
            searchfor = re.sub(matchstr, "", searchfor)
            chomp = True

        # Skip
        matchstr = r"^skip:\s?(?P<skip>[0-9]+)\s?"
        match = re.match(matchstr, searchfor, re.IGNORECASE)
        if match is not None:
            skip = int(match.group("skip"))
            searchfor = re.sub(matchstr, "", searchfor)
            chomp = True

        # Skip more
        matchstr = r"^\+(?P<skip>[0-9]+)\s?"
        match = re.match(matchstr, searchfor, re.IGNORECASE)
        if match is not None:
            skip += int(match.group("skip"))
            searchfor = re.sub(matchstr, "", searchfor)
            chomp = True

        # Set max
        matchstr = r"^max:\s?(?P<max>[0-9]+)\s?"
        match = re.match(matchstr, searchfor, re.IGNORECASE)
        if match is not None:
            maxresults = int(match.group("max"))
            searchfor = re.sub(matchstr, "", searchfor)
            chomp = True

    searchfor = searchfor.strip()

    # For all folder/daystr/*.(png|gif) ... match metadata
    globs = ["*.png", "*.gif"]
    pngs = set()
    for g in globs:
        for f in glob.glob(str(Path(folder) / daystr / g)):
            pngs.add(f)
    pngs = sorted(pngs)

    found = 0
    for file in pngs:
        im = Image.open(file)
        metadata = {"prompt": ""}
        if im.info.get("parameters"):
            metadata = json.loads(im.info["parameters"])

            # Show image if metadata is missing. (like for gifs)
        if searchfor == "" or "Prompt" not in metadata or search_for_words(searchfor, metadata["Prompt"]):
            # Return finished image to preview
            if callback is not None and found > skip:
                callback(found - skip, 0, 0, maxresults, None) # Returning im here is a bit much...
            images.append(file)
            found += 1
        if found >= (maxresults + skip):
            break

    return images[skip:]


class pipeline:
    pipeline_type = ["search"]

    model_hash = ""

    def parse_gen_data(self, gen_data):
        gen_data["original_image_number"] = gen_data["image_number"]
        gen_data["image_number"] = 1
        gen_data["show_preview"] = False
        return gen_data

    def load_base_model(self, name):
        # We're not doing models here
        return

    def load_keywords(self, lora):
        filename = lora.replace(".safetensors", ".txt")
        try:
            with open(filename, "r") as file:
                data = file.read()
            return data
        except FileNotFoundError:
            return " "

    def load_loras(self, loras):
        return

    def refresh_controlnet(self, name=None):
        return

    def clean_prompt_cond_caches(self):
        return


    def process(
        self,
        gen_data=None,
        callback=None,
    ):
        worker.add_result(
            gen_data["task_id"],
            "preview",
            (-1, f"Searching ...", None)
        )
        maxresults = gen_data["original_image_number"]
        maxresults = 100 if maxresults <= 1 else maxresults # 0 and 1 is 100 matches (max)

        images = search(gen_data["positive_prompt"], maxresults=maxresults, callback=callback)

        return images