(* chat server with concurrent cell. (c) osiire. To compile: ocamlfind -thread -package ccell -linkpkg chat.ml *) open Unix open Event (* for util *) let id x = x let (@@) f x = f x let ($) f g x = f (g x) let step f x = try f x with _ -> () let rec forever f x = let v = f x in forever f v let spawn f x = ignore (Thread.create f x) let spawn_loop f x = ignore (Thread.create (forever f) x) let tee f x = try ignore (f x); x with _ -> x (* for util *) type global_env = { up : string channel; down : string Bcast.t } module TCPListener = struct let max_number_of_pending_request = 10 let make port = let listen_socket = socket PF_INET SOCK_STREAM 0 in bind listen_socket @@ ADDR_INET (inet_addr_any, port); listen listen_socket max_number_of_pending_request; listen_socket let accept socket = let ac_channel = new_channel () in spawn (sync $ send ac_channel $ accept) socket; ac_channel end module type Work = sig type env type arg val init : arg -> env val action : env * file_descr -> unit end module Daemon (W : Work) = struct let run port init = let listener = TCPListener.make port in let rec loop (accept, env) = let fork (socket, _) = let wraped_action () = (step W.action) (env, socket); (step $ shutdown) socket SHUTDOWN_ALL; (step close) socket in spawn wraped_action (); loop (TCPListener.accept listener, env) in select [wrap (receive accept) fork] in loop (TCPListener.accept listener, W.init init) end module Echo = struct type env = global_env type arg = env let init arg = arg let action (env, sock) = let out = Unix.out_channel_of_descr sock in let net_send msg = let length = String.length msg in ignore @@ Unix.write sock msg 0 length; flush out in let port = Bcast.make_port env.down in let msg_queue, end_ch = Mbox.make (), new_channel () in let buf = String.create 1 in let rec wait_msg () = let c = let _ = Unix.select [sock] [] [] 0. in Unix.recv sock buf 0 1 [] in if c > 0 then begin wait_msg @@ sync @@ Mbox.push msg_queue (String.copy buf) end else sync @@ send end_ch () in spawn wait_msg (); let rec loop () = select [ wrap (Bcast.receive_port port) (loop $ net_send); wrap (Mbox.pop msg_queue) (loop $ sync $ send env.up); wrap (receive end_ch) (fun () -> ()) ] in loop () end let _ = let env = { up = new_channel (); down = Bcast.make () } in let center_server env = let msg = sync (receive env.up) in tee (fun env -> Bcast.send env.down msg) env in spawn_loop center_server env; let module ChatServer = Daemon ( Echo ) in try ChatServer.run 8080 env with e -> print_endline @@ Printexc.to_string e; exit 1