You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

623 lines
15KB

  1. package main
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. "github.com/nanobox-io/golang-scribble"
  6. "math"
  7. "net"
  8. "os"
  9. "os/signal"
  10. "syscall"
  11. "fmt"
  12. "gen/ledd"
  13. "github.com/golang/protobuf/proto"
  14. "github.com/lucasb-eyer/go-colorful"
  15. "github.com/op/go-logging"
  16. "gopkg.in/yaml.v2"
  17. "io/ioutil"
  18. )
  19. // CONSTANTS
  20. const VERSION = "0.1"
  21. const LOG_BACKEND = "BH"
  22. const LOG_CLIENTS = "CH"
  23. // STRUCTS
  24. type Config struct {
  25. Name string
  26. Daemon struct {
  27. Frontend struct {
  28. Host string
  29. Port int
  30. }
  31. Backend struct {
  32. Host string
  33. Port int
  34. }
  35. }
  36. Mongodb struct {
  37. Host string
  38. Port int
  39. Database string
  40. }
  41. }
  42. type BackendManager struct {
  43. backends map[string]*Backend
  44. broadcast chan []byte
  45. register chan *Backend
  46. unregister chan *Backend
  47. }
  48. type Backend struct {
  49. name string
  50. platformType string
  51. version string
  52. channel int32
  53. resolution int32
  54. socket net.Conn
  55. data chan []byte
  56. }
  57. type ClientManager struct {
  58. clients map[*Client]bool
  59. broadcast chan []byte
  60. register chan *Client
  61. unregister chan *Client
  62. }
  63. type Client struct {
  64. platform string
  65. socket net.Conn
  66. data chan []byte
  67. }
  68. type LED struct {
  69. Name string
  70. Channel []int32
  71. Correction []float32
  72. Backend string
  73. color chan colorful.Color
  74. }
  75. type LEDManager struct {
  76. leds map[string]*LED
  77. broadcast chan colorful.Color
  78. add chan *LED
  79. remove chan *LED
  80. }
  81. // GLOBAL VARS
  82. var log = logging.MustGetLogger("LedD")
  83. var backManager BackendManager
  84. var clientManager ClientManager
  85. var ledManager LEDManager
  86. var db *scribble.Driver
  87. var config Config
  88. // SOCKET SETUP
  89. func setupSocket(host string, port int, logTag string, backend bool) (func(), error) {
  90. ln, err := net.Listen("tcp4", fmt.Sprintf("%s:%d", host, port))
  91. if err != nil {
  92. return nil, err
  93. }
  94. log.Infof("[%s] Ready to handle connections.", logTag)
  95. return func() {
  96. for {
  97. conn, err := ln.Accept()
  98. if err != nil {
  99. log.Warningf("%s", err)
  100. }
  101. log.Infof("[%s] New connection from %s", logTag, conn.RemoteAddr())
  102. if backend {
  103. backend := &Backend{socket: conn, data: make(chan []byte, 20)}
  104. go backManager.receive(backend)
  105. go backManager.send(backend)
  106. } else {
  107. client := &Client{socket: conn, data: make(chan []byte, 20)}
  108. go clientManager.receive(client)
  109. go clientManager.send(client)
  110. }
  111. }
  112. }, nil
  113. }
  114. // LED MANAGER
  115. func (manager *LEDManager) start() {
  116. for {
  117. select {
  118. case led := <-manager.add:
  119. log.Debugf("[%s] Request to add LED: %s (%s) (%s)", led.Backend, led.Name, led.Channel, led.Correction)
  120. if led.Name == "" || len(led.Channel) == 0 || led.Backend == "" {
  121. log.Warningf("[%s] Can't add LED without required information! (%s)", LOG_CLIENTS, led)
  122. continue
  123. }
  124. if _, ok := manager.leds[led.Name]; ok {
  125. log.Warningf("[%s] Can't add LED: already in LEDM! (%s)", LOG_CLIENTS, led.Name)
  126. continue
  127. }
  128. var dbLED LED
  129. err := db.Read("led", led.Name, &dbLED)
  130. if err == nil {
  131. // log.Warningf("[%s] LED already in DB! (%s)", LOG_CLIENTS, led.Name)
  132. } else if os.IsNotExist(err) {
  133. err = db.Write("led", led.Name, led)
  134. if err != nil {
  135. log.Warning("[%s] Error while adding LED to database: %s", LOG_BACKEND, err)
  136. }
  137. } else {
  138. log.Warning("[%s] Error while checking database for LED: %s", LOG_BACKEND, err)
  139. }
  140. manager.leds[led.Name] = led
  141. go manager.color(led)
  142. case led := <-manager.remove:
  143. if _, ok := manager.leds[led.Name]; ok {
  144. log.Debugf("[%s] Request to remove %s", led.Backend, led.Name)
  145. err := db.Delete("led", led.Name)
  146. check(err)
  147. delete(manager.leds, led.Name)
  148. }
  149. case color := <-manager.broadcast:
  150. for _, led := range manager.leds {
  151. select {
  152. case led.color <- color:
  153. }
  154. }
  155. }
  156. }
  157. }
  158. func (manager *LEDManager) color(led *LED) {
  159. for {
  160. select {
  161. case color := <-led.color:
  162. if backend, ok := backManager.backends[led.Backend]; ok {
  163. if len(led.Channel) != 3 {
  164. log.Warningf("[%s] Currently only RGB LEDs are supported", led.Name)
  165. return
  166. }
  167. cMap := make(map[int32]float64)
  168. fMap := make(map[int32]float32)
  169. if !color.IsValid() {
  170. log.Warningf("[%s] Got invalid HCL->RGB color, clamping!", led.Name)
  171. color = color.Clamped()
  172. }
  173. log.Debugf("[%s] New color: \x1b[38;2;%d;%d;%dm%s\x1b[0m", led.Name, int(math.Round(color.R*255)), int(math.Round(color.G*255)), int(math.Round(color.B*255)), color.Hex())
  174. cMap[led.Channel[0]] = color.R
  175. cMap[led.Channel[1]] = color.G
  176. cMap[led.Channel[2]] = color.B
  177. fMap[led.Channel[0]] = led.Correction[0]
  178. fMap[led.Channel[1]] = led.Correction[1]
  179. fMap[led.Channel[2]] = led.Correction[2]
  180. wrapperMsg := &ledd.BackendWrapperMessage{
  181. Msg: &ledd.BackendWrapperMessage_MSetChannel{
  182. MSetChannel: &ledd.BackendSetChannel{
  183. Values: cMap,
  184. Correction: fMap,
  185. }}}
  186. data, err := proto.Marshal(wrapperMsg)
  187. if err != nil {
  188. log.Warningf("[%s] Failed to encode protobuf msg to %s: %s", led.Name, backend.name, err)
  189. }
  190. backend.data <- prepareProtobuf(data)
  191. } else {
  192. log.Warningf("[LM] Failed to set color for %s: backend %s not found", led.Name, led.Backend)
  193. }
  194. }
  195. }
  196. }
  197. // BACKEND HANDLER
  198. func (manager *BackendManager) start() {
  199. for {
  200. select {
  201. case backend := <-manager.register:
  202. manager.backends[backend.name] = backend
  203. log.Debugf("[%s] %s registered", LOG_BACKEND, backend.niceName())
  204. wrapperMsg := &ledd.BackendWrapperMessage{
  205. Msg: &ledd.BackendWrapperMessage_MLedd{
  206. MLedd: &ledd.LedD{
  207. Name: config.Name,
  208. },
  209. },
  210. }
  211. data, err := proto.Marshal(wrapperMsg)
  212. if err != nil {
  213. log.Warningf("[%s] Failed to encode protobuf: %s", backend.niceName(), err)
  214. }
  215. backend.data <- prepareProtobuf(data)
  216. case backend := <-manager.unregister:
  217. if _, ok := manager.backends[backend.name]; ok {
  218. log.Debugf("[%s] %s removed: connection terminated", LOG_BACKEND, backend.socket.RemoteAddr())
  219. close(backend.data)
  220. delete(manager.backends, backend.name)
  221. }
  222. case message := <-manager.broadcast:
  223. for _, backend := range manager.backends {
  224. select {
  225. case backend.data <- message:
  226. default:
  227. close(backend.data)
  228. delete(manager.backends, backend.name)
  229. }
  230. }
  231. }
  232. }
  233. }
  234. func (manager *BackendManager) stop() {
  235. for _, backend := range manager.backends {
  236. close(backend.data)
  237. }
  238. }
  239. func (manager *BackendManager) send(backend *Backend) {
  240. defer backend.socket.Close()
  241. for {
  242. select {
  243. case message, ok := <-backend.data:
  244. if !ok {
  245. return
  246. }
  247. backend.socket.Write(message)
  248. }
  249. }
  250. }
  251. func (manager *BackendManager) receive(backend *Backend) {
  252. for {
  253. message := make([]byte, 4096)
  254. length, err := backend.socket.Read(message)
  255. if err != nil {
  256. log.Warningf("[%s] Read failed: %s", backend.niceName(), err)
  257. manager.unregister <- backend
  258. backend.socket.Close()
  259. break
  260. }
  261. if length > 0 {
  262. msgLen := binary.BigEndian.Uint32(message[0:4])
  263. // log.Debugf("[%s] Read %d bytes, first protobuf is %d long", Backend.niceName(), length, msgLen)
  264. backendMsg := &ledd.BackendWrapperMessage{}
  265. err = proto.Unmarshal(message[4:msgLen+4], backendMsg)
  266. if err != nil {
  267. log.Warningf("[%s] Couldn't decode protobuf msg!", backend.niceName())
  268. continue
  269. }
  270. switch msg := backendMsg.Msg.(type) {
  271. case *ledd.BackendWrapperMessage_MBackend:
  272. nBackend := msg.MBackend
  273. backend.name = nBackend.Name
  274. backend.channel = nBackend.Channel
  275. backend.resolution = nBackend.Resolution
  276. backend.platformType = nBackend.Type
  277. backend.version = nBackend.Version
  278. log.Infof("[%s] %s is now identified as %s", LOG_BACKEND, backend.socket.RemoteAddr(), backend.niceName())
  279. backManager.register <- backend
  280. }
  281. }
  282. }
  283. }
  284. // CLIENT HANDLER
  285. func (manager *ClientManager) start() {
  286. for {
  287. select {
  288. case client := <-manager.register:
  289. manager.clients[client] = true
  290. log.Debugf("[%s] Client %s (%s) registered", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform)
  291. backends := make([]*ledd.Backend, 0, len(backManager.backends))
  292. leds := make([]*ledd.LED, 0, len(ledManager.leds))
  293. for _, led := range ledManager.leds {
  294. leds = append(leds, &ledd.LED{
  295. Name: led.Name,
  296. })
  297. }
  298. for _, backend := range backManager.backends {
  299. backends = append(backends, &ledd.Backend{
  300. Name: backend.name,
  301. Channel: backend.channel,
  302. Resolution: backend.resolution,
  303. Type: backend.platformType,
  304. Version: backend.version,
  305. })
  306. }
  307. wrapperMsg := &ledd.ClientWrapperMessage{
  308. Leds: leds,
  309. Backends: backends,
  310. Msg: &ledd.ClientWrapperMessage_MLedd{
  311. MLedd: &ledd.LedD{
  312. Name: config.Name,
  313. },
  314. },
  315. }
  316. data, err := proto.Marshal(wrapperMsg)
  317. if err != nil {
  318. log.Warningf("[%s] Failed to encode protobuf msg: %s", client.socket.RemoteAddr(), err)
  319. }
  320. client.data <- prepareProtobuf(data)
  321. case client := <-manager.unregister:
  322. if _, ok := manager.clients[client]; ok {
  323. log.Debugf("[%s] %s (%s) removed", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform)
  324. close(client.data)
  325. delete(manager.clients, client)
  326. }
  327. case message := <-manager.broadcast:
  328. for connection := range manager.clients {
  329. select {
  330. case connection.data <- message:
  331. default:
  332. close(connection.data)
  333. delete(manager.clients, connection)
  334. }
  335. }
  336. }
  337. }
  338. }
  339. func (manager *ClientManager) send(client *Client) {
  340. defer client.socket.Close()
  341. for {
  342. select {
  343. case message, ok := <-client.data:
  344. if !ok {
  345. return
  346. }
  347. client.socket.Write(message)
  348. }
  349. }
  350. }
  351. func (manager *ClientManager) receive(client *Client) {
  352. for {
  353. message := make([]byte, 4096)
  354. length, err := client.socket.Read(message)
  355. if err != nil {
  356. log.Warningf("[%s] Read failed: %s", client.socket.RemoteAddr(), err)
  357. manager.unregister <- client
  358. client.socket.Close()
  359. break
  360. }
  361. if length > 0 {
  362. for i := 0; i < length; {
  363. msgLen := int(binary.BigEndian.Uint32(message[i : i+4]))
  364. // log.Debugf("[%s] Reading protobuf after %d (len=%d)", client.socket.RemoteAddr(), i+4, msgLen)
  365. clientMsg := &ledd.ClientWrapperMessage{}
  366. err = proto.Unmarshal(message[i+4:i+msgLen+4], clientMsg)
  367. i += msgLen + 4
  368. if err != nil {
  369. log.Warningf("[%s] Couldn't decode protobuf msg!", client.socket.RemoteAddr())
  370. continue
  371. }
  372. switch msg := clientMsg.Msg.(type) {
  373. case *ledd.ClientWrapperMessage_MClient:
  374. client.platform = msg.MClient.Type
  375. log.Infof("[%s] %s is now identified as client (%s)", LOG_CLIENTS, client.socket.RemoteAddr(), client.platform)
  376. clientManager.register <- client
  377. case *ledd.ClientWrapperMessage_MGetLed:
  378. allLED := make([]*ledd.LED, 0)
  379. for _, led := range ledManager.leds {
  380. allLED = append(allLED, &ledd.LED{Name: led.Name})
  381. }
  382. data, err := proto.Marshal(&ledd.ClientWrapperMessage{Leds: allLED})
  383. if err != nil {
  384. log.Errorf("[%s] Error encoding protobuf: %s", client.socket.RemoteAddr(), err)
  385. break
  386. }
  387. client.data <- prepareProtobuf(data)
  388. case *ledd.ClientWrapperMessage_MAddLed:
  389. backend, ok := backManager.backends[msg.MAddLed.Backend]
  390. if !ok {
  391. log.Warningf("[%s] Can't add LED for non-existing backend %s", client.socket.RemoteAddr(), msg.MAddLed.Backend)
  392. break
  393. }
  394. if _, ok := ledManager.leds[msg.MAddLed.Name]; ok {
  395. log.Warningf("[%s] Can't add LED with exisiting name %s", client.socket.RemoteAddr(), msg.MAddLed.Name)
  396. break
  397. }
  398. nLED := LED{
  399. Name: msg.MAddLed.Name,
  400. Channel: msg.MAddLed.Channel,
  401. Backend: backend.name,
  402. color: make(chan colorful.Color, 20),
  403. Correction: msg.MAddLed.Correction,
  404. }
  405. ledManager.add <- &nLED
  406. case *ledd.ClientWrapperMessage_MSetLed:
  407. leds := clientMsg.Leds
  408. if len(leds) == 0 {
  409. log.Warningf("[%s] Got setLED with no LEDs attached!", client.socket.RemoteAddr())
  410. break
  411. }
  412. for _, pLED := range leds {
  413. led, ok := ledManager.leds[pLED.Name]
  414. if !ok {
  415. log.Warningf("[%s] Failed to set %s: not found", client.socket.RemoteAddr(), pLED.Name)
  416. break
  417. }
  418. // log.Debugf("[%s] Set %s to %s", client.socket.RemoteAddr(), led.Name, colorful.Hcl(msg.MSetLed.Colour.Hue, msg.MSetLed.Colour.Chroma, msg.MSetLed.Colour.Light))
  419. if pLED.Color == nil {
  420. led.color <- colorful.Color{R: msg.MSetLed.Colour.Red, G: msg.MSetLed.Colour.Green, B: msg.MSetLed.Colour.Blue}
  421. } else {
  422. led.color <- colorful.Color{R: pLED.Color.Red, G: pLED.Color.Green, B: pLED.Color.Blue}
  423. }
  424. }
  425. case *ledd.ClientWrapperMessage_MSetDirect:
  426. backend, ok := backManager.backends[msg.MSetDirect.Backend]
  427. if !ok {
  428. log.Warningf("[%s] Can't set channel for non-existing backend %s", client.socket.RemoteAddr(), msg.MSetDirect.Backend)
  429. break
  430. }
  431. backend.setChannel(msg.MSetDirect.Channel, float64(msg.MSetDirect.Value/backend.resolution))
  432. case *ledd.ClientWrapperMessage_MRemoveLed:
  433. led, ok := ledManager.leds[msg.MRemoveLed.Name]
  434. if !ok {
  435. log.Warningf("[%s] Failed to remove %s: not found", client.socket.RemoteAddr(), msg.MRemoveLed.Name)
  436. break
  437. }
  438. ledManager.remove <- led
  439. }
  440. }
  441. }
  442. }
  443. }
  444. // HELPER
  445. func check(e error) {
  446. if e != nil {
  447. panic(e)
  448. }
  449. }
  450. func (backend *Backend) niceName() string {
  451. if backend.name != "" {
  452. return backend.name
  453. } else {
  454. return backend.socket.RemoteAddr().String()
  455. }
  456. }
  457. func (backend Backend) setChannel(channel int32, val float64) {
  458. cMap := make(map[int32]float64)
  459. cMap[channel] = val
  460. wrapperMsg := &ledd.BackendWrapperMessage{
  461. Msg: &ledd.BackendWrapperMessage_MSetChannel{
  462. MSetChannel: &ledd.BackendSetChannel{
  463. Values: cMap}}}
  464. data, err := proto.Marshal(wrapperMsg)
  465. if err != nil {
  466. log.Warningf("[%s] Failed to encode protobuf msg: %s", backend.niceName(), err)
  467. }
  468. backend.data <- prepareProtobuf(data)
  469. }
  470. func prepareProtobuf(data []byte) []byte {
  471. size := make([]byte, 4)
  472. binary.BigEndian.PutUint32(size, uint32(len(data)))
  473. return append(size, data...)
  474. }
  475. // MAIN
  476. func main() {
  477. killSignals := make(chan os.Signal, 1)
  478. signal.Notify(killSignals, syscall.SIGINT, syscall.SIGTERM)
  479. log.Info("LedD", VERSION)
  480. content, err := ioutil.ReadFile("ledd.yaml")
  481. check(err)
  482. err = yaml.Unmarshal(content, &config)
  483. check(err)
  484. db, err = scribble.New("db", nil)
  485. check(err)
  486. backManager = BackendManager{
  487. backends: make(map[string]*Backend),
  488. broadcast: make(chan []byte, 20),
  489. register: make(chan *Backend, 10),
  490. unregister: make(chan *Backend, 10),
  491. }
  492. go backManager.start()
  493. clientManager = ClientManager{
  494. clients: make(map[*Client]bool),
  495. broadcast: make(chan []byte, 20),
  496. register: make(chan *Client, 10),
  497. unregister: make(chan *Client, 10),
  498. }
  499. go clientManager.start()
  500. ledManager = LEDManager{
  501. leds: make(map[string]*LED),
  502. broadcast: make(chan colorful.Color, 10),
  503. add: make(chan *LED, 10),
  504. remove: make(chan *LED, 10),
  505. }
  506. go ledManager.start()
  507. leds, err := db.ReadAll("led")
  508. if os.IsNotExist(err) {
  509. log.Infof("No DB found.")
  510. } else {
  511. check(err)
  512. }
  513. for _, ledJson := range leds {
  514. led := LED{}
  515. err = json.Unmarshal([]byte(ledJson), &led)
  516. check(err)
  517. led.color = make(chan colorful.Color, 20)
  518. ledManager.add <- &led
  519. }
  520. backendThread, err := setupSocket(config.Daemon.Backend.Host, config.Daemon.Backend.Port, LOG_BACKEND, true)
  521. check(err)
  522. go backendThread()
  523. frontendThread, err := setupSocket(config.Daemon.Frontend.Host, config.Daemon.Frontend.Port, LOG_CLIENTS, false)
  524. check(err)
  525. go frontendThread()
  526. log.Infof("All connection handler ready.")
  527. <-killSignals
  528. backManager.stop()
  529. }