Module ragability.ragability_check

Module to check responses against target facts and assign scores. This creates a result file with the scores which can then be used to calculate summary statistics in various ways.

Functions

def check_check(check: dict, example: dict, config: dict) ‑> bool
Expand source code
def check_check(check: dict, example: dict, config: dict) -> bool:
    """
    This returns True if the check is correct, False if it can be skipped or raises an exception if the error cannot be
    skipped.
    """
    # make sure the func field is present and that the func field is a string
    # now if the func is not LLM, we can use the function directly, otherwise we need to query the LLM
    if "func" not in check:
        logger.warning(f"Warning: Missing 'func' field in check in example {example['qid']}")
        return False
    if "metrics" not in check:
        logger.warning(f"Warning: Missing 'metrics' field in check in example {example['qid']}")
        return False
    if not isinstance(check["func"], str):
        logger.warning(f"Warning: 'func' field in check must be a string in example {example['qid']}")
        return False
    # make sure the function is in the CHECKS dictionary
    if check["func"] not in CHECKS:
        logger.warning(f"Warning: Check function {check['func']} not in CHECKS in example {example['qid']}")
        return False
    func = CHECKS[check["func"]]
    # check if the number of parameters defined with "parms" matches the number of parameters required by the function
    nargs = func["nargs"]
    args = check.get("args", [])
    if nargs != len(args):
        logger.warning(f"Warning: Wrong number of positional arguments in check for function {func['func']} in example {example['qid']}: {len(args)} instead of {nargs}")
        return False
    if not config['all'] and "result" in check:
        logger.debug(f"Skipping check {check['query']} with result")
        return False
    return True

This returns True if the check is correct, False if it can be skipped or raises an exception if the error cannot be skipped.

def get_args()
Expand source code
def get_args():
    """
    Get the command line arguments
    """
    parser = argparse.ArgumentParser(description='Check responses against target facts and assign scores')
    parser.add_argument('--input', '-i', type=str, help='Input file with the responses from ragability_query (or from config), jsonl, json, yaml', required=False)
    parser.add_argument('--output', '-o', type=str, help='Output file with the checking results (default: $DATETIME.out.jsonl), jsonl, json, yaml', required=False)
    parser.add_argument("--config", "-c", type=str, help="Config file with the LLM and other info, json, jsonl, yaml", required=False)
    parser.add_argument('--usellm', '-u', type=str, help='The alias of the configured LLM to use (use first one found)', required=False)
    parser.add_argument("--promptfile", "-pf", type=str, help="File with the prompt to use for the checking queries (or use config), jsonl, json, yaml", required=False)
    parser.add_argument("--all", "-a", action="store_true", help="Run all queries, even if they have a response", required=False)
    parser.add_argument("--logfile", "-f", type=str, help="Log file", required=False)
    parser.add_argument("--dry-run", "-n", action="store_true", help="Dry run, do not actually run the queries", required=False)
    parser.add_argument("--verbose", "-v", action="store_true", help="Be more verbose and inform what is happening", required=False)
    parser.add_argument("--debug", "-d", action="store_true", help="Debug mode", required=False)
    args_tmp = parser.parse_args()
    tmp = {}
    tmp.update(vars(args_tmp))
    args: dict = tmp

    # if a config file is specified, read the config file using our config reading function and update the arguments.
    # The config data may contain:
    # - input: used only if not specified in the command line arguments
    # - output: used only if not specified in the command line arguments
    # - llm: added to the ones specified in the command line arguments
    # - prompt: used to add config info to the llms specified in the command line arguments
    if args["config"]:
        config = read_config_file(args["config"])
        config.update(args)
        args = config
    if not args["input"]:
        print("Error: Missing input file")
        parser.print_help()
        sys.exit(1)
    update_llm_config(args)
    # read the prompt file into memory, add prompts to the "prompts" key in the config, raise an error if the
    # prompt id is already in the config
    if args["promptfile"]:
        prompts = read_prompt_file(args["promptfile"])   # this is a list of dicts with key "pid" containing the id
        if "prompts" not in args:
            args["prompts"] = []
        for prompt in prompts:
            if prompt["pid"] in args["prompts"]:
                raise ValueError(f"Error: Prompt id {prompt['pid']} already in config")
            args["prompts"].append(prompt)
    # create a "prompts_dict" key in the config which is a dict mapping the prompt id to the prompt dict
    if args.get("prompts") is None:
        args["prompts"] = []
    args["prompts_dict"] = {prompt["pid"]: prompt for prompt in args.get("prompts", [])}
    return args

Get the command line arguments

def main()
Expand source code
def main():
    args = get_args()
    if args["logfile"]:
        add_logging_file(args["logfile"])
    if args["debug"]:
        set_logging_level(DEBUG)
        ppargs = pp_config(args)
        logger.debug(f"Effective arguments: {ppargs}")
    run(args)
def run(config: dict)
Expand source code
def run(config: dict):
    # check the configuration: for checkking, we want exactly one LLM to be configured and we want
    # to have a single prompt or no promot configured. If no prompt is configured, a default prompt will be used.
    if len(config["llms"]) < 1:
        raise ValueError(f"Error: at least one LLM must be configured")
    # if usellm is configured, we want to use the LLM with that alias, otherwise we use the first in the list
    llmname = ""
    if config["usellm"]:
        for llm in config["llm"]:
            if llm["alias"] == config["usellm"]:
                thellmname = llm["alias"]
                break
        if not llmname:
            raise ValueError(f"Error: LLM with alias {config['usellm']} not found")
    else:
        llmname = config["llms"][0]["alias"]
    if len(config["prompts"]) == 0:
        theprompt = DEFAULT_PROMPT
        logger.warning(f"Warning: No prompt configured, using default prompt")
    # read the input file into memory, we do not expect it to be too large and we want to check the format
    # of all json lines
    inputs = read_input_file(config["input"])
    logger.info(f"Loaded {len(inputs)} queries from {config['input']}")
    logger.info(f"LLM to use: {llmname}")
    logger.info(f"Prompts found: {len(config['prompts_dict'])}")

    # initialize the LLMS object with the configuration
    llms = LLMS(config)
    llm: LLM = llms[llmname]

    if not config['output']:
        config['output'] = f"{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}.checked.hsjon"
    # write either a jsonl or json file, depending on the file extension
    if not config['output'].endswith(".json") and not config['output'].endswith(".jsonl") and not config[
        'output'].endswith(".hjson"):
        print(f"Error: Output file must end in .json, .jsonl or .hjson, not {config['output']}")
    n_errors = 0
    n_outputs = 0
    with open(config['output'], 'w') as f:
        if config['output'].endswith(".json") or config['output'].endswith(".hjson"):
            f.write("[\n")
        for example in inputs:
            # check if the example has checks at all, give a warning if not
            if not "checks" in example or len(example["checks"]) == 0:
                logger.warning(f"Warning: No checks in example {example['qid']}")
                continue
            # if the example has an error, we cannot check it, so we skip it
            if example.get("error"):
                logger.warning(f"Skipping example {example['qid']} with error: {example['error']}")
                continue
            # now go through each of the checks: if we already have a check result, skip unless the --all option is given
            # if the function is LLM, we need to run the function on the result of querying the LLM, otherwise
            # we directly run the function on the response from the query stage
            for check in example["checks"]:
                run_check(check, llm, example, config, debug=config["debug"])
                if check.get("error"):
                    logger.warning(f"Error in check {check['query']}: {check['error']}")
                    n_errors += 1
            # write the example to the output file
            towrite = example
            n_outputs += 1
            if config['output'].endswith(".json"):
                f.write(json.dumps(towrite, indent=2) + "\n")
            elif config['output'].endswith(".hjson"):
                f.write(hjson.dumps(towrite, indent=2) + "\n")
            else:
                f.write(json.dumps(towrite) + "\n")
        if config['output'].endswith(".json") or config['output'].endswith(".hjson"):
            f.write("]\n")
    logger.info(f"Wrote {n_outputs} examples to {config['output']}, {n_errors} errors")
def run_check(check, llm: llms_wrapper.llms.LLM, example, config, debug=False)
Expand source code
def run_check(check, llm: LLM, example, config, debug=False):
    llmname = llm["alias"]
    cid = check.get("cid", "NOID")
    # check the check
    if not check_check(check, example, config):
        logger.debug(f"Skipping check in example {example['qid']}")
        return

    # if there is a query in the check, invoke the checker LLM and use the response from the checker
    # as the response to check. If there is no query, use the response from the example as the response to check
    response = None   # this will hold the string to check
    if "query" in check and check["query"] is not None:
        query = check["query"]
        check_for = check.get("check_for")
        # get the prompt id from the check, if there is none, use the default prompt, otherwise use the prompt
        # with that id in the config. If a pid is specified which is not present, this is an error
        if "pid" in check:
            if check["pid"] in config["prompts_dict"]:
                theprompt = config["prompts_dict"][check["pid"]].copy()
            else:
                logger.warning(f"Error: Prompt id {check['pid']} not found for example {example['qid']}")
                logger.debug(f"Have prompt ids {config['prompts_dict'].keys()}")
                check["error"] = f"Prompt id {check['pid']} not found"
                check["result"] = None
                return
        else:
            theprompt = DEFAULT_PROMPT.copy()
        for role, text in theprompt.items():
            text = text.replace("${query}", query)
            text = text.replace("${answer}", example["response"])
            if check_for:
                text = text.replace("${check_for}", check_for)
            theprompt[role] = text
        # check if we have a dry run, if yes, just log what we would do, otherwise query the LLM
        messages = llm.make_messages(prompt=theprompt)
        if config['dry_run']:
            logger.info(f"Would query checker-LLM {llmname} with messages: {messages}")
            response = ""
            error = "NOT RUN: DRY-RUN"
            return
        if config['verbose']:
            logger.info(f"Querying checker-LLM {llmname} for example {example['qid']} and check {cid}")
        ret = llm.query(messages=messages, return_cost=True, debug=config['debug'])
        response = ret.get("answer", "")
        check["response"] = response
        error = ret.get("error", "")
        # if we had an error with the checker LLM, log it and return, we cannot check the response
        if error:
            logger.warning(f"Error from checking LLM, cannot check: {error}")
            check["error"] = error
            check["result"] = None
            return
    else:
        response = example["response"]
    func_config = CHECKS[check["func"]]
    func = func_config["func"]
    nargs = func_config["nargs"]
    args = check.get("args", [])
    assert len(args) == nargs, f"Error: Wrong number of positional arguments in check for function {func['func']}: {len(args)} instead of {nargs}"
    kwargs = check.get("kwargs", {})
    try:
        result = func(response, *args, **kwargs)
        error = ""
    except Exception as e:
        logger.error(f"Error in check function {func}: {e}")
        result = None
        error = f"Error in check function {func}: {e}"
    check["result"] = result
    check["error"] = error