参考资料:
医疗云影像下载器,从在线报告下载 CT、MRI 等片子的 DICOM 文件

(这个项目的思路,是从在线报告的云胶片中,下载 CT、MRI 等片子的 DICOM 文件,需要适配特别多的平台)

将医院给的在线CT查阅链接中提取为本地DICOM文件
(思路:用浏览器查看云胶片,然后将查看过程中的浏览保存为HAR, 再将HAR中的dicom数据提取出来,避免了适配不同的平台,只有胶片能用浏览器查看就行)

根据第2种思路,做了些优化改进:

import os
import json
import argparse
import base64
import binascii
from io import BytesIO
from datetime import datetime
from collections import defaultdict

import pydicom





def get_patient_name(ds):
    pn = getattr(ds, "PatientName", None)
    if not pn:
        return "UnknownPatient"

    try:
        return str(pn).replace("^", "_")
    except:
        return "UnknownPatient"






# ========= HAR → DICOM =========

def extract_dicoms_from_har(har_path):
    with open(har_path, 'r', encoding='utf-8') as f:
        har = json.load(f)

    dicoms = []
    seen_sop = set()

    for entry in har.get("log", {}).get("entries", []):
        content = entry.get("response", {}).get("content", {})
        text = content.get("text")

        if not text:
            continue

        if content.get("encoding") != "base64":
            continue

        try:
            raw = base64.b64decode(text.encode("utf-8"))
        except binascii.Error:
            continue

        if len(raw) < 256:
            continue

        try:
            ds = pydicom.dcmread(BytesIO(raw), stop_before_pixels=True)
        except Exception as e:
            print("invalid dicom:", e)
            continue

        sop_uid = getattr(ds, "SOPInstanceUID", None)
        if not sop_uid or sop_uid in seen_sop:
            continue

        seen_sop.add(sop_uid)

        dicoms.append((ds, raw))

    return dicoms




# ========= group + sort =========

def group_dicoms(dicoms):
    tree = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))

    for ds, raw in dicoms:
        patient = get_patient_name(ds)
        study = getattr(ds, "StudyInstanceUID", "UnknownStudy")
        series = getattr(ds, "SeriesInstanceUID", "UnknownSeries")

        tree[patient][study][series].append((ds, raw))

    return tree


def sort_by_instance_number(slices):
    slices = sorted(
        slices,
        key=lambda x: getattr(x[0], "InstanceNumber", 0)
    )


    nums = [getattr(ds, "InstanceNumber") for ds, _ in slices]
    if len(nums) != len(set(nums)):
        print("⚠️ InstanceNumber existed.")

    return slices





# ========= save =========

def save_tree(tree, base_folder):
    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    root = os.path.join(base_folder, "dicom_output", timestamp)

    for patient, studies in tree.items():
        for study, series_dict in studies.items():
            for series, slices in series_dict.items():

                sorted_slices = sort_by_instance_number(slices)

                folder = os.path.join(root, patient, study, series)
                os.makedirs(folder, exist_ok=True)

                for i, (ds, raw) in enumerate(sorted_slices):
                    filename = f"{i+1:04d}.dcm"
                    path = os.path.join(folder, filename)

                    with open(path, "wb") as f:
                        f.write(raw)

                print(f"[OK] {patient} | {series} -> {len(sorted_slices)} slices")

    print(f"\noutput dir: {root}")




def run(har_path, output_folder):
    dicoms = extract_dicoms_from_har(har_path)
    
    if not dicoms:
        print("not found DICOM,exit...")
        return    

    print(f"extracted DICOM count: {len(dicoms)}")
    tree = group_dicoms(dicoms)
    save_tree(tree, output_folder)




if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="extrace DICOM from HAR and save to file")
    parser.add_argument("har_file", help="HAR file path")
    parser.add_argument("output_dir", help="output dir")
    args = parser.parse_args()
    
    run(args.har_file, args.output_dir)

都是用python完成的

标签: none

添加新评论