队列数据结构:如何构建节点任务队列

    克雷格的盾牌
    分享

    本教程解释队列数据结构并演示队列系统。队列通常用于处理长时间运行的任务,如电子邮件时事通讯发送。下面,您将构建一个简单的Node任务队列。

    在请求时立即执行任务并不总是实际的。

    考虑一个电子邮件通讯管理系统。写完之后,管理员必须点击一个红色的“立即发送”按钮。应用程序可以立即发送每封电子邮件,并显示“完成”的回复。这对于十几条消息来说是可行的,但是对于1000个或更多的订阅者需要多长时间呢?浏览器请求将在流程完成之前超时。

    另一个例子:用户可以上传任意数量的照片到图库应用程序。系统根据不同的尺寸调整和锐化每张图像。这个过程可以在上传时运行,但它会导致每张图像都有延迟。

    在这些情况下,分离任务更为有效。用户收到即时响应,但任务处理在后台进行。其他应用程序或服务器处理任务并在失败时安排重新尝试。用户可以接收警报或检查日志以确定进度。

    什么是队列数据结构?

    一个队列是保存项集合的数据结构:

    • 任何进程都可以发送(或排队)在任何时间发送邮件,例如发送时事通讯X给收件人Y。
    • 任何进程都可以接收(或出列)队列前面的项目——例如,在队列中最长的项目。

    队列数据结构是先进先出(FIFO)结构。第一个添加到队列中的项将是第一个退出的项。

    基本的JavaScript任务队列数据结构

    您可以使用JavaScript数组创建任务队列。的push ()方法方法将一个项添加到数组的末尾转变()方法从开始处删除并返回一个项:

    常量队列队列第一项的队列“第二条”控制台日志队列转变// item 1控制台日志队列转变// item 2控制台日志队列转变/ /定义

    您的队列数据结构可以在单个数组元素中保存任何数据。您可以推字符串、数字、布尔值、其他数组或对象。

    你可以使用ES6类定义任意数量的独立队列:

    队列构造函数发送收到返回转变//定义两个队列常量第一季度队列常量第二季队列第一季度发送第一项的第二季发送“第二条”控制台日志第一季度收到// item 1控制台日志第一季度收到/ /定义控制台日志第二季收到// item 2

    这些简单的队列数据结构可能对不太重要的客户端代码很有用,比如将UI更新排队,以便在单个DOM更新中进行处理。localStorageIndexedDB如果需要,可以提供一定级别的数据持久性。

    排队的平台

    对于复杂的服务器应用程序,内存队列不太实用:

    1. 两个或多个单独的应用程序不能(轻松地)访问同一个队列。
    2. 当应用程序终止时,队列数据将消失。

    专门构建的消息代理软件提供了更健壮的队列。平台各不相同,但都提供如下功能:

    • 在具有复制、分片和集群选项的数据库中选择数据持久性
    • 一系列访问协议,通常包括HTTP和Web Sockets
    • 任意数量的独立队列
    • 延迟消息传递,其中消息处理可以在稍后发生
    • 类事务支持,在未确认处理时将消息重新排队
    • 发布-订阅模式,当队列上出现新项时,应用程序接收一个事件

    消息代理软件包括复述,RabbitMQApache ActiveMQ,Gearman.云消息传递服务包括Amazon SQSAzure服务总线,谷歌发布/订阅

    这些可能是企业级应用程序的可行选项。但是,如果您的需求更简单,并且已经使用了数据库,那么它们可能会有些多余。

    使用MongoDB作为我们节点任务队列的消息代理

    可以开发一个复杂的Node任务队列系统,在几百行代码中管理队列数据结构。

    queue-mongodb这里描述的模块使用MongoDB用于数据存储,但相同的概念可以被任何SQL或NoSQL数据库所采用。该代码可在GitHub而且npm

    节点任务队列项目:开始

    确保你有node . js14或以上安装,然后创建一个新的项目文件夹,如queue-test.添加一个新的package.json文件:

    “名称”“queue-test”“版本”“1.0.0”“描述”“队列测试”“类型”“模块”“脚本”“发送”“节点/ send.js”。“接收”“节点/ receive.js”。

    注意:“类型”:“模块”配置项目使用ES6模块。的“脚本”将发送和接收排队的项目。

    安装queue-mongodb模块:

    npm安装@craigbuckler / queue-mongodb

    然后创建一个.env文件与您的MongoDB数据库连接凭证。例如:

    QUEUE_DB_HOST=localhost QUEUE_DB_PORT=27017 QUEUE_DB_USER=root QUEUE_DB_PASS=mysecret QUEUE_DB_NAME=qdb QUEUE_DB_COLL=queue

    注意:这会创建一个队列收集(QUEUE_DB_COLL)在qdb数据库(QUEUE_DB_NAME).您可以使用现有的数据库,但要确保集合不会与其他数据库冲突。

    必须授予用户数据库读/写权限QUEUE_DB_USER)及密码mysecretQUEUE_DB_PASS).如果不需要身份验证,请将这两个值设置为空。

    如果MongoDB数据库尚未运行,则启动它。那些有码头工人而且码头工人组成可以创建一个新的docker-compose.yml文件:

    版本“3”服务queuedb环境-MONGO_INITDB_ROOT_USERNAME = $QUEUE_DB_USER-MONGO_INITDB_ROOT_PASSWORD = $QUEUE_DB_PASS图像蒙戈4.4-仿生container_namequeuedb-queuedata/数据/ db港口-" $ {QUEUE_DB_PORT}: $ {QUEUE_DB_PORT}”重新启动总是queuedata

    然后运行docker-compose起来下载并启动MongoDB的持久数据卷。

    Docker支持Linux、macOS和Windows 10。看到Docker安装说明

    创建一个新的send.js文件将随机生成的电子邮件添加到名为新闻

    //队列模块进口队列“@craigbuckler / queue-mongodb”//初始化名为news的队列常量newsQ队列“新闻”//随机名称常量的名字字符串fromCharCode65+数学随机26重复1+数学随机10//添加对象到队列常量发送等待newsQ发送的名字的名字电子邮件$ {的名字toLowerCase@test.com日期日期消息嘿,$ {的名字控制台日志“发送”发送//获取队列中剩余的项数控制台日志“项目排队:”等待newsQ//关闭连接并退出等待newsQ关闭

    使用NPM运行发送你会看到这样的输出:

    发送_id: 607d692563bd6d05bb459931, sent:202104-19t11:27:33.000z数据:名称:“AAA”电子邮件:“aaa@test.com”日期:202104-19t11:27:33.426z消息:“嗨,AAA!”项目排队:1

    .send ()方法返回一个qItem对象包含:

    1. MongoDB文档_id
    2. 项目最初排队的日期/时间,以及
    3. 信息的副本数据

    运行脚本任意次数,向队列中添加更多的项。的项目排队将在每次运行时递增。

    现在创建一个新的receive.js从相同的Node任务队列中检索消息:

    //队列模块进口队列“@craigbuckler / queue-mongodb”//初始化名为news的队列常量newsQ队列“新闻”qItemqItem等待newsQ收到如果qItem控制台日志“\ nreceive”qItem/ /……qItem过程。数据……/ /……要发送电子邮件…qItem//队列中剩余的项数控制台日志“项目排队:”等待newsQ等待newsQ关闭

    运行NPM运行接收获取和处理排队的项:

    收到_id: 607d692563bd6d05bb459931, sent:202104-19t11:27:33.000z数据:名称:“AAA”电子邮件:“aaa@test.com”日期:202104-19t11:27:33.426z消息:“嗨,AAA!”项目排队:0

    本例中没有发送电子邮件,但是可以使用Nodemailer或者其他合适的模块。

    处理失败可能是因为邮件服务器坏了一个物品可以这样重新排队:

    newsQ发送qItem数据600

    第二个600参数是可选的秒数或未来日期。该命令在经过600秒(10分钟)后重新对项目进行排队。

    这是一个简单的示例,但是任何应用程序都可以向任意数量的队列发送数据。另一个过程,也许开始于cron工作,可以在必要时接收和处理物品。

    如何queue-mongodb模块工作

    类型传递给类构造函数的字符串定义队列名。的.send ()方法在传递数据到队列时创建一个新的MongoDB文档。MongoDB文档包含:

    1. 一个MongoDB_id(创建日期/时间编码在值中)。
    2. 队列类型
    3. 处理日期/时间值proc.可以设置未来时间,但当前时间是默认时间。
    4. 项目数据.这可以是任何东西:布尔值、数字、字符串、数组、对象等等。

    .receive ()方法定位具有匹配的最古老的文档类型和一个proc过去的日期/时间。文档被格式化,返回给调用代码,并从数据库中删除。

    以下部分描述模块更详细。

    queue-mongodb模块:初始化

    dotenv模块读取.env环境变量(如果需要的话)。数据库连接对象是使用正式的mongodb驱动模块

    / /模块进口dotenv“dotenv”进口mongoDBmongodb的//环境变量如果过程envQUEUE_DB_HOSTdotenv配置// MongoDB数据库客户端常量dbName过程envQUEUE_DB_NAME||“qdb”qCollectionName过程envQUEUE_DB_COLL||“队列”qAuth过程envQUEUE_DB_USER?$ {过程envQUEUE_DB_USER$ {过程envQUEUE_DB_PASS||@dbClientmongoDBMongoClientmongodb: / /$ {qAuth$ {过程envQUEUE_DB_HOST||“localhost”$ {过程envQUEUE_DB_PORT||“27017”/useNewUrlParser真正的useUnifiedTopology真正的

    qCollection变量保存对数据库队列集合的引用(由QUEUE_DB_COLL).函数创建并返回dbConnect ()函数,该函数还在必要时定义集合模式和索引。所有队列方法运行const q = await dbConnect();获取集合引用:

    qCollection//队列收集//共享连接异步函数dbConnect//收集可用如果qCollection返回qCollection//连接数据库等待dbClient连接//集合定义?常量dbdbClientdbdbNamecolList等待dblistCollections的名字qCollectionNamenameOnly真正的toArray如果colList长度//定义集合模式jsonSchema美元bsonType“对象”要求“类型”“过程”“数据”属性类型bsonType“字符串”最小长度1procbsonType“日期”等待dbcreateCollectionqCollectionName验证器jsonSchema美元//定义索引等待db集合qCollectionName方法createindex关键类型1关键proc1//返回队列集合qCollectiondb集合qCollectionName返回qCollection

    dbClose ()函数关闭数据库连接:

    //关闭MongoDB数据库连接异步函数dbClose如果qCollection等待dbClient关闭qCollection

    queue-mongodb模块:队列构造函数

    队列构造函数设置队列类型或名称:

    出口队列构造函数类型“默认”类型类型

    queue-mongodb模块:Queue.send ()方法

    .send ()方法将数据添加到队列中类型.它是可选的delayUntil参数,该参数通过指定秒数或a将项在未来时间添加到队列中日期()

    方法将一个新文档插入到数据库中,并返回qItem对象({_id发送数据})或如果不成功:

    异步发送数据delayUntil试一试//计算开始日期/时间proc日期如果delayUntil运算符日期procdelayUntil其他的如果isNaNdelayUntilproc日期+proc+delayUntil1000//添加项目到队列常量等待dbConnectins等待insertOne类型类型proc数据//返回qItem返回ins& &insinsertedCount& &insinsertedId?_idinsinsertedId发送insinsertedIdgetTimestamp数据犯错控制台日志队列中。发送犯错或:\n$ {犯错返回

    queue-mongodb模块:Queue.receive ()方法

    .receive ()方法检索并删除数据库中最老的队列项类型和一个proc过去的日期/时间。它返回一个qItem对象({_id发送数据})或如果没有可用的信息或发生错误:

    异步收到试一试//查找并删除队列上的下一项常量现在日期等待dbConnect矩形等待findOneAndDelete类型类型proc$ lt现在排序proc1常量v矩形& &矩形价值//返回qItem返回v?_idv_id发送v_idgetTimestamp数据v数据犯错控制台日志队列中。矩形eive error:\n$ {犯错返回

    queue-mongodb模块:Queue.remove ()方法

    .remove ()方法删除由标识的队列项qItem对象({_id发送数据})返回的.send ()方法。它可用于删除队列项,而不管其在队列中的位置。

    该方法返回已删除文档的数量(通常为1)或发生错误时:

    异步删除qItem//没有需要移除的项目如果qItem||qItem_id返回试一试常量等待dbConnect等待deleteOne_idqItem_id返回deletedCount犯错控制台日志队列中。删除错误:\ n$ {犯错返回

    queue-mongodb模块:Queue.purge ()方法

    .purge ()方法删除相同的所有排队项类型并返回删除的数量:

    异步清洗试一试常量等待dbConnect等待deleteMany类型类型返回deletedCount犯错控制台日志队列中。清洗犯错或:\n$ {犯错返回

    queue-mongodb模块:Queue.count ()方法

    .count ()方法返回相同的排队项的数目类型

    异步试一试常量等待dbConnect返回等待countDocuments类型类型犯错控制台日志队列中。数犯错或:\n$ {犯错返回

    queue-mongodb模块:Queue.close ()方法

    .close ()方法运行dbClose ()函数终止数据库连接,这样Node.js事件循环就可以结束:

    异步关闭试一试等待dbClose犯错控制台日志队列中。关闭犯错或:\n$ {犯错返回//结束

    新队列

    对于任何web应用程序来说,队列都是一个需要考虑的问题,因为它的计算成本很高,可能会导致瓶颈。它们可以通过将应用程序解耦为更小、更快、更健壮的进程来提高性能和维护。专用的消息代理软件是一种选择,但是像我们今天构建的Node任务队列这样的简单队列系统只需要几十行代码就可以实现。

    Baidu