Node.js + Redis Sorted Set实现任务队列
需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。
根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。
在设计任务队列的过程中需要考虑到的几个问题
- 并行执行多个任务
- 任务唯一性
- 任务成功或失败后的处理
针对以上问题的解决方案
- 并行执行多个任务利用 Promise.all 来实现
- 任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。
- 执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部
示例代码
// remote_api.js 模拟第三方 API 'use strict'; const app = require('express')(); app.get('/', (req, res) => { setTimeout(() => { let arr = [200, 300]; // 200 代表成功,300 代表失败需要重新请求 res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] }); }, 3000); }); app.listen('9001', () => { console.log('API 服务监听端口:9001'); }); // producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列 'use strict'; const app = require('express')(); const redisClient = require('redis').createClient(); const QUEUE_NAME = 'queue:example'; function addTaskToQueue(taskName, callback) { // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列 redisClient.zscore(QUEUE_NAME, taskName, (error, task) => { if (error) { console.log(error); } else { if (task) { console.log('任务已存在,不新增相同任务'); callback(null, task); } else { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } else { callback(null, result); } }); } } }); } app.get('/', (req, res) => { let taskName = req.query['task-name']; addTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } else { res.status(200).send('正在查询中......'); } }); }); app.listen(9002, () => { console.log('生产者服务监听端口:9002'); }); // consumer.js 定时获取任务并执行 'use strict'; const redisClient = require('redis').createClient(); const request = require('request'); const schedule = require('node-schedule'); const QUEUE_NAME = 'queue:expmple'; const PARALLEL_TASK_NUMBER = 2; // 并行执行任务数量 function getTasksFromQueue(callback) { // 获取多个任务 redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => { if (error) { callback(error); } else { // 将任务分值设置为 0,表示正在处理 if (tasks.length > 0) { let tmp = []; tasks.forEach((task) => { tmp.push(0); tmp.push(task); }); redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => { if (error) { callback(error); } else { callback(null, tasks) } }); } } }); } function addFailedTaskToQueue(taskName, callback) { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { if (error) { callback(error); } else { callback(null, result); } }); } function removeSucceedTaskFromQueue(taskName, callback) { redisClient.zrem(QUEUE_NAME, taskName, (error, result) => { if (error) { callback(error); } else { callback(null, result); } }) } function execTask(taskName) { return new Promise((resolve, reject) => { let requestOptions = { 'url': 'http://127.0.0.1:9001', 'method': 'GET', 'timeout': 5000 }; request(requestOptions, (error, response, body) => { if (error) { resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error) => { if (error) { console.log(error); } else { } }); } else { try { body = typeof body !== 'object' ? JSON.parse(body) : body; } catch (error) { resolve('failed'); console.log(error); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } else { } }); return; } if (body.status !== 200) { resolve('failed'); addFailedTaskToQueue(taskName, (error, result) => { if (error) { console.log(error); } else { } }); } else { resolve('succeed'); removeSucceedTaskFromQueue(taskName, (error, result) => { if (error) { console.log(error); } else { } }); } } }); }); } // 定时,每隔 5 秒获取新的任务来执行 let job = schedule.scheduleJob('*/5 * * * * *', () => { console.log('获取新任务'); getTasksFromQueue((error, tasks) => { if (error) { console.log(error); } else { if (tasks.length > 0) { console.log(tasks); Promise.all(tasks.map(execTask)) .then((results) => { console.log(results); }) .catch((error) => { console.log(error); }); } } }); });
相关文章
Node.js + Redis Sorted Set实现任务队列
本文给大家分享的是使用Node.js + Redis Sorted Set实现任务队列的方法示例,非常的实用,有需要的小伙伴可以参考下...2016-10-03- 本篇文章主要介绍了PHP实现Session入库存入redis的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧 ...2017-05-08
Linux CentOS系统下安装node.js与express的方法
这篇文章主要给大家介绍了在Linux CentOS系统下安装node.js与express的方法,文中介绍的非常详细,相信对大家具有一定的参考价值,需要的朋友们下面来一起看看吧。...2017-04-03- 这篇文章主要给大家介绍了利用nginx+lua+redis实现反向代理方法教程,文中介绍的非常详细,对大家具有一定的参考学习价值,需要的朋友们下面来一起看看吧。...2017-07-06
- 下面小编就为大家带来一篇PHP实现电商订单自动确认收货redis队列。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2017-05-21
- 这篇文章主要介绍了php基于redis处理session的方法的相关资料,需要的朋友可以参考下...2016-03-18
Windows下安装Redis及使用Python操作Redis的方法
这篇文章主要介绍了Windows下安装Redis及使用Python操作Redis的方法,非常不错,具有参考借鉴价值,需要的朋友可以参考下...2017-07-06- 这篇文章主要介绍了PHP基于Redis消息队列实现发布微博的方法,结合实例形式分析了php+redis数据库的安装、连接、读取、插入等相关操作技巧,需要的朋友可以参考下...2017-05-08
- 这篇文章主要为大家详细介绍了php redis实现文章发布系统以及用户投票系统,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2017-03-12
nginx+tomcat实现负载均衡,使用redis session共享
这篇文章主要介绍了nginx tomcat负载均衡 使用redis session共享,有兴趣的同学可以了解一下。...2017-01-22- 这篇文章主要介绍了利用PHP如何访问带有密码的Redis,文章开始先介绍了如何设置Redis密码及方法带密码的Redis,方便大家学习和理解,有需要的朋友可以参考借鉴,下面来一起看看吧。...2017-02-18
- 消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。本文主要介绍了php中Redis的应用--消息传递。下面跟着小编一起来看下吧...2017-04-03
windows系统下node.js环境配置与安装教程图文详解(详细版)
本文分步骤给大家详细介绍windows系统下简单nodejs环境配置与安装教程,本文图文并茂给大家介绍的非常详细,感兴趣的朋友可以参考下...2017-01-15- 这篇文章主要为大家详细介绍了PHP redis实现超迷你全文检索的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2017-03-12
- 这篇文章主要介绍了PHP5.5安装PHPRedis扩展及连接测试方法,较为详细的介绍了针对php5.5安装PHPRedis扩展的具体步骤与相关操作技巧,并给出了安装后的连接测试示例,需要的朋友可以参考下...2017-01-25
php+redis实现多台服务器内网存储session并读取示例
这篇文章主要介绍了php+redis实现多台服务器内网存储session并读取示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...2017-01-15- 这篇文章主要介绍了Nginx+SSL+Node.js运行环境配置教程,本文用反向代理的方式代理基于Node.js的Web应用,需要的朋友可以参考下...2016-01-27