消息服务使用介绍


一、JOS消息服务简介

主动通知API是一种实时向客户端推送数据变化信息,降低ISV获取京东数据成本的长连接消息通知API。比如:对于一个卖家,他的交易可能任何时刻都会产生,此时卖家就需要及时知道每一笔新创建的订单,紧接着做发货等处理。

传统的做法:只能不停的以很短的间隔轮循API获取最新的交易。

主动通知做法:在创建交易后会客户端会非常及时的得到有新的交易创建了,然后通过API去获取这笔交易的详情,而不再需要以很短的间隔轮询获取最新的订单。

1.1主动通知和REST API区别

连接到主动通知API需要发起一个持久打开的HTTP连接请求,这种方式的请求和普通的REST API的HTTP请求对于应用程序处理会有很大的不同。

1.1.1  REST API的方式

1)应用程序接受用户的请求。

2)应用程序发起一个或者多个REST API的请求,得到API的响应结果,连接断开。

3)应用程序对结果做一些处理,把结果展示给用户。

1.1.2  主动通知方式:

1)  App获取长连接处理进程处理的结果,并展示给用户

2)  App想JD放起长连接请求

3)  JD把变化的数据通过长连接返回给app

4)  App接收到长连接返回的数据,处理数据,并存储处理结果

1.2 主动通知消息服务的价值

1)  实时:每次数据的变化基本都在毫秒内返回给应用程序。

2)  提升用户体验:开发者可以基于数据的实时性到达,满足用户对数据苛刻的实时展示需求,而且可以提供一些更加人性化的实时服务,提高用户体验满意度。

3)  多种业务数据类型:目前有商品、退款、交易、酒店、机票等消息数据,随着开发者的需求增多,会开放出来更多的业务数据类型。方便开发者满足不同的用户需求。

4)  数据订阅方式多样化:可以指定需要那些用户的数据,也可以指定需要某个属性满足某种条件的数据(如:只需要某些商品id变化的数据)。

5)  成本降低:开发者不在需要不停的轮循API获取最新的数据,只需要等待数据到达。尤其在随着用户量增加,轮循的成本会更大,而且会形成

6)  恶性循环,轮循过程中会出现API调用错误,错误越多轮循更加频繁。最终对TOP对开发者造成很大的压力和开销。

7)  稳定:每天消息的推送成功率在99.995%以上。

8)  易用:只需要发起一个HTTP请求,就可以不停的接收数据,同时官方提供了sdk,开发者只需要简单实现接口,处理业务数据。


消息服务的使用

流程图:


一、 APP(应用)订阅消息

访问 http://dev.jd.com ,登录后,选择应用管理进入主动通知管理页面,如下图所示:



开发者根据自己的需求,选择需要的消息类型以及消息状态.

 

二、授权指定用户

如果订阅的是私有消息,则在完成第一步后还不能接收到任何用户的消息数据,因为数据属于用户,必须获得用户(卖家)的授权允许才能接收用户的数据。所以首先要获取卖家授权即取得token ,再调用jingdong.increment.customer.permit授权获取指定用户的消息。

APP运行的过程中,如果APP不再需要接收某个用户的消息,可以调用jingdong.increment.customer.stop取消 。也可以调用jingdong.increment.customers.get 查看当前授权接收了那些用户的消息数据。

 

下载SDK 调用方式等详情请参考京东开放服务平台API文档。

 

 

 

主动通知-发起请求

一、连接到服务端

建立一个到主动通知 API 请求,相当于发起一个存活期很长的HTTP请求(目前的存活期是24小时),应用程序不停的从HTTP响应中增量的获取结果,简单的可以认为应用程序在用HTTP的方式下载一个无限大的文件。

正式环境请求地址:http://comet.jd.com/stream

客户端发起一个HTTP POST请求,JOS comet服务端将保持住这个请求24小时。如果在连接过程中出现请求参数错误、服务端处理错误、网络出现异常情况,服务端会断开连接。错误信息可以从HTTP相应头中的errmsg键值获得。

有些HTTP的客户端包只有在服务端关闭掉连接后才返回整个响应结果。这样的客户端不能用于发起HTTP长连接请求。需要使用能增量的给应用程序返回响应结果的HTTP客户端包,比如:jdk自带的HttpURLConnection,或者Apache的HttpClient 。另也可以使用我们的sdk(java sdk )

二、请求参数

名称

是否必需

描述

app_key

Y

comet分配给应用的key

app_secret

Y

应用的secret

rid

N

请求的连接标识,多连接时使用,目前只需要传null

三、服务端响应

1)http code为200,说明服务端接受了请求,应用程序可以正常读取消息。

2)http code为400,说明请求参数错误,服务端没有接受请求,应用程序从http header中读取key=errmsg的信息,查看具体的请求失败原因。

3)http code为403,说明服务端在发布,应用程序可以休眠一段时间继续发起请求

code

服务端行为

app端行为

消息体举例

 

含义

 

 

-

200

--------------接受连接

------------准备读取数据

{"packet":{"code":

200,"msg":"connectedjos"}}

 

连接成功(服务端接受连接并且连接保持24小时)

 

 

-

201

----------发送心跳包

------------重新发起连接

{"packet":{"code":201,"msg":10}}

 

服务端发送心跳包,防止连接被断开。msg中的次数表示从连接被接收到当前发送这个心跳包之间的这段时间,共发送了多少个业务消息包,app需要设置读取超时时间为1分钟左右,如果超过1分钟都没读到数据,说明网络可能有问题了,app需要重新发起连接

 

 

-

202

----------推送消息

------------获取消息内容并处理

{"packet":{"code":202,

"msg":业务数据}}

 

业务消息包。app只需取出业务消息包内容,并作业务逻辑处理,建议:接收数据的线程和处理数据的线程最好分开,不要影响接收度。

 

 

-

203

----------记录app消息丢弃情况

---

------------调api获取丢失消息

{"packet":{"code":203,"msg":{"begin":1313743932379, "end":1313745387904}}}

 

app有消息丢弃。服务端发现app的连接断开后会记录下来app的消息丢弃情况。

 

 

-

101

----------断开客户端连接

------------客户端主动重连

{"packet":{"code":101"}}

 

 

连接到达最大时间

 

 

-

102

----------服务端断开连接

------------等待重连

{"packet":{"code":102,"msg":60}}

 

服务端在升级。 msg表示服务端升级大概需要的时间,单位:秒。app在这段时间之后重新连接服务端,并且使用增量api把这段时间内丢失的消息获取到。

 

 

-

103

----------服务端断开连接。

------------等待重连或马上重连

{"packet":{"code":103, "msg":5}}

 

由于某些原因服务端出现了一些问题,需要断开客户端。msg表示建议app在多少s之后发起新的请求连接,app可以选择马上发起新的连接请求,也可以在一段时间后发起连接请求

 

 

-

104

----------断开旧的连接

------------结束连接

{"packet":{"code":104}}

 

由于客户端发起了重复的连接请求,服务端会把前一个连接主动断开。app在新的连接上接收消息,并且把前一个连接上剩余的消息接收完。(可能 104包不是之前连接上最后的包,所以最好在写代码的时候读到null后才结束)

 

 

-

105

----------断开连接

------------检查网络或程序

{"packet":{"code":105}}

 

由于app的消息量太大,但是app的与推送系统之间的网络不太好,或者app接收消息太慢。导致服务端有大量的消息积压,这种情况下服务端会断开连接。app端需要检查一下网络环境,或者接收消息和处理消息的线程有没有分开

 

 

四、消息处理

对于消息处理,为了防止由于客户端处理消息慢或者在618、双11等这种大促活动中导致的消息量激增,客户端处理不过来而导致服务端消息堆积后断开客户端的情况产生,客户端需要测试好大数据量的处理。

客户端可以在其中一个线程中接收消息,当接收到消息后可以把消息写到一个队列、文件或者数据库里,另外一个线程或者线程池可以从队列或者数据库中查询这些消息,然后解析处理消息。尽量做到不阻塞消息读取线程。

1、消息解析:

主动通知返回的结果是json格式的字符串(如下)

{"packet":{"code":200,"msg":"connectedjos"}}

2、消息顺序

主动通知消息数据在返回给客户端的时候不保证顺序性,一般先产生的消息会先到到客户端,但是也有可能在某种情况下消息是乱序到达的,客户端需要做好乱序到达的消息的处理。

3、重复的消息

一般情况消息是不会重复的,一条消息会下发一次,但是有可能在某种情况下消息会重复发送(这种情况不会很多),客户端需要做好对于重复消息的处理。

五、连接断开

由于以下几种原因HTTP连接将发生断开情况。

1、客户端使用相同的参数请求服务端,服务端会断开旧的连接,所以客户端一定要注意这种情况,防止相同的参数在多个地方请求服务端,导致连接不停的被踢掉影响正常的服务。

2、客户端同步处理消息并且消耗时间很长,或者停止读取消息,将导致服务端消息堆积,当堆积到一定量服务端会断开连接。

3、服务端升级或者重启,连接会断开。

4、Comet网络结构变更,或者服务端的负载不均衡,连接会断开,这种情况极少发生。

六、断开重连

一般HTTP客户端包在http请求时可以设置两个参数,一个是连接超时时间,另一个是读取数据超时时间,建议一定要设置读取数据超时时间,对于请求推送系统的消息通知api可以设置为60秒。

 主动通知服务端,为了防止连接被网络中的设备断开,如果连接很长时间都没有消息产生,服务端会间隔60秒发一个心跳数据。客户端可以设置读取数据超时时间为60秒,如果在60秒都没有读到数据,客户端可以断开此连接,重新发起连接请求。

在连接被断开或者发生异常情况的时候可以立刻发起重连,如果重新连接依然失败,建议根据连接过程中出现的错误情况做不同的重新连接处理。

1、如果是网络异常,一般这种错误是临时性的,客户端可以稍微休眠几秒在发起重连。

2、如果是请求过程中发生了业务错误,参考错误码中的描述做处理。

3、如果连接是被客户端踢掉,需要检查那里还用了相同的请求参数请求主动通知api导致客户端被踢掉。

4、如果连接是由于服务端消息堆积被断开,建议异步处理消息

       


主动通知-产生消息

一、产生消息

目前消息种类只有订单类型的消息,以下简单说明不同状态的消息代表的含义,以及如何产生这种状态的消息。

二、订单相关

1.订单添加orderAdd:产生了新订单。

当买家成功下单时,会产生此消息

2.订单删除orderDelete

当买家的订单被取消删除时,会产生此消息

3.订单锁定orderLock

当买家的订单被锁定时,会产生此消息

三、返回消息示例

1、订单添加消息格式:
{  "notify_orderAdd":

   {
pin:"商家帐号",

venderid:商家id,

time:"消息发送时间",

source:’消息源

type:"消息子类型",

content:"消息内容",

extend1:”扩展字段1”,

extend2:”扩展字段2”

}

}

2、订单删除消息格式

{  "notify_orderDelete":

   {
pin:"商家帐号",

venderid:商家id,

time:"消息发送时间",

source:’消息源

type:"消息子类型",

content:"消息内容",

extend1:”扩展字段1”,

extend2:”扩展字段2”

}

}

3、订单锁定消息格式

{  "notify_orderLock":

   {
pin:"商家帐号",

venderid:商家id,

time:"消息发送时间",

source:’消息源

type:"消息子类型",

content:"消息内容",

extend1:”扩展字段1”,

extend2:”扩展字段2”

}

}

 

 

字段名

说明

数据类型

备注

type

消息类型

字符串

1.新订单

2.删除订单

3.锁定订单

10.公告1

11.公告2

source

消息源

字符串

0.公告,1订单

pin

商家帐号

字符串

当消息类型为公告1时候,pin为商家类型,商家类型见5.3.1.2

当消息类型为公告2时候,pin为商家pin

venderid

商家id

字符串

当消息类型为公告1时候,值为空字符串

当消息类型为公告2时候,值为商家venderid

time

消息发送时间

日期

格式:yyyy-MM-dd HH:mm:ss SSS

content

消息内容

字符串

 

extend1

扩展字段1

字符串

 

extend2

扩展字段2

字符串

 

 


 

消息推送SDK使用说明


1、 建议

1)消息一定要异步处理,对于不是异步处理,导致的服务端下发时间过长的appkey,服务端将作出一定的处罚机制。即使用者在CometMessageListener.onReceiveMsg里面的处理逻辑一定要快(最好在几ms级别)。

2)对于相同参数发起的请求,服务端会使用新的连接踢掉旧的连接,由于使用者误用导致的互相踢,sdk可能会产生一些异常行为,请使用者注意。

 

2、连接方式

目前推送系统只有单连接,多连接作为以后的扩展。
如果需要关掉连接,则只需要调用CometStream.stop()方法。
对于连接过程中的各种行为和各种消息的处理方式,可以看ConnectionLifeCycleListenerCometMessageListener这两个类的详细注释。

 连接方式1:客户端只需要建立单个连接,不指定连接标识。
Configuration conf = new Configuration(appkey,secret,null);

conf.setConnectUrl(url);
CometStream stream = new JosCometStreamFactory(conf).getInstance();
stream.setConnectionListener(ConnectionLifeCycleListener接口的实现类);
stream.setMessageListener(CometMessageListener接口的实现类);
stream.start();


/* 可选为连接指定连接监听器和消息监听器,如果为连接指定了相对应的监听器,则对此连接会使用指定的监听器,否则会使用为 CometStream 指定的全局监听器,建议为连接指定自己的监听器,方便将来查找问题,在指定监听器时最好能做到很容器的区分出来不同连接的监听器*/

 

3http 错误码说明

 

code

服务端行为

app端行为

消息体举例

 

含义

 

 

-

200

--------------接受连接

------------准备读取数据

{"packet":{"code":

200,"msg":"connectedjos"}}

 

连接成功(服务端接受连接并且连接保持24小时)

 

 

-

201

----------发送心跳包

------------重新发起连接

{"packet":{"code":201,"msg":10}}

 

服务端发送心跳包,防止连接被断开。msg中的次数表示从连接被接收到当前发送这个心跳包之间的这段时间,共发送了多少个业务消息包,app需要设置读取超时时间为1分钟左右,如果超过1分钟都没读到数据,说明网络可能有问题了,app需要重新发起连接

 

 

-

202

----------推送消息

------------获取消息内容并处理

{"packet":{"code":202,

"msg":业务数据}}

 

业务消息包。app只需取出业务消息包内容,并作业务逻辑处理,建议:接收数据的线程和处理数据的线程最好分开,不要影响接收度。

 

 

-

203

----------记录app消息丢弃情况

---

------------调api获取丢失消息

{"packet":{"code":203,"msg":{"begin":1313743932379, "end":1313745387904}}}

 

app有消息丢弃。服务端发现app的连接断开后会记录下来app的消息丢弃情况。

 

 

-

101

----------断开客户端连接

------------客户端主动重连

{"packet":{"code":101"}}

 

 

连接到达最大时间

 

 

-

102

----------服务端断开连接

------------等待重连

{"packet":{"code":102,"msg":60}}

 

服务端在升级。 msg表示服务端升级大概需要的时间,单位:秒。app在这段时间之后重新连接服务端,并且使用增量api把这段时间内丢失的消息获取到。

 

 

-

103

----------服务端断开连接。

------------等待重连或马上重连

{"packet":{"code":103, "msg":5}}

 

由于某些原因服务端出现了一些问题,需要断开客户端。msg表示建议app在多少s之后发起新的请求连接,app可以选择马上发起新的连接请求,也可以在一段时间后发起连接请求

 

 

-

104

----------断开旧的连接

------------结束连接

{"packet":{"code":104}}

 

由于客户端发起了重复的连接请求,服务端会把前一个连接主动断开。app在新的连接上接收消息,并且把前一个连接上剩余的消息接收完。(可能 104包不是之前连接上最后的包,所以最好在写代码的时候读到null后才结束)

 

 

-

105

----------断开连接

------------检查网络或程序

{"packet":{"code":105}}

 

由于app的消息量太大,但是app的与推送系统之间的网络不太好,或者app接收消息太慢。导致服务端有大量的消息积压,这种情况下服务端会断开连接。app端需要检查一下网络环境,或者接收消息和处理消息的线程有没有分开

 

 

4.监听器方法介绍

ConnectionLifeCycleListener

(1).onBeforeConnect  

    在每次发起连接请求之前,允许应用程序做一些事情

(2)onException(Throwablee)

    系统中出现的任何与连接相关的异常时调用此方法,这里包括连接过程中发生的异常,和系统处理过程中发生的异常

         连接异常,签名错误,丢失参数等异常时,抛出这个异常后,表名没有正确连接到服务端

         系统处理过程中可能会抛出一些异常

         isv可以关注此异常信息,用于诊断系统运行状况

(3)onMaxReadTimeoutException

当系统在30分钟内超过10timeout,则调用这个方法,sdk会继续重连接,但是强烈建议isv监控此方法,当频繁出现readtimeout的时候,说明网络环境可能不是很稳定,需要人工介入检查一下是不是网络有问题。

 

CometMessageListener

4、onConnectMsg(Stringmessage)

服务端:当客户端的连接被接收后,服务端返回连接成功的消息

5、onHeartBeat

服务端:服务端会在每隔一段时间发送一个心跳包(如果一直有业务消息,则不会发送此心跳包)

客户端:不需要做任何处理,需要注意如果在一段时间内没有心跳包收到的话说明底层链路有问题了

6、onReceiveMsg(Stringmessage)

业务消息

7、onDiscardMsg(Stringmessage)

服务端告知客户端丢弃消息的时间段

当收到这个消息后,有两种解决办法。

         调用api查询哪些用户的消息丢弃了,接下来通过增量api补完整这些丢弃的消息。

         调用api查询一段时间内appkey所有开通消息服务的用户的所有消息。

8、onServerUpgrade(Stringmessage)

服务端:服务端在升级

客户端:sdk会在发布期间休眠一段时间,自动重连。

由于服务端在发布的时候消息会丢弃,所以客户端在收到这个消息后,在连接正常之后补充消息,

补充的方式和onDidcardMsg的方式一样

9、onServerRehash

    服务端:服务端负责不均衡,断开所有客户端连接

         客户端:sdk会马上重连

         建议:由于服务端在这个时候可能有消息会丢弃,所以客户端在收到这个消息后,在连接正常之后补充消息,

         补充的方式和onDidcardMsg的方式一样

10、OnServerKickOff

    服务端:消息量太大,isv接收太慢,服务端主动断开客户端

         客户端:sdk不会重连,会停掉系统。

11、OnClientKickOff

    服务端:由于客户端使用相同的参数发起了另外一个请求,服务端把前一个连接断开

         客户端:1,可能是sdk自动发起重连,打印一条信息。这种情况不需要做任何处理。

           2,可能是相同的appkey在其他地方发起了连接请求,需要检查一下相同的appkey是否有在其他地方被使用。

12、onOtherMsg(Stringmessage)

sdk的错误或者服务端增加了消息类型,但是sdk没有来得及升级,建议升级一下sdk

13、onException(Exceptione)

    处理消息过程中可能由于某些原因导致抛出的异常

 


常见问题

1. 消息的断开和心跳测试

客户端要直接断开消息:stream.stop(); 连接成功,则会接收到心跳消息;

2. 连接断开,消息怎么办

对于断开连接(如应用挂了)情况,服务端会堆积消息,等应用重新连接进来后,服务器会把当前时间的消息顺序推送给客户端。断开的这段时间内堆积的消息可通过接口获取;

3. 多连接接收消息

多连接收消息是指多个客户端与推送系统消息服务器建立多个连接来收消息。消息在下发时随机选择从多个客户端中选择一个连接下发消息。多链接有的随机下发消息的功能。