mars 求文档

unguejic  于 5个月前  发布在  其他
关注(0)|答案(4)|浏览(66)

无从下手,不知道服务器怎么配合才能运行起来,注意事项也无从知晓。
默认协议,心跳机制,登录过程等基础问题无从下手

尝试去看源代码,运行sample 调试起来,全部靠猜的坚持到现在。调试了几个星期,发现自己连心跳的机制都还不清楚,可怕的是无从下手呀。

ux6nzvsh

ux6nzvsh1#

我读了些长链短链的源码,希望对您有所帮助。。

mars::stn

stn(signaling transfer network)是mars的核心模块,它负责网络请求任务的处理。在该模块启动的时候会创建消息队列,客户端程序员调用API创建网络任务加入消息队列, 当stn线程发现任务队列有任务了就唤醒,然后 NetCore::StartTask (推荐断点。。)根据任务类型(task.channel_select )选择长链接还是短链接。

任务是网络请求的抽象,比如点击朋友圈获取数据这个网络请求是走哪个通道(长链/短链),优先级是多少,如果获取失败重试多少次等等。为了代码清晰,下面默认USE_LONG_LINK宏已配置)

短链接

以shortlink为例子,用户主动发送的消息基本都是短链接。 短链接由短链接管理器创建(ShortLinkTaskManager::StartTask),它会把任务(Task)包装成一个TaskProfile,TaskProfile会记录启动时间等。 然后放入任务列表(lst_cmd),后面 __RunOnStartTask 迭代处理这个任务列表:

void ShortLinkTaskManager::__RunOnStartTask() {
    std::list<TaskProfile>::iterator first = lst_cmd_.begin();
    std::list<TaskProfile>::iterator last = lst_cmd_.end();
    ...
    // 迭代处理任务
    while (first != last) {
        std::list<TaskProfile>::iterator next = first;
        ++next;
        // 如果该任务已经有线程执行了,就处理下一个
        if (first->running_id) {
            ++sent_count;
            first = next;
            continue;
        }
        //重试间隔
        ...
        // 检查是否具有发送任务的权限
        ...
        //雪崩检测
        ...
        // 发送网络请求之前记录一些时间信息等
        ...
        // 创建ShortLInk
        ShortLinkInterface* worker = ShortLinkChannelFactory::Create(MessageQueue::Handler2Queue(asyncreg_.Get()), net_source_, first->task, first->use_proxy);
        // 设置回调,这些回调会在合适的位置被mars主动调用,具体位置参见后面
        worker->OnSend.set(boost::bind(&ShortLinkTaskManager::__OnSend, this, _1), AYNC_HANDLER);
        worker->OnRecv.set(boost::bind(&ShortLinkTaskManager::__OnRecv, this, _1, _2, _3), AYNC_HANDLER);
        worker->OnResponse.set(boost::bind(&ShortLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6, _7), AYNC_HANDLER);
        first->running_id = (intptr_t)worker;
        // 创建ShortLink失败,则处理下一个任务
        if (!first->running_id) {
            first = next;
            continue;
        }
        // 设置网络数据上报函数
        worker->func_network_report.set(fun_notify_network_err_);
        // 发送请求
        worker->SendRequest(bufreq, buffer_extension);
        // 然后处理下一个任务
        ++sent_count;
        first = next;
    }
}

在迭代过程中,对于每个网络任务,短链接工厂(ShortLinkChannelFactory::Create)创建一个短链接,它关联了一个线程,短链接的生命周期就是这个线程的生命周期。短链接线程先服务器握手( __RunConnect )然后发送请求,等待回复( __RunReadWrite )。
有个问题是步骤比较固定,比如用户想发完请求打个日志,或者收到回复写一下数据库。这种情况下用户可以传入回调函数,mars会在合适的位置执行这些回调,一个比较完整的短链接生命周期如下:

线程启动 -> socket连接服务器 -> OnSend回调 -> 数据打包
 socket发送请求 -> socket接受回复 -> OnRecv回调 -> 数据解包 -> OnResponse回调 -> 线程结束

最后,线程做什么我们知道了,可是线程还没有启动。翻翻上面代码, worker->SendRequest 会启动线程。

长链接

长链接主要用于服务器向客户端推送的消息,比如一个不提醒的群聊收到群友的消息就会走长链接。前面套路都是一样的:stn获取消息队列的任务,net_core选择使用长链接处理任务,长链接管理器启动长链接任务。这一步短链接管理器是创建一个短链接,但是长链接管理器没有,它复用一条长链接。原因是mars目前只支持一条长链接和任意数量短链接。

长链接第一步也是服务器握手,第二步和短链接稍有不同;因为短链接是一次性任务,第二步开线程发个请求等回复就完了,长链接要求持续性的发送和接受任务,所以是一个whiletrue循环:

void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) {
    // 空指令闹钟(心跳闹钟)
    Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false);
    // 超时闹钟(心跳超时闹钟)
    Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false);
    ...
    
    while (true) {
        // 如果空指令闹钟响过(OnAlarm调用过)
        if (!alarmnoopinterval.IsWaiting()) {
            if (first_noop_sent && alarmnoopinterval.Status() != Alarm::kOnAlarm) {
                xassert2(false, "noop interval alarm not running");
            }
            if(first_noop_sent && alarmnoopinterval.Status() == Alarm::kOnAlarm) {
              __NotifySmartHeartbeatJudgeDozeStyle();
            }
            xgroup2_define(noop_xlog);
            uint64_t last_noop_interval = alarmnoopinterval.After();
            uint64_t last_noop_actual_interval = (alarmnoopinterval.Status() == Alarm::kOnAlarm) ? alarmnoopinterval.ElapseTime() : 0;
            bool has_late_toomuch = (last_noop_actual_interval >= (15*60*1000));
            
            // 发送心跳,开启超时闹钟,确保心跳在限定时间内被发送&&收到答复
            if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) {
                nooping = true;
                __NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval);
            }
            first_noop_sent = true;

            //  重置空指令闹钟
            uint64_t noop_interval = __GetNextHeartbeatInterval();
            alarmnoopinterval.Cancel();
            alarmnoopinterval.Start((int)noop_interval);
        }
 
        // 错误判断
        ...

        // 如果有待写的数据缓冲(lstsenddata_)
        if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) {
            // 用writeev一次写完缓冲
            ssize_t writelen = writev(_sock, vecwrite, (int)lstsenddata_.size());
            ...

            // 重置心跳闹钟,这意味着假如之前有个心跳闹钟,但是现在与服务
            // 器通信了,之前那个心跳闹钟就不算了
            unsigned long long noop_interval = __GetNextHeartbeatInterval();
            alarmnoopinterval.Cancel();
            alarmnoopinterval.Start((int)noop_interval);

            
            GetSignalOnNetworkDataChange()(XLOGGER_TAG, writelen, 0);

            // 写完调用OnSend回调并清理待写缓冲(lstsenddata_)          
            while (it != lstsenddata_.end() && 0 < writelen) {
                if (0 == it->second->Pos() && OnSend) OnSend(it->first.taskid);
                ...
                it = lstsenddata_.erase(it);
            }
        }
        
        lock.unlock();

        // 如果socket可读
        if (sel.Read_FD_ISSET(_sock)) {
            bufrecv.AllocWrite(64 * 1024, false);
            // 那就读
            ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0);
            ...
            
            // 解析读到的数据包
            while (0 < bufrecv.Length()) {
                int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body, extension, tracker_.get());
               
                ...

                // 解析完就调用OnRecv回调
                if (LONGLINK_UNPACK_STREAM_PACKAGE == unpackret) {
                    if (OnRecv)
                        OnRecv(taskid, packlen, packlen);
                } 
                // 如果是心跳包顺便取消心跳超时闹钟,并调用OnResponse回调
                else if (!__NoopResp(cmdid, taskid, stream_resp.stream, stream_resp.extension, alarmnooptimeout, nooping, _profile)) {
                    if (OnResponse)
                        OnResponse(kEctOK, 0, cmdid, taskid, stream_resp.stream, stream_resp.extension, _profile);
                    sent_taskids.erase(taskid);
                }
            }
        }
    }
    
    
End:
    // 错误处理,清理资源,退出等
    ...

}

Longlink做三件事:1)视情况发送心跳2)如果数据缓冲有数据就发送3)如果socket可读就读取。第一个心跳是通过空指令闹钟完成的,它每隔一段时间响一次,然后重置等待下一次响应,另外空指令闹钟响的时候会发送心跳包并开启超时闹钟(alarmnooptimeout),该闹钟用于确保心跳在超时范围内从客户端发送&&得到服务器响应。

同样,长链接完整流程是:

线程启动 ->  socket连接服务器 -> 死循环{ 
   空指令闹钟响过 ->  NoopReq发送心跳,开启超时闹钟 -> 重置空指令闹钟
   socket可写   ->  写 -> OnSend回调
   socket读     ->  读 -> 解包 -> OnRecv回调 -> 解包完毕 -> 如果是心跳包{ 取消超时闹钟 ,OnResponse回调 }
 }
kdfy810k

kdfy810k2#

@kelthuzadx 非常感谢,我昨天看到了心跳机制明白了心跳机制,可以查看 #638

现在问题在长连接的鉴权了,我计划鉴权的逻辑为

  1. 连接成功后
  2. 发送一个鉴权请求(带上token,就知道这条连接属于哪个用户)
  • 获取响应

  • 鉴权成功,进入正常逻辑

  • 鉴权失败,提示用户token 已经失效,请重新登录
    登录使用的是http 完成

现在卡在这个问题上。

5cnsuln7

5cnsuln74#

代码里心跳包不叫心跳,叫noop。。真是奇葩

相关问题