liuyu 5 years ago
parent
commit
5676705982
5 changed files with 47 additions and 21 deletions
  1. 18 0
      app.js
  2. 3 3
      app/controller/.message.js
  3. 6 5
      app/controller/home.js
  4. 18 12
      app/service/rabbitmq.js
  5. 2 1
      package.json

+ 18 - 0
app.js

@@ -0,0 +1,18 @@
+'use strict';
+class AppBootHook {
+  constructor(app) {
+    this.app = app;
+  }
+
+  async didReady() {
+    // 应用已经启动完毕
+
+    const ctx = await this.app.createAnonymousContext();
+    await ctx.service.rabbitmq.receiveQueueMsg('schedule');
+  }
+
+  async serverDidReady() {
+    // 应用已经启动完毕
+  }
+}
+module.exports = AppBootHook;

+ 3 - 3
app/controller/.message.js

@@ -1,6 +1,6 @@
 module.exports = {
   create: {
-    requestBody: ['!producerid', 'consumerid', 'type', 'content', 'remark']
+    requestBody: ['!userid', 'name', 'createtime', 'type', 'content', 'status', 'remark']
   },
   destroy: {
     params: ['!id'],
@@ -10,7 +10,7 @@ module.exports = {
     parameters: {
       params: ['!id']
     },
-    requestBody: ['producerid', 'consumerid', 'type', 'content', 'remark']
+    requestBody: ['!userid', 'name', 'createtime', 'type', 'content', 'status', 'remark']
   },
   show: {
     parameters: {
@@ -20,7 +20,7 @@ module.exports = {
   },
   index: {
     parameters: {
-      params: ['producerid', 'consumerid']
+      params: ['userid', 'type']
     },
     service: 'query',
     options: {

+ 6 - 5
app/controller/home.js

@@ -10,7 +10,12 @@ class HomeController extends Controller {
 
   async sendmq() {
     const { ctx } = this;
-    await this.service.rabbitmq.sendQueueMsg(ctx.query.ex, ctx.query.routekey, ctx.query.msg);
+    // const msg = { userid: ctx.body.userid, name: ctx.body.name, createtime: ctx.body.createtime, type: ctx.body.type, content: ctx.body.content, remark: ctx.body.remark };
+    const msg = { userid: '2', name: 'hh', createtime: '2019-10-25 10:10:10', type: '1', content: '您有一个待办事项', remark: '别忘了' };
+    const ex = 'schedule';
+    const routekey = '11';
+    const parm = { durable: true, headers: { userId: '2', type: '1', name: '哈哈' } };
+    await this.service.rabbitmq.sendQueueMsg(ex, routekey, 'hhhsss', parm);
     this.ctx.body = '发送成功';
   }
 
@@ -18,10 +23,6 @@ class HomeController extends Controller {
     const { ctx } = this;
     await this.service.rabbitmq.receiveQueueMsg(ctx.query.ex, ctx.query.routekey, msg => {
       console.log(msg);
-
-      // 插入待办事项到数据库中。
-      this.service.message.create({ userid: msg.userid, name: msg.name, createtime: msg.createtime, type: msg.type, content: msg.content, remark: msg.remark });
-
     });
     ctx.body = '接收成功';
   }

+ 18 - 12
app/service/rabbitmq.js

@@ -1,5 +1,6 @@
 'use strict';
-
+require('../util/constants');
+const sd = require('silly-datetime');
 const Service = require('egg').Service;
 
 class RabbitmqService extends Service {
@@ -11,36 +12,41 @@ class RabbitmqService extends Service {
   }
 
   // 发送消息
-  async sendQueueMsg(ex, routeKey, msg) {
+  async sendQueueMsg(ex, routeKey, msg, parm) {
     const self = this;
     const { mq } = self.ctx;
     if (mq) {
-      await mq.topic(ex, msg, routeKey, { durable: self.durable });
+      await mq.topic(ex, routeKey, msg, parm);
     } else {
       this.ctx.logger.error('!!!!!!没有配置MQ插件!!!!!!');
     }
   }
 
   // 接收消息
-  async receiveQueueMsg(ex, routeKey, receiveCallBack) {
+  async receiveQueueMsg(ex) {
     const self = this;
     const { mq } = self.ctx;
-    console.log(mq);
-    console.log(self.exType);
     if (mq) {
       const ch = await mq.conn.createChannel();
       try {
         await ch.assertExchange(ex, self.exType, { durable: self.durable });
-        const q = await ch.assertQueue('', { exclusive: false });
+        const q = await ch.assertQueue('', { exclusive: true });
         console.log('==q=', q);
         // 队列绑定 exchange
-        await ch.bindQueue(q.queue, ex, routeKey);
+        await ch.bindQueue(q.queue, ex, '*');
         ch.consume(q.queue, msg => {
           console.log('收到消息: ', msg);
-          // 发送确认消息
-          ch.ack(msg);
-          receiveCallBack && receiveCallBack(msg);
-        }, { noAck: false });
+          const result = msg.content.toString();
+          // const fields = msg.fields;
+          // const properties = msg.properties;
+          const headers = msg.properties.headers;
+          // 插入待办事项到数据库中。
+          if (result != null) {
+            // const routingKey = msg.fields.routingKey;
+            const updatetimes = sd.format(new Date(), 'YYYY-MM-DD HH:mm:ss');
+            this.service.message.create({ userid: headers.userId, name: headers.name, createtime: updatetimes, type: headers.type, content: result, remark: '别忘记了' });
+          }
+        }, { noAck: true });
       } catch (e) {
         console.log('==e==', e);
         await ch.close();

+ 2 - 1
package.json

@@ -11,7 +11,8 @@
     "egg-naf-amqp": "0.0.13",
     "egg-scripts": "^2.11.0",
     "jsonwebtoken": "^8.5.1",
-    "naf-framework-mongoose": "^0.6.11"
+    "naf-framework-mongoose": "^0.6.11",
+    "silly-datetime": "^0.1.2"
   },
   "devDependencies": {
     "autod": "^3.1.0",