(* 
   chat server with concurrent cell. (c) osiire.
   To compile:
   ocamlfind -thread -package ccell -linkpkg chat.ml
*)
open Unix
open Event

(* for util *)
let id x = x
let (@@) f x = f x
let ($) f g x = f (g x)
let step f x = try f x with _ -> ()

let rec forever f x = 
  let v = f x in forever f v

let spawn f x =
  ignore (Thread.create f x)
    
let spawn_loop f x =
  ignore (Thread.create (forever f) x)

let tee f x = try ignore (f x); x with _ -> x
(* for util *)

type global_env = { up : string channel; down : string Bcast.t }

module TCPListener = struct
  let max_number_of_pending_request = 10

  let make port = 
    let listen_socket = 
      socket PF_INET SOCK_STREAM 0
    in
    bind listen_socket @@ ADDR_INET (inet_addr_any, port);
    listen listen_socket max_number_of_pending_request;
    listen_socket
      
  let accept socket =
    let ac_channel = 
      new_channel () 
    in
    spawn (sync $ send ac_channel $ accept) socket;
    ac_channel

end

module type Work = sig
  type env
  type arg
  val init : arg -> env
  val action : env * file_descr -> unit
end

module Daemon (W : Work) = struct

  let run port init =
    let listener = 
      TCPListener.make port 
    in
    let rec loop (accept, env) =
      let fork (socket, _) =
	let wraped_action () =
	  (step W.action) (env, socket);
	  (step $ shutdown) socket SHUTDOWN_ALL;
	  (step close) socket
	in
	spawn wraped_action ();
	loop (TCPListener.accept listener, env)
      in
      select [wrap (receive accept) fork]
    in
    loop (TCPListener.accept listener, W.init init)

end    

module Echo = struct
  type env = global_env
  type arg = env
    
  let init arg = arg


  let action (env, sock) =  
    let out =
      Unix.out_channel_of_descr sock
    in
    let net_send msg = 
      let length = 
	String.length msg
      in
      ignore @@ Unix.write sock msg 0 length;
      flush out
    in
    let port =
      Bcast.make_port env.down
    in
    let msg_queue, end_ch = 
      Mbox.make (), new_channel ()
    in
    let buf = String.create 1 in
    let rec wait_msg () =
      let c =
	let _ = Unix.select [sock] [] [] 0. in
	Unix.recv sock buf 0 1 []
      in
      if c > 0 then begin
	wait_msg @@ sync @@ Mbox.push msg_queue (String.copy buf)
      end
      else
	sync @@ send end_ch ()
    in
    spawn wait_msg ();
    let rec loop () =
      select [ 
      wrap (Bcast.receive_port port) (loop $ net_send);
      wrap (Mbox.pop msg_queue) (loop $ sync $ send env.up);
      wrap (receive end_ch) (fun () -> ())
    ]
    in
    loop ()

end

let _ =
  let env =
    { up = new_channel ();
      down = Bcast.make () }
  in
  let center_server env =
    let msg =
      sync (receive env.up)
    in
    tee (fun env -> Bcast.send env.down msg) env
  in
  spawn_loop center_server env;
  let module ChatServer = 
    Daemon ( Echo ) 
  in
  try
    ChatServer.run 8080 env
  with e ->
    print_endline @@ Printexc.to_string e;
    exit 1

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2008-12-21 (日) 12:55:36 (5605d)