通过 Python 脚本批量导出 InvenTree 库位标签 PDF

这个自动库位导出脚本可以导出大量库位,并可选择将它们合并为一个多页 PDF。这比从 InvenTree Web 界面逐个导出要快得多。

我将它与自定义的 62mm Brother 模板以及 InvenTree 兼容 Brother-QL 的 62x27mm 库位标签模板BrotherQLLabelPrintService 配合使用,后者支持将多页 PDF 直接通过 Brother QL 系列驱动打印。

export_inventree_stock_locations.py
#!/usr/bin/env python3
"""从 InvenTree 下载并合并 PDF 库位标签。

本脚本通过 REST API 连接到 InvenTree 实例,检索库位
(可按名称模式和/或父库位过滤),使用 InvenTree 标签模板
并行生成 PDF 标签,并将结果合并为单个 PDF 文件。

配置从同目录下的 ``config.yaml`` 读取:

    inventree:
      server: https://inventree.example.com
      token: your-api-token-here

依赖:
    - Python 3.8+
    - requests
    - PyYAML
    - pypdf

用法示例:

    # 为所有库位生成标签,合并为一个 PDF
    python3 export_location_labels.py

    # 按名称过滤:glob 模式(含 *)或子串(不含 *)
    python3 export_location_labels.py -q "Schublade A*"
    python3 export_location_labels.py -q "Schublade"

    # 按父库位过滤(名称或数字 pk)
    python3 export_location_labels.py -p Apothekerschrank
    python3 export_location_labels.py -p 9

    # 组合两种过滤
    python3 export_location_labels.py -q "Schublade A*" -p Apothekerschrank

    # 按名称排除特定库位(glob 或子串,规则同 -q)
    python3 export_location_labels.py -e "Test*" -e "Illerbeuren"

    # 仅包含特定库位(覆盖排除和 -q)
    python3 export_location_labels.py -i "Schublade A1" -i "Schublade B2"

    # 包含 + 排除组合:包含优先
    python3 export_location_labels.py -i "Schublade*" -e "Schublade C*"

    # 选择特定标签模板(-t 简写)
    python3 export_location_labels.py -t "Lagerort Groß 62mm"

    # 自定义合并 PDF 的输出路径
    python3 export_location_labels.py -o ./my_labels.pdf

    # 将各个 PDF 写入目录而非合并
    python3 export_location_labels.py --individual -o ./my_labels_dir/

    # 自动命名输出:从搜索词派生文件名
    # "Schublade A*" -> Schublade_A.pdf(合并)或 Schublade_A/(单独)
    python3 export_location_labels.py -q "Schublade A*"

    # 调整并行度和超时
    python3 export_location_labels.py --workers 16 --timeout 120

过滤:
    所有名称匹配都会规范化空白:库位名称和
    查询/排除/包含模式中的任何空白字符序列
    (空格、制表符、换行等)在比较前都会折叠为单个空格。

    - **名称过滤**(``-q``):如果查询字符串包含 ``*``,则
      视为 glob 模式(例如 ``"Schublade A*"`` 匹配
      ``Schublade A1``、``Schublade A10`` 等)。如果没有 ``*``,
      则使用大小写不敏感的子串匹配。

    - **父库位过滤**(``-p``):过滤父库位匹配
      给定值的库位。该值可以是父库位名称
      (例如 ``Apothekerschrank``)或其数字主键(例如 ``9``)。

    - **排除**(``-e``):排除匹配给定模式的库位。
      可多次指定。glob/子串规则同 ``-q``。
      只要匹配*任意*排除模式,该库位就会被排除。

    - **包含**(``-i``):仅包含匹配给定模式的库位。
      可多次指定。glob/子串规则同 ``-q``。
      只要匹配*任意*包含模式,该库位就会被包含。
      **包含覆盖 ``-q`` 和 ``-e``**:当指定了
      ``-i`` 时,查询过滤会被忽略,且匹配包含模式的
      已排除库位仍会被包含。

输出:
    默认情况下,所有生成的标签 PDF 会使用 ``pypdf`` 合并为
    单个 PDF 文件。当指定 ``--individual`` 时,每个标签会
    作为单独的 PDF 文件写入目录。

    输出路径按以下方式确定:

    1. 如果指定了 ``-o``,则直接使用(合并模式下为文件路径,
       单独模式下为目录路径)。
    2. 如果未指定 ``-o``,则自动派生名称:
       - 从 ``-q`` 搜索词中去除 glob 字符(``*``、``?``)
         并将空格替换为下划线,例如
         ``"Schublade A*"`` -> ``Schublade_A.pdf``。
       - 如果没有 ``-q`` 但有 ``-i``,则从第一个包含模式
         派生(同样的去除规则)。
       - 如果都没有,则使用 ``StockLocationLabels.pdf``(合并)或
         ``StockLocationLabels/``(单独)。

工作原理:
    1. 从 InvenTree API 获取所有库位(分页)。
    2. 应用可选的名称、父库位、包含和排除过滤。
    3. 获取可用的 ``stocklocation`` 标签模板。
    4. 通过 ``POST /api/label/print/`` 并行为每个匹配的库位
       提交标签打印任务。
    5. 轮询 ``GET /api/data-output/<pk>/`` 直到每个任务完成。
    6. 从输出路径下载生成的 PDF。
    7. 使用 ``pypdf`` 将所有单独的 PDF 合并为一个(或者如果
       指定了 ``--individual`` 则单独写入)。
"""

import argparse
import fnmatch
import io
import re
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

import requests
import yaml
from pypdf import PdfWriter, PdfReader

CONFIG_PATH = Path(__file__).parent / "config.yaml"


def load_config():
    with open(CONFIG_PATH, "r") as f:
        return yaml.safe_load(f)["inventree"]


class InvenTreeAPI:
    def __init__(self, server, token):
        self.server = server.rstrip("/")
        self.token = token
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Token {token}",
        })

    def get(self, path, params=None):
        r = self.session.get(f"{self.server}{path}", params=params)
        r.raise_for_status()
        return r

    def post(self, path, data=None, json=None):
        r = self.session.post(f"{self.server}{path}", data=data, json=json)
        if r.status_code == 400:
            print(f"  错误 400:{r.text}")
        r.raise_for_status()
        return r


def get_all_locations(api):
    """通过分页 API 调用获取所有库位。"""
    locations = []
    offset = 0
    while True:
        r = api.get("/api/stock/location/", params={
            "limit": 100, "offset": offset,
        })
        data = r.json()
        locations.extend(data["results"])
        if not data["next"]:
            break
        offset += 100
    return locations


def get_location_templates(api):
    """获取所有已启用的库位标签模板。"""
    templates = []
    offset = 0
    while True:
        r = api.get("/api/label/template/", params={
            "limit": 100, "offset": offset,
            "model_type": "stocklocation", "enabled": True,
        })
        data = r.json()
        templates.extend(data["results"])
        if not data["next"]:
            break
        offset += 100
    return templates


def print_and_download_label(api, template_pk, item_pks, timeout=60):
    """提交标签打印任务,轮询直到完成,下载 PDF。

    以字节形式返回 PDF 内容。
    """
    r = api.post("/api/label/print/", json={
        "template": template_pk,
        "items": item_pks,
    })
    result = r.json()
    output_pk = result["pk"]

    deadline = time.time() + timeout
    while time.time() < deadline:
        r = api.get(f"/api/data-output/{output_pk}/")
        data = r.json()
        if data.get("complete"):
            output_path = data.get("output")
            if not output_path:
                raise RuntimeError(
                    f"标签输出 {output_pk} 已完成但没有输出路径"
                )
            pdf_url = f"{api.server}{output_path}"
            pr = api.session.get(pdf_url)
            pr.raise_for_status()
            return pr.content
        time.sleep(0.5)

    raise TimeoutError(
        f"标签输出 {output_pk}{timeout}s 内未完成"
    )


def sanitize_filename(name):
    """使字符串可安全用作文件名。"""
    for ch in r'<>:"/\\|?*':
        name = name.replace(ch, "_")
    return name.strip()


def normalize_ws(s):
    """将字符串中所有空白序列折叠为单个空格。"""
    return re.sub(r"\s+", " ", s).strip()


def name_matches(name, pattern):
    """检查库位名称是否匹配某个模式。

    如果模式包含 * 则使用 glob,否则使用大小写不敏感的子串匹配。
    比较前双方都会进行空白规范化。
    """
    name = normalize_ws(name)
    pattern = normalize_ws(pattern)
    if "*" in pattern:
        return fnmatch.fnmatch(name, pattern)
    return pattern.lower() in name.lower()


def filter_locations(locations, query, parent, includes, excludes):
    """按名称模式、父库位、包含/排除列表过滤库位。

    - query:glob 模式(含 *)或大小写不敏感的子串
    - parent:父库位名称或 pk(整数字符串)
    - includes:模式列表;如果非空,仅保留匹配的库位
      (覆盖 query 和 excludes)
    - excludes:模式列表;匹配的库位会被移除
    """
    if includes:
        filtered = [l for l in locations if any(name_matches(l.get("name", ""), p) for p in includes)]
    else:
        filtered = locations

        if query:
            filtered = [l for l in filtered if name_matches(l.get("name", ""), query)]

    if parent:
        parent_pk = None
        if parent.isdigit():
            parent_pk = int(parent)
        else:
            for l in locations:
                if normalize_ws(l.get("name", "")) == normalize_ws(parent):
                    parent_pk = l["pk"]
                    break
            if parent_pk is None:
                print(f"错误:未找到父库位 '{parent}'")
                sys.exit(1)
        filtered = [l for l in filtered if l.get("parent") == parent_pk]

    if excludes and not includes:
        filtered = [l for l in filtered if not any(name_matches(l.get("name", ""), p) for p in excludes)]
    elif excludes and includes:
        filtered = [l for l in filtered if not any(name_matches(l.get("name", ""), p) for p in excludes) or any(name_matches(l.get("name", ""), p) for p in includes)]

    return filtered


def main():
    parser = argparse.ArgumentParser(
        description="从 InvenTree 下载并合并 PDF 库位标签"
    )
    parser.add_argument(
        "-q", "--query", default=None,
        help="按名称过滤库位(含 * 为 glob,否则为子串)",
    )
    parser.add_argument(
        "-p", "--parent", default=None,
        help="按父库位过滤(名称或数字 pk)",
    )
    parser.add_argument(
        "-e", "--exclude", action="append", default=[],
        help="排除匹配此模式的库位(glob 或子串)。可多次指定。",
    )
    parser.add_argument(
        "-i", "--include", action="append", default=[],
        help="仅包含匹配此模式的库位(glob 或子串)。覆盖 -q 和 -e。可多次指定。",
    )
    parser.add_argument(
        "-t", "--template", default=None,
        help="要使用的标签模板名称(默认:第一个可用模板)",
    )
    parser.add_argument(
        "-o", "--output", default=None,
        help="输出路径:PDF 文件(合并模式)或目录(单独模式)。"
             "如果未指定,则从搜索词或包含模式自动派生。",
    )
    parser.add_argument(
        "--individual", action="store_true",
        help="将各个 PDF 写入目录而非合并为一个 PDF",
    )
    parser.add_argument(
        "--workers", type=int, default=8,
        help="并行打印任务数(默认:8)",
    )
    parser.add_argument(
        "--timeout", type=int, default=60,
        help="每个标签任务的超时秒数(默认:60)",
    )
    args = parser.parse_args()

    config = load_config()
    api = InvenTreeAPI(config["server"], config["token"])

    # --- 获取标签模板 ---
    templates = get_location_templates(api)
    if not templates:
        print("错误:未找到 model_type 'stocklocation' 的已启用标签模板")
        sys.exit(1)

    print(f"找到 {len(templates)} 个库位标签模板:")
    for t in templates:
        print(f"  - {t['name']}(pk={t['pk']}{t['width']}x{t['height']}mm)")

    selected = None
    if args.template:
        for t in templates:
            if t["name"] == args.template:
                selected = t
                break
        if not selected:
            print(f"错误:未找到模板 '{args.template}'")
            sys.exit(1)
    else:
        selected = templates[0]
    print(f"\n使用模板:{selected['name']}(pk={selected['pk']})")

    # --- 获取并过滤库位 ---
    locations = get_all_locations(api)
    print(f"共找到 {len(locations)} 个库位")

    locations = filter_locations(locations, args.query, args.parent, args.include, args.exclude)
    print(f"过滤后剩余:{len(locations)} 个库位")

    if not locations:
        print("没有库位匹配过滤条件。")
        return

    for loc in locations:
        print(f"  {loc['name']}(pk={loc['pk']})")

    # --- 并行打印标签 ---
    print(f"\n正在并行生成 {len(locations)} 个标签"
          f"({args.workers} 个 worker)...")

    results = {}  # pk -> (name, pdf_bytes or None)
    errors = {}

    def _print_one(loc):
        name = loc.get("name", f"location_{loc['pk']}")
        pk = loc["pk"]
        try:
            pdf = print_and_download_label(
                api, selected["pk"], [pk], timeout=args.timeout
            )
            return pk, name, pdf, None
        except Exception as e:
            return pk, name, None, str(e)

    with ThreadPoolExecutor(max_workers=args.workers) as pool:
        futures = {pool.submit(_print_one, loc): loc for loc in locations}
        for fut in as_completed(futures):
            pk, name, pdf, err = fut.result()
            if err:
                print(f"  失败:{name}(pk={pk}):{err}")
                errors[pk] = err
            else:
                print(f"  成功:{name}(pk={pk}{len(pdf)} 字节)")
                results[pk] = (name, pdf)

    if not results:
        print("\n错误:没有成功生成任何标签。")
        sys.exit(1)

    # --- 确定输出路径 ---
    def derive_name():
        """从查询或第一个包含模式自动派生输出名称。"""
        source = None
        if args.query:
            source = args.query
        elif args.include:
            source = args.include[0]
        if source:
            # 去除 glob 字符,规范化空白,将空格替换为 _
            cleaned = re.sub(r"[*?]", "", source)
            cleaned = normalize_ws(cleaned).replace(" ", "_")
            cleaned = sanitize_filename(cleaned)
            return cleaned if cleaned else "StockLocationLabels"
        return "StockLocationLabels"

    if args.output:
        output_path = Path(args.output)
    else:
        base_name = derive_name()
        if args.individual:
            output_path = Path(base_name)
        else:
            output_path = Path(f"{base_name}.pdf")

    # --- 写入输出 ---
    if args.individual:
        output_path.mkdir(parents=True, exist_ok=True)
        print(f"\n正在将 {len(results)} 个单独 PDF 写入 {output_path}/...")
        for pk in sorted(results.keys()):
            name, pdf_bytes = results[pk]
            safe_name = sanitize_filename(normalize_ws(name).replace(" ", "_"))
            pdf_path = output_path / f"{safe_name}.pdf"
            pdf_path.write_bytes(pdf_bytes)
            print(f"  {pdf_path.name}{len(pdf_bytes)} 字节)")
        print(f"\n完成:已写入 {len(results)} 个标签,{len(errors)} 个失败")
        print(f"输出目录:{output_path.resolve()}")
    else:
        print(f"\n正在将 {len(results)} 个 PDF 合并到 {output_path}...")
        writer = PdfWriter()
        for pk in sorted(results.keys()):
            name, pdf_bytes = results[pk]
            reader = PdfReader(io.BytesIO(pdf_bytes))
            for page in reader.pages:
                writer.add_page(page)

        output_path.parent.mkdir(parents=True, exist_ok=True)
        with open(output_path, "wb") as f:
            writer.write(f)

        print(f"\n完成:已合并 {len(results)} 个标签,{len(errors)} 个失败")
        print(f"输出:{output_path.resolve()}")


if __name__ == "__main__":
    main()

Check out similar posts by category: InvenTree, Python