提问者:小点点

迁移到Kubeflow管道v2时的json. decder.JSONDecodeError


从这里复制:https://github.com/kubeflow/pipelines/issues/7608

我有一个针对Kubeflow运行的生成代码文件。它在Kubeflow v1上运行良好,现在我将其移动到Kubeflow v2。当我这样做时,我收到以下错误:json. decder.JSONDecodeError:期望用双引号括起来的属性名称:第1行第2列(char 1)

老实说,我甚至不知道下一步该去哪里。感觉有些东西在第一个字符中失败了,但我看不到它(它在kubeflow执行中)。

谢谢!

>

  • 您是如何部署Kubeflow管道(KFP)的?标准部署到AWS

    KFP版本:1.8.1

    KFPSDK版本:1.8.12

    这是日志:

    time="2022-04-26T17:38:09.547Z" level=info msg="capturing logs" argo=true
    WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: 
    https://pip.pypa.io/warnings/venv
    [KFP Executor 2022-04-26 17:38:24,691 INFO]: Looking for component `run_info_fn` in --component_module_path `/tmp/tmp.NJW6PWXpIt/ephemeral_component.py`
    [KFP Executor 2022-04-26 17:38:24,691 INFO]: Loading KFP component "run_info_fn" from /tmp/tmp.NJW6PWXpIt/ephemeral_component.py (directory "/tmp/tmp.NJW6PWXpIt" and module name "ephemeral_component")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main
        "__main__", mod_spec)
      File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
        exec(code, run_globals)
      File "/usr/local/lib/python3.7/site-packages/kfp/v2/components/executor_main.py", line 104, in <module>
        executor_main()
      File "/usr/local/lib/python3.7/site-packages/kfp/v2/components/executor_main.py", line 94, in executor_main
        executor_input = json.loads(args.executor_input)
      File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
        return _default_decoder.decode(s)
      File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/local/lib/python3.7/json/decoder.py", line 353, in raw_decode
        obj, end = self.scan_once(s, idx)
    json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)
    time="2022-04-26T17:38:24.803Z" level=error msg="cannot save artifact /tmp/outputs/run_info/data" argo=true error="stat /tmp/outputs/run_info/data: no such file or directory"
    Error: exit status 1
    

    这是要重现的文件:root_pipeline_04d99580c84b47c28405a2c8bcae8703.py

    import kfp.v2.components
    from kfp.v2.dsl import InputPath
    from kubernetes.client.models import V1EnvVar
    from kubernetes import client, config
    from typing import NamedTuple
    from base64 import b64encode
    import kfp.v2.dsl as dsl
    import kubernetes
    import json
    import kfp
    
    from run_info import run_info_fn
    from same_step_000_ce6494722c474dd3b8bef482bb976557 import same_step_000_ce6494722c474dd3b8bef482bb976557_fn
    
    
    run_info_comp = kfp.v2.dsl.component(
        func=run_info_fn,
        packages_to_install=[
            "kfp",
            "dill",
        ],
    )
    
    same_step_000_ce6494722c474dd3b8bef482bb976557_comp = kfp.v2.dsl.component(
        func=same_step_000_ce6494722c474dd3b8bef482bb976557_fn,
        base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/codeserver-python:v1.5.0",
        packages_to_install=[
            "dill",
            "requests",
             # TODO: make this a loop
        ],
    )
    
    @kfp.dsl.pipeline(name="root_pipeline_compilation",)
    def root(
        context: str='', metadata_url: str='',
    ):
        # Generate secrets (if not already created)
        secrets_by_env = {}
    
        env_vars = {
        }
    
        run_info = run_info_comp(run_id=kfp.dsl.RUN_ID_PLACEHOLDER)
    
    
        same_step_000_ce6494722c474dd3b8bef482bb976557 = same_step_000_ce6494722c474dd3b8bef482bb976557_comp(
            input_context_path="",
            run_info=run_info.outputs["run_info"],
            metadata_url=metadata_url
        )
    
        same_step_000_ce6494722c474dd3b8bef482bb976557.execution_options.caching_strategy.max_cache_staleness = "P0D"
        for k in env_vars:
            same_step_000_ce6494722c474dd3b8bef482bb976557.add_env_variable(V1EnvVar(name=k, value=env_vars[k]))
    
    

    run_info.py

    """
    The run_info component fetches metadata about the current pipeline execution
    from kubeflow and passes it on to the user code step components.
    """
    from typing import NamedTuple
    
    
    def run_info_fn(
        run_id: str,
    ) -> NamedTuple("RunInfoOutput", [("run_info", str),]):
        from base64 import urlsafe_b64encode
        from collections import namedtuple
        import datetime
        import base64
        import dill
        import kfp
    
        client = kfp.Client(host="http://ml-pipeline:8888")
        run_info = client.get_run(run_id=run_id)
    
        run_info_dict = {
            "run_id": run_info.run.id,
            "name": run_info.run.name,
            "created_at": run_info.run.created_at.isoformat(),
            "pipeline_id": run_info.run.pipeline_spec.pipeline_id,
        }
    
        # Track kubernetes resources associated wth the run.
        for r in run_info.run.resource_references:
            run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id
    
        # Base64-encoded as value is visible in kubeflow ui.
        output = urlsafe_b64encode(dill.dumps(run_info_dict))
    
        return namedtuple("RunInfoOutput", ["run_info"])(
            str(output, encoding="ascii")
        )
    

    same_step_000_ce6494722c474dd3b8bef482bb976557.py

    import kfp
    from kfp.v2.dsl import component, Artifact, Input, InputPath, Output, OutputPath, Dataset, Model
    from typing import NamedTuple
    
    
    def same_step_000_ce6494722c474dd3b8bef482bb976557_fn(
        input_context_path: InputPath(str),
        output_context_path: OutputPath(str),
        run_info: str = "gAR9lC4=",
        metadata_url: str = "",
    ):
        from base64 import urlsafe_b64encode, urlsafe_b64decode
        from pathlib import Path
        import datetime
        import requests
        import tempfile
        import dill
        import os
    
        input_context = None
        with Path(input_context_path).open("rb") as reader:
            input_context = reader.read()
    
        # Helper function for posting metadata to mlflow.
        def post_metadata(json):
            if metadata_url == "":
                return
    
            try:
                req = requests.post(metadata_url, json=json)
                req.raise_for_status()
            except requests.exceptions.HTTPError as err:
                print(f"Error posting metadata: {err}")
    
        # Move to writable directory as user might want to do file IO.
        # TODO: won't persist across steps, might need support in SDK?
        os.chdir(tempfile.mkdtemp())
    
        # Load information about the current experiment run:
        run_info = dill.loads(urlsafe_b64decode(run_info))
    
        # Post session context to mlflow.
        if len(input_context) > 0:
            input_context_str = urlsafe_b64encode(input_context)
            post_metadata(
                {
                    "experiment_id": run_info["experiment_id"],
                    "run_id": run_info["run_id"],
                    "step_id": "same_step_000",
                    "metadata_type": "input",
                    "metadata_value": input_context_str,
                    "metadata_time": datetime.datetime.now().isoformat(),
                }
            )
    
        # User code for step, which we run in its own execution frame.
        user_code = f"""
    import dill
    
    # Load session context into global namespace:
    if { len(input_context) } > 0:
        dill.load_session("{ input_context_path }")
    
    {dill.loads(urlsafe_b64decode("gASVGAAAAAAAAACMFHByaW50KCJIZWxsbyB3b3JsZCIplC4="))}
    
    # Remove anything from the global namespace that cannot be serialised.
    # TODO: this will include things like pandas dataframes, needs sdk support?
    _bad_keys = []
    _all_keys = list(globals().keys())
    for k in _all_keys:
        try:
            dill.dumps(globals()[k])
        except TypeError:
            _bad_keys.append(k)
    
    for k in _bad_keys:
        del globals()[k]
    
    # Save new session context to disk for the next component:
    dill.dump_session("{output_context_path}")
    """
    
        # Runs the user code in a new execution frame. Context from the previous
        # component in the run is loaded into the session dynamically, and we run
        # with a single globals() namespace to simulate top-level execution.
        exec(user_code, globals(), globals())
    
        # Post new session context to mlflow:
        with Path(output_context_path).open("rb") as reader:
            context = urlsafe_b64encode(reader.read())
            post_metadata(
                {
                    "experiment_id": run_info["experiment_id"],
                    "run_id": run_info["run_id"],
                    "step_id": "same_step_000",
                    "metadata_type": "output",
                    "metadata_value": context,
                    "metadata_time": datetime.datetime.now().isoformat(),
                }
            )
    

    要执行以运行的Python文件:

    from sameproject.ops import helpers
    from pathlib import Path
    import importlib
    import kfp
    
    
    def deploy(compiled_path: Path, root_module_name: str):
        with helpers.add_path(str(compiled_path)):
            kfp_client = kfp.Client()  # only supporting 'kubeflow' namespace
            root_module = importlib.import_module(root_module_name)
    
            return kfp_client.create_run_from_pipeline_func(
                root_module.root,
                arguments={},
            )
    

  • 共1个答案

    匿名用户

    事实证明,这与没有打开正确的执行模式进行编译有关。

    如果您得到这个,您的代码应该如下所示。

    Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(pipeline_func=root_module.root, package_path=str(package_yaml_path))
    
    

    相关问题