dh_demo

DreamHanks demo project
git clone git://git.lair.cx/dh_demo
Log | Files | Refs | README

event_queue.mjs (4181B)


      1 #!/usr/bin/env node
      2 
      3 // Event Queue는 Redis에서 이벤트를 구독하고, 웹소켓 클라이언트에
      4 // 이벤트를 전달하는 작은 프로그램입니다.
      5 //
      6 // 다음 환경 변수를 사용합니다.
      7 //  REDIS_URL      - Redis 서버의 URL입니다.
      8 //  REDIS_USERNAME - Redis 서버의 사용자 이름입니다.
      9 //  REDIS_PASSWORD - Redis 서버의 비밀번호입니다.
     10 //  REDIS_DB       - Redis 서버의 데이터베이스 번호입니다.
     11 //  WSS_PORT       - 웹소켓 서버의 포트 번호입니다.
     12 //
     13 // 다음 채널을 구독합니다.
     14 //  event
     15 // 이벤트 페이로드의 형식은 다음과 같습니다.
     16 //  [path]|[event]|[payload]
     17 
     18 import redis from 'redis'
     19 import { WebSocketServer } from 'ws'
     20 import { nanoid } from 'nanoid'
     21 
     22 /**
     23  * @typedef {string} Path
     24  * @typedef {string} SocketID
     25  */
     26 
     27 /**
     28  * 웹소켓 접속을 관리합니다.
     29  */
     30 class SocketManager {
     31   constructor () {
     32     this.server = new WebSocketServer({
     33       port: parseIntOrDefault(process.env.WSS_PORT, 3001),
     34       clientTracking: false,
     35     })
     36 
     37     /**
     38      * @type {Map<Path, Set<SocketID>>}
     39      */
     40     this.paths = new Map()
     41 
     42     /**
     43      * @type {Map<SocketID, Path>}
     44      */
     45     this.socketPaths = new Map()
     46 
     47     /**
     48      * @type {Map<SocketID, WebSocket>}
     49      */
     50     this.sockets = new Map()
     51 
     52     // Setup socket handlers
     53     this.server.on('connection', ws => {
     54       this.#handleConnection(ws)
     55     })
     56   }
     57 
     58   /**
     59    * @param ws {WebSocket}
     60    */
     61   #handleConnection (ws) {
     62     const id = nanoid()
     63     this.sockets.set(id, ws)
     64 
     65     console.log(`Connected: ${id}`)
     66 
     67     ws.on('message', (message, isBinary) => {
     68       this.#handleMessage(id, ws, message, isBinary)
     69     })
     70     ws.on('close', () => {
     71       this.#unregister(id)
     72     })
     73 
     74     setTimeout(() => {
     75       ws.send('HAND')
     76     }, 500)
     77   }
     78 
     79   /**
     80    * @param id {SocketID}
     81    */
     82   #unregister (id) {
     83     const path = this.socketPaths.get(id)
     84     if (this.paths.has(path)) {
     85       this.paths.get(path).delete(id)
     86     }
     87     this.socketPaths.delete(id)
     88   }
     89 
     90   /**
     91    * @param message {import('ws').WebSocket.RawData}
     92    * @return {[string, string]}
     93    */
     94   #parseCommand (message) {
     95     const src = message.toString().trim()
     96     return [src.slice(0, 4), src.slice(4)] // Command is always 4 characters
     97   }
     98 
     99   /**
    100    * @param id {SocketID}
    101    * @param ws {import('ws').WebSocket}
    102    * @param message {import('ws').WebSocket.RawData}
    103    * @param isBinary {boolean}
    104    */
    105   #handleMessage (id, ws, message, isBinary) {
    106     if (!this.sockets.has(id)) {
    107       return
    108     }
    109 
    110     const [command, payload] = this.#parseCommand(message)
    111     switch (command) {
    112       case 'PATH':
    113         this.#changePath(id, payload)
    114         ws.send('OKAY')
    115         break
    116 
    117       case 'TEST':
    118         this.broadcast(payload, 'TEST')
    119         break
    120 
    121       default: {
    122         ws.send('????')
    123         break
    124       }
    125     }
    126   }
    127 
    128   /**
    129    * @param id {SocketID}
    130    * @param path {Path}
    131    */
    132   #changePath (id, path) {
    133     const oldPath = this.socketPaths.get(id)
    134     if (oldPath) {
    135       this.paths.get(oldPath).delete(id)
    136     }
    137 
    138     if (!this.paths.has(path)) {
    139       this.paths.set(path, new Set())
    140     }
    141     this.paths.get(path).add(id)
    142   }
    143 
    144   /**
    145    * @param path {Path}
    146    * @param message {string}
    147    */
    148   broadcast (path, message) {
    149     if (!this.paths.has(path)) {
    150       return
    151     }
    152     this.paths.get(path).forEach(id => {
    153       const ws = this.sockets.get(id)
    154       ws.send(message)
    155     })
    156   }
    157 }
    158 
    159 function parseIntOrDefault (value, defaultValue) {
    160   const parsed = parseInt(value, 10)
    161   return Number.isNaN(parsed) ? defaultValue : parsed
    162 }
    163 
    164 !(async () => {
    165   const redisClient = redis.createClient({
    166     url: process.env.REDIS_URL,
    167     username: process.env.REDIS_USERNAME,
    168     password: process.env.REDIS_PASSWORD,
    169     database: parseIntOrDefault(process.env.REDIS_DB, undefined),
    170   })
    171 
    172   const socketManager = new SocketManager()
    173 
    174   await redisClient.connect()
    175 
    176   await redisClient.subscribe('event', (message) => {
    177     const [path, event, payload] = message.split('|')
    178     socketManager.broadcast(path, `EMIT${event} ${payload}`)
    179   })
    180 
    181   console.log('Started')
    182 })()
    183   .catch(err => {
    184     console.error(err)
    185     process.exit(1)
    186   })