searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

前端优雅处理流式数据

2023-03-27 09:12:47
1004
0

一、流式API能解决浏览器端的什么问题?

1、对于非前端的同学来说,流是个很常见的概念,它能让我们一段一段地接收与处理数据。相比较于获取整个数据再处理,流不仅不需要占用一大块内存空间来存放整个数据,节省内存占用空间。

2、在每一个流式片段汇总,还能实时地对数据进行处理(比如数据压缩),不需要等待整个数据获取完毕,从而缩短整个操作的耗时;流也同时具有管道的概念,可以写一些中间件来处理业务逻辑。

二、实现前端的流式API

1、很遗憾,以前的JavaScript中是没有流式API的能力的,过去我们使用 XMLHttpRequest 获取一个文件时,我们必须等待浏览器下载完整的文件,等待浏览器处理成我们需要的格式,收到所有的数据后才能进行数据的处理。

2、得益于浏览器的发展,浏览器已经逐步的支持了流式API,而我们前端熟悉的Fetch API也是受益者之一,流式API赋予了网络请求以片段处理数据的能力.我们可以以 TypedArray 片段的形式接收一部分二进制数据,然后直接对数据进行处理

Fetch API 会在发起请求后得到的 Promise 对象中返回一个 Response 对象,而 Response 对象除了提供 headers等参数和方法外,在 Body 上我们才看到我们常用的那些 res.json()、res.text()、res.arrayBuffer() 等方法。在 Body 上还有一个 body 参数,这个 body 参数就是一个 ReadableStream,下面我们来写一个简单的demo,来展示一下如何使用这个API,更加高阶的能力,如:如何分流tee(),锁机制,pipeTo等方法后续有机会继续分享

fetch('/tableData.json', {
        method: 'GET',
        headers: {
            'content-type': 'application/json'
        },
        credentials: 'include',
    }).then(res => {
        totalSize = res.headers.get('content-length'); // 获取字节总长度,可以用用于做传输进度
        console.log(`获取的数据大小${res.headers.get('content-length') / 1000 / 1000}M`);
        // 这里返回的就是一个ReadableStream对象,我们可以调用流的api,进行数据的读取
        return res.body.getReader();
    })
    .then(readProcess)
// 递归读取流中的数据,直到读取完毕
const readProcess = (reader) => {
        // 调用ReadableStream对象的read方法,返回的也是一个Promise对象
        return reader.read().then(({ value, done }) => {
            // 这里的value是一个Uint8Array前端二进制数据,不能够直接拿来使用 
            if (done) {
                console.log('读取完毕');
                return;
            }
            // 使用TextDecoder将二进制数据进行解析,返回文本
            const decoder = new TextDecoder('utf-8');
            const text = decoder.decode(value, { stream: true });
            console.log(text);
            // 递归,去读取剩下的数据
            return readProcess(reader);
        });
    };

执行后,通过图片来看,我们可以拿到正常的处理进度,以及处理的片段,但是我们发现一个问题,因为前端拿到部分二进制数据,进行解析,流式API也不可能帮我们把数据做好切割,它按照默认的块级大小切割,直接丢给我们,我们去解析的时候,就有可能把整一个JSON数据截断,如果我们拿到当前的片段,直接进行JSON.parse很大概率会报错的,这时,我们就需要根据json结构,来实现一个解析器,保留最大可用JSON字符串来解析,把截断的部分,和下一段已解析的流进行合并在进行JSON解析即可。实现方案如下:

    // 根据你的业务数据,进行处理,处理最大完整的Json结构数据,返回剩下截断的数据,跟之后的chunk进行拼接
    function execChunkData(chunkStr) {
        // 处理开头
        if (chunkStr.indexOf('[{')) {
            // 如果连一条记录都不完整
            if (chunkStr.indexOf('}') == -1) {
                return chunkStr;
            }
            const lastIndex = chunkStr.lastIndexOf('},');
            // 如果是最后一天记录,则直接补全头部返回
            if (lastIndex == -1 && chunkStr.indexOf(']')) {
                return chunkStr.slice(0);
            }
            let resolvedString = chunkStr.slice(0, lastIndex);
            resolvedString += "}]";
            let restString = '[' + chunkStr.slice(lastIndex + 2);
            // 这里的temp对象是通过对大可用json字符串解析出来,保证可用
            const temp = JSON.parse(resolvedString);
            // 将截断的数据,返回给后续的块继续使用
            return restString;
        }
    }

tips:这里实现的比较简单粗暴,可满足大部分需求,如果对于正则表达式比较熟悉的同学,完全可以写出更好的解决方案

三、能不能在读取数据的时候做点事情?

跟普通请求全量数据对比,这里我们总结一下流式数据处理的优势
1、 分片处理,我们可以在加载分片数据的时候,做一些事情(比如数据压缩),这个后面内容会说
2、加载全量数据的时候,如果不用worker的时候,会将全量数据拿到之后,在主线程进行统一计算,这时主进程被完全占用,performance monitor可以看到,cpu在100%的峰值,这时浏览器什么也干不了,只能等待计算完成,有了块级处理,将这个long task进行大量的拆分,cpu的计算性能在20%水平,完全不影响其他渲染进程以及动画的展示。

3、前端数据压缩实现
思想:如果数据量非常大的情况下,JSON文件存在大量的key重复,如果我们把key单独抽离,头部的每个key引用当前key下的所有数据,那么将会压缩一部分数据,节省了页面内存,减少浏览器端出现out of memory的风险

    // 根据你的业务数据以及逻辑,对行数据,进行列式存储
    const ArrayBufferPool = {};// 定义数据池,可以用ArrayBuffer也行,这里我就简单处理下,思想是一样的
    function execChunkData(chunkStr) {
        // 处理开头
        if (chunkStr.indexOf('[{')) {
            // 如果连一条记录都不完整
            if (chunkStr.indexOf('}') == -1) {
                return chunkStr;
            }
            const lastIndex = chunkStr.lastIndexOf('},');
            // 如果是最后一天记录,则直接补全头部返回
            if (lastIndex == -1 && chunkStr.indexOf(']')) {
                return chunkStr.slice(0);
            }
            let firstString = chunkStr.slice(0, lastIndex);
            firstString += "}]";
            let restString = '[' + chunkStr.slice(lastIndex + 2);
            const temp = JSON.parse(firstString);
            const keys = Object.keys(temp[0]);// 取出列头
            temp.forEach(item => {
                keys.forEach(key => {
                    // 将数据归类,把横向的数据,进行列式存储
                    if (ArrayBufferPool[key]) {
                        ArrayBufferPool[key].push(item[key]);
                        return;
                    }
                    ArrayBufferPool[key] = [item[key]];
                })
            });
            return restString;
        }
    }

总结:最终我们得到的数据结构如图所示

我将之前的原始数据,和压缩数据下载下来,都去除了宫格,做了对比,压缩率在40%左右

四、展望
1、还有其他可玩的,可实现的功能,如终止一个request,以及断点续传等,有兴趣的同学可以继续深挖fetch API 以及ReadableStream接口协议

0条评论
0 / 1000
张****伟
7文章数
0粉丝数
张****伟
7 文章 | 0 粉丝
原创

前端优雅处理流式数据

2023-03-27 09:12:47
1004
0

一、流式API能解决浏览器端的什么问题?

1、对于非前端的同学来说,流是个很常见的概念,它能让我们一段一段地接收与处理数据。相比较于获取整个数据再处理,流不仅不需要占用一大块内存空间来存放整个数据,节省内存占用空间。

2、在每一个流式片段汇总,还能实时地对数据进行处理(比如数据压缩),不需要等待整个数据获取完毕,从而缩短整个操作的耗时;流也同时具有管道的概念,可以写一些中间件来处理业务逻辑。

二、实现前端的流式API

1、很遗憾,以前的JavaScript中是没有流式API的能力的,过去我们使用 XMLHttpRequest 获取一个文件时,我们必须等待浏览器下载完整的文件,等待浏览器处理成我们需要的格式,收到所有的数据后才能进行数据的处理。

2、得益于浏览器的发展,浏览器已经逐步的支持了流式API,而我们前端熟悉的Fetch API也是受益者之一,流式API赋予了网络请求以片段处理数据的能力.我们可以以 TypedArray 片段的形式接收一部分二进制数据,然后直接对数据进行处理

Fetch API 会在发起请求后得到的 Promise 对象中返回一个 Response 对象,而 Response 对象除了提供 headers等参数和方法外,在 Body 上我们才看到我们常用的那些 res.json()、res.text()、res.arrayBuffer() 等方法。在 Body 上还有一个 body 参数,这个 body 参数就是一个 ReadableStream,下面我们来写一个简单的demo,来展示一下如何使用这个API,更加高阶的能力,如:如何分流tee(),锁机制,pipeTo等方法后续有机会继续分享

fetch('/tableData.json', {
        method: 'GET',
        headers: {
            'content-type': 'application/json'
        },
        credentials: 'include',
    }).then(res => {
        totalSize = res.headers.get('content-length'); // 获取字节总长度,可以用用于做传输进度
        console.log(`获取的数据大小${res.headers.get('content-length') / 1000 / 1000}M`);
        // 这里返回的就是一个ReadableStream对象,我们可以调用流的api,进行数据的读取
        return res.body.getReader();
    })
    .then(readProcess)
// 递归读取流中的数据,直到读取完毕
const readProcess = (reader) => {
        // 调用ReadableStream对象的read方法,返回的也是一个Promise对象
        return reader.read().then(({ value, done }) => {
            // 这里的value是一个Uint8Array前端二进制数据,不能够直接拿来使用 
            if (done) {
                console.log('读取完毕');
                return;
            }
            // 使用TextDecoder将二进制数据进行解析,返回文本
            const decoder = new TextDecoder('utf-8');
            const text = decoder.decode(value, { stream: true });
            console.log(text);
            // 递归,去读取剩下的数据
            return readProcess(reader);
        });
    };

执行后,通过图片来看,我们可以拿到正常的处理进度,以及处理的片段,但是我们发现一个问题,因为前端拿到部分二进制数据,进行解析,流式API也不可能帮我们把数据做好切割,它按照默认的块级大小切割,直接丢给我们,我们去解析的时候,就有可能把整一个JSON数据截断,如果我们拿到当前的片段,直接进行JSON.parse很大概率会报错的,这时,我们就需要根据json结构,来实现一个解析器,保留最大可用JSON字符串来解析,把截断的部分,和下一段已解析的流进行合并在进行JSON解析即可。实现方案如下:

    // 根据你的业务数据,进行处理,处理最大完整的Json结构数据,返回剩下截断的数据,跟之后的chunk进行拼接
    function execChunkData(chunkStr) {
        // 处理开头
        if (chunkStr.indexOf('[{')) {
            // 如果连一条记录都不完整
            if (chunkStr.indexOf('}') == -1) {
                return chunkStr;
            }
            const lastIndex = chunkStr.lastIndexOf('},');
            // 如果是最后一天记录,则直接补全头部返回
            if (lastIndex == -1 && chunkStr.indexOf(']')) {
                return chunkStr.slice(0);
            }
            let resolvedString = chunkStr.slice(0, lastIndex);
            resolvedString += "}]";
            let restString = '[' + chunkStr.slice(lastIndex + 2);
            // 这里的temp对象是通过对大可用json字符串解析出来,保证可用
            const temp = JSON.parse(resolvedString);
            // 将截断的数据,返回给后续的块继续使用
            return restString;
        }
    }

tips:这里实现的比较简单粗暴,可满足大部分需求,如果对于正则表达式比较熟悉的同学,完全可以写出更好的解决方案

三、能不能在读取数据的时候做点事情?

跟普通请求全量数据对比,这里我们总结一下流式数据处理的优势
1、 分片处理,我们可以在加载分片数据的时候,做一些事情(比如数据压缩),这个后面内容会说
2、加载全量数据的时候,如果不用worker的时候,会将全量数据拿到之后,在主线程进行统一计算,这时主进程被完全占用,performance monitor可以看到,cpu在100%的峰值,这时浏览器什么也干不了,只能等待计算完成,有了块级处理,将这个long task进行大量的拆分,cpu的计算性能在20%水平,完全不影响其他渲染进程以及动画的展示。

3、前端数据压缩实现
思想:如果数据量非常大的情况下,JSON文件存在大量的key重复,如果我们把key单独抽离,头部的每个key引用当前key下的所有数据,那么将会压缩一部分数据,节省了页面内存,减少浏览器端出现out of memory的风险

    // 根据你的业务数据以及逻辑,对行数据,进行列式存储
    const ArrayBufferPool = {};// 定义数据池,可以用ArrayBuffer也行,这里我就简单处理下,思想是一样的
    function execChunkData(chunkStr) {
        // 处理开头
        if (chunkStr.indexOf('[{')) {
            // 如果连一条记录都不完整
            if (chunkStr.indexOf('}') == -1) {
                return chunkStr;
            }
            const lastIndex = chunkStr.lastIndexOf('},');
            // 如果是最后一天记录,则直接补全头部返回
            if (lastIndex == -1 && chunkStr.indexOf(']')) {
                return chunkStr.slice(0);
            }
            let firstString = chunkStr.slice(0, lastIndex);
            firstString += "}]";
            let restString = '[' + chunkStr.slice(lastIndex + 2);
            const temp = JSON.parse(firstString);
            const keys = Object.keys(temp[0]);// 取出列头
            temp.forEach(item => {
                keys.forEach(key => {
                    // 将数据归类,把横向的数据,进行列式存储
                    if (ArrayBufferPool[key]) {
                        ArrayBufferPool[key].push(item[key]);
                        return;
                    }
                    ArrayBufferPool[key] = [item[key]];
                })
            });
            return restString;
        }
    }

总结:最终我们得到的数据结构如图所示

我将之前的原始数据,和压缩数据下载下来,都去除了宫格,做了对比,压缩率在40%左右

四、展望
1、还有其他可玩的,可实现的功能,如终止一个request,以及断点续传等,有兴趣的同学可以继续深挖fetch API 以及ReadableStream接口协议

文章来自个人专栏
前端数据处理
7 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0