refactor: Cloud report sync to enable syncing violation events

This commit is contained in:
abhisek 2024-10-01 09:09:14 +05:30
parent fca2b8e3ab
commit a9b424dc51
No known key found for this signature in database
GPG Key ID: CB92A4990C02A88F

View File

@ -125,9 +125,14 @@ func (s *syncSessionPool) forEach(f func(key string, session *syncSession) error
return nil return nil
} }
type workItem struct {
pkg *models.Package
event *analyzer.AnalyzerEvent
}
type syncReporter struct { type syncReporter struct {
config *SyncReporterConfig config *SyncReporterConfig
workQueue chan *models.Package workQueue chan *workItem
done chan bool done chan bool
wg sync.WaitGroup wg sync.WaitGroup
client *grpc.ClientConn client *grpc.ClientConn
@ -199,7 +204,7 @@ func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
self := &syncReporter{ self := &syncReporter{
config: &config, config: &config,
done: done, done: done,
workQueue: make(chan *models.Package, 1000), workQueue: make(chan *workItem, 1000),
client: client, client: client,
sessions: &syncSessionPool, sessions: &syncSessionPool,
} }
@ -255,6 +260,7 @@ func (s *syncReporter) AddManifest(manifest *models.PackageManifest) {
} }
func (s *syncReporter) AddAnalyzerEvent(event *analyzer.AnalyzerEvent) { func (s *syncReporter) AddAnalyzerEvent(event *analyzer.AnalyzerEvent) {
s.queueEvent(event)
} }
func (s *syncReporter) AddPolicyEvent(event *policy.PolicyEvent) { func (s *syncReporter) AddPolicyEvent(event *policy.PolicyEvent) {
@ -280,9 +286,14 @@ func (s *syncReporter) Finish() error {
}) })
} }
func (s *syncReporter) queueEvent(event *analyzer.AnalyzerEvent) {
s.wg.Add(1)
s.workQueue <- &workItem{event: event}
}
func (s *syncReporter) queuePackage(pkg *models.Package) { func (s *syncReporter) queuePackage(pkg *models.Package) {
s.wg.Add(1) s.wg.Add(1)
s.workQueue <- pkg s.workQueue <- &workItem{pkg: pkg}
} }
func (s *syncReporter) startWorkers() { func (s *syncReporter) startWorkers() {
@ -299,17 +310,37 @@ func (s *syncReporter) startWorkers() {
func (s *syncReporter) syncReportWorker() { func (s *syncReporter) syncReportWorker() {
for { for {
select { select {
case pkg := <-s.workQueue: case item := <-s.workQueue:
err := s.syncPackage(pkg) if item.event != nil {
err := s.syncEvent(item.event)
if err != nil {
logger.Errorf("failed to sync event: %v", err)
}
} else if item.pkg != nil {
err := s.syncPackage(item.pkg)
if err != nil { if err != nil {
logger.Errorf("failed to sync package: %v", err) logger.Errorf("failed to sync package: %v", err)
} }
}
case <-s.done: case <-s.done:
return return
} }
} }
} }
func (s *syncReporter) syncEvent(event *analyzer.AnalyzerEvent) error {
defer s.wg.Done()
pkg := event.Package
filter := event.Filter
if pkg == nil || filter == nil || pkg.Manifest == nil {
return fmt.Errorf("failed to sync event: invalid event data")
}
return nil
}
func (s *syncReporter) syncPackage(pkg *models.Package) error { func (s *syncReporter) syncPackage(pkg *models.Package) error {
defer s.wg.Done() defer s.wg.Done()