トップ
新規
単語検索
ヘルプ
chat_server
をテンプレートにして作成
開始行:
(*
chat server with concurrent cell. (c) osiire.
To compile:
ocamlfind -thread -package ccell -linkpkg chat.ml
*)
open Unix
open Event
(* for util *)
let id x = x
let (@@) f x = f x
let ($) f g x = f (g x)
let step f x = try f x with _ -> ()
let rec forever f x =
let v = f x in forever f v
let spawn f x =
ignore (Thread.create f x)
let spawn_loop f x =
ignore (Thread.create (forever f) x)
let tee f x = try ignore (f x); x with _ -> x
(* for util *)
type global_env = { up : string channel; down : string B...
module TCPListener = struct
let max_number_of_pending_request = 10
let make port =
let listen_socket =
socket PF_INET SOCK_STREAM 0
in
bind listen_socket @@ ADDR_INET (inet_addr_any, port);
listen listen_socket max_number_of_pending_request;
listen_socket
let accept socket =
let ac_channel =
new_channel ()
in
spawn (sync $ send ac_channel $ accept) socket;
ac_channel
end
module type Work = sig
type env
type arg
val init : arg -> env
val action : env * file_descr -> unit
end
module Daemon (W : Work) = struct
let run port init =
let listener =
TCPListener.make port
in
let rec loop (accept, env) =
let fork (socket, _) =
let wraped_action () =
(step W.action) (env, socket);
(step $ shutdown) socket SHUTDOWN_ALL;
(step close) socket
in
spawn wraped_action ();
loop (TCPListener.accept listener, env)
in
select [wrap (receive accept) fork]
in
loop (TCPListener.accept listener, W.init init)
end
module Echo = struct
type env = global_env
type arg = env
let init arg = arg
let action (env, sock) =
let out =
Unix.out_channel_of_descr sock
in
let net_send msg =
let length =
String.length msg
in
ignore @@ Unix.write sock msg 0 length;
flush out
in
let port =
Bcast.make_port env.down
in
let msg_queue, end_ch =
Mbox.make (), new_channel ()
in
let buf = String.create 1 in
let rec wait_msg () =
let c =
let _ = Unix.select [sock] [] [] 0. in
Unix.recv sock buf 0 1 []
in
if c > 0 then begin
wait_msg @@ sync @@ Mbox.push msg_queue (String.copy buf)
end
else
sync @@ send end_ch ()
in
spawn wait_msg ();
let rec loop () =
select [
wrap (Bcast.receive_port port) (loop $ net_send);
wrap (Mbox.pop msg_queue) (loop $ sync $ send env....
wrap (receive end_ch) (fun () -> ())
]
in
loop ()
end
let _ =
let env =
{ up = new_channel ();
down = Bcast.make () }
in
let center_server env =
let msg =
sync (receive env.up)
in
tee (fun env -> Bcast.send env.down msg) env
in
spawn_loop center_server env;
let module ChatServer =
Daemon ( Echo )
in
try
ChatServer.run 8080 env
with e ->
print_endline @@ Printexc.to_string e;
exit 1
終了行:
(*
chat server with concurrent cell. (c) osiire.
To compile:
ocamlfind -thread -package ccell -linkpkg chat.ml
*)
open Unix
open Event
(* for util *)
let id x = x
let (@@) f x = f x
let ($) f g x = f (g x)
let step f x = try f x with _ -> ()
let rec forever f x =
let v = f x in forever f v
let spawn f x =
ignore (Thread.create f x)
let spawn_loop f x =
ignore (Thread.create (forever f) x)
let tee f x = try ignore (f x); x with _ -> x
(* for util *)
type global_env = { up : string channel; down : string B...
module TCPListener = struct
let max_number_of_pending_request = 10
let make port =
let listen_socket =
socket PF_INET SOCK_STREAM 0
in
bind listen_socket @@ ADDR_INET (inet_addr_any, port);
listen listen_socket max_number_of_pending_request;
listen_socket
let accept socket =
let ac_channel =
new_channel ()
in
spawn (sync $ send ac_channel $ accept) socket;
ac_channel
end
module type Work = sig
type env
type arg
val init : arg -> env
val action : env * file_descr -> unit
end
module Daemon (W : Work) = struct
let run port init =
let listener =
TCPListener.make port
in
let rec loop (accept, env) =
let fork (socket, _) =
let wraped_action () =
(step W.action) (env, socket);
(step $ shutdown) socket SHUTDOWN_ALL;
(step close) socket
in
spawn wraped_action ();
loop (TCPListener.accept listener, env)
in
select [wrap (receive accept) fork]
in
loop (TCPListener.accept listener, W.init init)
end
module Echo = struct
type env = global_env
type arg = env
let init arg = arg
let action (env, sock) =
let out =
Unix.out_channel_of_descr sock
in
let net_send msg =
let length =
String.length msg
in
ignore @@ Unix.write sock msg 0 length;
flush out
in
let port =
Bcast.make_port env.down
in
let msg_queue, end_ch =
Mbox.make (), new_channel ()
in
let buf = String.create 1 in
let rec wait_msg () =
let c =
let _ = Unix.select [sock] [] [] 0. in
Unix.recv sock buf 0 1 []
in
if c > 0 then begin
wait_msg @@ sync @@ Mbox.push msg_queue (String.copy buf)
end
else
sync @@ send end_ch ()
in
spawn wait_msg ();
let rec loop () =
select [
wrap (Bcast.receive_port port) (loop $ net_send);
wrap (Mbox.pop msg_queue) (loop $ sync $ send env....
wrap (receive end_ch) (fun () -> ())
]
in
loop ()
end
let _ =
let env =
{ up = new_channel ();
down = Bcast.make () }
in
let center_server env =
let msg =
sync (receive env.up)
in
tee (fun env -> Bcast.send env.down msg) env
in
spawn_loop center_server env;
let module ChatServer =
Daemon ( Echo )
in
try
ChatServer.run 8080 env
with e ->
print_endline @@ Printexc.to_string e;
exit 1
ページ名: