(* 
    chat server with concurrent cell. (c) osiire.
    To compile:
    ocamlfind -thread -package ccell -linkpkg chat.ml
 *)
 open Unix
 open Event
 
 (* for util *)
 let id x = x
 let (@@) f x = f x
 let ($) f g x = f (g x)
 let step f x = try f x with _ -> ()
 
 let rec forever f x = 
   let v = f x in forever f v
 
 let spawn f x =
   ignore (Thread.create f x)
     
 let spawn_loop f x =
   ignore (Thread.create (forever f) x)
 
 let tee f x = try ignore (f x); x with _ -> x
 (* for util *)
 
 type global_env = { up : string channel; down : string Bcast.t }
 
 module TCPListener = struct
   let max_number_of_pending_request = 10
 
   let make port = 
     let listen_socket = 
       socket PF_INET SOCK_STREAM 0
     in
     bind listen_socket @@ ADDR_INET (inet_addr_any, port);
     listen listen_socket max_number_of_pending_request;
     listen_socket
       
   let accept socket =
     let ac_channel = 
       new_channel () 
     in
     spawn (sync $ send ac_channel $ accept) socket;
     ac_channel
 
 end
 
 module type Work = sig
   type env
   type arg
   val init : arg -> env
   val action : env * file_descr -> unit
 end
 
 module Daemon (W : Work) = struct
 
   let run port init =
     let listener = 
       TCPListener.make port 
     in
     let rec loop (accept, env) =
       let fork (socket, _) =
 	let wraped_action () =
 	  (step W.action) (env, socket);
 	  (step $ shutdown) socket SHUTDOWN_ALL;
 	  (step close) socket
 	in
 	spawn wraped_action ();
 	loop (TCPListener.accept listener, env)
       in
       select [wrap (receive accept) fork]
     in
     loop (TCPListener.accept listener, W.init init)
 
 end    
 
 module Echo = struct
   type env = global_env
   type arg = env
     
   let init arg = arg
 
 
   let action (env, sock) =  
     let out =
       Unix.out_channel_of_descr sock
     in
     let net_send msg = 
       let length = 
 	String.length msg
       in
       ignore @@ Unix.write sock msg 0 length;
       flush out
     in
     let port =
       Bcast.make_port env.down
     in
     let msg_queue, end_ch = 
       Mbox.make (), new_channel ()
     in
     let buf = String.create 1 in
     let rec wait_msg () =
       let c =
 	let _ = Unix.select [sock] [] [] 0. in
 	Unix.recv sock buf 0 1 []
       in
       if c > 0 then begin
 	wait_msg @@ sync @@ Mbox.push msg_queue (String.copy buf)
       end
       else
 	sync @@ send end_ch ()
     in
     spawn wait_msg ();
     let rec loop () =
       select [ 
       wrap (Bcast.receive_port port) (loop $ net_send);
       wrap (Mbox.pop msg_queue) (loop $ sync $ send env.up);
       wrap (receive end_ch) (fun () -> ())
     ]
     in
     loop ()
 
 end
 
 let _ =
   let env =
     { up = new_channel ();
       down = Bcast.make () }
   in
   let center_server env =
     let msg =
       sync (receive env.up)
     in
     tee (fun env -> Bcast.send env.down msg) env
   in
   spawn_loop center_server env;
   let module ChatServer = 
     Daemon ( Echo ) 
   in
   try
     ChatServer.run 8080 env
   with e ->
     print_endline @@ Printexc.to_string e;
     exit 1

トップ   編集 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS