Сокет

Сокет — программный интерфейс (абстрактный объект) для обеспечения обмена данными между процессами. То же: socket.

Межпроцессное взаимодействие через сокеты может использовать сетевой протокол, а может и не использовать. В POSIX есть IPC-сокеты, которые как раз не используют сетевой протокол. В дальнейшем, когда говорим о сокетах, будем подразумевать сетевые сокеты.

Сокеты хороши тем, что позволяют двум процессам обмениваться данными через интернет. Использование сокетов — один из двух возможных вариантов распределённого программирования в Эрланге. Более надёжный при использовании интернета по сравнению с нативным кластером нод, который лучше использовать в локальной сети. Использование сокетов для общения двух эрланг-программ не такое удобное и красивое, как нативный кластер, однако и не такое уж сложное. К тому же через сокеты можно передавать специальные (расширенные) бинарники, в которые элегантно можно запаковывать любые термы, включая фунтермы. Это очень полезная и удобная возможность Эрланга.

И конечно, сокеты позволяют программам на Эрланге взаимодействовать с программами, написанными на других языках.

Для взаимодействия через интернет предусмотрены два основных протокола: UDP и TCP. UDP более прост в реализации, однако сообщения (датаграммы), передаваемые с его помощью могут потеряться, продублироваться, прийти в другом порядке. Однако если датаграмма пришла, можно не сомневаться в её достоверности. Этот протокол можно использовать для передачи коротких сигналов или (наоборот) распространения аудио и видео (то есть очень больших данных, в которых потеря некоторой небольшой части может пройти незаметно).

Там же, где речь идёт о надёжной передаче данных произвольной длины, используют TCP. Типичный пример — веб. Когда мы просматриваем содержание какой-либо страницы в интернете (например, читаем рецепт борща), получаем через TCP один или несколько файлов. При этом можем быть уверены, что случайно соль в тексте не будет заменена на сахар или соду.

В стандартной поставке Эрланга есть два модуля для работы с этими протоколами: gen_udp и gen_tcp.

Получение данных через TCP-сокет

Чтобы получить данные от сервера, нам придётся сделать три вещи:

Рассмотрим краткий пример, где мы создадим элементарную функцию для получения каких-либо данных по URL.

-module(proba).
-export([main/0]).

main() ->
    get_url("localhost", 8000, "/index.html").

get_url(Host, Port, What) ->
    {ok,Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),
    ok = gen_tcp:send(Socket, "GET "++What++" HTTP/1.0\r\n\r\n"),
    otvet(Socket, []).

otvet(Socket, SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            otvet(Socket, [Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(lists:reverse(SoFar))
    end.

Мы реализовали функцию get_url/3, которая в качестве аргументов получает имя хоста, номер порта и описание того, что именно нам интересно получить с этого хоста (“/index.html”). С помощью gen_tcp:connect/3 мы получаем терм Socket. С помощью gen_tcp:send/2 мы посылаем запрос. После этого ждём ответ. Пока ждём, запускаем рекурсивную функцию otvet/2, она нам понадобится, чтобы собрать ответ по частям.

Мы находимся в определённом акторе. Наш актор будет получать ответ в виде сообщений, как минимум двух. Сначала актор получит одно или несколько сообщений вида {tcp,Socket,Bin}, то есть кортежа, состоящего из трёх элементов: атома tcp, терма Socket (чтобы мы могли убедиться, что сообщения приходят от нужного сокета), собственно сообщения-бинарника. Пока приходят сообщения, функция otvet/2 рекурсивно вызывает саму себя. Когда же приходит финальное сообщение {tcp_closed,Socket}, весь ответ целиком компонуется и возвращается вызывающей стороне (функции get_url/3).

Интересный вопрос: а что из себя представляет терм Socket? Чтобы проверить, добавим следующую строку:

io:format("~p~n", [Socket]),

И получим:

#Port<0.10>

Это порт. Мы можем проверить это также с помощью бифа is_port/1, что даст нам true. Порт в Эрланге — специальный объект, через который осуществляется связь с внешней программой. Эрланг-порт это специальный актор внутри ERTS. Этот актор и шлёт нашему сообщения.

У gen_tcp:connect/3 третий аргумент — опции. binary означает, что мы желаем получать от ERTS данные в виде бинарника. {packet, 0} — данные должны поступать нам в не изменённом виде.

Следует обратить внимание на эту строку:

otvet(Socket, [Bin|SoFar]);

Полученный фрагмент мы добавляем в голову списка. В результате наш список становится “неправильным”: в нём идут сначала новые фрагменты, потом старые. Чтобы вернуть нормальный порядок, мы потом используем reverse/1. Это правильный подход, потому что так работа с памятью проходит более эффективно. Если бы мы сразу генерировали “правильный список”, это было бы медленнее, потому что с каждым новым разом данные бы переписывались в памяти.

Если соединение по той или иной причине не может быть установлено, это вызовет исключение. Изменим номер порта с корректного 8000 на некорректный 8001…

** exception error: no match of right hand side value {error,econnrefused}
     in function  proba:get_url/3 (proba.erl, line 8)

Создание TCP-сервера

Создание сервера предполагает следующие действия:

Создадим минимальный TCP-сервер, который обрабатывает лишь один запрос, после этого закрывается и больше не работает.

-module(server).
-export([start/0]).

start() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet,4},
        {reuseaddr,true}, {active,true}]),
    {ok, Sock} = gen_tcp:accept(Listen),
gen_tcp:close(Listen),
receive 
    {tcp, Sock, Bin} ->
        io:format("GOT: ~p~n", [Bin]),
        gen_tcp:send(Sock, <<"pong">>),
        gen_tcp:close(Sock)
end.

gen_tcp:listen/2 создаёт слушающий сокет. Опция {packet,4} означает, что каждое сообщение предваряется 4-байтным заголовком, обозначающим длину сообщения (есть такое соглашение в Erlang/OTP). gen_tcp:accept/1 принимает в качестве аргумента слушающий сокет и замирает, ожидая входящего соединения. Больше, кстати, со слушающим сокетом, делать нечего. Когда входящее соединение появляется, эта функция gen_tcp:accept возвращает сокет данного соединения с клиентской программой. После этого мы тут же закрываем слушающий сокет, потому что больше не собираемся ждать входящих соединений.

Нам, конечно, интересно проверить, что это за термы такие: Listen и Sock. Добавим пару строк, чтобы это выяснить:

io:format("Listen: ~p~n", [Listen]),
io:format("Sock: ~p~n", [Sock]),

И увидим, что это два разных порта:

Listen: #Port<0.13>
Sock: #Port<0.14>

Когда соединение установлено, мы ожидаем запрос с помощью конструкции receive ... end. Когда сообщение получено, мы его читаем, выводим на экран. Далее с помощью gen_tcp:send/2 отсылаем в ответ бинарник <<"pong">>. После этого gen_tcp:close/1 закрывает сокет.

Чтобы проверить работу сервера, мы немного изменим наш пример — модуль proba (см. выше). Поменяем номер порта с 8000 на 2345 и опцию {packet,0} — на {packet,4}. Запустим сервер, а следом клиент. Делаем это в разных оболочках.

Работа клиента будет выглядеть так:

16> proba:main().
<<"pong">>
17>

А сервера так:

12> server:start().
GOT: <<"GET /index.html HTTP/1.0\r\n\r\n">>
ok
13>

Изначально наш модуль proba был разработан как клиент веб-сервера, поэтому в нём и остался этот запрос GET .... Наш минимальный сервер вообще никак не анализирует запрос, и на всякий отвечает бинарником <<"pong">>. Функция start на этом полностью завершает свою работу.

В Эрланге удобно передавать термы через сокеты с помощью специального бифа term_to_binary/1. Этот биф кодирует любой терм в специальный бинарник. Когда на другом конце соединения получается этот бинарник, его декодируют с помощью обратного бифа binary_to_term/1.

Последовательный сервер

Нам, конечно, может захотеться, чтобы сервер обрабатывал не один запрос, а много. Есть два варианта, как это организовать: последовательный сервер, параллельный сервер. В первом случае все запросы могут обрабатываться лишь последовательно. При этом новые запросы, пока обрабатывается предыдущий, ставятся в очередь. Очередь не безгранична, при достижении определённого предела, новые запросы будут отклоняться. Во втором случае несколько или много запросов могут обрабатываться параллельно.

Чтобы сделать последовательный сервер, надо сохранить слушающий сокет, сделать цикл и в начале каждого цикла из слушающего сокета создавать сокет соединения.

Нам надо совсем немного, чтобы изменить наш модуль server и сделать из него последовательный сервер.

%%% Простой последовательный TCP-сервер
-module(server).
-export([start/0]).

start() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet,4},
        {reuseaddr,true}, {active,true}]),
    loop(Listen).

loop(Listen) ->
    {ok, Sock} = gen_tcp:accept(Listen),
    receive 
        {tcp, Sock, Bin} ->
            io:format("GOT: ~p~n", [Bin]),
            gen_tcp:send(Sock, <<"pong">>),
            gen_tcp:close(Sock)
    end,
    loop(Listen).

Теперь сколько бы раз мы ни запускали proba:main/0, она каждый раз сможет связаться с сервером и получить ответ:

1> proba:main().
<<"pong">>
2> proba:main().
<<"pong">>
3> proba:main().
<<"pong">>
4> proba:main().
<<"pong">>
5> proba:main().
<<"pong">>

Параллельный сервер

Мы не знаем, сколько соединений придётся обрабатывать параллельно. Мы знаем лишь то, что каждый запрос придётся обрабатывать в отдельном акторе. Поэтому надо придумать какой-то трюк, чтобы акторы спаунились по мере надобности.

Вначале нам понадобится, конечно, лишь один актор. Он будет слушать, слушать, пока не дождётся соединения. Когда это случится, надо будет и обрабатывать запрос, и продолжать слушать. Поэтому наш актор, когда получит соединение с помощью gen_tcp:accept, тут же должен породить себе “брата” — точно такого же актора. Заспаунив его, наш первый актор может приступать к обработке запроса. Второй актор будет работать по точно тому же алгоритму. Первый актор, когда закончит обрабатывать запрос, просто завершится, и у нас всегда будет необходимое количество акторов.

%%% Простой параллельный TCP-сервер
-module(par_server).
-export([start/0, actor/1]).

start() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet,4},
        {reuseaddr,true}, {active,true}]),
    spawn(?MODULE, actor, [Listen]).

actor(Listen) ->
    {ok, Sock} = gen_tcp:accept(Listen),
    spawn(?MODULE, actor, [Listen]),
    receive 
        {tcp, Sock, Bin} ->
            io:format("GOT: ~p~n", [Bin]),
            gen_tcp:send(Sock, <<"pong">>),
            gen_tcp:close(Sock)
    end.

В обоих случаях (последовательный и параллельный сервер) мы сумели уложиться в 17 строчек кода. Работа сервера выглядит так:

1> par_server:start().
<0.87.0>
GOT: <<"GET /index.html HTTP/1.0\r\n\r\n">>
GOT: <<"GET /index.html HTTP/1.0\r\n\r\n">>
GOT: <<"GET /index.html HTTP/1.0\r\n\r\n">>
GOT: <<"GET /index.html HTTP/1.0\r\n\r\n">>
GOT: <<"GET /index.html HTTP/1.0\r\n\r\n">>
GOT: <<"GET /index.html HTTP/1.0\r\n\r\n">>
2> 

Функция start заканчивается вызовом spawn, поэтому мы и видим пид созданного первого актора — <0.87.0>.

Контролирующий актор

Актор, который порождает сокет с помощью gen_tcp:connect или gen_tcp:accept, называется контролирующим актором. То есть этот актор контролирует все порождённые сокеты. Все сообщения из сокета поступают именно в контролирующий актор, где мы их обрабатываем с помощью receive ... end.

Если контролирующий актор заканчивает свою работу, все его сокеты закрываются.

Если угодно, в качестве контролирующего можно назначить другой актор (не тот, который создал сокет). Для этого есть функция gen_tcp:controlling_process(Socket, NewPid). Если сокетов несколько, надо для каждого применить данную функцию (если, конечно, мы желаем для каждого сменить контролирующего актора).

Активные и пассивные сокеты

В Эрланге сокет может быть открыт в одном из трёх режимов:

Для этого есть соответствующая опция {active,true|false|once} у функций gen_tcp:connect и gen_tcp:listen.

Когда данные поступают через активный сокет, все сообщения немедленно (насколько это возможно) поступают контролирующему актору. Он не может контролировать этот поток поступающей информации и запросто может в нём захлебнуться. Пассивный сокет напротив умеет контролировать этот поток. Вызывая gen_tcp:recv(Socket,N), он сообщает ERTS, что готов к новым данным. С помощью N задаётся ожидаемое количество байтов.

С помощью сокета, открытого в активном режиме, мы создаём неблокирующий сервер. Выше мы создавали именно такие серверы. Здесь есть очевидная проблема: если клиент будет порождать больше запросов, нежели мы можем обработать, это может привести к краху системы или её непредсказуемой работе. Такой режим, конечно, надо применять только в случаях, когда мы уверены, что система не окажется запружена. Допустим, наши несколько датчиков ежеминутно передают свои короткие сообщения.

Чтобы открыть сокет в пассивном режиме и сделать таким образом блокирующий сервер, надо использовать опцию {active,false}. Такой сервер не рухнет, если будет получать слишком много данных.

При активном режиме сокета сообщения приходят в почтовый ящик актора (то есть сокет активно шлёт сообщения), и их мы обрабатываем с помощью receive ... end. В пассивном режиме этого не происходит — сообщения вытаскиваются с помощью функции gen_tcp:recv (то есть сокет пассивно ждёт команды отдать). Поэтому блокирующий сервер выглядит немного иначе:

%%% Простой параллельный блокирующий TCP-сервер
-module(block_server).
-export([start/0, actor/1]).

start() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet,4},
        {reuseaddr,true}, {active,false}]),
    spawn(?MODULE, actor, [Listen]).

actor(Listen) ->
    {ok, Sock} = gen_tcp:accept(Listen),
    inet:setopts(Sock, [binary, {packet,4}, {reuseaddr,true},
        {active,false}]),
    spawn(?MODULE, actor, [Listen]),
    case gen_tcp:recv(Sock,0) of
        {ok,Bin} -> 
            io:format("GOT: ~p~n", [Bin]),
            gen_tcp:send(Sock, <<"pong">>),
            gen_tcp:close(Sock);
        {error,closed} -> 
            io:format("Error, closed.~n")
    end.

Второй аргумент у gen_tcp:recv — точное количество байтов, которое мы собираемся прочитать. Обычно его выставляют в ноль, и тогда каждый вызов этой функции возвращает одно сообщение (пусть их даже несколько накопилось в очереди). Если сообщений в очереди пока нет, gen_tcp:recv замирает и ждёт сообщения. Если количество байтов выставить больше нуля, мы, скорее всего, получим ошибку einval. Это работает только в raw-режиме, без него будет исключение.

Чтобы лучше понять работу модуля gen_tcp, можно упражняться непосредственно в оболочке. А точнее в двух: одну мы используем для сервера, другую для клиента. Создаём сервер:

1> {ok, Listen} = gen_tcp:listen(2345, [binary, {packet,4},
    {reuseaddr,true}, {active,false}]).
{ok,#Port<0.3>}
2> {ok,Sock} = gen_tcp:accept(Listen).

Последняя функция замирает и не даёт больше ничего делать в этой оболочке (ждёт соединения). Переключаемся во вторую оболочку, где создаём клиента и посылаем сообщение:

1> {ok,Sock} = gen_tcp:connect("localhost",2345,[binary,{packet,4}]).
{ok,#Port<0.4>}
2> gen_tcp:send(Sock, <<"blabla">>).
ok

Возвращаемся в первую оболочку и видим что соединение установлено. Принимаем сообщение.

{ok,#Port<0.4>}
3> gen_tcp:recv(Sock,0).
{ok,<<"blabla">>}

Можно поэкспериментировать и убедиться, что gen_tcp:recv(Sock,0) принимает сообщения по одному, даже если их в очереди много, а если второй аргумент поставить больше ноля, получим ошибку.

Единожды активный

Единожды активный сокет создаётся с опцией {active,once}. Смысл его в следующем.

Во многих, если не в большинстве, своих проектов нам захочется вытягивать сообщения из сокета с помощью gen_tcp:recv. Это обезопасит нашу Систему от запруживания. Однако есть неприятный момент — gen_tcp:recv блокирует наш актор, и он не будет работать, пока не получит сообщение. Возможно, нам захочется работать в одном акторе с несколькими сокетами, и тогда с помощью gen_tcp:recv мы сможем прослушивать только один сокет.

Для повышения гибкости придумали единожды активные сокеты. Такие сокеты изначально активны, но после передачи первого сообщения они переходят в пассивный режим. Иными словами, первое сообщение из сокета мы получим через стандартный почтовый ящик актора, а остальные придётся вытягивать с помощью gen_tcp:recv.

Поупражняемся в оболочке с единожды активным сервером.

1> {ok, Listen} = gen_tcp:listen(2345, [binary, {packet,4}, {reuseaddr,true}, {active,once}]).
{ok,#Port<0.3>}
2> {ok,Sock} = gen_tcp:accept(Listen).
{ok,#Port<0.4>}
3> flush().
Shell got {tcp,#Port<0.4>,<<"murmur">>}
ok
4> gen_tcp:recv(Sock,0).
{ok,<<"purrpurr">>}
5> gen_tcp:recv(Sock,0).
{ok,<<"gavgav">>}

После первых двух команд мы из другой оболочки подключились к нашему серверу и передали сообщение <<"murmur">>. Сокет сработал активно и передал его на стандартный почтовый ящик актора (в данном случае эта наша оболочка). Функция flush() очищает почтовый ящик оболочки и выводит все сообщения из него. После этого сокет перешёл в пассивный режим, поэтому следующие два сообщения мы вытащили с помощью gen_tcp:recv(Sock,0).

Как узнать состояние сокета

Если надо, узнать информацию о состоянии сокета (открыт или закрыт, некоторую другую), можно с помощью inet_info/1.

1> {ok, Listen} = gen_tcp:listen(2345, [binary, {packet,4}, {reuseaddr,true}, {active,once}]).
{ok,#Port<0.3>}
2> inet:info(Listen).
#{counters =>
      #{send_pend => 0,recv_avg => 0,recv_cnt => 0,recv_dvi => 0,
        recv_max => 0,recv_oct => 0,send_avg => 0,send_cnt => 0,
        send_max => 0,send_oct => 0},
  input => 0,
  links => [<0.85.0>],
  memory => 48,monitors => [],output => 0,owner => <0.85.0>,
  states => [listen,open]}
3> {ok,Sock} = gen_tcp:accept(Listen).
{ok,#Port<0.4>}
4> inet:info(Sock).
#{counters =>
      #{send_pend => 0,recv_avg => 10,recv_cnt => 1,recv_dvi => 0,
        recv_max => 10,recv_oct => 10,send_avg => 0,send_cnt => 0,
        send_max => 0,send_oct => 0},
  input => 0,
  links => [<0.85.0>],
  memory => 48,monitors => [],output => 0,owner => <0.85.0>,
  states => [connected,open]}
5> gen_tcp:close(Sock).
ok
6> inet:info(Sock).
#{states => [closed]}

Возможно, нам станет интересен IP-адрес клиента, который подключился к нашему серверу. Допустим, в связи с тем, что с этого адреса сыпется спам. Для этого можно использовать следующую функцию:

3> inet:peername(Sock).
{ok,{{127,0,0,1},42630}}

В данном случае IP это 127.0.0.1 (наш собственный компьютер). Если какой-то IP попал в “чёрный список”, можно легко организовать проверку на наличие IP в этом списке, после чего просто закроем соединение.

UDP

UDP — протокол, в котором, в отличие от TCP, не надо устанавливать соединение. Соответственно, не надо беспокоиться о том, чтобы аккуратно работать с закрытием этого соединения. Передаваемые датаграммы могут быть повреждены, но если они дошли, можно не сомневаться в их достоверности. Если датаграмма большая, она может быть разбита на фрагменты, потом она автоматически будет собрана, и опять же можно не сомневаться в её достоверности.

За работу с UDP отвечает модуль gen_udp.

Проиллюстрировать работу UDP-сервера и клиента можно всего четырьмя командами. Сначала в одной оболочке запустим сервер (на порту 2345, в режиме приёма бинарников):

1> {ok,Sock} = gen_udp:open(2345, [binary]).
{ok,#Port<0.3>}

После этого во второй оболочке откроем сокет для передачи данных и сразу же воспользуемся им, передав бинарник <<"hello">>.

1> {ok,Sock} = gen_udp:open(0, [binary]).
{ok,#Port<0.3>}
2> gen_udp:send(Sock, "localhost", 2345, <<"hello">>).
ok

Для клиента не важно, на каком номер порта быть открытым, поэтому первый аргумент у gen_udp:open — 0. Система автоматически подберёт номер порта.

Сервер примет сообщение, и из сокета оно попадёт в почтовый ящик нашего актора. В данном случае это оболочка, поэтому воспользуемся функцией flush(), чтобы прочитать сообщение.

2> flush().
Shell got {udp,#Port<0.3>,{127,0,0,1},55552,<<"hello">>}
ok

Пока у клиента сокет не закрыт, можно им пользоваться для передачи данных.

3> gen_udp:send(Sock, "localhost", 2345, <<"hi">>).
ok
4> gen_udp:send(Sock, "localhost", 2345, <<"good morning">>).
ok
5> gen_udp:send(Sock, "localhost", 2345, <<"good afternoon">>).
ok

Сервер будет продолжать читать эти данные:

3> flush().
Shell got {udp,#Port<0.3>,{127,0,0,1},55552,<<"hi">>}
Shell got {udp,#Port<0.3>,{127,0,0,1},55552,<<"good morning">>}
Shell got {udp,#Port<0.3>,{127,0,0,1},55552,<<"good afternoon">>}
ok

Сервер может послать что-нибудь в ответ, ведь он знает IP-адрес и порт, откуда пришла датаграмма.

4> gen_udp:send(Sock, {127,0,0,1}, 55552, <<"Who are you?">>).
ok

Клиент (если он, конечно, работает на Эрланге) также получит сообщение себе в почтовый ящик:

6> flush().
Shell got {udp,#Port<0.3>,{127,0,0,1},2345,<<"Who are you?">>}
ok

UDP можно использовать для вещания (broadcasting) в локальной сети. Тогда сокет создаётся с опцией {broadcast,true}.


Copyright © 2025 Алексей Карманов