主要是利用rabbitmq的东西,贴出服务端代码
listen代码
gen_tcp:listen(Port, SocketOpts) SocketOpts = [ binary, {packet, 0}, %%{packet, 0}表示erlang系统会吧TCP数据原封不动地直接传送给应用程序 {reuseaddr, true}, %%允许本地重复使用端口号 {nodelay, true}, %%意味着很少的数据也会被马上被发送出去 {delay_send, true}, %%如果打开了delay_send,每个port会维护一个发送队列,数据不是立即发送,而是存到发送队列里,等socket可写的时候再发送,相当于是ERTS自己实现的组包机制 {active, false}, %%注意如果socket客户端断开后,其port不会关闭,而{active,true}与{active,once}则会关闭port {backlog, 1024}, %%缓冲区的长度 {exit_on_close, false}, %%设置为flase,那么在socket被close之后还能将缓冲区中的数据发送出去 {send_timeout, 15000} %%设置一个时间去等待操作系统发送数据,如果底层在这个时间段后还没发出数据,那么就会返回{error,timeout} ]
这个handle_info是在acceptor进程处理的
handle_info({inet_async, LSock, Ref, {ok, Sock}}, State = #state{listen_socket=LSock, ref=Ref}) -> %% patch up the socket so it looks like one we got from %% gen_tcp:accept/1 {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), try %% report {ok, {Address, Port}} = inet:sockname(LSock), {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), ?DEBUG("accepted TCP connection on ~s:~p from ~s:~p~n", [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), spawn_socket_controller(Sock) catch Error:Reason -> gen_tcp:close(Sock), ?ERROR_MSG("ERRunable to accept TCP connection: ~p ~p~n", [Error, Reason]) end, accept(State); spawn_socket_controller(ClientSock) -> case gen_tcp:recv(ClientSock, 23, 30000) of {ok, Bin} -> %%需要测试,过滤掉没用的数据 case supervisor:start_child(sdeeg_tcp_client_sup, [ClientSock, Line]) of {ok, CPid} -> inet:setopts(ClientSock, [{packet, 4}, binary, {active, false}, {nodelay, true}, {delay_send, true}]), gen_tcp:controlling_process(ClientSock, CPid), %%移交socket到网关进程中去 CPid ! go; %%路由到网关进程,以后的数据将会从socket发送到sdeeg_tcp_client进程 {error, Error} -> ?CRITICAL_MSG("cannt accept client:~w", [Error]), catch erlang:port_close(ClientSock) end; Other -> ?ERROR_MSG("ERRrecv packet error:~w", [Other]), catch erlang:port_close(ClientSock) end.
上面会利用gen_tcp:controlling_process把以后客户端发来的数据发送到网关进程中去处理。
下面看看网关进程的部分代码
handle_info(go, #state{socket=Socket} = State) -> prim_inet:async_recv(Socket, 0, -1), {noreply, State}; handle_info({inet_async, Socket, _Ref, {ok, Data}}, State) -> ?ERROR_MSG("ERR~p", [Data]), do_handle_data(Socket,Data,State), {noreply, State}; handle_info({inet_async, _Socket, _Ref, {error, closed}}, State) -> ?ERROR_MSG("ERR~p", [closed]), {stop, normal, State}; handle_info({inet_async, _Socket, _Ref, {error, Reason}}, State) -> ?ERROR_MSG("ERR~ts:~w", ["Socket出错", Reason]), {stop, normal, State}; do_handle_data(Socket, Data, _State) -> ?ERROR_MSG("ERR~p", [Data]), prim_inet:async_recv(Socket, 0, -1).
下面是我的测试代码:
-module(test_gateway). -export([connect/0]). connect() -> {ok,Socket1} = gen_tcp:connect("210.38.235.164", 443, [binary, {packet, 0}]), ok = gen_tcp:send(Socket1,<<1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1>>), ok = gen_tcp:send(Socket1,<<23:32,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1>>), gen_tcp:close(Socket1).
上面的测试代码只能收到第一条信息,第二条信息一直都收取不到,刚开始一直以为是ClientSocket移交给网关进程有问题,但是在执行gen_tcp:close(Socket1),网关进程却能够收到{error, closed}这条消息,明显不是这个问题。后来发现是问题是出现在这里
在刚开始gen_tcp:listen除的参数packet设置为0的,但后面把socket移交给网关进程的时候把packet设置为4.所以导致上面的客户端程序的第二条消息一直都收取不到。下面正确代码
-module(test_gateway). -export([connect/0]). connect() -> {ok,Socket1} = gen_tcp:connect("210.38.235.164", 443, [binary, {packet, 0}]), ok = gen_tcp:send(Socket1,<<1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1>>), inet:setopts(Socket1, [{packet, 4}]), ok = gen_tcp:send(Socket1,<<23:32,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1>>), gen_tcp:close(Socket1).
OK,搞定。
相关推荐
Erlang TCP服务器是用Erlang编程语言实现的一种网络通信服务,它允许程序通过TCP协议接收和发送数据。在Erlang中,构建TCP服务器通常涉及到以下关键知识点: 1. **Erlang OTP(Open Telecom Platform)**:Erlang ...
- **故障隔离**: Erlang VM设计时考虑到了系统的容错性,通过监控和链接机制实现了故障隔离。 - **垃圾回收**: VM具有自动垃圾回收机制,但开发者可以通过调整来优化垃圾回收行为。 - **内部实现**: - **内存管理...
Erlang提供了丰富的内置函数,这些函数覆盖了从简单的数值计算到复杂的数据结构操作的各种功能。掌握这些内置函数对于高效地编写Erlang程序至关重要。 **1.5 并发** 并发是Erlang最引人注目的特性之一。Erlang的...
Erlang的并发模型和内存管理机制可以帮助处理高并发场景,但还需要考虑消息缓存、负载均衡等问题。 9. **界面设计**:尽管这里只提到了源码,但通常一个完整的聊天室项目还会包含前端界面,可能用到HTML、CSS和...
Erlang内置了对并发的支持,通过进程实现,这些进程之间通过异步消息传递进行通信。在Erlang中,编译单元是模块,每个模块都有一个唯一的名称,并包含多个函数,其中只有被明确标记为可导出的函数才能被其他模块访问...
因此,编写健壮的Erlang代码需要考虑到这些可能的异常情况,并采取适当的错误处理策略。 在深入理解Erlang的timer模块的同时,还可以查阅《Erlang程序设计》这本书,它提供了更多关于Erlang语言特性和实践的详细...
在实际应用中,开发者可以利用RabbitMQ实现微服务间的异步通信,通过创建生产者和消费者来发送和接收消息。Erlang的强大力量在于其对分布式系统和并发处理的支持,这使得RabbitMQ成为许多大型分布式系统中不可或缺的...
同时,学习如何通过AMQP协议或各种编程语言的客户端库(如Java的RabbitMQ Java Client,Python的pika等)来发送和接收消息是必不可少的。 在实际应用中,RabbitMQ可用于多种场景,如任务调度、日志收集、事件驱动...
2. **故障恢复与容错**:Erlang支持热代码替换,可以在不中断服务的情况下更新程序。此外,它的分布式特性使得节点间的故障能够被优雅地处理,确保系统的高可用性。 3. **模式匹配**:Erlang的函数调用支持模式匹配...
- **并发性**:Erlang进程模型使得在单个节点上并行运行大量轻量级进程成为可能,这些进程通过消息传递进行通信,减少了竞争条件和同步问题。 - **热代码替换**:Erlang支持在运行时更新和升级代码,无需重启服务,...
它们不关心消息何时被消费,只需确保消息正确地发送到队列。 3. **消费者(Consumer)**:消费者是从RabbitMQ队列中接收并处理消息的应用。它们可以设置为阻塞模式,等待消息到来,或者使用回调函数在有新消息时...
3. **容错性**:Erlang采用故障隔离和恢复机制,当某个进程出错时,不影响其他进程的正常运行。 4. **分布式计算**:Erlang天生支持分布式节点间的通信,这为构建分布式系统提供了便利。 **RabbitMQ与Erlang的结合*...
1. 消息持久化:RabbitMQ允许将消息存储到磁盘上,即使服务器重启,也能保证消息不丢失。 2. 多协议支持:除了AMQP(Advanced Message Queuing Protocol),RabbitMQ还支持其他如STOMP、MQTT等协议,适用于不同场景...
3. **环境变量配置**: 安装完成后,确保Erlang的bin目录已添加到系统的PATH环境变量中。这将使你在命令行中可以全局运行Erlang命令,如`erl`。 接下来,我们来关注RabbitMQ的安装和配置: 1. **下载RabbitMQ**: ...
这种模型非常适合构建聊天系统,因为每个客户端连接都可以视为一个独立的进程,服务器只需要处理进程间的通信,而不需要担心共享状态的问题,从而简化了并发编程的复杂性。 **Erlang的健壮性和容错性** 由于Erlang...
- 软实时性:Erlang设计时考虑了实时性需求,但在保证服务不中断的前提下允许一定程度的延迟。 2. **RabbitMQ核心概念**: - **Exchange**:消息进入RabbitMQ的第一个目的地,根据路由规则将消息分发到不同的队列...
Erlang中的进程具有严格的隔离性,每个进程都有自己的内存空间,并且不共享任何数据。这种设计简化了多线程编程中常见的复杂性问题,如死锁和竞态条件。 ##### 消息传递 进程间的通信完全依赖于消息传递。消息传递...
在实际应用中,Erlang和RabbitMQ通常一起使用,Erlang的并发模型和容错机制为RabbitMQ提供了坚实的后盾,而RabbitMQ作为消息中间件,解决了不同系统之间的异步通信问题,降低了系统间的耦合度。这种组合在微服务架构...
RabbitMQ是一款开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中的异步任务处理、解耦组件以及负载均衡。Erlang是一种为构建高并发、分布式和容错性强的系统而...