'use strict'; var _typeof = typeof Symbol === "function" && typeof Symbol.iterator === "symbol" ? function (obj) { return typeof obj; } : function (obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; var _createClass = function () { function defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } return function (Constructor, protoProps, staticProps) { if (protoProps) defineProperties(Constructor.prototype, protoProps); if (staticProps) defineProperties(Constructor, staticProps); return Constructor; }; }(); function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } function _possibleConstructorReturn(self, call) { if (!self) { throw new ReferenceError("this hasn't been initialised - super() hasn't been called"); } return call && (typeof call === "object" || typeof call === "function") ? call : self; } function _inherits(subClass, superClass) { if (typeof superClass !== "function" && superClass !== null) { throw new TypeError("Super expression must either be null or a function, not " + typeof superClass); } subClass.prototype = Object.create(superClass && superClass.prototype, { constructor: { value: subClass, enumerable: false, writable: true, configurable: true } }); if (superClass) Object.setPrototypeOf ? Object.setPrototypeOf(subClass, superClass) : subClass.__proto__ = superClass; } var mqtt = require('mqtt'); var EventEmitter = require('events'); var util = require('util'); var _require = require('./utils'), createGuid = _require.createGuid, createDebug = _require.createDebug, getIP = _require.getIP, isJsonString = _require.isJsonString, mqttMatch = _require.mqttMatch, mqttNotMatch = _require.mqttNotMatch; var debug = createDebug('device'); var guid = createGuid(); var packagejson = require('../package.json'); var Model = require('./model'); var nilFn = function nilFn() {}; var Thing = function (_EventEmitter) { _inherits(Thing, _EventEmitter); function Thing() { var config = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {}; var mqttClient = arguments[1]; var type = arguments[2]; _classCallCheck(this, Thing); // 设备类型:device,subdevice,gateway var _this = _possibleConstructorReturn(this, (Thing.__proto__ || Object.getPrototypeOf(Thing)).call(this)); _this._type = _this.constructor.name || "UNKNOW"; // console.log("thing type:",this._type); //init model _this.model = new Model(config); _this.serveCB = []; _this._onShadowCB = nilFn; _this._onConfigCB = nilFn; // props set callback _this._onPropsCB = nilFn; //兼容旧版本 _this._compatibleoverdue(); return _this; } // 发布消息到topic _createClass(Thing, [{ key: 'publish', value: function publish() { var _mqttClient; (_mqttClient = this._mqttClient).publish.apply(_mqttClient, arguments); } // 订阅消息 }, { key: 'subscribe', value: function subscribe() { var _mqttClient2; (_mqttClient2 = this._mqttClient).subscribe.apply(_mqttClient2, arguments); } // 取消订阅消息 }, { key: 'unsubscribe', value: function unsubscribe() { var _mqttClient3; (_mqttClient3 = this._mqttClient).unsubscribe.apply(_mqttClient3, arguments); } }, { key: 'end', value: function end() { var _mqttClient4; (_mqttClient4 = this._mqttClient).end.apply(_mqttClient4, arguments); } // 属性快捷获取 }, { key: 'postProps', /* 高级版设备属性上报:详细文档地址:https://help.aliyun.com/document_detail/89301.html?spm=a2c4g.11186623.6.660.3ad223dcF1jjSU "params": {"key": "key","value": "value"} */ value: function postProps() { var props = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : {}; var cb = arguments[1]; this._publishAlinkMessage({ method: this.model.POST_PROPERY_METHOD, pubTopic: this.model.POST_PROPS_TOPIC, expectReplyTopic: this.model.POST_PROPS_REPLY_TOPIC, params: props }, cb); } // 当云端属性设置时触发 }, { key: 'onProps', value: function onProps(cb) { this._onPropsCB = cb; } /* 高级版设备事件上报:详细文档地址:https://help.aliyun.com/document_detail/89301.html?spm=a2c4g.11186623.6.660.3ad223dcF1jjSU "params": {"key": "key","value": "value"} */ }, { key: 'postEvent', value: function postEvent(eventName) { var params = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; var cb = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : function () {}; this._publishAlinkMessage({ method: this.model.getPostEventMethod(eventName), pubTopic: this.model.getPostEvenTopic(eventName), expectReplyTopic: this.model.getPostEvenReplyTopic(eventName), params: params }, cb); } /* 高级版设备服务监听:详细文档地址:https://help.aliyun.com/document_detail/89301.html?spm=a2c4g.11186623.6.660.3ad223dcF1jjSU "params": serviceName,cb */ }, { key: 'onService', value: function onService(serviceName, cb) { if (this._addServiceListenerFn == undefined) { this._addServiceListenerFn = this._wrapServiceSubscribe(serviceName, cb); } var reply = this._wrapReplyServiceFn(serviceName); var newCb = function newCb(res) { cb(res, reply); }; this._addServiceListenerFn(); this._pushReceiveServiceCallback(serviceName, newCb); } // 封装服务响应的reply函数 }, { key: '_wrapReplyServiceFn', value: function _wrapReplyServiceFn(serviceName) { var _this2 = this; /* 回应云端调用服务的接口 params的数据结构:{ id:xxx "code": 200, "data": { output:xxx } */ var fn = function fn(params) { var type = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : 'async'; if (params.id == undefined) { params.id = guid(); } var topic = void 0; // 同步调用和异步调用 if (type == 'async') { topic = _this2.model.getServiceRespTopic(serviceName); } else { var rrpcid = _this2._getServiceRRPCID(serviceName); topic = _this2.model.getRRPCRespTopic(rrpcid); } // console.log('topic:',topic); // console.log('params',params); _this2._publishMessage(topic, params); }; return fn; } /* 设备标签上报:详细文档地址:https://help.aliyun.com/document_detail/89304.html?spm=a2c4g.11174283.6.662.7e9d16685aIS1k "params": [{"attrKey": "key","attrValue": "value"},{},{}] 每次上报不会清除上一次的tag */ }, { key: 'postTags', value: function postTags() { var tags = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : []; var cb = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : function () {}; this._publishAlinkMessage({ method: this.model.POST_TAGS_METHOD, pubTopic: this.model.TAG_TOPIC, params: tags }, cb); } // 删除设备标签 }, { key: 'deleteTags', value: function deleteTags() { var tags = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : []; var cb = arguments[1]; var params = tags.map(function (tag) { return { attrKey: tag }; }); this._publishAlinkMessage({ method: this.model.DELETE_TAGS_METHOD, pubTopic: this.model.TAG_DELETE_TOPIC, params: params }, cb); } /* * 获取远程配置 * 技术文档地址:https://help.aliyun.com/document_detail/89308.html?spm=a2c4g.11186623.6.666.53e168d0joDPCn * 返回数据格式 example { code: 200, data: { configId: '3cfda5091d5b4f53b51621cf4bbf86ec', configSize: 16, getType: 'file', sign: 'xxx', signMethod: 'Sha256', url: 'https://otx-devicecenter-thing-config-cn-shanghai-online.oss-cn-shanghai.aliyuncs.com/xxxxxx' }, id: '3', method: 'thing.config.get', version: '1.0' } */ }, { key: 'getConfig', value: function getConfig() { var cb = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : function () {}; // 目前只支持产品维度和文件类型 var params = { "configScope": "product", "getType": "file" }; this._publishAlinkMessage({ method: this.model.GET_CONFIG_METHOD, pubTopic: this.model.CONFIG_TOPIC, params: params }, cb); } //当远程配置下发到设备触发 }, { key: 'onConfig', value: function onConfig(cb) { this._onConfigCB = cb; } // 获取设备影子数据 }, { key: 'getShadow', value: function getShadow() { var msg = { "method": "get" }; this._publishMessage(this.model.SHADOW_GET_TOPIC, msg); } // 设备上报实际值 }, { key: 'postShadow', value: function postShadow(reported) { var msg = { "method": "update", "state": { "reported": reported }, "version": Date.now() }; this._publishMessage(this.model.SHADOW_POST_TOPIC, msg); } // 删除影子设备 }, { key: 'deleteShadow', value: function deleteShadow(keys) { var reported = {}; if (typeof keys == 'string') { var key = keys; reported[key] = "null"; } if ((typeof keys === 'undefined' ? 'undefined' : _typeof(keys)) == "object") { keys.forEach(function (key) { reported[key] = "null"; }); } if (keys == undefined) { reported = "null"; } var msg = { "method": "delete", "state": { reported: reported }, "version": Date.now() }; this._publishMessage(this.model.SHADOW_POST_TOPIC, msg); } // 注册影子设备监听 }, { key: 'onShadow', value: function onShadow(cb) { this._onShadowCB = cb; } // 发送普通消息 }, { key: '_publishMessage', value: function _publishMessage(pubTopic) { var msg = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; var qos = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : 0; var payload = JSON.stringify(msg); this.publish(pubTopic, payload, { qos: qos }, function (err, res) { if (err) { // console.log('publish error', pubTopic, msg.id, err, res) } }); } // 发送alink协议消息 }, { key: '_publishAlinkMessage', value: function _publishAlinkMessage(_ref, cb) { var _ref$method = _ref.method, method = _ref$method === undefined ? "" : _ref$method, pubTopic = _ref.pubTopic, params = _ref.params, expectReplyTopic = _ref.expectReplyTopic, timeout = _ref.timeout, msgId = _ref.msgId; var id = this._genAlinkMessageId(msgId, expectReplyTopic); //暂存回调函数 this._pushCallback(id, cb); var msg = this.model.genAlinkContent(method, params, id); var payload = JSON.stringify(msg); // console.log("_publishAlinkMessage:",payload,pubTopic); this.publish(pubTopic, payload, function (err, res) { debug('pub callback', pubTopic, msg.id, err, res); // console.log('pub callback', pubTopic, msg.id, err, res); if (err) { debug('publish error', pubTopic, msg.id, err, res); } }); } }, { key: '_genAlinkMessageId', value: function _genAlinkMessageId(originID, expect) { var msgId = void 0; var separator = '|exp-topic|'; if (!originID) { msgId = guid(); } // 解决部分topic响应多次的问题 if (expect) { msgId = msgId + separator + expect; } return msgId; } }, { key: '_subscribeClientEvent', value: function _subscribeClientEvent() { var _this3 = this; var client = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : this._mqttClient; ['connect', 'error', 'close', 'reconnect', 'offline', 'message'].forEach(function (evtName) { _this3._mqttClient.on(evtName, function () { for (var _len = arguments.length, args = Array(_len), _key = 0; _key < _len; _key++) { args[_key] = arguments[_key]; } debug('mqtt client ' + evtName); if (evtName === 'connect') { debug('mqtt connected'); _this3._subscribePresetTopic(); } // 事件流到设备端开发者lib中的方式有2中,通过subscribe和通过callback if (evtName === 'message') { // 1:处理subscribe通知 _this3.emit.apply(_this3, [evtName].concat(args)); // 2:处理callback通知 _this3._mqttCallbackHandler.apply(_this3, args); return; } if (evtName === 'close') {} // console.log("on close"); // 其他事件 'connect', 'error', 'close', 'reconnect', 'offline'处理 _this3.emit(evtName, args); }); }); } }, { key: '_createClient', value: function _createClient() { this._mqttClient = mqtt.connect(this.model.brokerUrl, this.model.genConnectPrarms()); } }, { key: '_subscribePresetTopic', value: function _subscribePresetTopic() { var _this4 = this; var thing = arguments.length > 0 && arguments[0] !== undefined ? arguments[0] : this; //初始化只需要订阅 属性返回的topic和标签删除返回的topic,事件topic需要跟进event动态订阅,所以初始化不需要订阅 [ // "/sys/#", // "/shadow/#", // "/ext/#" // devices thing.model.POST_PROPS_REPLY_TOPIC, thing.model.ONSET_PROPS_TOPIC, thing.model.getWildcardEvenTopic(), thing.model.TAG_REPLY_TOPIC, thing.model.TAG_DELETE_REPLY_TOPIC, thing.model.CONFIG_REPLY_TOPIC, thing.model.SHADOW_SUBSCRIBE_TOPIC, thing.model.CONFIG_SUBSCRIBE_TOPIC, thing.model.CONFIG_SUBSCRIBE_RESP_TOPIC, // gateway thing.model.ADD_TOPO_REPLY_TOPIC, thing.model.DELETE_TOPO_REPLY_TOPIC, thing.model.GET_TOPO_REPLY_TOPIC, thing.model.LOGIN_REPLY_TOPIC, thing.model.LOGOUT_REPLY_TOPIC, thing.model.SUBDEVICE_REGISTER_REPLY_TOPIC, thing.model.RRPC_REQ_TOPIC].forEach(function (replyTopic) { // console.log("subscribe topic>>>>>>", replyTopic); _this4.subscribe(replyTopic, { "qos": 1 }, function (error, res) { // console.log(">>>>>> subscribe topic resp",error,res); if (error) { debug('sub error:', error.toString); } }); }); } // 处理内部message以及各种方法的回调 }, { key: '_mqttCallbackHandler', value: function _mqttCallbackHandler(topic, message) { // console.log('device _mqttCallbackHandler',topic,message); // console.log('message',JSON.parse(message.toString())); // console.log('topic',topic); // 几种不处理的情况 // 情况1:回调函数为空 if (this._cb == [] && this._serviceCB == [] && this._onShadowCB == nilFn && this._onConfigCB == nilFn) { return; } // 情况2:返回值为非结构化数据(非结构化可能是:基础版产品或是用户自定义topic) if (isJsonString(message.toString()) == false) { return; } // 开始处理返回值 try { var res = JSON.parse(message.toString()); //处理On Props Set回调 // topic /sys///thing/service/property/set if (mqttMatch(this.model.ONSET_PROPS_TOPIC, topic)) { this._onPropsCB(res); return; } //处理物模型服务订阅返回数据,同步或者异步方式 if ((mqttMatch(this.model.getWildcardServiceTopic(), topic) || mqttMatch(this.model.RRPC_REQ_TOPIC, topic)) && this._onReceiveService != undefined) { // console.log("device mqttMatch(this.model.getWildcardServiceTopic"); this._onReceiveService(topic, res); return; } // 影子设备reply和云端或应用下发影子配置通知,很久之前cmp定义的方法名称,所以格式与其他名称不太相同 if (mqttMatch(this.model.SHADOW_SUBSCRIBE_TOPIC, topic) && this._onShadowCB != nilFn) { this._onShadowCB(res); return; } // 远程配置回调 if (mqttMatch(this.model.getWildcardConfigTopic(), topic) && mqttNotMatch(this.model.CONFIG_REPLY_TOPIC, topic) && this._onConfigCB != undefined) { this._onConfigCB(res); return; } //其他通用回调 var cbID = res.id; var callback = this._findCallback(cbID, topic); if (callback) { callback(res); } } catch (e) { // console.log('_mqttCallbackHandler error',e) } } }, { key: '_findCallback', value: function _findCallback(cbID, topic) { var separator = '|exp-topic|'; var msgTopic = cbID.split(separator)[1]; if (msgTopic && msgTopic != topic) { return; } // 查找回调函数,找到后删除 var cb = this._getCallbackById(cbID); delete this._cb[cbID]; return cb; // if(cbID.indexOf(separator)>0 ){ // console.log("cbID>>>>:",cbID); // console.log("cbID.split",cbID.split(separator)[1]); // if(cbID.split(separator)[1] != topic){ // return; // } // } } // // 查找回调函数,找到后使 // _popCallback(cbID) { // const cb = this._getCallbackById(cbID); // delete this._cb[cbID]; // return cb; // } }, { key: '_wrapServiceSubscribe', value: function _wrapServiceSubscribe(serviceName, cb) { var _this5 = this; var subscription = void 0; var fn = function fn() { //初始化 if (subscription == undefined) { subscription = {}; }; // 查找是否存在 if (subscription.serviceName == undefined) { _this5.subscribe(_this5.model.getServiceTopic(serviceName), function (error, res) { if (error) { debug('sub error:', res); } }); subscription.serviceName = true; } }; return fn; } //处理接收云端下发服务调用 }, { key: '_onReceiveService', value: function _onReceiveService(topic, res) { // console.log("_onReceiveService",this.model.deviceName) var method = res.method; var serviceName = method.split('.').pop(); var cb = this._serviceCB[serviceName] || function () {}; // 如果是rrpc的方式产生的服务同步调用,需要记录服务的id if (mqttMatch(this.model.RRPC_REQ_TOPIC, topic)) { var rrpcid = topic.split('/').pop(); this._pushReceiveServiceRRPCID(serviceName, rrpcid); } cb(res); } }, { key: '_pushCallback', value: function _pushCallback(msgid, fn) { // 初始化回调函数数组 if (this._cb == undefined) { this._cb = []; }; this._cb[msgid] = fn; } }, { key: '_getCallbackById', value: function _getCallbackById(msgid) { // 初始化回调函数数组 if (this._cb == undefined) { this._cb = []; }; return this._cb[msgid]; } }, { key: '_pushReceiveServiceCallback', value: function _pushReceiveServiceCallback(serviceName, fn) { // 初始化回调函数数组 if (this._serviceCB == undefined) { this._serviceCB = []; }; this._serviceCB[serviceName] = fn; } }, { key: '_pushReceiveServiceRRPCID', value: function _pushReceiveServiceRRPCID(serviceName, rrpdid) { // 初始化回调函数数组 if (this._serviceRRPCID == undefined) { this._serviceRRPCID = []; }; this._serviceRRPCID[serviceName] = rrpdid; } }, { key: '_getServiceRRPCID', value: function _getServiceRRPCID(serviceName) { if (this._serviceRRPCID && this._serviceRRPCID[serviceName]) { return this._serviceRRPCID[serviceName]; } return undefined; } }, { key: '_compatibleoverdue', value: function _compatibleoverdue() {} }, { key: 'connected', get: function get() { return this._mqttClient.connected; } }, { key: 'mqttClient', get: function get() { return this._mqttClient; } }]); return Thing; }(EventEmitter); module.exports = Thing;