diff --git a/fastchat/serve/monitor/classify/README.md b/fastchat/serve/monitor/classify/README.md index 259957618..1c75b9b29 100644 --- a/fastchat/serve/monitor/classify/README.md +++ b/fastchat/serve/monitor/classify/README.md @@ -24,7 +24,15 @@ Your label_bench directory should follow the structure: ## How to evaluate your category classifier? -To test your new classifier for a new category, you would have to make sure you created the category child class in `category.py`. Then, to generate classification labels, make the necessary edits in `config.yaml` and run +To test your new classifier for a new category, you would have to make sure you created the category child class in `category.py`. + +We currently support classifiers that can be accessed via OpenAI API, and Hugging Face classifiers. + +If you are using a OpenAI API based classifier, you should create a class inheriting from `CategoryAPI`. You will need to create a `name_tag` attribute for your classifier, and implement the `pre_process` and `post_process` functions. More documentation is available on these functions in `category.py` under the `CategoryAPI` class. + +If you are using a Hugging Face based classifier, you should create a class inheriting from `CategoryHF`. You will need to create a `name_tag` attribute for your classifier, and implement the `pre_process` and `post_process` functions. More documentation is available on these functions in `category.py` under the `CategoryHF` class. + +Then, to generate classification labels, make the necessary edits in `config.yaml` and run ```console python label.py --config config.yaml --testing ``` diff --git a/fastchat/serve/monitor/classify/category.py b/fastchat/serve/monitor/classify/category.py index d21181829..a3166a1e3 100644 --- a/fastchat/serve/monitor/classify/category.py +++ b/fastchat/serve/monitor/classify/category.py @@ -8,32 +8,162 @@ # - if_v0.1 # - if # - score +# - creative_writing_v0.1 +# - creative_writing +# - score +# - refusal_v0.2 +# - refusal + import ast import re +import numpy as np +from collections import defaultdict + +from utils import ( + HuggingFaceClassifier, + chat_completion_openai, + chat_completion_anthropic, +) + + +def create_category(name): + if name == "criteria_v0.1": + return CategoryHardPrompt() + elif name == "if_v0.1": + return CategoryIF() + elif name == "math_v0.1": + return CategoryMath() + elif name == "creative_writing_v0.1": + return CategoryCreativeWriting() + elif name == "refusal_v0.1": + return CategoryRefusalAPI() + elif name == "refusal_v0.2": + return CategoryRefusalHF() + + raise Exception(f"Category name is incorrect: {name}") -class Category: +class CategoryAPI: def __init__(self): + self.batch_size = 1 + self.is_parallel = True + + def get_answer( + self, batch, model_name, max_tokens, temperature, api_dict, api_type + ): + assert len(batch) == 1, "API-based categories must have batch size of 1" + + convs, uids = self.pre_process(batch) + + outputs = [] + + if api_type == "openai": + for conv in convs: + output = chat_completion_openai( + model=model_name, + messages=conv, + temperature=temperature, + max_tokens=max_tokens, + api_dict=api_dict, + ) + outputs.append(output) + + elif api_type == "anthropic": + for conv in convs: + output = chat_completion_anthropic( + model=model_name, + messages=conv, + temperature=temperature, + max_tokens=max_tokens, + api_dict=api_dict, + ) + outputs.append(output) + + return self.post_process(outputs, uids) + + def pre_process(self, row): + """ + Prepares a text to be labeled by LLM through OpenAI API + + Inherited category classifier classes should implement this method. + + Args: + row (pd.Dataframe): row representing single battle to be labeled + + Returns: + to_label (List[List[Dict]]): list of queries, each including system prompt in OpenAI API format: + [ + {"role": "system", "content": "}, + {"role": "user", "content": }, + ... + ] + uid (str): UID to be labeled + """ pass - @staticmethod - def create_category(name): - if name == "criteria_v0.1": - return CategoryHardPrompt() - elif name == "if_v0.1": - return CategoryIF() - elif name == "math_v0.1": - return CategoryMath() - elif name == "creative_writing_v0.1": - return CategoryCreativeWriting() + def post_process(self, judgements, uid): + """ + Processes judgements/outputs of LLM to retrieve final labels + + Inherited category classifier classes should implement this method. - raise Exception(f"Category name is incorrect: {name}") + Args: + judgements (List[str]): text outputs of LLM labeler + uid (str): UID of the battles to be labeled - def post_process(self): + Returns: + output (Dict[str, Dict[str, str]]: Key is battle UID, value is the output associated with that battle (usually a dictionary) + raw_ouput (Dict[str, str]): Key is battle UID, value is the unprocessed LLM output + """ pass -class CategoryHardPrompt(Category): +class CategoryHF: + def __init__(self): + self.batch_size = 1 + self.is_parallel = False + + def get_answer( + self, batch, model_name, max_tokens, temperature, api_dict, api_type + ): + to_label, to_label_uids = self.pre_process(batch) + labels = self.classifier.classify_batch(to_label) + + return self.post_process(labels, to_label_uids) + + def pre_process(self, batch): + """ + Prepares a batch of texts to be labeled by Hugging Face classifier. + + Inherited category classifier classes should implement this method. + + Args: + batch (pd.DataFrame): Each row of the DataFrame represents one battle. + + Returns: + to_label (List[str]): Texts to be labeled by HF classifier + to_label_uids (List[str]): Battle UIDs corresponding to each text to be labeled + """ + pass + + def post_process(labels, to_label_uids): + """ + Processes raw HF labels. + + Inherited category classifier classes should implement this method. + + Args: + labels (List[bool]): labels directly from HF classifier + to_label_uids (List[str]): Battle UIDs corresponding to each string that was labeled + + Returns: + output (Dict[str, Dict[str, str]]: Keys are battle uids, values are the outputs associated with that battle (usually a dictionary) + raw_ouput (Dict[str, str]): Keys is battle UIDs, value is the unprocessed HF model output or None + """ + pass + + +class CategoryHardPrompt(CategoryAPI): def __init__(self): super().__init__() self.name_tag = "criteria_v0.1" @@ -63,17 +193,21 @@ def get_score(self, judgment): else: return [] - def pre_process(self, prompt): + def pre_process(self, row): + prompt = row["prompt"].iloc[0] conv = [{"role": "system", "content": self.sys_prompt}] conv.append({"role": "user", "content": prompt}) - return conv + return [conv], row["uid"].iloc[0] + + def post_process(self, judgments, uid): + raw_output = {uid: judgments[0]} - def post_process(self, judgment): - criteria = self.get_score(judgment=judgment) - return {name: bool(i in criteria) for i, name in self.tags.items()} + criteria = self.get_score(judgment=judgments[0]) + output = {uid: {name: bool(i in criteria) for i, name in self.tags.items()}} + return output, raw_output -class CategoryIF(Category): +class CategoryIF(CategoryAPI): def __init__(self): super().__init__() self.name_tag = "if_v0.1" @@ -91,23 +225,29 @@ def get_score(self, judgment): else: return None - def pre_process(self, prompt): + def pre_process(self, row): + prompt = row["prompt"].iloc[0] args = {"PROMPT": prompt} conv = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": self.prompt_template.format(**args)}, ] - return conv + return [conv], row["uid"].iloc[0] + + def post_process(self, judgments, uid): + raw_output = {uid: judgments[0]} - def post_process(self, judgment): - score = self.get_score(judgment=judgment) - return { - "if": bool(score >= 4) if score else False, - "score": score, + score = self.get_score(judgment=judgments[0]) + output = { + uid: { + "if": bool(score >= 4) if score else False, + "score": score, + } } + return output, raw_output -class CategoryMath(Category): +class CategoryMath(CategoryAPI): def __init__(self): super().__init__() self.name_tag = "math_v0.1" @@ -125,20 +265,25 @@ def get_score(self, judgment): else: return None - def pre_process(self, prompt): + def pre_process(self, row): + prompt = row["prompt"].iloc[0] args = {"PROMPT": prompt} conv = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": self.prompt_template.format(**args)}, ] - return conv + return [conv], row["uid"].iloc[0] - def post_process(self, judgment): - score = self.get_score(judgment=judgment) - return {"math": bool(score == "yes") if score else False} + def post_process(self, judgments, uid): + raw_output = {uid: judgments[0]} + score = self.get_score(judgment=judgments[0]) + output = {uid: {"math": bool(score == "yes") if score else False}} -class CategoryCreativeWriting(Category): + return output, raw_output + + +class CategoryCreativeWriting(CategoryAPI): def __init__(self): super().__init__() self.name_tag = "creative_writing_v0.1" @@ -162,15 +307,158 @@ def get_score(self, judgment): else: return None - def pre_process(self, prompt): + def pre_process(self, row): + prompt = row["prompt"].iloc[0] args = {"PROMPT": prompt} conv = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": self.prompt_template.format(**args)}, ] - return conv + return [conv], row["uid"].iloc[0] - def post_process(self, judgment): - score = self.get_score(judgment=judgment) + def post_process(self, judgments, uid): + raw_output = {uid: judgments[0]} + + score = self.get_score(judgment=judgments[0]) bool_score = bool(score == "yes") if score else False - return {"creative_writing": bool_score, "score": score} + output = {uid: {"creative_writing": bool_score, "score": score}} + + return output, raw_output + + +class CategoryRefusalAPI(CategoryAPI): + def __init__(self): + super().__init__() + self.name_tag = "refusal_v0.1" + self.pattern = re.compile(r"(\w+)<\/classification>") + + # NOTE: pattern for Fireworks JSON mode + # self.pattern = re.compile(r'"classification"\s*:\s*\\?"(\w+)\\?"') + + self.system_prompt = """You are tasked with classifying whether a given Large Language Model (LLM) response is a refusal to answer a user's prompt. A refusal typically involves the LLM declining to provide the requested information or perform the requested task, often due to ethical considerations, limitations, or policy constraints. + +Carefully read and analyze the user query and LLM response above. + +Look for indicators of refusal, or "yes", such as: +1. Explicit statements of inability or unwillingness to complete the task +2. Apologies for not being able to fulfill the request +3. Explanations of why the task cannot be performed +4. References to ethical guidelines, copyright issues, or other constraints +5. Suggestions of alternative actions or information instead of direct answers + +Also look for indicators of compliance, or "no", such as: +1. Attempting to directly address and complete the task +2. Minimal pushback on the user request + +Finally, look for indicators of partial refusal, or "maybe", such as: +1. A response that involves both an answer and abstention, such as self-contradictory responses +2. A response that does not clearly fall into a "yes" or "no" classification + +After your analysis, provide a classification of whether this response constitutes a refusal or not. Your classification should be "yes", "no", or "maybe". Do NOT explain. + +Format your response as follows: + +[yes/no/maybe] +""" + + # NOTE: replace "Format your response..." with below for Fireworks JSON mode. + # Respond in one JSON. + # """ + self.prompt_template = "Here is the user query:\n\n{QUERY}\n\n\nHere is the LLM response to the user:\n\n{RESPONSE}\n" + + def get_score(self, judgment): + matches = self.pattern.findall(judgment.replace("\n", "").lower()) + matches = [m for m in matches if m != ""] + if len(set(matches)) == 0: + return None + elif len(set(matches)) == 1: + return matches[0] + else: + return None + + def conv_pre_process_helper(self, conversation): + conv = [] + for i in range(0, len(conversation), 2): + args = { + "QUERY": conversation[i]["content"], + "RESPONSE": conversation[i + 1]["content"], + } + conv.append(self.prompt_template.format(**args)) + return conv + + def pre_process(self, row): + formatted_queries = [] + + if "conversation_a" in row.columns: + conv_a = self.conv_pre_process_helper(row["conversation_a"].iloc[0]) + formatted_queries.extend(conv_a) + + if "conversation_b" in row.columns: + conv_b = self.conv_pre_process_helper(row["conversation_b"].iloc[0]) + formatted_queries.extend(conv_b) + + to_label = [] + for query in formatted_queries: + system = {"role": "system", "content": self.system_prompt} + user = {"role": "user", "content": query} + to_label.append([system, user]) + + # print(to_label) + return to_label, row["uid"].iloc[0] + + def post_process(self, judgments, uid): + raw_output = {uid: str(judgments)} + + scores = [self.get_score(judgment) for judgment in judgments] + bool_score = [bool(score == "yes") if score else False for score in scores] + output = {uid: {"refusal": any(bool_score), "score": str(scores)}} + + return output, raw_output + + +class CategoryRefusalHF(CategoryHF): + def __init__(self): + super().__init__() + self.name_tag = "refusal_v0.2" + self.prompt_template = "Here is the user query:\n\n{QUERY}\n\n\nHere is the LLM response to the user:\n\n{RESPONSE}\n" + self.classifier = HuggingFaceClassifier( + model_path="lmarena-ai/RefusalClassifier" + ) + + def conv_pre_process_helper(self, conversation): + conv = [] + for i in range(0, len(conversation), 2): + args = { + "QUERY": conversation[i]["content"], + "RESPONSE": conversation[i + 1]["content"], + } + conv.append(self.prompt_template.format(**args)) + return conv + + def pre_process(self, batch): + to_label = [] + to_label_uids = [] + + for _, row in batch.iterrows(): + if "conversation_a" in row.index: + conv_a = self.conv_pre_process_helper(row["conversation_a"]) + to_label.extend(conv_a) + to_label_uids.extend([row["uid"]] * len(conv_a)) + + if "conversation_b" in row.index: + conv_b = self.conv_pre_process_helper(row["conversation_b"]) + to_label.extend(conv_b) + to_label_uids.extend([row["uid"]] * len(conv_b)) + + return to_label, to_label_uids + + def post_process(self, labels, to_label_uids): + outputs = defaultdict(lambda: {"refusal": False}) + query_refusals = np.where(labels)[0] + + for i in query_refusals: + outputs[to_label_uids[i]] = {"refusal": True} + + return outputs, defaultdict( + lambda: None + ) # No raw/testing outputs for HF classifier diff --git a/fastchat/serve/monitor/classify/config.yaml b/fastchat/serve/monitor/classify/config.yaml index 315f0dccc..ff1396067 100644 --- a/fastchat/serve/monitor/classify/config.yaml +++ b/fastchat/serve/monitor/classify/config.yaml @@ -11,16 +11,18 @@ task_name: - if_v0.1 - math_v0.1 - creative_writing_v0.1 + - refusal_v0.2 +api_type: openai model_name: null name: llama-3-70b-instruct endpoints: - api_base: null api_key: null -parallel: 50 +parallel: 64 temperature: 0.0 max_token: 512 max_retry: 2 retry_sleep: 10 -error_output: $ERROR$ \ No newline at end of file +error_output: $ERROR$ diff --git a/fastchat/serve/monitor/classify/display_score.py b/fastchat/serve/monitor/classify/display_score.py index acb8cea12..735e00dff 100644 --- a/fastchat/serve/monitor/classify/display_score.py +++ b/fastchat/serve/monitor/classify/display_score.py @@ -1,6 +1,7 @@ import pandas as pd import argparse import os +from pathlib import Path from glob import glob from sklearn.metrics import recall_score, precision_score @@ -9,6 +10,7 @@ "math_bench": ("math_v0.1", "math"), "hard_bench": ("criteria_v0.1", "hard"), "creative_writing_bench": ("creative_writing_v0.1", "creative_writing"), + "refusal_bench": ("refusal_v0.1", "refusal"), } @@ -39,7 +41,7 @@ recall = recall_score(y_pred=test.pred, y_true=test.label) precision = precision_score(y_pred=test.pred, y_true=test.label) - print(f"Model: {output.model[0]}") + print(f"Classifier: {Path(file).stem}") print(f"Accuracy: {round(accuracy, 3)}") print(f"Precision: {round(precision, 3)}") print(f"Recall: {round(recall, 3)}") diff --git a/fastchat/serve/monitor/classify/label.py b/fastchat/serve/monitor/classify/label.py index 2d0471a1f..ddd91bb5b 100644 --- a/fastchat/serve/monitor/classify/label.py +++ b/fastchat/serve/monitor/classify/label.py @@ -1,28 +1,61 @@ +import os + import argparse import json import pandas as pd -import os -import time +import numpy as np import concurrent.futures -import tqdm import yaml import random import threading import orjson -from category import Category - +from category import create_category +from utils import api_config +from tqdm import tqdm LOCK = threading.RLock() TASKS = None + +""" +CACHE_DICT (dict): Cached labels +- uid (str): UID for the battle that has been cached + - category_tag + - criteria_v0.1 + - specificity + - ... + - math_v0.1 + - math + - if_v0.1 + - if + - score + - creative_writing_v0.1 + - creative_writing + - score + - refusal_v0.2 + - refusal +""" CACHE_DICT = None -OUTPUT_DICT = None -# API setting constants -API_MAX_RETRY = None -API_RETRY_SLEEP = None -API_ERROR_OUTPUT = None +""" +OUTPUT_DICT (dict): Previously outputted labels +- uid (str): UID for the battle that has been cached + - criteria_v0.1 + - specificity + - ... + - math_v0.1 + - math + - if_v0.1 + - if + - score + - creative_writing_v0.1 + - creative_writing + - score + - refusal_v0.2 + - refusal +""" +OUTPUT_DICT = None # load config args from config yaml files @@ -42,101 +75,62 @@ def get_endpoint(endpoint_list): return api_dict -def chat_completion_openai(model, messages, temperature, max_tokens, api_dict=None): - import openai - - if api_dict: - client = openai.OpenAI( - base_url=api_dict["api_base"], - api_key=api_dict["api_key"], - ) - else: - client = openai.OpenAI() - - output = API_ERROR_OUTPUT - for _ in range(API_MAX_RETRY): - try: - # print(messages) - completion = client.chat.completions.create( - model=model, - messages=messages, - temperature=temperature, - max_tokens=max_tokens, - # extra_body={"guided_choice": GUIDED_CHOICES} if GUIDED_CHOICES else None, - ) - output = completion.choices[0].message.content - # print(output) - break - except openai.RateLimitError as e: - print(type(e), e) - time.sleep(API_RETRY_SLEEP) - except openai.BadRequestError as e: - print(messages) - print(type(e), e) - break - except openai.APIConnectionError as e: - print(messages) - print(type(e), e) - time.sleep(API_RETRY_SLEEP) - except openai.InternalServerError as e: - print(messages) - print(type(e), e) - time.sleep(API_RETRY_SLEEP) - except Exception as e: - print(type(e), e) - break - - return output - - def get_answer( - question: dict, + batch: pd.DataFrame, model_name: str, max_tokens: int, temperature: float, answer_file: str, api_dict: dict, - categories: list, + api_type: str, + category: object, testing: bool, ): - if "category_tag" in question: - category_tag = question["category_tag"] - else: - category_tag = {} - - output_log = {} - - for category in categories: - conv = category.pre_process(question["prompt"]) - output = chat_completion_openai( - model=model_name, - messages=conv, - temperature=temperature, - max_tokens=max_tokens, - api_dict=api_dict, - ) - # Dump answers - category_tag[category.name_tag] = category.post_process(output) + uid_to_row = {} + for _, row in batch.iterrows(): + uid = row["uid"] + uid_to_row[uid] = row + + outputs, raw_outputs = category.get_answer( + batch, model_name, max_tokens, temperature, api_dict, api_type + ) + + for uid in uid_to_row: + output = outputs[uid] + line = {"uid": uid, "category_tag": {category.name_tag: output}} if testing: - output_log[category.name_tag] = output + raw_output = raw_outputs[uid] + line["raw_output"] = raw_output + + with LOCK: + with open(answer_file, "a") as fout: + fout.write(json.dumps(line) + "\n") + - question["category_tag"] = category_tag - if testing: - question["output_log"] = output_log +def category_merge_helper(series): + """ + Given a series of dictionaries of category labels for a single battle, merge into one dict - question.drop(["prompt", "uid", "required_tasks"], inplace=True) + Args: + series (pd.Series[Dict[str, Dict]]): series of dictionaries of category labels - with LOCK: - with open(answer_file, "a") as fout: - fout.write(json.dumps(question.to_dict()) + "\n") + Returns: + category_label (Dict[str, Dict]): Dictionary of all labeled categories for one battle + """ + merged = {} + for dct in series: + merged.update(dct) + + # Pandas automatically turns top-level keys into index (not good), so we create a dummy key which we remove later + return {"dummy": merged} def category_merge(row): id = row["uid"] input_category = row["category_tag"] if "category_tag" in row else {} cache_category = CACHE_DICT[id]["category_tag"] if id in CACHE_DICT else {} - output_category = OUTPUT_DICT[id]["category_tag"] if id in OUTPUT_DICT else {} + output_category = OUTPUT_DICT[id] if id in OUTPUT_DICT else {} # tries to fill in missing categories using cache first, then output for name in TASKS: @@ -154,15 +148,19 @@ def find_required_tasks(row): id = row["uid"] input_category = row["category_tag"] if "category_tag" in row else {} cache_category = CACHE_DICT[id]["category_tag"] if id in CACHE_DICT else {} - output_category = OUTPUT_DICT[id]["category_tag"] if id in OUTPUT_DICT else {} - - return [ - name - for name in TASKS - if not ( - name in input_category or name in cache_category or name in output_category - ) - ] + output_category = OUTPUT_DICT[id] if id in OUTPUT_DICT else {} + + return set( + [ + name + for name in TASKS + if not ( + name in input_category + or name in cache_category + or name in output_category + ) + ] + ) if __name__ == "__main__": @@ -178,12 +176,15 @@ def find_required_tasks(row): exit() config = make_config(args.config) + api_config(config) - API_MAX_RETRY = config["max_retry"] - API_RETRY_SLEEP = config["retry_sleep"] - API_ERROR_OUTPUT = config["error_output"] + # Divide categories into parallelized + non-parallel. Non-parallel for HF models - automatically parallelized + categories = [create_category(name) for name in config["task_name"]] + parallel_categories = [category for category in categories if category.is_parallel] + not_parallel_categories = [ + category for category in categories if not category.is_parallel + ] - categories = [Category.create_category(name) for name in config["task_name"]] TASKS = config["task_name"] print( f"Following categories will be labeled:\n{[category.name_tag for category in categories]}" @@ -219,17 +220,18 @@ def find_required_tasks(row): if os.path.isfile(config["output_file"]): print("loading existing output") output_data = pd.read_json(config["output_file"], lines=True) - output_data["uid"] = output_data.question_id.map(str) + output_data.tstamp.map( - str - ) - assert len(output_data) == len(output_data.uid.unique()) - print(f"{len(output_data)}# of existing output just loaded") assert "category_tag" in output_data.columns - output_dict = output_data[["uid", "category_tag"]].set_index("uid") + assert "uid" in output_data.columns + print("finalizing output_dict (should take less than 30 sec)") - OUTPUT_DICT = output_dict.to_dict("index") + OUTPUT_DICT = ( + output_data.groupby("uid")["category_tag"] + .apply(category_merge_helper) + .reset_index(level=1, drop=True) # get rid of dummy key/index + .to_dict() + ) else: OUTPUT_DICT = {} @@ -251,28 +253,59 @@ def find_required_tasks(row): ) not_labeled["prompt"] = not_labeled.prompt.map(lambda x: x[:12500]) - with concurrent.futures.ThreadPoolExecutor( - max_workers=config["parallel"] - ) as executor: - futures = [] - for index, row in tqdm.tqdm(not_labeled.iterrows()): - future = executor.submit( - get_answer, - row, + # Label non-parallel categories + for category in not_parallel_categories: + category_not_labeled = not_labeled[ + not_labeled["required_tasks"].apply(lambda x: category.name_tag in x) + ] + print(f"Labeling {category.name_tag} using HF model.") + for index, batch in tqdm( + category_not_labeled.groupby( + np.arange(len(category_not_labeled)) // category.batch_size + ) + ): + get_answer( + batch, config["model_name"], config["max_token"], config["temperature"], config["output_file"], get_endpoint(config["endpoints"]), - [ - category - for category in categories - if category.name_tag in row["required_tasks"] - ], + config["api_type"], + category, args.testing, ) - futures.append(future) - for future in tqdm.tqdm( + + # Loop over parallel categories + with concurrent.futures.ThreadPoolExecutor( + max_workers=config["parallel"] + ) as executor: + futures = [] + for category in parallel_categories: + category_not_labeled = not_labeled[ + not_labeled["required_tasks"].apply(lambda x: category.name_tag in x) + ] + print(f"Creating batches for {category.name_tag}.") + for index, batch in tqdm( + category_not_labeled.groupby( + np.arange(len(category_not_labeled)) // category.batch_size + ) + ): + future = executor.submit( + get_answer, + batch, + config["model_name"], + config["max_token"], + config["temperature"], + config["output_file"], + get_endpoint(config["endpoints"]), + config["api_type"], + category, + args.testing, + ) + futures.append(future) + print("Processing parallel api calls.") + for future in tqdm( concurrent.futures.as_completed(futures), total=len(futures) ): future.result() @@ -289,13 +322,17 @@ def find_required_tasks(row): assert os.path.isfile(config["output_file"]) print("reading output file...") temp = pd.read_json(config["output_file"], lines=True) - temp["uid"] = temp.question_id.map(str) + temp.tstamp.map(str) - assert len(temp) == len(temp.uid.unique()) assert "category_tag" in temp.columns - output_dict = temp[["uid", "category_tag"]].set_index("uid") + assert "uid" in temp.columns + print("finalizing output_dict (should take less than 30 sec)") - OUTPUT_DICT = output_dict.to_dict("index") + OUTPUT_DICT = ( + temp.groupby("uid")["category_tag"] + .apply(category_merge_helper) + .reset_index(level=1, drop=True) # get rid of dummy key/index + .to_dict() + ) print("begin merging (should take around 1 minute or less on large dataset)") input_data["category_tag"] = input_data.apply(category_merge, axis=1) diff --git a/fastchat/serve/monitor/classify/utils.py b/fastchat/serve/monitor/classify/utils.py new file mode 100644 index 000000000..cf031166d --- /dev/null +++ b/fastchat/serve/monitor/classify/utils.py @@ -0,0 +1,123 @@ +from transformers import pipeline +from pydantic import BaseModel +import torch +import time +import os + + +# API setting constants +API_MAX_RETRY = None +API_RETRY_SLEEP = None +API_ERROR_OUTPUT = None + + +def api_config(config_dict): + global API_MAX_RETRY, API_RETRY_SLEEP, API_ERROR_OUTPUT + API_MAX_RETRY = config_dict["max_retry"] + API_RETRY_SLEEP = config_dict["retry_sleep"] + API_ERROR_OUTPUT = config_dict["error_output"] + + +# Used for Fireworks JSON mode. +class Result(BaseModel): + classifcation: str + + +def chat_completion_openai(model, messages, temperature, max_tokens, api_dict=None): + import openai + + if api_dict: + client = openai.OpenAI( + base_url=api_dict["api_base"], + api_key=api_dict["api_key"], + ) + else: + client = openai.OpenAI() + + output = API_ERROR_OUTPUT + for _ in range(API_MAX_RETRY): + try: + # print(messages) + completion = client.chat.completions.create( + model=model, + # NOTE: If Fireworks JSON mode, include below line. + # response_format={"type": "json_object", "schema": Result.model_json_schema()}, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + # extra_body={"guided_choice": GUIDED_CHOICES} if GUIDED_CHOICES else None, + ) + output = completion.choices[0].message.content + # print(output) + break + except openai.RateLimitError as e: + print(type(e), e) + time.sleep(API_RETRY_SLEEP) + except openai.BadRequestError as e: + print(messages) + print(type(e), e) + break + except openai.APIConnectionError as e: + print(messages) + print(type(e), e) + time.sleep(API_RETRY_SLEEP) + except openai.InternalServerError as e: + print(messages) + print(type(e), e) + time.sleep(API_RETRY_SLEEP) + except Exception as e: + print(type(e), e) + break + + return output + + +def chat_completion_anthropic(model, messages, temperature, max_tokens, api_dict=None): + import anthropic + + if api_dict: + api_key = api_dict["api_key"] + else: + api_key = os.environ["ANTHROPIC_API_KEY"] + + sys_msg = "" + if messages[0]["role"] == "system": + sys_msg = messages[0]["content"] + messages = messages[1:] + + output = API_ERROR_OUTPUT + for _ in range(API_MAX_RETRY): + try: + c = anthropic.Anthropic(api_key=api_key) + response = c.messages.create( + model=model, + messages=messages, + stop_sequences=[anthropic.HUMAN_PROMPT], + max_tokens=max_tokens, + temperature=temperature, + system=sys_msg, + ) + output = response.content[0].text + break + except anthropic.APIError as e: + print(type(e), e) + time.sleep(API_RETRY_SLEEP) + return output + + +class HuggingFaceClassifier: + def __init__(self, model_path, device=None): + print("Loading model and tokenizer...") + self.device = device or torch.device( + "cuda" if torch.cuda.is_available() else "cpu" + ) + self.pipeline = pipeline( + "text-classification", + model=model_path, + tokenizer=model_path, + device=self.device, + ) + + def classify_batch(self, input_texts, batch_size=8): + results = self.pipeline(input_texts, batch_size=batch_size, truncation=True) + return [res["label"] == "LABEL_1" for res in results]