gojay

high performance JSON encoder/decoder with stream API for Golang
git clone git://git.lair.cx/gojay
Log | Files | Refs | README | LICENSE

commit c758e56937cc3273b482d304a0e07880924ab3cd
parent dd3bc129dd64a8ed3b9175610fc9f756bcd8b065
Author: francoispqt <francois@parquet.ninja>
Date:   Thu,  3 May 2018 00:13:53 +0800

add example directory and web socket stream implementation

Diffstat:
Aexamples/websocket/client/client.go | 26++++++++++++++++++++++++++
Aexamples/websocket/comm/comm.go | 107+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aexamples/websocket/main.go | 67+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aexamples/websocket/server/server.go | 82+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 282 insertions(+), 0 deletions(-)

diff --git a/examples/websocket/client/client.go b/examples/websocket/client/client.go @@ -0,0 +1,26 @@ +package client + +import ( + "github.com/francoispqt/gojay/examples/websocket/comm" + "golang.org/x/net/websocket" +) + +type client struct { + comm.SenderReceiver + id int +} + +func NewClient(id int) *client { + c := new(client) + c.id = id + return c +} + +func (c *client) Dial(url, origin string) error { + conn, err := websocket.Dial(url, "", origin) + if err != nil { + return err + } + c.Conn = conn + return nil +} diff --git a/examples/websocket/comm/comm.go b/examples/websocket/comm/comm.go @@ -0,0 +1,107 @@ +package comm + +import ( + "errors" + "log" + + "github.com/francoispqt/gojay" + "golang.org/x/net/websocket" +) + +// A basic message for our WebSocket app + +type Message struct { + Message string + UserName string +} + +func (m *Message) UnmarshalObject(dec *gojay.Decoder, k string) error { + switch k { + case "message": + return dec.AddString(&m.Message) + case "userName": + return dec.AddString(&m.UserName) + } + return nil +} +func (m *Message) NKeys() int { + return 2 +} + +func (m *Message) MarshalObject(enc *gojay.Encoder) { + enc.AddStringKey("message", m.Message) + enc.AddStringKey("userName", m.UserName) +} +func (u *Message) IsNil() bool { + return u == nil +} + +// Here are defined our communication types +type Sender chan gojay.MarshalerObject + +func (s Sender) MarshalStream(enc *gojay.StreamEncoder) { + select { + case <-enc.Done(): + return + case m := <-s: + enc.AddObject(m) + } +} + +type Receiver chan *Message + +func (s Receiver) UnmarshalStream(dec *gojay.StreamDecoder) error { + m := &Message{} + if err := dec.AddObject(m); err != nil { + return err + } + s <- m + return nil +} + +type SenderReceiver struct { + Send Sender + Receive Receiver + Dec *gojay.StreamDecoder + Enc *gojay.StreamEncoder + Conn *websocket.Conn +} + +func (sc *SenderReceiver) SetReceiver() { + sc.Receive = Receiver(make(chan *Message)) + sc.Dec = gojay.Stream.BorrowDecoder(sc.Conn) + go sc.Dec.DecodeStream(sc.Receive) +} + +func (sc *SenderReceiver) SetSender(nCons int) { + sc.Send = Sender(make(chan gojay.MarshalerObject)) + sc.Enc = gojay.Stream.BorrowEncoder(sc.Conn).NConsumer(nCons).LineDelimited() + go sc.Enc.EncodeStream(sc.Send) +} + +func (sc *SenderReceiver) SendMessage(m gojay.MarshalerObject) error { + select { + case <-sc.Enc.Done(): + return errors.New("sender closed") + case sc.Send <- m: + log.Print("message sent by client: ", m) + return nil + } +} + +func (c *SenderReceiver) OnMessage(f func(*Message)) error { + for { + select { + case <-c.Dec.Done(): + return errors.New("receiver closed") + case m := <-c.Receive: + f(m) + } + } +} + +func (sc *SenderReceiver) Init(sender int) *SenderReceiver { + sc.SetSender(sender) + sc.SetReceiver() + return sc +} diff --git a/examples/websocket/main.go b/examples/websocket/main.go @@ -0,0 +1,67 @@ +// package main simulates a conversation between +// a given set of websocket clients and a server. +// +// It uses gojay's streaming feature to abstract JSON communication +// and only having to handle go values. +package main + +import ( + "log" + "strconv" + + "github.com/francoispqt/gojay/examples/websocket/client" + "github.com/francoispqt/gojay/examples/websocket/comm" + "github.com/francoispqt/gojay/examples/websocket/server" +) + +func createServer(done chan error) { + // create our server, with a done signal + s := server.NewServer() + // set our connection handler + s.OnConnection(func(c *server.Client) { + // send welcome message to initiate the conversation + c.SendMessage(&comm.Message{ + UserName: "server", + Message: "Welcome !", + }) + // start handling messages + c.OnMessage(func(m *comm.Message) { + log.Print("message received from client: ", m) + s.BroadCastRandom(c, m) + }) + }) + go s.Listen(":8070", done) +} + +func createClient(url, origin string, i int) { + // create our client + c := client.NewClient(i) + // Dial connection to the WS server + err := c.Dial(url, origin) + if err != nil { + panic(err) + } + str := strconv.Itoa(i) + // Init client's sender and receiver + // Set the OnMessage handler + c.Init(10).OnMessage(func(m *comm.Message) { + log.Print("client "+str+" received from "+m.UserName+" message: ", m) + c.SendMessage(&comm.Message{ + UserName: str, + Message: "Responding to: " + m.UserName + " | old message: " + m.Message, + }) + }) +} + +// Our main function +func main() { + done := make(chan error) + createServer(done) + // add our clients connection + for i := 0; i < 100; i++ { + i := i + go createClient("ws://localhost:8070/ws", "http://localhost/", i) + } + // handle server's termination + log.Fatal(<-done) +} diff --git a/examples/websocket/server/server.go b/examples/websocket/server/server.go @@ -0,0 +1,82 @@ +package server + +import ( + "log" + "math/rand" + "net/http" + "sync" + "time" + + "github.com/francoispqt/gojay/examples/websocket/comm" + "golang.org/x/net/websocket" +) + +type server struct { + clients []*Client + mux *sync.RWMutex + handle func(c *Client) +} + +type Client struct { + comm.SenderReceiver + server *server +} + +func NewClient(s *server, conn *websocket.Conn) *Client { + sC := new(Client) + sC.Conn = conn + sC.server = s + return sC +} + +func NewServer() *server { + s := new(server) + s.mux = new(sync.RWMutex) + s.clients = make([]*Client, 0, 100) + return s +} + +func (c *Client) Close() { + c.Conn.Close() +} + +func (s *server) Handle(conn *websocket.Conn) { + defer func() { + err := conn.Close() + if err != nil { + log.Fatal(err) + } + }() + c := NewClient(s, conn) + // add our server client to the list of clients + s.mux.Lock() + s.clients = append(s.clients, c) + s.mux.Unlock() + // init Client's sender and receiver + c.Init(10) + s.handle(c) + // block until reader is done + <-c.Dec.Done() +} + +func (s *server) Listen(port string, done chan error) { + http.Handle("/ws", websocket.Handler(s.Handle)) + done <- http.ListenAndServe(port, nil) +} + +func (s *server) OnConnection(h func(c *Client)) { + s.handle = h +} + +func random(min, max int) int { + rand.Seed(time.Now().Unix()) + return rand.Intn(max-min) + min +} + +func (s *server) BroadCastRandom(sC *Client, m *comm.Message) { + m.Message = "Random message" + s.mux.RLock() + r := random(0, len(s.clients)) + s.clients[r].SendMessage(m) + s.mux.RUnlock() +}