forked from ebhomengo/niki
				
			add supervisor
This commit is contained in:
		
							parent
							
								
									74c1d223d1
								
							
						
					
					
						commit
						748dee60e2
					
				| 
						 | 
				
			
			@ -0,0 +1,350 @@
 | 
			
		|||
package supervisor
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"git.gocasts.ir/ebhomengo/niki/logger"
 | 
			
		||||
 | 
			
		||||
	"log/slog"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/signal"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ProcessFunc is a long-running process which listens on finishSignal
 | 
			
		||||
// It notifies the supervisor by terminate channel when it terminates
 | 
			
		||||
type ProcessFunc func(finishSignal context.Context, processName string, terminateChannel chan<- string) error
 | 
			
		||||
 | 
			
		||||
var noopProcessFunc = func(finishSignal context.Context, processName string, terminateChannel chan<- string) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Supervisor is responsible to manage long-running processes
 | 
			
		||||
// Supervisor is not for concurrent use and should be used as the main goroutine of application
 | 
			
		||||
type Supervisor struct {
 | 
			
		||||
	logger          *slog.Logger
 | 
			
		||||
	lock            *sync.Mutex
 | 
			
		||||
	processes       map[string]Process
 | 
			
		||||
	shutdownSignal  chan os.Signal
 | 
			
		||||
	ctx             context.Context
 | 
			
		||||
	ctxCancel       context.CancelFunc
 | 
			
		||||
	shutdownTimeout time.Duration
 | 
			
		||||
	// terminateChannel should be used to notify supervisor when a process terminates
 | 
			
		||||
	terminateChannel chan string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(shutdownTimeout time.Duration, l *slog.Logger) *Supervisor {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
 | 
			
		||||
	if l == nil {
 | 
			
		||||
		l = logger.L()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if shutdownTimeout == 0 {
 | 
			
		||||
		shutdownTimeout = DefaultGracefulShutdownTimeout
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &Supervisor{
 | 
			
		||||
		lock:            &sync.Mutex{},
 | 
			
		||||
		logger:          l.WithGroup(LogNSSupervisor),
 | 
			
		||||
		processes:       make(map[string]Process),
 | 
			
		||||
		shutdownSignal:  make(chan os.Signal, 1),
 | 
			
		||||
		ctx:             ctx,
 | 
			
		||||
		ctxCancel:       cancel,
 | 
			
		||||
		shutdownTimeout: shutdownTimeout,
 | 
			
		||||
		// TODO : how to set terminateChannel buffer?
 | 
			
		||||
		terminateChannel: make(chan string, 10),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Process struct {
 | 
			
		||||
	name    string
 | 
			
		||||
	handler ProcessFunc
 | 
			
		||||
	options ProcessOption
 | 
			
		||||
	state   ProcessState
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ProcessState struct {
 | 
			
		||||
	// RecoveredNum count number of time the process recovered
 | 
			
		||||
	RecoveredNum int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ProcessOption struct {
 | 
			
		||||
	Recover         bool
 | 
			
		||||
	RecoverInterval time.Duration
 | 
			
		||||
	RecoverCount    int
 | 
			
		||||
	RetryCount      int
 | 
			
		||||
	RetryInterval   time.Duration
 | 
			
		||||
	IsFatal         bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	ProcessRetryCount              = 3
 | 
			
		||||
	ProcessRetryInterval           = 3 * time.Second
 | 
			
		||||
	ProcessRecoverCount            = 10
 | 
			
		||||
	ProcessRecoverInterval         = 2 * time.Second
 | 
			
		||||
	DefaultGracefulShutdownTimeout = 5 * time.Second
 | 
			
		||||
	LogNSSupervisor                = "supervisor"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var defaultOptions = ProcessOption{
 | 
			
		||||
	Recover:         true,
 | 
			
		||||
	RetryInterval:   ProcessRetryInterval,
 | 
			
		||||
	RecoverInterval: ProcessRecoverInterval,
 | 
			
		||||
	RecoverCount:    ProcessRecoverCount,
 | 
			
		||||
	RetryCount:      ProcessRetryCount,
 | 
			
		||||
	IsFatal:         true,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Register registers a new process to supervisor
 | 
			
		||||
func (s *Supervisor) Register(name string, process ProcessFunc, options *ProcessOption) {
 | 
			
		||||
	// TODO : don't allow any registration after Start is called using a mutex
 | 
			
		||||
 | 
			
		||||
	s.warnIfNameAlreadyInUse(name)
 | 
			
		||||
 | 
			
		||||
	// TODO : validate name
 | 
			
		||||
	p := Process{
 | 
			
		||||
		name:    name,
 | 
			
		||||
		handler: process,
 | 
			
		||||
		options: defaultOptions,
 | 
			
		||||
		state:   ProcessState{RecoveredNum: 0},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if options != nil {
 | 
			
		||||
		p.options = *options
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	s.processes[name] = p
 | 
			
		||||
	s.lock.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Start spawns a new goroutine for each process
 | 
			
		||||
// Spawned goroutine is responsible to handle the panics and restart the process
 | 
			
		||||
func (s *Supervisor) Start() {
 | 
			
		||||
	// TODO : is it viable to use a goroutine pool such as Ants ?
 | 
			
		||||
	for name := range s.processes {
 | 
			
		||||
		go s.executeProcessWithRetryPolicy(name)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) executeProcessWithRetryPolicy(name string) {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if r := recover(); r != nil {
 | 
			
		||||
			s.logger.Error("recover from panic", slog.String("process_name", name), slog.Any("panic", r))
 | 
			
		||||
 | 
			
		||||
			if s.isRecoverable(name) {
 | 
			
		||||
				s.incRecover(name)
 | 
			
		||||
				s.waitFoRecover(name)
 | 
			
		||||
				s.logger.Info("restart the process", slog.String("process_name", name))
 | 
			
		||||
 | 
			
		||||
				// spawn new goroutine to avoid heap/stack memory leak when the recover count is big
 | 
			
		||||
				go s.executeProcessWithRetryPolicy(name)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			s.logger.Info("don't try any more to restart the process", slog.String("process_name", name))
 | 
			
		||||
			s.removeProcess(name)
 | 
			
		||||
 | 
			
		||||
			if s.isFatal(name) {
 | 
			
		||||
				s.logger.Error("can't recover important process. exit..", slog.String("process_name", name))
 | 
			
		||||
				s.shutdownSignal <- os.Interrupt
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	for i := 1; i <= s.retryCount(name); i++ {
 | 
			
		||||
		s.logger.Info("execute process", slog.String("process_name", name))
 | 
			
		||||
		f := s.handler(name)
 | 
			
		||||
		err := f(s.ctx, name, s.terminateChannel)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			s.logger.Error("failed to execute process", slog.String("process_name", name),
 | 
			
		||||
				slog.Int("attempt", i), slog.String("error", err.Error()))
 | 
			
		||||
 | 
			
		||||
			s.waitFoRetry(name)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// don't expect handler return if it hasn't any error because it's long-running process
 | 
			
		||||
		// it should return when receives shutdown signal
 | 
			
		||||
		s.logger.Info("process terminates with no error", slog.String("process_name", name))
 | 
			
		||||
 | 
			
		||||
		if s.isFatal(name) {
 | 
			
		||||
			s.logger.Error("can't recover important process. exit..", slog.String("process_name", name))
 | 
			
		||||
			s.shutdownSignal <- os.Interrupt
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s.logger.Info("don't try any more to execute process", slog.String("process_name", name))
 | 
			
		||||
	s.removeProcess(name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WaitOnShutdownSignal wait to receive shutdown signal.
 | 
			
		||||
// WaitOnShutdownSignal should not be called in other goroutines except main goroutine of application
 | 
			
		||||
func (s *Supervisor) WaitOnShutdownSignal() {
 | 
			
		||||
	// TODO : is it necessary to add os.Interrupt to supervisor config?
 | 
			
		||||
	signal.Notify(s.shutdownSignal, os.Interrupt)
 | 
			
		||||
	<-s.shutdownSignal
 | 
			
		||||
 | 
			
		||||
	s.gracefulShutdown()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) gracefulShutdown() {
 | 
			
		||||
	s.logger.Info("shutdown all processes gracefully")
 | 
			
		||||
 | 
			
		||||
	s.logger.Info("notify all processes (goroutines) to finish their jobs", slog.Duration("shutdown_timeout", s.shutdownTimeout))
 | 
			
		||||
	s.ctxCancel()
 | 
			
		||||
 | 
			
		||||
	forceExitCtx, forceExitCancel := context.WithTimeout(context.Background(), s.shutdownTimeout)
 | 
			
		||||
	defer forceExitCancel()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case name := <-s.terminateChannel:
 | 
			
		||||
			s.logger.Info("process terminates gracefully", slog.String("process_name", name))
 | 
			
		||||
			s.removeProcess(name)
 | 
			
		||||
 | 
			
		||||
		case <-forceExitCtx.Done():
 | 
			
		||||
			s.logger.Info("supervisor terminates its job.", slog.Int("number_of_unfinished_processes", len(s.processes)))
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) removeProcess(name string) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	delete(s.processes, name)
 | 
			
		||||
	s.lock.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) isRecoverable(name string) bool {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if v.options.Recover && v.state.RecoveredNum < v.options.RecoverCount {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) isFatal(name string) bool {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return v.options.IsFatal
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) incRecover(name string) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	v.state.RecoveredNum++
 | 
			
		||||
	s.processes[name] = v
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) retryCount(name string) int {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return -1
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return v.options.RetryCount
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) retryInterval(name string) time.Duration {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return -1
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return v.options.RetryInterval
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) waitFoRecover(name string) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t := v.options.RecoverInterval
 | 
			
		||||
 | 
			
		||||
	// free lock before sleep
 | 
			
		||||
	s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	time.Sleep(t)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) waitFoRetry(name string) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t := v.options.RetryInterval
 | 
			
		||||
 | 
			
		||||
	// free lock before sleep
 | 
			
		||||
	s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	s.logger.Info("wait to retry execute process after sleep interval",
 | 
			
		||||
		slog.String("process_name", name), slog.Duration("interval",
 | 
			
		||||
			t))
 | 
			
		||||
 | 
			
		||||
	time.Sleep(t)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) handler(name string) ProcessFunc {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	v, ok := s.processes[name]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		s.logger.Warn("process doesn't exist", slog.String("process_name", name))
 | 
			
		||||
		return noopProcessFunc
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return v.handler
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Supervisor) warnIfNameAlreadyInUse(name string) {
 | 
			
		||||
	s.lock.Lock()
 | 
			
		||||
	defer s.lock.Unlock()
 | 
			
		||||
 | 
			
		||||
	if _, ok := s.processes[name]; ok {
 | 
			
		||||
		s.logger.Warn("process name already in use", slog.String("process_name", name))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue