mirror of
https://github.com/safedep/vet.git
synced 2025-12-10 00:22:08 -06:00
refactor: Enable tool service session pooling in cloud sync reporter
This commit is contained in:
parent
5c1052c6c6
commit
7a5d637a50
@ -34,6 +34,10 @@ type SyncReporterConfig struct {
|
||||
ControlTowerBaseUrl string
|
||||
ControlTowerToken string
|
||||
|
||||
// Enable multi-project syncing
|
||||
// In this case, a new project is created per package manifest
|
||||
EnableMultiProjectSync bool
|
||||
|
||||
// Required
|
||||
ProjectName string
|
||||
ProjectVersion string
|
||||
@ -53,14 +57,72 @@ type SyncReporterConfig struct {
|
||||
ToolVersion string
|
||||
}
|
||||
|
||||
type syncReporter struct {
|
||||
config *SyncReporterConfig
|
||||
workQueue chan *models.Package
|
||||
done chan bool
|
||||
wg sync.WaitGroup
|
||||
client *grpc.ClientConn
|
||||
toolServiceClient controltowerv1grpc.ToolServiceClient
|
||||
type syncSession struct {
|
||||
sessionId string
|
||||
toolServiceClient controltowerv1grpc.ToolServiceClient
|
||||
}
|
||||
|
||||
type syncSessionPool struct {
|
||||
mu sync.RWMutex
|
||||
syncSessions map[string]syncSession
|
||||
}
|
||||
|
||||
// Only use this session
|
||||
func (s *syncSessionPool) addPrimarySession(sessionId string, client controltowerv1grpc.ToolServiceClient) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.syncSessions["*"] = syncSession{
|
||||
sessionId: sessionId,
|
||||
toolServiceClient: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncSessionPool) addKeyedSession(key, sessionId string, client controltowerv1grpc.ToolServiceClient) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.syncSessions[key] = syncSession{
|
||||
sessionId: sessionId,
|
||||
toolServiceClient: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncSessionPool) getSession(key string) (*syncSession, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
if s, ok := s.syncSessions["*"]; ok {
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
if s, ok := s.syncSessions[key]; ok {
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("session not found for key: %s", key)
|
||||
}
|
||||
|
||||
func (s *syncSessionPool) forEach(f func(key string, session *syncSession) error) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for key, session := range s.syncSessions {
|
||||
err := f(key, &session)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type syncReporter struct {
|
||||
config *SyncReporterConfig
|
||||
workQueue chan *models.Package
|
||||
done chan bool
|
||||
wg sync.WaitGroup
|
||||
sessions *syncSessionPool
|
||||
}
|
||||
|
||||
func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
|
||||
@ -89,37 +151,47 @@ func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
|
||||
return nil, fmt.Errorf("failed to create gRPC client: %w", err)
|
||||
}
|
||||
|
||||
// TODO: Auto-discover config using CI environment variables
|
||||
// if enabled by the user
|
||||
|
||||
syncSessionPool := syncSessionPool{
|
||||
syncSessions: make(map[string]syncSession),
|
||||
}
|
||||
|
||||
trigger := controltowerv1.ToolTrigger_TOOL_TRIGGER_MANUAL
|
||||
source := packagev1.ProjectSourceType_PROJECT_SOURCE_TYPE_UNSPECIFIED
|
||||
|
||||
logger.Debugf("Report Sync: Creating tool session for project: %s, version: %s",
|
||||
config.ProjectName, config.ProjectVersion)
|
||||
if !config.EnableMultiProjectSync {
|
||||
logger.Debugf("Report Sync: Creating tool session for project: %s, version: %s",
|
||||
config.ProjectName, config.ProjectVersion)
|
||||
|
||||
toolServiceClient := controltowerv1grpc.NewToolServiceClient(client)
|
||||
toolSessionRes, err := toolServiceClient.CreateToolSession(context.Background(),
|
||||
&controltowerv1.CreateToolSessionRequest{
|
||||
ToolName: config.ToolName,
|
||||
ToolVersion: config.ToolVersion,
|
||||
ProjectName: config.ProjectName,
|
||||
ProjectVersion: &config.ProjectVersion,
|
||||
ProjectSource: &source,
|
||||
Trigger: &trigger,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create tool session: %w", err)
|
||||
toolServiceClient := controltowerv1grpc.NewToolServiceClient(client)
|
||||
toolSessionRes, err := toolServiceClient.CreateToolSession(context.Background(),
|
||||
&controltowerv1.CreateToolSessionRequest{
|
||||
ToolName: config.ToolName,
|
||||
ToolVersion: config.ToolVersion,
|
||||
ProjectName: config.ProjectName,
|
||||
ProjectVersion: &config.ProjectVersion,
|
||||
ProjectSource: &source,
|
||||
Trigger: &trigger,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create tool session: %w", err)
|
||||
}
|
||||
|
||||
logger.Debugf("Report Sync: Tool data upload session ID: %s",
|
||||
toolSessionRes.GetToolSession().GetToolSessionId())
|
||||
|
||||
syncSessionPool.addPrimarySession(toolSessionRes.GetToolSession().GetToolSessionId(),
|
||||
toolServiceClient)
|
||||
}
|
||||
|
||||
logger.Debugf("Report Sync: Tool data upload session ID: %s",
|
||||
toolSessionRes.GetToolSession().GetToolSessionId())
|
||||
|
||||
done := make(chan bool)
|
||||
self := &syncReporter{
|
||||
config: &config,
|
||||
done: done,
|
||||
workQueue: make(chan *models.Package, 1000),
|
||||
client: client,
|
||||
toolServiceClient: toolServiceClient,
|
||||
sessionId: toolSessionRes.GetToolSession().GetToolSessionId(),
|
||||
config: &config,
|
||||
done: done,
|
||||
workQueue: make(chan *models.Package, 1000),
|
||||
sessions: &syncSessionPool,
|
||||
}
|
||||
|
||||
self.startWorkers()
|
||||
@ -148,18 +220,20 @@ func (s *syncReporter) Finish() error {
|
||||
s.wg.Wait()
|
||||
close(s.done)
|
||||
|
||||
logger.Debugf("Report Sync: Completing tool session: %s", s.sessionId)
|
||||
return s.sessions.forEach(func(_ string, session *syncSession) error {
|
||||
logger.Debugf("Report Sync: Completing tool session: %s", session.sessionId)
|
||||
|
||||
_, err := s.toolServiceClient.CompleteToolSession(context.Background(),
|
||||
&controltowerv1.CompleteToolSessionRequest{
|
||||
ToolSession: &controltowerv1.ToolSession{
|
||||
ToolSessionId: s.sessionId,
|
||||
},
|
||||
_, err := session.toolServiceClient.CompleteToolSession(context.Background(),
|
||||
&controltowerv1.CompleteToolSessionRequest{
|
||||
ToolSession: &controltowerv1.ToolSession{
|
||||
ToolSessionId: session.sessionId,
|
||||
},
|
||||
|
||||
Status: controltowerv1.CompleteToolSessionRequest_STATUS_SUCCESS,
|
||||
})
|
||||
Status: controltowerv1.CompleteToolSessionRequest_STATUS_SUCCESS,
|
||||
})
|
||||
|
||||
return err
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (s *syncReporter) queuePackage(pkg *models.Package) {
|
||||
@ -195,10 +269,16 @@ func (s *syncReporter) syncReportWorker() {
|
||||
func (s *syncReporter) syncPackage(pkg *models.Package) error {
|
||||
defer s.wg.Done()
|
||||
|
||||
session, err := s.sessions.getSession(pkg.Manifest.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get session for package: %s/%s/%s: %w",
|
||||
pkg.Manifest.Ecosystem, pkg.GetName(), pkg.GetVersion(), err)
|
||||
}
|
||||
|
||||
// Build the base package manifest and package
|
||||
req := controltowerv1.PublishPackageInsightRequest{
|
||||
ToolSession: &controltowerv1.ToolSession{
|
||||
ToolSessionId: s.sessionId,
|
||||
ToolSessionId: session.sessionId,
|
||||
},
|
||||
|
||||
Manifest: &packagev1.PackageManifest{
|
||||
@ -311,7 +391,7 @@ func (s *syncReporter) syncPackage(pkg *models.Package) error {
|
||||
// not a single scorecard per package. Rather there is a scorecard per project. Since
|
||||
// a package may be related to multiple projects, we will have multiple related scorecards.
|
||||
|
||||
_, err = s.toolServiceClient.PublishPackageInsight(context.Background(), &req)
|
||||
_, err = session.toolServiceClient.PublishPackageInsight(context.Background(), &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to publish package insight: %w", err)
|
||||
}
|
||||
|
||||
2
scan.go
2
scan.go
@ -140,7 +140,7 @@ func newScanCommand() *cobra.Command {
|
||||
"Enable syncing report data to cloud")
|
||||
cmd.Flags().StringVarP(&syncReportProject, "report-sync-project", "", "",
|
||||
"Project name to use in cloud")
|
||||
cmd.Flags().StringVarP(&syncReportStream, "report-sync-stream", "", "",
|
||||
cmd.Flags().StringVarP(&syncReportStream, "report-sync-project-version", "", "",
|
||||
"Project stream name (e.g. branch) to use in cloud")
|
||||
cmd.Flags().StringArrayVarP(&trustedRegistryUrls, "trusted-registry", "", []string{},
|
||||
"Trusted registry URLs to use for package manifest verification")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user