refactor: Enable tool service session pooling in cloud sync reporter

This commit is contained in:
abhisek 2024-09-30 12:48:44 +05:30
parent 5c1052c6c6
commit 7a5d637a50
No known key found for this signature in database
GPG Key ID: CB92A4990C02A88F
2 changed files with 122 additions and 42 deletions

View File

@ -34,6 +34,10 @@ type SyncReporterConfig struct {
ControlTowerBaseUrl string
ControlTowerToken string
// Enable multi-project syncing
// In this case, a new project is created per package manifest
EnableMultiProjectSync bool
// Required
ProjectName string
ProjectVersion string
@ -53,14 +57,72 @@ type SyncReporterConfig struct {
ToolVersion string
}
type syncReporter struct {
config *SyncReporterConfig
workQueue chan *models.Package
done chan bool
wg sync.WaitGroup
client *grpc.ClientConn
toolServiceClient controltowerv1grpc.ToolServiceClient
type syncSession struct {
sessionId string
toolServiceClient controltowerv1grpc.ToolServiceClient
}
type syncSessionPool struct {
mu sync.RWMutex
syncSessions map[string]syncSession
}
// Only use this session
func (s *syncSessionPool) addPrimarySession(sessionId string, client controltowerv1grpc.ToolServiceClient) {
s.mu.Lock()
defer s.mu.Unlock()
s.syncSessions["*"] = syncSession{
sessionId: sessionId,
toolServiceClient: client,
}
}
func (s *syncSessionPool) addKeyedSession(key, sessionId string, client controltowerv1grpc.ToolServiceClient) {
s.mu.Lock()
defer s.mu.Unlock()
s.syncSessions[key] = syncSession{
sessionId: sessionId,
toolServiceClient: client,
}
}
func (s *syncSessionPool) getSession(key string) (*syncSession, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s, ok := s.syncSessions["*"]; ok {
return &s, nil
}
if s, ok := s.syncSessions[key]; ok {
return &s, nil
}
return nil, fmt.Errorf("session not found for key: %s", key)
}
func (s *syncSessionPool) forEach(f func(key string, session *syncSession) error) error {
s.mu.RLock()
defer s.mu.RUnlock()
for key, session := range s.syncSessions {
err := f(key, &session)
if err != nil {
return err
}
}
return nil
}
type syncReporter struct {
config *SyncReporterConfig
workQueue chan *models.Package
done chan bool
wg sync.WaitGroup
sessions *syncSessionPool
}
func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
@ -89,37 +151,47 @@ func NewSyncReporter(config SyncReporterConfig) (Reporter, error) {
return nil, fmt.Errorf("failed to create gRPC client: %w", err)
}
// TODO: Auto-discover config using CI environment variables
// if enabled by the user
syncSessionPool := syncSessionPool{
syncSessions: make(map[string]syncSession),
}
trigger := controltowerv1.ToolTrigger_TOOL_TRIGGER_MANUAL
source := packagev1.ProjectSourceType_PROJECT_SOURCE_TYPE_UNSPECIFIED
logger.Debugf("Report Sync: Creating tool session for project: %s, version: %s",
config.ProjectName, config.ProjectVersion)
if !config.EnableMultiProjectSync {
logger.Debugf("Report Sync: Creating tool session for project: %s, version: %s",
config.ProjectName, config.ProjectVersion)
toolServiceClient := controltowerv1grpc.NewToolServiceClient(client)
toolSessionRes, err := toolServiceClient.CreateToolSession(context.Background(),
&controltowerv1.CreateToolSessionRequest{
ToolName: config.ToolName,
ToolVersion: config.ToolVersion,
ProjectName: config.ProjectName,
ProjectVersion: &config.ProjectVersion,
ProjectSource: &source,
Trigger: &trigger,
})
if err != nil {
return nil, fmt.Errorf("failed to create tool session: %w", err)
toolServiceClient := controltowerv1grpc.NewToolServiceClient(client)
toolSessionRes, err := toolServiceClient.CreateToolSession(context.Background(),
&controltowerv1.CreateToolSessionRequest{
ToolName: config.ToolName,
ToolVersion: config.ToolVersion,
ProjectName: config.ProjectName,
ProjectVersion: &config.ProjectVersion,
ProjectSource: &source,
Trigger: &trigger,
})
if err != nil {
return nil, fmt.Errorf("failed to create tool session: %w", err)
}
logger.Debugf("Report Sync: Tool data upload session ID: %s",
toolSessionRes.GetToolSession().GetToolSessionId())
syncSessionPool.addPrimarySession(toolSessionRes.GetToolSession().GetToolSessionId(),
toolServiceClient)
}
logger.Debugf("Report Sync: Tool data upload session ID: %s",
toolSessionRes.GetToolSession().GetToolSessionId())
done := make(chan bool)
self := &syncReporter{
config: &config,
done: done,
workQueue: make(chan *models.Package, 1000),
client: client,
toolServiceClient: toolServiceClient,
sessionId: toolSessionRes.GetToolSession().GetToolSessionId(),
config: &config,
done: done,
workQueue: make(chan *models.Package, 1000),
sessions: &syncSessionPool,
}
self.startWorkers()
@ -148,18 +220,20 @@ func (s *syncReporter) Finish() error {
s.wg.Wait()
close(s.done)
logger.Debugf("Report Sync: Completing tool session: %s", s.sessionId)
return s.sessions.forEach(func(_ string, session *syncSession) error {
logger.Debugf("Report Sync: Completing tool session: %s", session.sessionId)
_, err := s.toolServiceClient.CompleteToolSession(context.Background(),
&controltowerv1.CompleteToolSessionRequest{
ToolSession: &controltowerv1.ToolSession{
ToolSessionId: s.sessionId,
},
_, err := session.toolServiceClient.CompleteToolSession(context.Background(),
&controltowerv1.CompleteToolSessionRequest{
ToolSession: &controltowerv1.ToolSession{
ToolSessionId: session.sessionId,
},
Status: controltowerv1.CompleteToolSessionRequest_STATUS_SUCCESS,
})
Status: controltowerv1.CompleteToolSessionRequest_STATUS_SUCCESS,
})
return err
return err
})
}
func (s *syncReporter) queuePackage(pkg *models.Package) {
@ -195,10 +269,16 @@ func (s *syncReporter) syncReportWorker() {
func (s *syncReporter) syncPackage(pkg *models.Package) error {
defer s.wg.Done()
session, err := s.sessions.getSession(pkg.Manifest.Path)
if err != nil {
return fmt.Errorf("failed to get session for package: %s/%s/%s: %w",
pkg.Manifest.Ecosystem, pkg.GetName(), pkg.GetVersion(), err)
}
// Build the base package manifest and package
req := controltowerv1.PublishPackageInsightRequest{
ToolSession: &controltowerv1.ToolSession{
ToolSessionId: s.sessionId,
ToolSessionId: session.sessionId,
},
Manifest: &packagev1.PackageManifest{
@ -311,7 +391,7 @@ func (s *syncReporter) syncPackage(pkg *models.Package) error {
// not a single scorecard per package. Rather there is a scorecard per project. Since
// a package may be related to multiple projects, we will have multiple related scorecards.
_, err = s.toolServiceClient.PublishPackageInsight(context.Background(), &req)
_, err = session.toolServiceClient.PublishPackageInsight(context.Background(), &req)
if err != nil {
return fmt.Errorf("failed to publish package insight: %w", err)
}

View File

@ -140,7 +140,7 @@ func newScanCommand() *cobra.Command {
"Enable syncing report data to cloud")
cmd.Flags().StringVarP(&syncReportProject, "report-sync-project", "", "",
"Project name to use in cloud")
cmd.Flags().StringVarP(&syncReportStream, "report-sync-stream", "", "",
cmd.Flags().StringVarP(&syncReportStream, "report-sync-project-version", "", "",
"Project stream name (e.g. branch) to use in cloud")
cmd.Flags().StringArrayVarP(&trustedRegistryUrls, "trusted-registry", "", []string{},
"Trusted registry URLs to use for package manifest verification")