mirror of
https://github.com/safedep/vet.git
synced 2025-12-10 13:43:01 -06:00
refactor: Cloud report sync to enable syncing violation events
This commit is contained in:
parent
fca2b8e3ab
commit
a9b424dc51
@ -125,9 +125,14 @@ func (s *syncSessionPool) forEach(f func(key string, session *syncSession) error
|
||||
return nil
|
||||
}
|
||||
|
||||
type workItem struct {
|
||||
pkg *models.Package
|
||||
event *analyzer.AnalyzerEvent
|
||||
}
|
||||
|
||||
type syncReporter struct {
|
||||
config *SyncReporterConfig
|
||||
workQueue chan *models.Package
|
||||
workQueue chan *workItem
|
||||
done chan bool
|
||||
wg sync.WaitGroup
|
||||
client *grpc.ClientConn
|
||||
@ -199,7 +204,7 @@ func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
|
||||
self := &syncReporter{
|
||||
config: &config,
|
||||
done: done,
|
||||
workQueue: make(chan *models.Package, 1000),
|
||||
workQueue: make(chan *workItem, 1000),
|
||||
client: client,
|
||||
sessions: &syncSessionPool,
|
||||
}
|
||||
@ -255,6 +260,7 @@ func (s *syncReporter) AddManifest(manifest *models.PackageManifest) {
|
||||
}
|
||||
|
||||
func (s *syncReporter) AddAnalyzerEvent(event *analyzer.AnalyzerEvent) {
|
||||
s.queueEvent(event)
|
||||
}
|
||||
|
||||
func (s *syncReporter) AddPolicyEvent(event *policy.PolicyEvent) {
|
||||
@ -280,9 +286,14 @@ func (s *syncReporter) Finish() error {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *syncReporter) queueEvent(event *analyzer.AnalyzerEvent) {
|
||||
s.wg.Add(1)
|
||||
s.workQueue <- &workItem{event: event}
|
||||
}
|
||||
|
||||
func (s *syncReporter) queuePackage(pkg *models.Package) {
|
||||
s.wg.Add(1)
|
||||
s.workQueue <- pkg
|
||||
s.workQueue <- &workItem{pkg: pkg}
|
||||
}
|
||||
|
||||
func (s *syncReporter) startWorkers() {
|
||||
@ -299,10 +310,17 @@ func (s *syncReporter) startWorkers() {
|
||||
func (s *syncReporter) syncReportWorker() {
|
||||
for {
|
||||
select {
|
||||
case pkg := <-s.workQueue:
|
||||
err := s.syncPackage(pkg)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to sync package: %v", err)
|
||||
case item := <-s.workQueue:
|
||||
if item.event != nil {
|
||||
err := s.syncEvent(item.event)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to sync event: %v", err)
|
||||
}
|
||||
} else if item.pkg != nil {
|
||||
err := s.syncPackage(item.pkg)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to sync package: %v", err)
|
||||
}
|
||||
}
|
||||
case <-s.done:
|
||||
return
|
||||
@ -310,6 +328,19 @@ func (s *syncReporter) syncReportWorker() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncReporter) syncEvent(event *analyzer.AnalyzerEvent) error {
|
||||
defer s.wg.Done()
|
||||
|
||||
pkg := event.Package
|
||||
filter := event.Filter
|
||||
|
||||
if pkg == nil || filter == nil || pkg.Manifest == nil {
|
||||
return fmt.Errorf("failed to sync event: invalid event data")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *syncReporter) syncPackage(pkg *models.Package) error {
|
||||
defer s.wg.Done()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user