Re: [erlang-bugs] inets: HTTP client race condition

View: New views
1 Messages — Rating Filter:   Alert me  

Parent Message unknown Re: [erlang-bugs] inets: HTTP client race condition

by Tomas Abrahamsson :: Rate this Message:

Reply to Author | View Threaded | Show Only this Message

> Today I have encountered strange problem in simple scenario with inets.
> when I'm doing concurrent requests to the same server which responds
> slowly.

Hi, I too have got bitten by this now, so I extended your
test program to not rely on any external web server,
to make it a bit easier to trigger and observe the bug.
Attached is a program that triggers the bug fairly often
in a local environment. Compile it and run it like this:
   erl -s http_client_race_condition go -s erlang halt

Attached is also a patch fixing the problem.
The patch is against R13B01/inets-5.1

It seems the problem is that if one calls http:request and
the httpc_manager selects a session where there's already
a pending request, then the connection handler for that
session effectively resets its parser, readying it for the
response to the second request, but sometimes there are
still some more inbound packets for the response to the
first request, and that's when things get confused.

I did some research: The bug first appeared in
R13A/inets-5.0.13, in which a queue for keep-alive requests
seems to have been introduced. In inets-5.0.12 (R12B-5)
there was only one pipeline queue, but in 5.0.13 there's
also another queue for keep-alive. (I don't know the reason
for there being two queues).

Incidentally, when fixing the problem, the code for handling
the keep-alive queue more and more grew to resemble the
code that handles the pipeline queue. Now it looks much like
two copies of the same code. So someone more knowledgeable
in the art of inets and the pipeline/keep-alive queues might
want to refactor the patched code into something more beautiful.

As you already observed, creating separate profiles works
around the bug. With this patch, it is no longer necessary
to create extra profiles if one wants to send http requests
concurrently.


BRs
Tomas

[http_client_race_condition.erl]

-module(http_client_race_condition).

-compile(export_all).

go() ->
    start_inets(),
    ReqSeqNumServer = start_sequence_number_server(),
    RespSeqNumServer = start_sequence_number_server(),
    {ok,Server,Port} = start_slow_server(RespSeqNumServer),
    Clients = run_clients(105, Port, ReqSeqNumServer),
    ok = wait_for_clients(Clients),
    Server ! shutdown,
    RespSeqNumServer ! shutdown,
    ReqSeqNumServer ! shutdown,
    ok.

start_inets() ->
    inets:start(),
    ok.

%% -----------------------------------------------------
%% A sequence number handler
%% The purpose is to be able to pair requests with responses.

start_sequence_number_server() ->
    proc_lib:spawn(fun() -> loop_sequence_number(1) end).

loop_sequence_number(N) ->
    receive
        shutdown ->
            ok;
        {From, get_next} ->
            From ! {next_is, N},
            loop_sequence_number(N + 1)
    end.

get_next_sequence_number(SeqNumServer) ->
    SeqNumServer ! {self(), get_next},
    receive {next_is, N} -> N end.

%% -----------------------------------------------------
%% Client part
%% Sends requests randomly parallel

run_clients(NumClients, ServerPort, SeqNumServer) ->
    set_random_seed(),
    lists:map(
      fun(_) ->
              Req = f("req~3..0w", [get_next_sequence_number(SeqNumServer)]),
              Url = f("http://127.0.0.1:~w/~s", [ServerPort, Req]),
              Pid = proc_lib:spawn(
                      fun() ->
                              {ok, {{_,200,_}, _, Resp}} = http:request(Url),
                              io:format("response: ~p~n", [Resp]),
                              case lists:prefix(Req++"->", Resp) of
                                  true -> exit(normal);
                                  faslse -> exit({bad_resp,Req,Resp})
                              end
                      end),
              MRef = erlang:monitor(process, Pid),
              timer:sleep(10 + random:uniform(1334)),
              {Pid, MRef}

      end,
      lists:seq(1, NumClients)).

wait_for_clients(Clients) ->
    lists:foreach(
      fun({Pid, MRef}) ->
              receive
                  {'DOWN', MRef, process, Pid, normal} ->
                      ok;
                  {'DOWN', MRef, process, Pid, Reason} ->
                      erlang:error(Reason)
              end
      end,
      Clients).

%% -----------------------------------------------------
%% Webserver part:
%% Implements a web server that sends responses one character
%% at a time, with random delays between the characters.

start_slow_server(SeqNumServer) ->
    proc_lib:start(
      erlang, apply, [fun() -> init_slow_server(SeqNumServer) end, []]).

init_slow_server(SeqNumServer) ->
    {ok, LSock} = gen_tcp:listen(0, [binary, {packet,0}, {active,true},
                                     {backlog, 100}]),
    {ok, {_IP, Port}} = inet:sockname(LSock),
    proc_lib:init_ack({ok, self(), Port}),
    loop_slow_server(LSock, SeqNumServer).

loop_slow_server(LSock, SeqNumServer) ->
    Master = self(),
    Acceptor = proc_lib:spawn(
                 fun() -> client_handler(Master, LSock, SeqNumServer) end),
    receive
        {accepted, Acceptor} ->
            loop_slow_server(LSock, SeqNumServer);
        shutdown ->
            gen_tcp:close(LSock),
            exit(Acceptor, kill)
    end.


%% Handle one client connection
client_handler(Master, LSock, SeqNumServer) ->
    {ok, CSock} = gen_tcp:accept(LSock),
    Master ! {accepted, self()},
    set_random_seed(),
    loop_client(1, CSock, SeqNumServer).

loop_client(N, CSock, SeqNumServer) ->
    %% Await request, don't bother parsing it too much,
    %% assuming the entire request arrives in one packet.
    receive
        {tcp, CSock, Req} ->
            ReqNum = parse_req_num(Req),
            RespSeqNum = get_next_sequence_number(SeqNumServer),
            Response = f("~s->resp~3..0w/~2..0w", [ReqNum, RespSeqNum, N]),
            Txt = f("Slow server (~p) got ~p, answering with ~p",
                    [self(), Req, Response]),
            io:format("~s...~n", [Txt]),
            slowly_send_response(CSock, Response),
            case parse_connection_type(Req) of
                keep_alive ->
                    io:format("~s...done~n", [Txt]),
                    loop_client(N+1, CSock, SeqNumServer);
                close ->
                    io:format("~s...done (closing)~n", [Txt]),
                    gen_tcp:close(CSock)
            end
    end.

slowly_send_response(CSock, Answer) ->
    Response = f("HTTP/1.1 200 OK\r\nContent-Length: ~w\r\n\r\n~s",
                 [length(Answer), Answer]),
    lists:foreach(
      fun(Char) ->
              timer:sleep(random:uniform(100)),
              gen_tcp:send(CSock, <<Char>>)
      end,
      Response).

parse_req_num(Request) ->
    Opts = [caseless,{capture,all_but_first,list}],
    {match, [ReqNum]} = re:run(Request, "GET /(.*) HTTP", Opts),
    ReqNum.

parse_connection_type(Request) ->
    Opts = [caseless,{capture,all_but_first,list}],
    {match,[CType]} = re:run(Request, "connection: *(keep-alive|close)", Opts),
    case string:to_lower(CType) of
        "close" -> close;
        "keep-alive" -> keep_alive
    end.


set_random_seed() ->
    {_, _, Micros} = now(),
    A = erlang:phash2([make_ref(), self(), Micros]),
    random:seed(A, A, A).

f(F, A) -> lists:flatten(io_lib:format(F,A)).


[httpc_handler.erl.patch]

--- lib/inets/src/http_client/httpc_handler.erl.orig 2009-08-20 07:13:51.000000000 +0200
+++ lib/inets/src/http_client/httpc_handler.erl 2009-08-20 07:14:15.000000000 +0200
@@ -289,6 +289,7 @@
 handle_call(Request, _, #state{session = Session =
        #tcp_session{type = keep_alive,
     socket = Socket},
+       timers = Timers,
        options = Options,
        profile_name = ProfileName} = State) ->
       
@@ -301,18 +302,39 @@
     NewState =
  activate_request_timeout(State#state{request =
      Request}),
-    NewSession =
- Session#tcp_session{queue_length = 1,
-    client_close = ClientClose},
-    httpc_manager:insert_session(NewSession, ProfileName),
-    Relaxed =
- (Request#request.settings)#http_options.relaxed,
-    {reply, ok,
-     NewState#state{request = Request,
-    session = NewSession,
-    mfa = {httpc_response, parse,
-   [State#state.max_header_size,
-    Relaxed]}}};
+
+    case State#state.request of
+ #request{} -> %% Old request not yet finished
+    %% Make sure to use the new value of timers in state
+    NewTimers = NewState#state.timers,
+                    NewKeepAlive = queue:in(Request, State#state.keep_alive),
+    NewSession =
+ Session#tcp_session{queue_length =
+    %% Queue + current
+    queue:len(NewKeepAlive) + 1,
+    client_close = ClientClose},
+    httpc_manager:insert_session(NewSession, ProfileName),
+                    {reply, ok, State#state{keep_alive = NewKeepAlive,
+    session = NewSession,
+    timers = NewTimers}};
+ undefined ->
+    %% Note: tcp-message reciving has already been
+    %% activated by handle_pipeline/2.
+    cancel_timer(Timers#timers.queue_timer,
+ timeout_queue),
+    NewSession =
+ Session#tcp_session{queue_length = 1,
+    client_close = ClientClose},
+    httpc_manager:insert_session(NewSession, ProfileName),
+    Relaxed =
+ (Request#request.settings)#http_options.relaxed,
+    {reply, ok,
+     NewState#state{request = Request,
+    session = NewSession,
+    mfa = {httpc_response, parse,
+   [State#state.max_header_size,
+    Relaxed]}}}
+    end;
  {error, Reason}    ->
     {reply, {request_failed, Reason}, State}
     end.
@@ -1003,13 +1025,29 @@
     handle_keep_alive_queue(State#state{keep_alive =
  KeepAlive}, Data);
  false ->
-    {reply, ok, NewState} =
- handle_call(NextRequest,
-    dummy, State#state{request = undefined}),
-    http_transport:setopts(
-      socket_type(Session#tcp_session.scheme),
-      Session#tcp_session.socket, [{active, once}]),
-    {noreply, NewState}
+    Relaxed =
+ (NextRequest#request.settings)#http_options.relaxed,
+    NewState =
+ State#state{request = NextRequest,
+    keep_alive = KeepAlive,
+    mfa = {httpc_response, parse,
+   [State#state.max_header_size,
+    Relaxed]},
+    status_line = undefined,
+    headers = undefined,
+    body = undefined},
+    case Data of
+ <<>> ->
+    http_transport:setopts(
+      socket_type(Session#tcp_session.scheme),
+      Session#tcp_session.socket, [{active, once}]),
+    {noreply, NewState};
+ _ ->
+    %% If we already received some bytes of
+    %% the next response
+    handle_info({httpc_handler, dummy, Data},
+ NewState)
+    end
     end
     end.
 



________________________________________________________________
erlang-patches mailing list. See http://www.erlang.org/faq.html
erlang-patches (at) erlang.org