mirror of
https://github.com/safedep/vet.git
synced 2025-12-14 00:52:46 -06:00
refactor: Cloud report sync to enable syncing violation events
This commit is contained in:
parent
fca2b8e3ab
commit
a9b424dc51
@ -125,9 +125,14 @@ func (s *syncSessionPool) forEach(f func(key string, session *syncSession) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type workItem struct {
|
||||||
|
pkg *models.Package
|
||||||
|
event *analyzer.AnalyzerEvent
|
||||||
|
}
|
||||||
|
|
||||||
type syncReporter struct {
|
type syncReporter struct {
|
||||||
config *SyncReporterConfig
|
config *SyncReporterConfig
|
||||||
workQueue chan *models.Package
|
workQueue chan *workItem
|
||||||
done chan bool
|
done chan bool
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
client *grpc.ClientConn
|
client *grpc.ClientConn
|
||||||
@ -199,7 +204,7 @@ func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
|
|||||||
self := &syncReporter{
|
self := &syncReporter{
|
||||||
config: &config,
|
config: &config,
|
||||||
done: done,
|
done: done,
|
||||||
workQueue: make(chan *models.Package, 1000),
|
workQueue: make(chan *workItem, 1000),
|
||||||
client: client,
|
client: client,
|
||||||
sessions: &syncSessionPool,
|
sessions: &syncSessionPool,
|
||||||
}
|
}
|
||||||
@ -255,6 +260,7 @@ func (s *syncReporter) AddManifest(manifest *models.PackageManifest) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncReporter) AddAnalyzerEvent(event *analyzer.AnalyzerEvent) {
|
func (s *syncReporter) AddAnalyzerEvent(event *analyzer.AnalyzerEvent) {
|
||||||
|
s.queueEvent(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncReporter) AddPolicyEvent(event *policy.PolicyEvent) {
|
func (s *syncReporter) AddPolicyEvent(event *policy.PolicyEvent) {
|
||||||
@ -280,9 +286,14 @@ func (s *syncReporter) Finish() error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) queueEvent(event *analyzer.AnalyzerEvent) {
|
||||||
|
s.wg.Add(1)
|
||||||
|
s.workQueue <- &workItem{event: event}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *syncReporter) queuePackage(pkg *models.Package) {
|
func (s *syncReporter) queuePackage(pkg *models.Package) {
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
s.workQueue <- pkg
|
s.workQueue <- &workItem{pkg: pkg}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncReporter) startWorkers() {
|
func (s *syncReporter) startWorkers() {
|
||||||
@ -299,17 +310,37 @@ func (s *syncReporter) startWorkers() {
|
|||||||
func (s *syncReporter) syncReportWorker() {
|
func (s *syncReporter) syncReportWorker() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case pkg := <-s.workQueue:
|
case item := <-s.workQueue:
|
||||||
err := s.syncPackage(pkg)
|
if item.event != nil {
|
||||||
|
err := s.syncEvent(item.event)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("failed to sync event: %v", err)
|
||||||
|
}
|
||||||
|
} else if item.pkg != nil {
|
||||||
|
err := s.syncPackage(item.pkg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("failed to sync package: %v", err)
|
logger.Errorf("failed to sync package: %v", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) syncEvent(event *analyzer.AnalyzerEvent) error {
|
||||||
|
defer s.wg.Done()
|
||||||
|
|
||||||
|
pkg := event.Package
|
||||||
|
filter := event.Filter
|
||||||
|
|
||||||
|
if pkg == nil || filter == nil || pkg.Manifest == nil {
|
||||||
|
return fmt.Errorf("failed to sync event: invalid event data")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *syncReporter) syncPackage(pkg *models.Package) error {
|
func (s *syncReporter) syncPackage(pkg *models.Package) error {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user