mirror of
https://github.com/safedep/vet.git
synced 2025-12-10 13:43:01 -06:00
feat: progress bar in cloud report syncing (#400)
* feat: progress bar in cloud report syncing Signed-off-by: Kunal Singh <kunalsin9h@gmail.com> * fix: missing nil check guard in syncReportTracker closure * Update pkg/reporter/sync.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kunal Singh <kunalsin9h@gmail.com> * fix: race condition of pre-closure of progress bar before finish * fix: race condition * fix: Delay marking trackers as done till stop event --------- Signed-off-by: Kunal Singh <kunalsin9h@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: abhisek <abhisek.datta@gmail.com>
This commit is contained in:
parent
27548de0c8
commit
e0bb4a7836
53
pkg/reporter/callbacks.go
Normal file
53
pkg/reporter/callbacks.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package reporter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/safedep/vet/pkg/analyzer"
|
||||||
|
"github.com/safedep/vet/pkg/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SyncReporterCallbacks are effects trigger during Cloud Sync Report Process
|
||||||
|
// This is primarily used to show progress bar on the terminal
|
||||||
|
type SyncReporterCallbacks struct {
|
||||||
|
OnSyncStart func()
|
||||||
|
OnSyncFinish func()
|
||||||
|
OnPackageSync func(pkg *models.Package)
|
||||||
|
OnPackageSyncDone func(pkg *models.Package)
|
||||||
|
OnEventSync func(event *analyzer.AnalyzerEvent)
|
||||||
|
OnEventSyncDone func(event *analyzer.AnalyzerEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) dispatchOnPackageSync(pkg *models.Package) {
|
||||||
|
if s.callbacks.OnPackageSync != nil {
|
||||||
|
s.callbacks.OnPackageSync(pkg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) dispatchOnPackageSyncDone(pkg *models.Package) {
|
||||||
|
if s.callbacks.OnPackageSyncDone != nil {
|
||||||
|
s.callbacks.OnPackageSyncDone(pkg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) dispatchOnEventSync(event *analyzer.AnalyzerEvent) {
|
||||||
|
if s.callbacks.OnEventSync != nil {
|
||||||
|
s.callbacks.OnEventSync(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) dispatchOnEventSyncDone(event *analyzer.AnalyzerEvent) {
|
||||||
|
if s.callbacks.OnEventSyncDone != nil {
|
||||||
|
s.callbacks.OnEventSyncDone(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) dispatchOnSyncStart() {
|
||||||
|
if s.callbacks.OnSyncStart != nil {
|
||||||
|
s.callbacks.OnSyncStart()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *syncReporter) dispatchOnSyncFinish() {
|
||||||
|
if s.callbacks.OnSyncFinish != nil {
|
||||||
|
s.callbacks.OnSyncFinish()
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -134,9 +134,13 @@ type syncReporter struct {
|
|||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
client *grpc.ClientConn
|
client *grpc.ClientConn
|
||||||
sessions *syncSessionPool
|
sessions *syncSessionPool
|
||||||
|
callbacks SyncReporterCallbacks
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
|
// Verify syncReporter implements the Reporter interface
|
||||||
|
var _ Reporter = (*syncReporter)(nil)
|
||||||
|
|
||||||
|
func NewSyncReporter(config SyncReporterConfig, callbacks SyncReporterCallbacks) (Reporter, error) {
|
||||||
if config.ClientConnection == nil {
|
if config.ClientConnection == nil {
|
||||||
return nil, fmt.Errorf("missing gRPC client connection")
|
return nil, fmt.Errorf("missing gRPC client connection")
|
||||||
}
|
}
|
||||||
@ -186,8 +190,10 @@ func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
|
|||||||
workQueue: make(chan *workItem, 1000),
|
workQueue: make(chan *workItem, 1000),
|
||||||
client: config.ClientConnection,
|
client: config.ClientConnection,
|
||||||
sessions: &syncSessionPool,
|
sessions: &syncSessionPool,
|
||||||
|
callbacks: callbacks,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.dispatchOnSyncStart()
|
||||||
self.startWorkers()
|
self.startWorkers()
|
||||||
return self, nil
|
return self, nil
|
||||||
}
|
}
|
||||||
@ -248,6 +254,7 @@ func (s *syncReporter) AddPolicyEvent(event *policy.PolicyEvent) {
|
|||||||
|
|
||||||
func (s *syncReporter) Finish() error {
|
func (s *syncReporter) Finish() error {
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
s.dispatchOnSyncFinish()
|
||||||
close(s.done)
|
close(s.done)
|
||||||
|
|
||||||
return s.sessions.forEach(func(_ string, session *syncSession) error {
|
return s.sessions.forEach(func(_ string, session *syncSession) error {
|
||||||
@ -258,7 +265,6 @@ func (s *syncReporter) Finish() error {
|
|||||||
ToolSession: &controltowerv1.ToolSession{
|
ToolSession: &controltowerv1.ToolSession{
|
||||||
ToolSessionId: session.sessionId,
|
ToolSessionId: session.sessionId,
|
||||||
},
|
},
|
||||||
|
|
||||||
Status: controltowerv1.CompleteToolSessionRequest_STATUS_SUCCESS,
|
Status: controltowerv1.CompleteToolSessionRequest_STATUS_SUCCESS,
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -268,11 +274,13 @@ func (s *syncReporter) Finish() error {
|
|||||||
|
|
||||||
func (s *syncReporter) queueEvent(event *analyzer.AnalyzerEvent) {
|
func (s *syncReporter) queueEvent(event *analyzer.AnalyzerEvent) {
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
|
s.dispatchOnEventSync(event)
|
||||||
s.workQueue <- &workItem{event: event}
|
s.workQueue <- &workItem{event: event}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncReporter) queuePackage(pkg *models.Package) {
|
func (s *syncReporter) queuePackage(pkg *models.Package) {
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
|
s.dispatchOnPackageSync(pkg)
|
||||||
s.workQueue <- &workItem{pkg: pkg}
|
s.workQueue <- &workItem{pkg: pkg}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -385,6 +393,7 @@ func (s *syncReporter) syncEvent(event *analyzer.AnalyzerEvent) error {
|
|||||||
return fmt.Errorf("failed to publish policy violation: %w", err)
|
return fmt.Errorf("failed to publish policy violation: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.dispatchOnEventSyncDone(event)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -522,5 +531,6 @@ func (s *syncReporter) syncPackage(pkg *models.Package) error {
|
|||||||
return fmt.Errorf("failed to publish package insight: %w", err)
|
return fmt.Errorf("failed to publish package insight: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.dispatchOnPackageSyncDone(pkg)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
34
scan.go
34
scan.go
@ -506,6 +506,9 @@ func internalStartScan() error {
|
|||||||
reporters = append(reporters, rp)
|
reporters = append(reporters, rp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UI tracker (progress bar) for cloud report syncing
|
||||||
|
var syncReportTracker any
|
||||||
|
|
||||||
if syncReport {
|
if syncReport {
|
||||||
clientConn, err := auth.SyncClientConnection("vet-sync")
|
clientConn, err := auth.SyncClientConnection("vet-sync")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -519,6 +522,25 @@ func internalStartScan() error {
|
|||||||
ProjectVersion: syncReportStream,
|
ProjectVersion: syncReportStream,
|
||||||
EnableMultiProjectSync: syncEnableMultiProject,
|
EnableMultiProjectSync: syncEnableMultiProject,
|
||||||
ClientConnection: clientConn,
|
ClientConnection: clientConn,
|
||||||
|
}, reporter.SyncReporterCallbacks{
|
||||||
|
OnSyncStart: func() {
|
||||||
|
ui.PrintMsg("🌐 Syncing data to SafeDep Cloud...")
|
||||||
|
},
|
||||||
|
OnPackageSync: func(pkg *models.Package) {
|
||||||
|
ui.IncrementTrackerTotal(syncReportTracker, 1)
|
||||||
|
},
|
||||||
|
OnPackageSyncDone: func(pkg *models.Package) {
|
||||||
|
ui.IncrementProgress(syncReportTracker, 1)
|
||||||
|
},
|
||||||
|
OnEventSync: func(event *analyzer.AnalyzerEvent) {
|
||||||
|
ui.IncrementTrackerTotal(syncReportTracker, 1)
|
||||||
|
},
|
||||||
|
OnEventSyncDone: func(event *analyzer.AnalyzerEvent) {
|
||||||
|
ui.IncrementProgress(syncReportTracker, 1)
|
||||||
|
},
|
||||||
|
OnSyncFinish: func() {
|
||||||
|
ui.PrintSuccess("Syncing report data to cloud is complete")
|
||||||
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -655,6 +677,11 @@ func internalStartScan() error {
|
|||||||
|
|
||||||
packageManifestTracker = ui.TrackProgress("Scanning manifests", 0)
|
packageManifestTracker = ui.TrackProgress("Scanning manifests", 0)
|
||||||
packageTracker = ui.TrackProgress("Scanning packages", 0)
|
packageTracker = ui.TrackProgress("Scanning packages", 0)
|
||||||
|
|
||||||
|
// We use a separate tracker for syncing report data
|
||||||
|
if syncReport {
|
||||||
|
syncReportTracker = ui.TrackProgress("Uploading reports", 0)
|
||||||
|
}
|
||||||
},
|
},
|
||||||
OnAddTransitivePackage: func(pkg *models.Package) {
|
OnAddTransitivePackage: func(pkg *models.Package) {
|
||||||
ui.IncrementTrackerTotal(packageTracker, 1)
|
ui.IncrementTrackerTotal(packageTracker, 1)
|
||||||
@ -666,8 +693,15 @@ func internalStartScan() error {
|
|||||||
ui.IncrementProgress(packageTracker, 1)
|
ui.IncrementProgress(packageTracker, 1)
|
||||||
},
|
},
|
||||||
BeforeFinish: func() {
|
BeforeFinish: func() {
|
||||||
|
// Only mark package and manifest trackers as done
|
||||||
ui.MarkTrackerAsDone(packageManifestTracker)
|
ui.MarkTrackerAsDone(packageManifestTracker)
|
||||||
ui.MarkTrackerAsDone(packageTracker)
|
ui.MarkTrackerAsDone(packageTracker)
|
||||||
|
},
|
||||||
|
OnStop: func(err error) {
|
||||||
|
if syncReport {
|
||||||
|
ui.MarkTrackerAsDone(syncReportTracker)
|
||||||
|
}
|
||||||
|
|
||||||
ui.StopProgressWriter()
|
ui.StopProgressWriter()
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user