使用Workerman实现基于UDP的异步SIP服务器

概述

分享主题:使用workerman实现基于UDP的异步SIP服务器,服务器端可主动发送UDP数据给客户端

基于Workerman实现基于UDP的异步SIP服务器是一个涉及网络编程和协议实现的复杂任务。Workerman是一个高性能的PHP socket服务器框架,它支持TCP、UDP、UnixSocket等多种协议,非常适合用于开发需要长连接或高并发的网络应用。下面将详细介绍如何使用Workerman来实现一个基于UDP的异步SIP服务器。

理解SIP协议

SIP(Session Initiation Protocol)是一个信令协议,用于在Internet Protocol(IP)网络中启动、管理和终止实时会话,这些会话可能包括语音、视频、消息传递以及其他多媒体通信。SIP协议是基于请求/响应模型的,类似于HTTP协议。

业务需求

自从使用workerman实现物联网终端接入以来,我工作中的所有网络场景(TCP\\\\UDP\\\\HTTP)等均使用workerman+channel以微服务方式实现,开发速度快,性能超级高。(几十万台设备同时接入都轻轻松松承受住)

之前多次关注过workerman的UDP服务器,但一没有实现我想要的结果,由于近期的业务需求,外加HTTP3 QUIC协议的广泛使用,workerman作为一个广泛使用的高性能PHP网络开发框架,支持持久化的UDP通信是很有必要的。

一直以来想通过workerman编写个基于UDP的SIP服务器和实现GB28181的国标协议,搭配SRSZLMediaKit或者monibuca,满足摄像头、硬盘录像机设备的接入,也可配合FreeSwitch实现基于SIP的语音通话或视频会议系统。

  • workerman 主动发送udp数据:https://www.workerman.net/q/2688
  • UDP服务器主动向客户端发送消息:https://www.workerman.net/q/4284

直到今天终于使用workerman 实现单进程或多进程方式监听某个UDP端口,主动从平台向客户端发送数据并且所有功能均使用workerman的loop功能,能够发挥平台最大化性能。

当进程只有一个时使用 socket 函数实现端口监听,当进程大于1个时使用stream_socket实现端口监听(各有利弊,请酌情使用,大部分场景,推荐将进程数保持与CPU数量一致,自动使用 stream_socket )

0x02 初步测试

当使用stream_socket时,服务器首次收到客户端发送的数据后,能够稳定的向客户端发送约5分钟的数据报文,直到该通信会话被Linux内核丢弃,因此使用UDP进行通信,建议至少60秒进行一次双向心跳交互保活。

当使用socket时,服务器首次收到客户端发送的数据后,能够稳定的向客户端长期发送数据报文(如果网络中的防火墙或NAT路由器没有将会话过期,应该可以一直使用)

0x03 代码

<?php
chdir(dirname($_SERVER[\\\'SCRIPT_FILENAME\\\']));
include_once __DIR__ . \\\'/vendor/autoload.php\\\';

use Workerman\\\\Worker;
use Workerman\\\\Lib\\\\Timer;

$worker = new Worker();
$worker->count = 1;   //开启进程数量
$processName = \\\"sip_server_udp5060\\\";
$worker->transport = \\\"udp\\\";
$worker->name = $processName;
$worker->reusePort = true;   //开启均衡负载模式

$date = date(\\\"Y-m-d\\\");
Worker::$pidFile = \\\"var/{$processName}.pid\\\";
Worker::$logFile = \\\"var/{$processName}_logFile.log\\\";
Worker::$stdoutFile = \\\"var/{$processName}_stdout.log\\\";

$socket     = null;
$workerId   = null;

define(\\\"UNAUTHORIZED_KICKOFF_SECOND\\\" ,  1);
define(\\\"UNAUTHORIZED_ALLOW_TRYTIME\\\"  , 10);
define(\\\"UDP_SOCKET_TYPE_IS_STREAM\\\"   ,  $worker->count > 1 ? 1 : 0 );

$worker->onWorkerStart = function() {
    global $worker , $workerId , $socket , $processName;

    //定期与数据库握手,避免被断掉,该动作每个进程都得执行

    $workerId = $worker->id ;
    $worker->connections = [];
    $worker->connections_ur = [];

    //根据daemon顺序延时,这是确保系统正常运行的关键
    usleep(1000 * 10 * ($worker->id+1) );

    echo date(\\\"Y-m-d H:i:s\\\").\\\" 服务进程{$worker->id}已经启动!\\\\n\\\";

    if( $workerId >= 0 ){

        //计划监听的UDP端口
        if(UDP_SOCKET_TYPE_IS_STREAM == 0){
            //这种模式只能运行一个进程
            $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
            socket_bind($socket , \\\"0.0.0.0\\\" , 5060);
        }else{
            //这种模式可以多个进程共同监听同一端口
            $context = stream_context_create();
            $socket = stream_socket_server(\\\"udp://0.0.0.0:5060\\\" , $error_code , $error_message , STREAM_SERVER_BIND , $context);
        }

        Worker::$globalEvent->add($socket, 1 , \\\"acceptUdpConnection\\\");   //加入全局事件loop

        /*
        //如果需要从服务器主动发数据给客户端,可以通过channel方式调用,具体的业务实现可以参照官方channel使用介绍

        Channel\\\\Client::connect(CHANNEL_SIP_IP , CHANNEL_SIP_PORT);
        $event_name = $processName;    //UDP的数据回复跟TCP不一样,只要知道对方端口即可,理论上任意一个进程均可从服务器端发送数据给客户端
        //到公网环境查询本机公网IP,便于生成日志
        Channel\\\\Client::on($event_name, function($event_data)use($worker , $event_name) {
        });
        */

        //每隔一段时间对长时间未进行通信的临时会话进行清理
        //不宜太大,太大容易遭受DDOS攻击,也不宜太小,太小的话有可能还没完成业务逻辑就被踢掉
        Timer::add( UNAUTHORIZED_KICKOFF_SECOND , function (){
            global $worker , $socket;
            $dateTime = date(\\\"Y-m-d H:i:s\\\");
            $timeNow = time();

            foreach ($worker->connections_ur as $remote_address => $remote_arr){
                if( $remote_arr[\\\'lastMsgTime\\\'] < $timeNow - UNAUTHORIZED_KICKOFF_SECOND ){
                    //非法接入,直接踢掉
                    unset($worker->connections_ur[$remote_address]);
                    //print(\\\"{$dateTime} {$remote_address} 踢掉非法接入\\\\n\\\");
                    //sendto($remote_address ,\\\"illegal connection!\\\\n\\\");
                }

                //平台主动发送数据给终端,
                sendto($remote_address ,\\\"{$dateTime} test send data!\\\\n\\\");
            }
        });

        //每隔一段时间对已经认证过的会话进行检查,对于长时间未通信的需要进行清理
        Timer::add( 60 , function (){
            global $worker , $socket;
            $dateTime = date(\\\"Y-m-d H:i:s\\\");
            $timeNow = time();

            foreach ($worker->connections as $remote_address => $remote_arr){
                if( $remote_arr[\\\'lastMsgTime\\\'] < $timeNow - 300 ){
                    //超时未通信,直接踢掉
                    unset($worker->connections[$remote_address]);
                    //这里编写远端会话离线的内容
                    //print(\\\"{$dateTime} {$remote_address} 会话超时掉线\\\\n\\\");
                    //sendto($remote_address ,\\\"connection timeout!\\\\n\\\");
                }
            }
        });
    }
};

function sendto($remote_address , $send_buffer ){
    global $socket;
    if(UDP_SOCKET_TYPE_IS_STREAM == 0){
        list($host, $port) = explode(\\\":\\\" , $remote_address );
        socket_sendto($socket , $send_buffer , strlen($send_buffer), 0, $host, $port);
    }else{
        stream_socket_sendto( $socket, $send_buffer, 0, $remote_address);
    }
}

function acceptUdpConnection($socket){
    global $worker;
    $dateTime = date(\\\"Y-m-d H:i:s\\\");
    $timeNow = time();

    set_error_handler(function(){});
    if(UDP_SOCKET_TYPE_IS_STREAM == 0){
        socket_recvfrom($socket, $recv_buffer, 65535, 0, $from, $port);
        $remote_address = \\\"{$from}:{$port}\\\";
    }else{
        $recv_buffer = stream_socket_recvfrom($socket, 65535 , 0, $remote_address);
    }

    restore_error_handler();
    if (false === $recv_buffer || empty($remote_address)) {
        return false;
    }

    if(
        isset($worker->connections[$remote_address][\\\'lastMsgTime\\\'])
        && $worker->connections[$remote_address][\\\'lastMsgTime\\\'] >= $timeNow - 300
    ){
        //已注册成功
        $worker->connections[$remote_address][\\\'lastMsgTime\\\'] = $timeNow;

        //处理业务逻辑  针对已经认证的合法连接
        //针对已经认证过的连接,建议将收到的数据通过channel发布到其他服务端进行轮询处理,以最大化提升系统处理性能,此时,本程序仅仅充当gateway功能

    }else{
        //未注册成功   未认证的可以进行几次通信
        $tryCount = $worker->connections_ur[$remote_address][\\\'tryCount\\\']??1;
        if( $tryCount < UNAUTHORIZED_ALLOW_TRYTIME ){
            $worker->connections_ur[$remote_address][\\\'lastMsgTime\\\'] = $timeNow;
            $worker->connections_ur[$remote_address][\\\'tryCount\\\'] = $tryCount + 1;
            //sendTo( $remote_address ,\\\"{$dateTime} {$remote_address}:\\\\nunauthorized,try time {$tryCount} \\\\n\\\");
            //处理业务逻辑   对于未认证的连接,必须在超时前完成认证       完成认证后需要将连接从  connections_ur 里面移出,并且加入到 connections 里面去
        }else{
            //超过一定交互次数但仍未完成认证的会话将被忽视,直到被踢下线
            //sendTo( $remote_address ,\\\"{$dateTime} {$remote_address}:\\\\nunauthorized,max try \\\\n\\\");
        }
    }

    //收到数据,打印日志
    print( \\\"{$dateTime} {$remote_address}:\\\\n{$recv_buffer}\\\\n\\\");
};

Worker::runAll();

0x04 程序输出样例数据

2024-03-27 01:15:52 39.129.72.95:50574:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.4:5060;rport;branch=z9hG4bK650883237
From: <sip:34020000001180870005@3402000000>;tag=698431213
To: <sip:34020000001180870005@3402000000>
Call-ID: 1146222786
CSeq: 1 REGISTER
Contact: <sip:34020000001180870005@192.168.1.4:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0

2024-03-27 01:15:53 39.129.72.95:15363:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.2:5060;rport;branch=z9hG4bK1890963109
From: <sip:34020000001180870007@3402000000>;tag=1955740825
To: <sip:34020000001180870007@3402000000>
Call-ID: 254089720
CSeq: 1 REGISTER
Contact: <sip:34020000001180870007@192.168.1.2:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0

2024-03-27 01:15:54 39.129.72.95:31137:
REGISTER sip:34020000002000000001@3402000000 SIP/2.0
Via: SIP/2.0/UDP 192.168.1.8:5060;rport;branch=z9hG4bK1552460892
From: <sip:34020000001180870002@3402000000>;tag=1271059450
To: <sip:34020000001180870002@3402000000>
Call-ID: 108704431
CSeq: 1 REGISTER
Contact: <sip:34020000001180870002@192.168.1.8:5060>
Max-Forwards: 70
User-Agent: Embedded Net DVR/NVR/DVS
Expires: 86400
Content-Length: 0

01:38:09.293144 IP (tos 0x68, ttl 48, id 44466, offset 0, flags [DF], proto UDP (17), length 436)
    39.129.72.95.64959 > 192.168.27.21.5060: [udp sum ok] SIP, length: 408
        REGISTER sip:34020000002000000001@3402000000 SIP/2.0
        Via: SIP/2.0/UDP 192.168.1.7:5060;rport;branch=z9hG4bK1838031091
        From: <sip:34020000001180870003@3402000000>;tag=793428652
        To: <sip:34020000001180870003@3402000000>
        Call-ID: 1815643450
        CSeq: 1 REGISTER
        Contact: <sip:34020000001180870003@192.168.1.7:5060>
        Max-Forwards: 70
        User-Agent: Embedded Net DVR/NVR/DVS
        Expires: 86400
        Content-Length: 0

01:38:09.540771 IP (tos 0x0, ttl 64, id 30488, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.50574: [udp sum ok] SIP
01:38:09.540806 IP (tos 0x0, ttl 64, id 30489, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.31137: [udp sum ok] SIP
01:38:09.540815 IP (tos 0x0, ttl 64, id 30490, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.15363: [udp sum ok] SIP
01:38:09.540822 IP (tos 0x0, ttl 64, id 45413, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.130.87.71.34950: [udp sum ok] SIP
01:38:09.540830 IP (tos 0x0, ttl 64, id 27774, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.128.225.81.35329: [udp sum ok] SIP
01:38:09.540838 IP (tos 0x0, ttl 64, id 30491, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.26505: [udp sum ok] SIP
01:38:09.540845 IP (tos 0x0, ttl 64, id 2845, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.64959: [udp sum ok] SIP
01:38:09.540860 IP (tos 0x0, ttl 64, id 30493, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.36321: [udp sum ok] SIP
01:38:09.540867 IP (tos 0x0, ttl 64, id 27775, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.128.225.81.35342: [udp sum ok] SIP
01:38:09.540873 IP (tos 0x0, ttl 64, id 30494, offset 0, flags [DF], proto UDP (17), length 64)
    192.168.27.21.5060 > 39.129.72.95.23047: [udp sum ok] SIP
 * */

原创文章,作者:网络技术联盟站,如若转载,请注明出处:https://www.sudun.com/ask/49876.html

(0)
网络技术联盟站's avatar网络技术联盟站
上一篇 2024年5月11日 上午1:40
下一篇 2024年5月11日 上午1:42

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注