event_queue.mjs (4181B)
1 #!/usr/bin/env node 2 3 // Event Queue는 Redis에서 이벤트를 구독하고, 웹소켓 클라이언트에 4 // 이벤트를 전달하는 작은 프로그램입니다. 5 // 6 // 다음 환경 변수를 사용합니다. 7 // REDIS_URL - Redis 서버의 URL입니다. 8 // REDIS_USERNAME - Redis 서버의 사용자 이름입니다. 9 // REDIS_PASSWORD - Redis 서버의 비밀번호입니다. 10 // REDIS_DB - Redis 서버의 데이터베이스 번호입니다. 11 // WSS_PORT - 웹소켓 서버의 포트 번호입니다. 12 // 13 // 다음 채널을 구독합니다. 14 // event 15 // 이벤트 페이로드의 형식은 다음과 같습니다. 16 // [path]|[event]|[payload] 17 18 import redis from 'redis' 19 import { WebSocketServer } from 'ws' 20 import { nanoid } from 'nanoid' 21 22 /** 23 * @typedef {string} Path 24 * @typedef {string} SocketID 25 */ 26 27 /** 28 * 웹소켓 접속을 관리합니다. 29 */ 30 class SocketManager { 31 constructor () { 32 this.server = new WebSocketServer({ 33 port: parseIntOrDefault(process.env.WSS_PORT, 3001), 34 clientTracking: false, 35 }) 36 37 /** 38 * @type {Map<Path, Set<SocketID>>} 39 */ 40 this.paths = new Map() 41 42 /** 43 * @type {Map<SocketID, Path>} 44 */ 45 this.socketPaths = new Map() 46 47 /** 48 * @type {Map<SocketID, WebSocket>} 49 */ 50 this.sockets = new Map() 51 52 // Setup socket handlers 53 this.server.on('connection', ws => { 54 this.#handleConnection(ws) 55 }) 56 } 57 58 /** 59 * @param ws {WebSocket} 60 */ 61 #handleConnection (ws) { 62 const id = nanoid() 63 this.sockets.set(id, ws) 64 65 console.log(`Connected: ${id}`) 66 67 ws.on('message', (message, isBinary) => { 68 this.#handleMessage(id, ws, message, isBinary) 69 }) 70 ws.on('close', () => { 71 this.#unregister(id) 72 }) 73 74 setTimeout(() => { 75 ws.send('HAND') 76 }, 500) 77 } 78 79 /** 80 * @param id {SocketID} 81 */ 82 #unregister (id) { 83 const path = this.socketPaths.get(id) 84 if (this.paths.has(path)) { 85 this.paths.get(path).delete(id) 86 } 87 this.socketPaths.delete(id) 88 } 89 90 /** 91 * @param message {import('ws').WebSocket.RawData} 92 * @return {[string, string]} 93 */ 94 #parseCommand (message) { 95 const src = message.toString().trim() 96 return [src.slice(0, 4), src.slice(4)] // Command is always 4 characters 97 } 98 99 /** 100 * @param id {SocketID} 101 * @param ws {import('ws').WebSocket} 102 * @param message {import('ws').WebSocket.RawData} 103 * @param isBinary {boolean} 104 */ 105 #handleMessage (id, ws, message, isBinary) { 106 if (!this.sockets.has(id)) { 107 return 108 } 109 110 const [command, payload] = this.#parseCommand(message) 111 switch (command) { 112 case 'PATH': 113 this.#changePath(id, payload) 114 ws.send('OKAY') 115 break 116 117 case 'TEST': 118 this.broadcast(payload, 'TEST') 119 break 120 121 default: { 122 ws.send('????') 123 break 124 } 125 } 126 } 127 128 /** 129 * @param id {SocketID} 130 * @param path {Path} 131 */ 132 #changePath (id, path) { 133 const oldPath = this.socketPaths.get(id) 134 if (oldPath) { 135 this.paths.get(oldPath).delete(id) 136 } 137 138 if (!this.paths.has(path)) { 139 this.paths.set(path, new Set()) 140 } 141 this.paths.get(path).add(id) 142 } 143 144 /** 145 * @param path {Path} 146 * @param message {string} 147 */ 148 broadcast (path, message) { 149 if (!this.paths.has(path)) { 150 return 151 } 152 this.paths.get(path).forEach(id => { 153 const ws = this.sockets.get(id) 154 ws.send(message) 155 }) 156 } 157 } 158 159 function parseIntOrDefault (value, defaultValue) { 160 const parsed = parseInt(value, 10) 161 return Number.isNaN(parsed) ? defaultValue : parsed 162 } 163 164 !(async () => { 165 const redisClient = redis.createClient({ 166 url: process.env.REDIS_URL, 167 username: process.env.REDIS_USERNAME, 168 password: process.env.REDIS_PASSWORD, 169 database: parseIntOrDefault(process.env.REDIS_DB, undefined), 170 }) 171 172 const socketManager = new SocketManager() 173 174 await redisClient.connect() 175 176 await redisClient.subscribe('event', (message) => { 177 const [path, event, payload] = message.split('|') 178 socketManager.broadcast(path, `EMIT${event} ${payload}`) 179 }) 180 181 console.log('Started') 182 })() 183 .catch(err => { 184 console.error(err) 185 process.exit(1) 186 })