从医院云影像中提取dicom
参考资料:
医疗云影像下载器,从在线报告下载 CT、MRI 等片子的 DICOM 文件
(这个项目的思路,是从在线报告的云胶片中,下载 CT、MRI 等片子的 DICOM 文件,需要适配特别多的平台)
将医院给的在线CT查阅链接中提取为本地DICOM文件
(思路:用浏览器查看云胶片,然后将查看过程中的浏览保存为HAR, 再将HAR中的dicom数据提取出来,避免了适配不同的平台,只有胶片能用浏览器查看就行)
根据第2种思路,做了些优化改进:
import os
import json
import argparse
import base64
import binascii
from io import BytesIO
from datetime import datetime
from collections import defaultdict
import pydicom
def get_patient_name(ds):
pn = getattr(ds, "PatientName", None)
if not pn:
return "UnknownPatient"
try:
return str(pn).replace("^", "_")
except:
return "UnknownPatient"
# ========= HAR → DICOM =========
def extract_dicoms_from_har(har_path):
with open(har_path, 'r', encoding='utf-8') as f:
har = json.load(f)
dicoms = []
seen_sop = set()
for entry in har.get("log", {}).get("entries", []):
content = entry.get("response", {}).get("content", {})
text = content.get("text")
if not text:
continue
if content.get("encoding") != "base64":
continue
try:
raw = base64.b64decode(text.encode("utf-8"))
except binascii.Error:
continue
if len(raw) < 256:
continue
try:
ds = pydicom.dcmread(BytesIO(raw), stop_before_pixels=True)
except Exception as e:
print("invalid dicom:", e)
continue
sop_uid = getattr(ds, "SOPInstanceUID", None)
if not sop_uid or sop_uid in seen_sop:
continue
seen_sop.add(sop_uid)
dicoms.append((ds, raw))
return dicoms
# ========= group + sort =========
def group_dicoms(dicoms):
tree = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
for ds, raw in dicoms:
patient = get_patient_name(ds)
study = getattr(ds, "StudyInstanceUID", "UnknownStudy")
series = getattr(ds, "SeriesInstanceUID", "UnknownSeries")
tree[patient][study][series].append((ds, raw))
return tree
def sort_by_instance_number(slices):
slices = sorted(
slices,
key=lambda x: getattr(x[0], "InstanceNumber", 0)
)
nums = [getattr(ds, "InstanceNumber") for ds, _ in slices]
if len(nums) != len(set(nums)):
print("⚠️ InstanceNumber existed.")
return slices
# ========= save =========
def save_tree(tree, base_folder):
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
root = os.path.join(base_folder, "dicom_output", timestamp)
for patient, studies in tree.items():
for study, series_dict in studies.items():
for series, slices in series_dict.items():
sorted_slices = sort_by_instance_number(slices)
folder = os.path.join(root, patient, study, series)
os.makedirs(folder, exist_ok=True)
for i, (ds, raw) in enumerate(sorted_slices):
filename = f"{i+1:04d}.dcm"
path = os.path.join(folder, filename)
with open(path, "wb") as f:
f.write(raw)
print(f"[OK] {patient} | {series} -> {len(sorted_slices)} slices")
print(f"\noutput dir: {root}")
def run(har_path, output_folder):
dicoms = extract_dicoms_from_har(har_path)
if not dicoms:
print("not found DICOM,exit...")
return
print(f"extracted DICOM count: {len(dicoms)}")
tree = group_dicoms(dicoms)
save_tree(tree, output_folder)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="extrace DICOM from HAR and save to file")
parser.add_argument("har_file", help="HAR file path")
parser.add_argument("output_dir", help="output dir")
args = parser.parse_args()
run(args.har_file, args.output_dir)
都是用python完成的