参考资料:
医疗云影像下载器,从在线报告下载 CT、MRI 等片子的 DICOM 文件

(这个项目的思路,是从在线报告的云胶片中,下载 CT、MRI 等片子的 DICOM 文件,需要适配特别多的平台)

将医院给的在线CT查阅链接中提取为本地DICOM文件
(思路:用浏览器查看云胶片,然后将查看过程中的浏览保存为HAR, 再将HAR中的dicom数据提取出来,避免了适配不同的平台,只有胶片能用浏览器查看就行)

根据第2种思路,做了些优化改进:

import os
import json
import argparse
import base64
import binascii
from io import BytesIO
from datetime import datetime
from collections import defaultdict

import pydicom





def get_patient_name(ds):
    pn = getattr(ds, "PatientName", None)
    if not pn:
        return "UnknownPatient"

    try:
        return str(pn).replace("^", "_")
    except:
        return "UnknownPatient"






# ========= HAR → DICOM =========

def extract_dicoms_from_har(har_path):
    with open(har_path, 'r', encoding='utf-8') as f:
        har = json.load(f)

    dicoms = []
    seen_sop = set()

    for entry in har.get("log", {}).get("entries", []):
        content = entry.get("response", {}).get("content", {})
        text = content.get("text")

        if not text:
            continue

        if content.get("encoding") != "base64":
            continue

        try:
            raw = base64.b64decode(text.encode("utf-8"))
        except binascii.Error:
            continue

        if len(raw) < 256:
            continue

        try:
            ds = pydicom.dcmread(BytesIO(raw), stop_before_pixels=True)
        except Exception as e:
            print("invalid dicom:", e)
            continue

        sop_uid = getattr(ds, "SOPInstanceUID", None)
        if not sop_uid or sop_uid in seen_sop:
            continue

        seen_sop.add(sop_uid)

        dicoms.append((ds, raw))

    return dicoms




# ========= group + sort =========

def group_dicoms(dicoms):
    tree = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))

    for ds, raw in dicoms:
        patient = get_patient_name(ds)
        study = getattr(ds, "StudyInstanceUID", "UnknownStudy")
        series = getattr(ds, "SeriesInstanceUID", "UnknownSeries")

        tree[patient][study][series].append((ds, raw))

    return tree


def sort_by_instance_number(slices):
    slices = sorted(
        slices,
        key=lambda x: getattr(x[0], "InstanceNumber", 0)
    )


    nums = [getattr(ds, "InstanceNumber") for ds, _ in slices]
    if len(nums) != len(set(nums)):
        print("⚠️ InstanceNumber existed.")

    return slices





# ========= save =========

def save_tree(tree, base_folder):
    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    root = os.path.join(base_folder, "dicom_output", timestamp)

    for patient, studies in tree.items():
        for study, series_dict in studies.items():
            for series, slices in series_dict.items():

                sorted_slices = sort_by_instance_number(slices)

                folder = os.path.join(root, patient, study, series)
                os.makedirs(folder, exist_ok=True)

                for i, (ds, raw) in enumerate(sorted_slices):
                    filename = f"{i+1:04d}.dcm"
                    path = os.path.join(folder, filename)

                    with open(path, "wb") as f:
                        f.write(raw)

                print(f"[OK] {patient} | {series} -> {len(sorted_slices)} slices")

    print(f"\noutput dir: {root}")




def run(har_path, output_folder):
    dicoms = extract_dicoms_from_har(har_path)
    
    if not dicoms:
        print("not found DICOM,exit...")
        return    

    print(f"extracted DICOM count: {len(dicoms)}")
    tree = group_dicoms(dicoms)
    save_tree(tree, output_folder)




if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="extrace DICOM from HAR and save to file")
    parser.add_argument("har_file", help="HAR file path")
    parser.add_argument("output_dir", help="output dir")
    args = parser.parse_args()
    
    run(args.har_file, args.output_dir)

都是用python完成的

DICOM 文件通常不仅保存图像像素数据,还包含大量与检查相关的元信息,例如:

患者信息:姓名、性别、年龄、ID
检查信息:检查日期、设备型号、医院名称
图像信息:图像尺寸、像素间距、层厚
扫描参数:模态类型(CT、MR等)、窗宽窗位、采集条件 

在下载存储影像时,比较关心 Media Storage SOP Instance UID

在检查信息、序列信息、实例信息中都有对应的一个唯一标识uid,并且形成层级关系:
UID 含义
第一级:StudyInstanceUID 标识同一患者的一次检查
第二级:SeriesInstanceUID 标识一次检查下的一次序列
第三级:SOPInstanceUID 标识一次序列下的产生的其中一个图像

StudyInstanceUID (检查实例 UID) DICOM Tag(标签):(0020,000D)
它是 Study(检查) 这一层级的全球唯一身份证号
当你去医院做一次完整的影像检查(比如一次完整的胸部 CT),设备就会为这次“检查行为”生成一个唯一的 StudyInstanceUID。
这次检查中产生的所有序列(Series)、所有图像(Instance),它们的 DICOM 文件头里,都会包含这同一个 StudyInstanceUID。

SeriesInstanceUID (序列实例 UID) DICOM Tag(标签):(0020,000E)
代表一次检查中的“一次连续扫描”。比如上述 500 张照片中,前 200 张是平扫,它们共享一个 SeriesInstanceUID;后 300 张是增强扫描,它们共享另一个 SeriesInstanceUID。

或者一次检查中,有多个部位的扫描。不同的部位,扫描不是连续的,其 SeriesInstanceUID 也是不一样的。

SOPInstanceUID (SOP 实例 UID) DICOM Tag(标签):(0008,0018)
它是一张具体的 DICOM 文件(一张切片)的唯一标识

先给出代码

use leptos::prelude::*;

// #[component]宏将一个函数标记为 可重用的组件a function as a reusable component
#[component]
fn App() -> impl IntoView {
    // 创建一个反应式的信号,并得到 一对 getter, setter
    let (count, set_count) = signal(0);

    view! {
        <button
            // 当 click 事件发生时,执行这个闭包
            //  move表示:把闭包中使用到的外部变量“按值捕获”进闭包内部。
            on:click=move |_| *set_count.write() += 1
        >
            "Click me: "
            {count}
        </button>
        <p>
            <strong>"Reactive: "</strong>                
            {move || count.get()}
        </p>
        <p>
            <strong>"Reactive shorthand: "</strong>
            {count}
        </p>
        <p>
            <strong>"Not reactive: "</strong>
            // 注意:下面这种写法不是反应式的,只是简单地一次性地得到count的值
            {count.get()}
        </p>
    }
}

fn main() {
    leptos::mount::mount_to_body(App)
}



count.get() 会克隆 count的值
set_count.set() 会覆盖改写 count的值

请看
https://docs.rs/leptos/latest/leptos/reactive/signal/struct.ReadSignal.html
https://docs.rs/leptos/latest/leptos/reactive/signal/struct.WriteSignal.html

注意:
move |_| *set_count.write() += 1
相当于

move |event: web_sys::MouseEvent| {
    *set_count.write() += 1
}

Leptos 会自动把浏览器的 click 事件对象传进来,但我们没用到 event.
_ 表示 忽略这个参数

更重要的是, set_count.write() 中的 是解引用谁

set_count.write() 返回 WriteSignalGuard
本质类似于 RwLockWriteGuard
它实现了 DerefMut
也就是说 write() 返回一个“可变借用的 guard”

set_count.write() 等价于 (set_count.write())
也就是, 对 write() 返回的 guard 进行解引用
不是对 set_count 解引用。

上面的代码,完全展开,应该是

on:click = move |_: web_sys::MouseEvent| {
    let mut value_ref: &mut i32 = &mut *set_count.write();
    *value_ref += 1;
}


write() 和 update()的 区别与联系

write() 是用来 获取可变引用的, 返回值是 WriteSignalGuard
使用 *guard 解引用后 修改

update(f) 是 原子修改, 返回值是(), 传入闭包 进行修改。

set_count.update(|value| {
    *value += 1;
});

等价于

{
    let mut guard = set_count.write();
    f(&mut *guard);
}

其中, f就是上面的闭包

也就是说, update() 是 write() 的语法封装

与 set() 的关系

还有一个 set_count.set(5);

它等价于 set_count.update(|v| *v = 5);

Element:input 事件

当一个 input select 或 textarea 元素的 value 被修改时,会触发 input 事件。

事件的 target 为当前正在编辑的宿主。

每当元素的 value 改变,input 事件都会被触发。这与 change 事件不同。
change 事件仅当 value 被提交时触发,如按回车键,从一个 options 列表中选择一个值等。

示例:

<input placeholder="输入一些文本" name="name" />
<p id="values"></p>

<script>
const input = document.querySelector("input");
const log = document.getElementById("values");

input.addEventListener("input", updateValue);

function updateValue(e) {
  log.textContent = e.target.value;
}
</script>

对于input, 在Leptos中强调了 value attribute 和 value property 的区别。许多前端框架会混淆这两者。
https://book.leptos.dev/view/05_forms.html

attribute用来设置初始值,而property 则是设置当前值。就是当用户在 input界面上输入过之后,只能用 property来修改

打开浏览器的 about:blank 页面, 用 Ctrl + Shift + I 调出 调试界面,在 控制台里 依次 输入下面四句(输入完一句,看一下效果)

const el = document.createElement("input");
document.body.appendChild(el);

el.setAttribute("value", "test"); el.setAttribute("value", "another
test");

页面最后会出现一个 输入框
在 input框中输入几个字符,或者删除几个字符。

el.setAttribute("value", "one more time?");

再用 setAttribute 就无法改变 input的内容了,只能用prop

el.value = "But this works";