whois/ntfy/watcher.go

97 lines
2.0 KiB
Go
Raw Normal View History

2025-01-05 16:39:13 -08:00
package ntfy
import (
"bufio"
"encoding/json"
"log"
"net/http"
"net/url"
"path"
"time"
)
type Message struct {
Id string `json:"id"`
Time int `json:"time"`
Message string `json:"message"`
Event string `json:"event"`
}
type NtfyWatcher struct {
Endpoint string
Topics []string
}
func (w *NtfyWatcher) Watch() chan Message {
notifications := make(chan Message)
for _, topic := range w.Topics {
log.Println("subscribing to topic:", topic)
go func() {
retryCount := 5
retryTime := 5 * time.Second
retries := retryCount
sleepAndDecrementRetry := func() {
log.Println("waiting 5 seconds before reconnecting. retries left:", retries, "topic:", topic, "endpoint:", w.Endpoint)
time.Sleep(retryTime)
retries--
}
for true {
if retries == 0 {
log.Fatal("too many retries, exiting")
}
endpoint, _ := url.JoinPath(w.Endpoint, path.Join(topic, "json"))
resp, err := http.Get(endpoint)
if err != nil {
log.Println("error connecting to endpoint:", err)
sleepAndDecrementRetry()
continue
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
bytes := scanner.Bytes()
var msg Message
err := json.Unmarshal(bytes, &msg)
if err != nil {
log.Println("could not unmarshal message:", err)
continue
}
if msg.Event == "keepalive" {
log.Println("received keepalive message")
continue
}
if msg.Event != "message" {
log.Println("received unknown event:", msg.Event)
continue
}
log.Println("received notification:", msg)
notifications <- msg
retries = retryCount // reset retries
}
if err := scanner.Err(); err != nil {
log.Println("error reading response body:", err)
sleepAndDecrementRetry()
}
}
}()
}
return notifications
}
func MakeNtfyWatcher(endpoint string, topics []string) *NtfyWatcher {
return &NtfyWatcher{
Endpoint: endpoint,
Topics: topics,
}
}