refactor: Cloud report sync to enable syncing violation events

This commit is contained in:
abhisek 2024-10-01 09:09:14 +05:30
parent fca2b8e3ab
commit a9b424dc51
No known key found for this signature in database
GPG Key ID: CB92A4990C02A88F

View File

@ -125,9 +125,14 @@ func (s *syncSessionPool) forEach(f func(key string, session *syncSession) error
return nil
}
type workItem struct {
pkg *models.Package
event *analyzer.AnalyzerEvent
}
type syncReporter struct {
config *SyncReporterConfig
workQueue chan *models.Package
workQueue chan *workItem
done chan bool
wg sync.WaitGroup
client *grpc.ClientConn
@ -199,7 +204,7 @@ func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
self := &syncReporter{
config: &config,
done: done,
workQueue: make(chan *models.Package, 1000),
workQueue: make(chan *workItem, 1000),
client: client,
sessions: &syncSessionPool,
}
@ -255,6 +260,7 @@ func (s *syncReporter) AddManifest(manifest *models.PackageManifest) {
}
func (s *syncReporter) AddAnalyzerEvent(event *analyzer.AnalyzerEvent) {
s.queueEvent(event)
}
func (s *syncReporter) AddPolicyEvent(event *policy.PolicyEvent) {
@ -280,9 +286,14 @@ func (s *syncReporter) Finish() error {
})
}
func (s *syncReporter) queueEvent(event *analyzer.AnalyzerEvent) {
s.wg.Add(1)
s.workQueue <- &workItem{event: event}
}
func (s *syncReporter) queuePackage(pkg *models.Package) {
s.wg.Add(1)
s.workQueue <- pkg
s.workQueue <- &workItem{pkg: pkg}
}
func (s *syncReporter) startWorkers() {
@ -299,10 +310,17 @@ func (s *syncReporter) startWorkers() {
func (s *syncReporter) syncReportWorker() {
for {
select {
case pkg := <-s.workQueue:
err := s.syncPackage(pkg)
if err != nil {
logger.Errorf("failed to sync package: %v", err)
case item := <-s.workQueue:
if item.event != nil {
err := s.syncEvent(item.event)
if err != nil {
logger.Errorf("failed to sync event: %v", err)
}
} else if item.pkg != nil {
err := s.syncPackage(item.pkg)
if err != nil {
logger.Errorf("failed to sync package: %v", err)
}
}
case <-s.done:
return
@ -310,6 +328,19 @@ func (s *syncReporter) syncReportWorker() {
}
}
func (s *syncReporter) syncEvent(event *analyzer.AnalyzerEvent) error {
defer s.wg.Done()
pkg := event.Package
filter := event.Filter
if pkg == nil || filter == nil || pkg.Manifest == nil {
return fmt.Errorf("failed to sync event: invalid event data")
}
return nil
}
func (s *syncReporter) syncPackage(pkg *models.Package) error {
defer s.wg.Done()