forked from ebhomengo/niki
351 lines
8.6 KiB
Go
351 lines
8.6 KiB
Go
|
package supervisor
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"git.gocasts.ir/ebhomengo/niki/logger"
|
||
|
|
||
|
"log/slog"
|
||
|
"os"
|
||
|
"os/signal"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// ProcessFunc is a long-running process which listens on finishSignal
|
||
|
// It notifies the supervisor by terminate channel when it terminates
|
||
|
type ProcessFunc func(finishSignal context.Context, processName string, terminateChannel chan<- string) error
|
||
|
|
||
|
var noopProcessFunc = func(finishSignal context.Context, processName string, terminateChannel chan<- string) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Supervisor is responsible to manage long-running processes
|
||
|
// Supervisor is not for concurrent use and should be used as the main goroutine of application
|
||
|
type Supervisor struct {
|
||
|
logger *slog.Logger
|
||
|
lock *sync.Mutex
|
||
|
processes map[string]Process
|
||
|
shutdownSignal chan os.Signal
|
||
|
ctx context.Context
|
||
|
ctxCancel context.CancelFunc
|
||
|
shutdownTimeout time.Duration
|
||
|
// terminateChannel should be used to notify supervisor when a process terminates
|
||
|
terminateChannel chan string
|
||
|
}
|
||
|
|
||
|
func New(shutdownTimeout time.Duration, l *slog.Logger) *Supervisor {
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
|
||
|
if l == nil {
|
||
|
l = logger.L()
|
||
|
}
|
||
|
|
||
|
if shutdownTimeout == 0 {
|
||
|
shutdownTimeout = DefaultGracefulShutdownTimeout
|
||
|
}
|
||
|
|
||
|
return &Supervisor{
|
||
|
lock: &sync.Mutex{},
|
||
|
logger: l.WithGroup(LogNSSupervisor),
|
||
|
processes: make(map[string]Process),
|
||
|
shutdownSignal: make(chan os.Signal, 1),
|
||
|
ctx: ctx,
|
||
|
ctxCancel: cancel,
|
||
|
shutdownTimeout: shutdownTimeout,
|
||
|
// TODO : how to set terminateChannel buffer?
|
||
|
terminateChannel: make(chan string, 10),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type Process struct {
|
||
|
name string
|
||
|
handler ProcessFunc
|
||
|
options ProcessOption
|
||
|
state ProcessState
|
||
|
}
|
||
|
|
||
|
type ProcessState struct {
|
||
|
// RecoveredNum count number of time the process recovered
|
||
|
RecoveredNum int
|
||
|
}
|
||
|
|
||
|
type ProcessOption struct {
|
||
|
Recover bool
|
||
|
RecoverInterval time.Duration
|
||
|
RecoverCount int
|
||
|
RetryCount int
|
||
|
RetryInterval time.Duration
|
||
|
IsFatal bool
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
ProcessRetryCount = 3
|
||
|
ProcessRetryInterval = 3 * time.Second
|
||
|
ProcessRecoverCount = 10
|
||
|
ProcessRecoverInterval = 2 * time.Second
|
||
|
DefaultGracefulShutdownTimeout = 5 * time.Second
|
||
|
LogNSSupervisor = "supervisor"
|
||
|
)
|
||
|
|
||
|
var defaultOptions = ProcessOption{
|
||
|
Recover: true,
|
||
|
RetryInterval: ProcessRetryInterval,
|
||
|
RecoverInterval: ProcessRecoverInterval,
|
||
|
RecoverCount: ProcessRecoverCount,
|
||
|
RetryCount: ProcessRetryCount,
|
||
|
IsFatal: true,
|
||
|
}
|
||
|
|
||
|
// Register registers a new process to supervisor
|
||
|
func (s *Supervisor) Register(name string, process ProcessFunc, options *ProcessOption) {
|
||
|
// TODO : don't allow any registration after Start is called using a mutex
|
||
|
|
||
|
s.warnIfNameAlreadyInUse(name)
|
||
|
|
||
|
// TODO : validate name
|
||
|
p := Process{
|
||
|
name: name,
|
||
|
handler: process,
|
||
|
options: defaultOptions,
|
||
|
state: ProcessState{RecoveredNum: 0},
|
||
|
}
|
||
|
|
||
|
if options != nil {
|
||
|
p.options = *options
|
||
|
}
|
||
|
|
||
|
s.lock.Lock()
|
||
|
s.processes[name] = p
|
||
|
s.lock.Unlock()
|
||
|
}
|
||
|
|
||
|
// Start spawns a new goroutine for each process
|
||
|
// Spawned goroutine is responsible to handle the panics and restart the process
|
||
|
func (s *Supervisor) Start() {
|
||
|
// TODO : is it viable to use a goroutine pool such as Ants ?
|
||
|
for name := range s.processes {
|
||
|
go s.executeProcessWithRetryPolicy(name)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) executeProcessWithRetryPolicy(name string) {
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
s.logger.Error("recover from panic", slog.String("process_name", name), slog.Any("panic", r))
|
||
|
|
||
|
if s.isRecoverable(name) {
|
||
|
s.incRecover(name)
|
||
|
s.waitFoRecover(name)
|
||
|
s.logger.Info("restart the process", slog.String("process_name", name))
|
||
|
|
||
|
// spawn new goroutine to avoid heap/stack memory leak when the recover count is big
|
||
|
go s.executeProcessWithRetryPolicy(name)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
s.logger.Info("don't try any more to restart the process", slog.String("process_name", name))
|
||
|
s.removeProcess(name)
|
||
|
|
||
|
if s.isFatal(name) {
|
||
|
s.logger.Error("can't recover important process. exit..", slog.String("process_name", name))
|
||
|
s.shutdownSignal <- os.Interrupt
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for i := 1; i <= s.retryCount(name); i++ {
|
||
|
s.logger.Info("execute process", slog.String("process_name", name))
|
||
|
f := s.handler(name)
|
||
|
err := f(s.ctx, name, s.terminateChannel)
|
||
|
if err != nil {
|
||
|
s.logger.Error("failed to execute process", slog.String("process_name", name),
|
||
|
slog.Int("attempt", i), slog.String("error", err.Error()))
|
||
|
|
||
|
s.waitFoRetry(name)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// don't expect handler return if it hasn't any error because it's long-running process
|
||
|
// it should return when receives shutdown signal
|
||
|
s.logger.Info("process terminates with no error", slog.String("process_name", name))
|
||
|
|
||
|
if s.isFatal(name) {
|
||
|
s.logger.Error("can't recover important process. exit..", slog.String("process_name", name))
|
||
|
s.shutdownSignal <- os.Interrupt
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
s.logger.Info("don't try any more to execute process", slog.String("process_name", name))
|
||
|
s.removeProcess(name)
|
||
|
}
|
||
|
|
||
|
// WaitOnShutdownSignal wait to receive shutdown signal.
|
||
|
// WaitOnShutdownSignal should not be called in other goroutines except main goroutine of application
|
||
|
func (s *Supervisor) WaitOnShutdownSignal() {
|
||
|
// TODO : is it necessary to add os.Interrupt to supervisor config?
|
||
|
signal.Notify(s.shutdownSignal, os.Interrupt)
|
||
|
<-s.shutdownSignal
|
||
|
|
||
|
s.gracefulShutdown()
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) gracefulShutdown() {
|
||
|
s.logger.Info("shutdown all processes gracefully")
|
||
|
|
||
|
s.logger.Info("notify all processes (goroutines) to finish their jobs", slog.Duration("shutdown_timeout", s.shutdownTimeout))
|
||
|
s.ctxCancel()
|
||
|
|
||
|
forceExitCtx, forceExitCancel := context.WithTimeout(context.Background(), s.shutdownTimeout)
|
||
|
defer forceExitCancel()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case name := <-s.terminateChannel:
|
||
|
s.logger.Info("process terminates gracefully", slog.String("process_name", name))
|
||
|
s.removeProcess(name)
|
||
|
|
||
|
case <-forceExitCtx.Done():
|
||
|
s.logger.Info("supervisor terminates its job.", slog.Int("number_of_unfinished_processes", len(s.processes)))
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) removeProcess(name string) {
|
||
|
s.lock.Lock()
|
||
|
delete(s.processes, name)
|
||
|
s.lock.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) isRecoverable(name string) bool {
|
||
|
s.lock.Lock()
|
||
|
defer s.lock.Unlock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
if v.options.Recover && v.state.RecoveredNum < v.options.RecoverCount {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) isFatal(name string) bool {
|
||
|
s.lock.Lock()
|
||
|
defer s.lock.Unlock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
return v.options.IsFatal
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) incRecover(name string) {
|
||
|
s.lock.Lock()
|
||
|
defer s.lock.Unlock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
v.state.RecoveredNum++
|
||
|
s.processes[name] = v
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) retryCount(name string) int {
|
||
|
s.lock.Lock()
|
||
|
defer s.lock.Unlock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
return v.options.RetryCount
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) retryInterval(name string) time.Duration {
|
||
|
s.lock.Lock()
|
||
|
defer s.lock.Unlock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
return v.options.RetryInterval
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) waitFoRecover(name string) {
|
||
|
s.lock.Lock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
t := v.options.RecoverInterval
|
||
|
|
||
|
// free lock before sleep
|
||
|
s.lock.Unlock()
|
||
|
|
||
|
time.Sleep(t)
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) waitFoRetry(name string) {
|
||
|
s.lock.Lock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
t := v.options.RetryInterval
|
||
|
|
||
|
// free lock before sleep
|
||
|
s.lock.Unlock()
|
||
|
|
||
|
s.logger.Info("wait to retry execute process after sleep interval",
|
||
|
slog.String("process_name", name), slog.Duration("interval",
|
||
|
t))
|
||
|
|
||
|
time.Sleep(t)
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) handler(name string) ProcessFunc {
|
||
|
s.lock.Lock()
|
||
|
defer s.lock.Unlock()
|
||
|
|
||
|
v, ok := s.processes[name]
|
||
|
if !ok {
|
||
|
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
|
||
|
return noopProcessFunc
|
||
|
}
|
||
|
|
||
|
return v.handler
|
||
|
}
|
||
|
|
||
|
func (s *Supervisor) warnIfNameAlreadyInUse(name string) {
|
||
|
s.lock.Lock()
|
||
|
defer s.lock.Unlock()
|
||
|
|
||
|
if _, ok := s.processes[name]; ok {
|
||
|
s.logger.Warn("process name already in use", slog.String("process_name", name))
|
||
|
}
|
||
|
}
|