队列数据结构:如何构建节点任务队列
本教程解释队列数据结构并演示队列系统。队列通常用于处理长时间运行的任务,如电子邮件时事通讯发送。下面,您将构建一个简单的Node任务队列。
在请求时立即执行任务并不总是实际的。
考虑一个电子邮件通讯管理系统。写完之后,管理员必须点击一个红色的“立即发送”按钮。应用程序可以立即发送每封电子邮件,并显示“完成”的回复。这对于十几条消息来说是可行的,但是对于1000个或更多的订阅者需要多长时间呢?浏览器请求将在流程完成之前超时。
另一个例子:用户可以上传任意数量的照片到图库应用程序。系统根据不同的尺寸调整和锐化每张图像。这个过程可以在上传时运行,但它会导致每张图像都有延迟。
在这些情况下,分离任务更为有效。用户收到即时响应,但任务处理在后台进行。其他应用程序或服务器处理任务并在失败时安排重新尝试。用户可以接收警报或检查日志以确定进度。
什么是队列数据结构?
一个队列是保存项集合的数据结构:
- 任何进程都可以发送(或排队)在任何时间发送邮件,例如发送时事通讯X给收件人Y。
- 任何进程都可以接收(或出列)队列前面的项目——例如,在队列中最长的项目。
队列数据结构是先进先出(FIFO)结构。第一个添加到队列中的项将是第一个退出的项。
基本的JavaScript任务队列数据结构
您可以使用JavaScript数组创建任务队列。的push ()
方法方法将一个项添加到数组的末尾转变()
方法从开始处删除并返回一个项:
常量队列=[];队列.推(第一项的);队列.推(“第二条”);控制台.日志(队列.转变());// item 1控制台.日志(队列.转变());// item 2控制台.日志(队列.转变());/ /定义
您的队列数据结构可以在单个数组元素中保存任何数据。您可以推字符串、数字、布尔值、其他数组或对象。
你可以使用ES6类定义任意数量的独立队列:
类队列{构造函数(){这.问=[];}发送(项){这.问.推(项);}收到(){返回这.问.转变();}}//定义两个队列常量第一季度=新队列();常量第二季=新队列();第一季度.发送(第一项的);第二季.发送(“第二条”);控制台.日志(第一季度.收到());// item 1控制台.日志(第一季度.收到());/ /定义控制台.日志(第二季.收到());// item 2
这些简单的队列数据结构可能对不太重要的客户端代码很有用,比如将UI更新排队,以便在单个DOM更新中进行处理。localStorage或IndexedDB如果需要,可以提供一定级别的数据持久性。
排队的平台
对于复杂的服务器应用程序,内存队列不太实用:
- 两个或多个单独的应用程序不能(轻松地)访问同一个队列。
- 当应用程序终止时,队列数据将消失。
专门构建的消息代理软件提供了更健壮的队列。平台各不相同,但都提供如下功能:
- 在具有复制、分片和集群选项的数据库中选择数据持久性
- 一系列访问协议,通常包括HTTP和Web Sockets
- 任意数量的独立队列
- 延迟消息传递,其中消息处理可以在稍后发生
- 类事务支持,在未确认处理时将消息重新排队
- 发布-订阅模式,当队列上出现新项时,应用程序接收一个事件
消息代理软件包括复述,,RabbitMQ,Apache ActiveMQ,Gearman.云消息传递服务包括Amazon SQS,Azure服务总线,谷歌发布/订阅.
这些可能是企业级应用程序的可行选项。但是,如果您的需求更简单,并且已经使用了数据库,那么它们可能会有些多余。
使用MongoDB作为我们节点任务队列的消息代理
可以开发一个复杂的Node任务队列系统,在几百行代码中管理队列数据结构。
的queue-mongodb
这里描述的模块使用MongoDB用于数据存储,但相同的概念可以被任何SQL或NoSQL数据库所采用。该代码可在GitHub而且npm.
节点任务队列项目:开始
确保你有node . js14或以上安装,然后创建一个新的项目文件夹,如queue-test
.添加一个新的package.json
文件:
{“名称”:“queue-test”,“版本”:“1.0.0”,“描述”:“队列测试”,“类型”:“模块”,“脚本”:{“发送”:“节点/ send.js”。,“接收”:“节点/ receive.js”。}}
注意:“类型”:“模块”
配置项目使用ES6模块。的“脚本”
将发送和接收排队的项目。
安装queue-mongodb模块:
npm安装@craigbuckler / queue-mongodb
然后创建一个.env
文件与您的MongoDB数据库连接凭证。例如:
QUEUE_DB_HOST=localhost QUEUE_DB_PORT=27017 QUEUE_DB_USER=root QUEUE_DB_PASS=mysecret QUEUE_DB_NAME=qdb QUEUE_DB_COLL=queue
注意:这会创建一个队列
收集(QUEUE_DB_COLL
)在qdb
数据库(QUEUE_DB_NAME
).您可以使用现有的数据库,但要确保集合不会与其他数据库冲突。
必须授予用户数据库读/写权限根
(QUEUE_DB_USER
)及密码mysecret
(QUEUE_DB_PASS
).如果不需要身份验证,请将这两个值设置为空。
如果MongoDB数据库尚未运行,则启动它。那些有码头工人而且码头工人组成可以创建一个新的docker-compose.yml
文件:
版本:“3”服务:queuedb:环境:-MONGO_INITDB_ROOT_USERNAME = ${QUEUE_DB_USER}-MONGO_INITDB_ROOT_PASSWORD = ${QUEUE_DB_PASS}图像:蒙戈:4.4-仿生container_name:queuedb卷:-queuedata:/数据/ db港口:-" $ {QUEUE_DB_PORT}: $ {QUEUE_DB_PORT}”重新启动:总是卷:queuedata:
然后运行docker-compose起来
下载并启动MongoDB的持久数据卷。
Docker支持Linux、macOS和Windows 10。看到Docker安装说明.
创建一个新的send.js
文件将随机生成的电子邮件添加到名为新闻
:
//队列模块进口{队列}从“@craigbuckler / queue-mongodb”;//初始化名为news的队列常量newsQ=新队列(“新闻”);//随机名称常量的名字=字符串.fromCharCode(65+数学.随机()*26).重复(1+数学.随机()*10);//添加对象到队列常量发送=等待newsQ.发送({的名字:的名字,电子邮件:`$ {的名字.toLowerCase()}@test.com`,日期:新日期(),消息:`嘿,$ {的名字}!`});控制台.日志(“发送”,发送);//获取队列中剩余的项数控制台.日志(“项目排队:”,等待newsQ.数());//关闭连接并退出等待newsQ.关闭();
使用NPM运行发送
你会看到这样的输出:
发送{_id: 607d692563bd6d05bb459931, sent:202104-19t11:27:33.000z数据:{名称:“AAA”电子邮件:“aaa@test.com”日期:202104-19t11:27:33.426z消息:“嗨,AAA!”}}项目排队:1
的.send ()
方法返回一个qItem
对象包含:
- MongoDB文档
_id
- 项目最初排队的日期/时间,以及
- 信息的副本
数据
运行脚本任意次数,向队列中添加更多的项。的项目排队
将在每次运行时递增。
现在创建一个新的receive.js
从相同的Node任务队列中检索消息:
//队列模块进口{队列}从“@craigbuckler / queue-mongodb”;//初始化名为news的队列常量newsQ=新队列(“新闻”);让qItem;做{qItem=等待newsQ.收到();如果(qItem){控制台.日志(“\ nreceive”,qItem);/ /……qItem过程。数据……/ /……要发送电子邮件…}}而(qItem);//队列中剩余的项数控制台.日志(“项目排队:”,等待newsQ.数());等待newsQ.关闭();
运行NPM运行接收
获取和处理排队的项:
收到{_id: 607d692563bd6d05bb459931, sent:202104-19t11:27:33.000z数据:{名称:“AAA”电子邮件:“aaa@test.com”日期:202104-19t11:27:33.426z消息:“嗨,AAA!”}}项目排队:0
本例中没有发送电子邮件,但是可以使用Nodemailer或者其他合适的模块。
处理失败可能是因为邮件服务器坏了一个物品可以这样重新排队:
newsQ.发送(qItem.数据,600);
第二个600
参数是可选的秒数或未来日期。该命令在经过600秒(10分钟)后重新对项目进行排队。
这是一个简单的示例,但是任何应用程序都可以向任意数量的队列发送数据。另一个过程,也许开始于cron
工作,可以在必要时接收和处理物品。
如何queue-mongodb
模块工作
的类型
传递给类构造函数的字符串定义队列名。的.send ()
方法在传递数据到队列时创建一个新的MongoDB文档。MongoDB文档包含:
- 一个MongoDB
_id
(创建日期/时间编码在值中)。 - 队列
类型
. - 处理日期/时间值
proc
.可以设置未来时间,但当前时间是默认时间。 - 项目
数据
.这可以是任何东西:布尔值、数字、字符串、数组、对象等等。
的.receive ()
方法定位具有匹配的最古老的文档类型
和一个proc
过去的日期/时间。文档被格式化,返回给调用代码,并从数据库中删除。
以下部分描述模块更详细。
queue-mongodb
模块:初始化
的dotenv
模块读取.env
环境变量(如果需要的话)。数据库连接对象是使用正式的mongodb
驱动模块:
/ /模块进口dotenv从“dotenv”;进口mongoDB从mongodb的;//环境变量如果(!过程.env.QUEUE_DB_HOST){dotenv.配置();}// MongoDB数据库客户端常量dbName=过程.env.QUEUE_DB_NAME||“qdb”,qCollectionName=过程.env.QUEUE_DB_COLL||“队列”,qAuth=过程.env.QUEUE_DB_USER?`$ {过程.env.QUEUE_DB_USER}:$ {过程.env.QUEUE_DB_PASS||”}@`:”,dbClient=新mongoDB.MongoClient(`mongodb: / /$ {qAuth}$ {过程.env.QUEUE_DB_HOST||“localhost”}:$ {过程.env.QUEUE_DB_PORT||“27017”}/`,{useNewUrlParser:真正的,useUnifiedTopology:真正的});
的qCollection
变量保存对数据库队列集合的引用(由QUEUE_DB_COLL
).函数创建并返回dbConnect ()
函数,该函数还在必要时定义集合模式和索引。所有队列
方法运行const q = await dbConnect();
获取集合引用:
让qCollection;//队列收集//共享连接异步函数dbConnect(){//收集可用如果(qCollection)返回qCollection;//连接数据库等待dbClient.连接();//集合定义?常量db=dbClient.db(dbName),colList=等待db.listCollections({的名字:qCollectionName},{nameOnly:真正的}).toArray();如果(!colList.长度){//定义集合模式让jsonSchema美元={bsonType:“对象”,要求:[“类型”,“过程”,“数据”],属性:{类型:{bsonType:“字符串”,最小长度:1},proc:{bsonType:“日期”}}};等待db.createCollection(qCollectionName,{验证器:{jsonSchema美元}});//定义索引等待db.集合(qCollectionName).方法createindex([{关键:{类型:1}},{关键:{proc:1}}]);}//返回队列集合qCollection=db.集合(qCollectionName);返回qCollection;}
的dbClose ()
函数关闭数据库连接:
//关闭MongoDB数据库连接异步函数dbClose(){如果(qCollection){等待dbClient.关闭();qCollection=零;}}
queue-mongodb
模块:队列
构造函数
的队列
构造函数设置队列类型
或名称:
出口类队列{构造函数(类型=“默认”){这.类型=类型;}
queue-mongodb
模块:Queue.send ()
方法
的.send ()
方法将数据添加到队列中类型
.它是可选的delayUntil
参数,该参数通过指定秒数或a将项在未来时间添加到队列中日期()
.
方法将一个新文档插入到数据库中,并返回qItem
对象({_id
,发送
,数据
})或零
如果不成功:
异步发送(数据=零,delayUntil){试一试{//计算开始日期/时间让proc=新日期();如果(delayUntil运算符日期){proc=delayUntil;}其他的如果(!isNaN(delayUntil)){proc=新日期(+proc+delayUntil*1000);}//添加项目到队列常量问=等待dbConnect(),ins=等待问.insertOne({类型:这.类型,proc,数据});//返回qItem返回ins& &ins.insertedCount& &ins.insertedId?{_id:ins.insertedId,发送:ins.insertedId.getTimestamp(),数据}:零;}抓(犯错){控制台.日志(`队列中。发送犯错或:\n$ {犯错}`);返回零;}}
queue-mongodb
模块:Queue.receive ()
方法
的.receive ()
方法检索并删除数据库中最老的队列项类型
和一个proc
过去的日期/时间。它返回一个qItem
对象({_id
,发送
,数据
})或零
如果没有可用的信息或发生错误:
异步收到(){试一试{//查找并删除队列上的下一项常量现在=新日期(),问=等待dbConnect(),矩形=等待问.findOneAndDelete({类型:这.类型,proc:{$ lt:现在}},{排序:{proc:1}});常量v=矩形& &矩形.价值;//返回qItem返回v?{_id:v._id,发送:v._id.getTimestamp(),数据:v.数据}:零;}抓(犯错){控制台.日志(`队列中。矩形eive error:\n$ {犯错}`);返回零;}}
queue-mongodb
模块:Queue.remove ()
方法
的.remove ()
方法删除由标识的队列项qItem
对象({_id
,发送
,数据
})返回的.send ()
方法。它可用于删除队列项,而不管其在队列中的位置。
该方法返回已删除文档的数量(通常为1)或零
发生错误时:
异步删除(qItem){//没有需要移除的项目如果(!qItem||!qItem._id)返回零;试一试{常量问=等待dbConnect(),▽=等待问.deleteOne({_id:qItem._id});返回▽.deletedCount;}抓(犯错){控制台.日志(`队列中。删除错误:\ n$ {犯错}`);返回零;}}
queue-mongodb
模块:Queue.purge ()
方法
的.purge ()
方法删除相同的所有排队项类型
并返回删除的数量:
异步清洗(){试一试{常量问=等待dbConnect(),▽=等待问.deleteMany({类型:这.类型});返回▽.deletedCount;}抓(犯错){控制台.日志(`队列中。清洗犯错或:\n$ {犯错}`);返回零;}}
queue-mongodb
模块:Queue.count ()
方法
的.count ()
方法返回相同的排队项的数目类型
:
异步数(){试一试{常量问=等待dbConnect();返回等待问.countDocuments({类型:这.类型});}抓(犯错){控制台.日志(`队列中。数犯错或:\n$ {犯错}`);返回零;}}
queue-mongodb
模块:Queue.close ()
方法
的.close ()
方法运行dbClose ()
函数终止数据库连接,这样Node.js事件循环就可以结束:
异步关闭(){试一试{等待dbClose();}抓(犯错){控制台.日志(`队列中。关闭犯错或:\n$ {犯错}`);返回零;}}//结束}
新队列
对于任何web应用程序来说,队列都是一个需要考虑的问题,因为它的计算成本很高,可能会导致瓶颈。它们可以通过将应用程序解耦为更小、更快、更健壮的进程来提高性能和维护。专用的消息代理软件是一种选择,但是像我们今天构建的Node任务队列这样的简单队列系统只需要几十行代码就可以实现。