File

src/app/streams/stream-create/create-dialog/create-dialog.component.ts

Description

Component to display dialog to allow user to name and deploy (if selected) a stream.

Extends

Modal

Implements

OnInit OnDestroy

Example

Metadata

encapsulation ViewEncapsulation.None
selector app-stream-create-dialog-content
styleUrls styles.scss
templateUrl create-dialog.component.html

Index

Properties
Methods

Constructor

constructor(bsModalRef: BsModalRef, notificationService: NotificationService, parserService: ParserService, streamService: StreamsService, loggerService: LoggerService, blockerService: BlockerService, router: Router, aboutService: SharedAboutService)

Constructor

Parameters :
Name Type Optional Description
bsModalRef BsModalRef
notificationService NotificationService
parserService ParserService
streamService StreamsService
loggerService LoggerService
blockerService BlockerService
router Router
aboutService SharedAboutService

Methods

canSubmit
canSubmit()

Can Submit

Returns : boolean
changeStreamName
changeStreamName(index: number, newName: string)

Change Stream name

Parameters :
Name Type Optional Description
index number
newName string
Returns : void
createStreams
createStreams(index: number)

Function creating streams based on the info in scopes flo.streamdefs contents.

After calling the REST API to create a stream, it doesn't mean it is fully defined yet. So this createStreams() function can be passed a stream name that it should wait on before continuing. This ensures that if a later stream depends on an earlier stream, everything works.

Parameters :
Name Type Optional Description
index number
Returns : void
getControl
getControl(id: string)

Get Control

Parameters :
Name Type Optional Description
id string
Returns : AbstractControl
hasDuplicateName
hasDuplicateName(def: any)

Has dupplicate name

Parameters :
Name Type Optional Description
def any
Returns : boolean
invalidStreamRow
invalidStreamRow(def: any)

Invalid a stream row

Parameters :
Name Type Optional Description
def any
Returns : boolean
isStreamCreationInProgress
isStreamCreationInProgress()

Is Stream create in progress

Returns : boolean
ngOnDestroy
ngOnDestroy()

Will cleanup any {@link Subscription}s to prevent memory leaks.

Returns : void
ngOnInit
ngOnInit()

Initialize

Returns : void
open
open(args: )

Implement open: not used

Parameters :
Name Type Optional Description
args
Returns : Observable<any>
setDsl
setDsl(dsl: string)

Set DSL

Parameters :
Name Type Optional Description
dsl string
Returns : void
streamDefsToCreate
streamDefsToCreate()

Stream Definitions to Create

Returns : Array<any>
submit
submit()

Submit Streams

Returns : void
uniqueStreamNames
uniqueStreamNames()

Is unique Stream names

waitForStreamDef
waitForStreamDef(streamDefNameToWaitFor: string, attemptCount: number)

Wait for Stream definition

Parameters :
Name Type Optional Description
streamDefNameToWaitFor string
attemptCount number
Returns : Promise<void>

Properties

confirm
confirm: EventEmitter<boolean>
Type : EventEmitter<boolean>

Emit after undeploy success

dependencies
dependencies: Map<number | Array<number>>
Type : Map<number | Array<number>>

Dependencies Map

errors
errors: Array<string>
Type : Array<string>

Errors

form
form: FormGroup
Type : FormGroup

Form

Private ngUnsubscribe$
ngUnsubscribe$: Subject<any>
Type : Subject<any>

UnSubscribe

progressData
progressData: ProgressData
Type : ProgressData

Progress data

streamDefs
streamDefs: Array<any>
Type : Array<any>

Stream definitions

import { Component, ViewEncapsulation, OnInit, OnDestroy, EventEmitter } from '@angular/core';
import { BsModalRef } from 'ngx-bootstrap';
import { FormGroup, FormControl, AbstractControl, Validators } from '@angular/forms';
import { ParserService } from '../../../shared/services/parser.service';
import { convertParseResponseToJsonGraph } from '../../components/flo/text-to-graph';
import { Utils } from '../../components/flo/support/utils';
import { StreamsService } from '../../streams.service';
import { Properties } from 'spring-flo';
import { Router } from '@angular/router';
import { SharedAboutService } from '../../../shared/services/shared-about.service';
import { finalize, takeUntil } from 'rxjs/operators';
import { Modal } from '../../../shared/components/modal/modal-abstract';
import { NotificationService } from '../../../shared/services/notification.service';
import { LoggerService } from '../../../shared/services/logger.service';
import { Observable, Subscription, Subject } from 'rxjs';
import { BlockerService } from '../../../shared/components/blocker/blocker.service';

/**
 * Stores progress percentage.
 *
 * @author Alex Boyko
 * @author Andy Clement
 * @author Gunnar Hillert
 */
class ProgressData {
  constructor(public count, public total) {
  }

  get percent(): number {
    return Math.round(this.count / this.total * 100);
  }
}

/**
 * Progress bar tick
 */
const PROGRESS_BAR_WAIT_TIME = 500; // to account for animation delay

/**
 * Component to display dialog to allow user to name and deploy (if selected) a stream.
 *
 * @author Alex Boyko
 * @author Andy Clement
 * @author Damien Vitrac
 */
@Component({
  selector: 'app-stream-create-dialog-content',
  templateUrl: 'create-dialog.component.html',
  styleUrls: ['styles.scss'],
  encapsulation: ViewEncapsulation.None
})
export class StreamCreateDialogComponent extends Modal implements OnInit, OnDestroy {

  /**
   * UnSubscribe
   */
  private ngUnsubscribe$: Subject<any> = new Subject();

  /**
   * Form
   */
  form: FormGroup;

  /**
   * Stream definitions
   */
  streamDefs: Array<any> = [];

  /**
   * Errors
   */
  errors: Array<string>;

  /**
   * Dependencies Map
   */
  dependencies: Map<number, Array<number>>;

  /**
   * Progress data
   */
  progressData: ProgressData;

  /**
   * Emit after undeploy success
   */
  confirm: EventEmitter<boolean> = new EventEmitter();

  /**
   * Constructor
   *
   * @param {BsModalRef} bsModalRef
   * @param {NotificationService} notificationService
   * @param {ParserService} parserService
   * @param {StreamsService} streamService
   * @param {LoggerService} loggerService
   * @param {BlockerService} blockerService
   * @param {Router} router
   * @param {SharedAboutService} aboutService
   */
  constructor(private bsModalRef: BsModalRef,
              private notificationService: NotificationService,
              private parserService: ParserService,
              private streamService: StreamsService,
              private loggerService: LoggerService,
              private blockerService: BlockerService,
              private router: Router,
              private aboutService: SharedAboutService) {
    super(bsModalRef);
  }

  /**
   * Initialize
   */
  ngOnInit() {
    this.form = new FormGroup({}, this.uniqueStreamNames());
  }

  /**
   * Implement open: not used
   * @param args
   * @returns {Observable<any>}
   */
  open(args): Observable<any> {
    this.setDsl(args.dsl);
    return this.confirm;
  }

  /**
   * Will cleanup any {@link Subscription}s to prevent
   * memory leaks.
   */
  ngOnDestroy() {
    this.ngUnsubscribe$.next();
    this.ngUnsubscribe$.complete();
  }

  /**
   * Set DSL
   *
   * @param {string} dsl
   */
  setDsl(dsl: string) {
    // Remove empty lines from text definition and strip off white space
    let newLineNumber = 0;
    let text = '';
    dsl.split('\n').forEach(line => {
      const newLine = line.trim();
      if (newLine.length > 0) {
        text += (newLineNumber ? '\n' : '') + line.trim();
        newLineNumber++;
      }
    });

    this.dependencies = new Map();
    if (text) {
      // TODO: Adopt to parser types once they are available
      const graphAndErrors = convertParseResponseToJsonGraph(text, this.parserService.parseDsl(text));
      if (graphAndErrors.graph) {
        this.streamDefs.push(...graphAndErrors.graph.streamdefs);
        this.streamDefs.forEach((streamDef, i) => {
          streamDef.created = false;
          streamDef.index = i;
          this.form.addControl(streamDef.index.toString(), new FormControl(streamDef.name || '', [
            Validators.required,
            Validators.pattern(/^[\w\-]+$/)
          ], [
            Properties.Validators.uniqueResource((value) => this.streamService.getDefinition(value), 500)
          ]));
        });

        graphAndErrors.graph.links.filter(l => l.linkType === 'tap').forEach(l => {
          const parentLine = graphAndErrors.graph.nodes.find(n => n.id === l.from).range.start.line;
          const childLine = graphAndErrors.graph.nodes.find(n => n.id === l.to).range.start.line;
          if (parentLine !== childLine) {
            if (!this.dependencies.has(parentLine)) {
              this.dependencies.set(parentLine, [childLine]);
            } else {
              this.dependencies.get(parentLine).push(childLine);
            }
          }
        });

        // if (angular.isFunction($scope.focusInvalidField)) {
        //   // Need to be timed to let angular compile the DOM node for these changes.
        //   // HACK, but couldn't find anything better for this to work
        //   utils.$timeout($scope.focusInvalidField, 300);
        // }
      }
      if (graphAndErrors.errors) {
        this.errors = graphAndErrors.errors.map(e => e.message);
      }
    }
  }

  /**
   * Is unique Stream names
   *
   * @returns {(control: AbstractControl) => {[p: string]: any}}
   */
  uniqueStreamNames() {
    const streamDefs = this.streamDefs;
    return (control: AbstractControl): { [key: string]: any } => {
      const duplicates = Utils.findDuplicates(streamDefs.filter(s => s.name).map(s => s.name));
      return duplicates.length === 0 ? null : { 'uniqueStreamNames': duplicates };
    };
  }

  /**
   * Invalid a stream row
   *
   * @param def
   * @returns {boolean}
   */
  invalidStreamRow(def: any): boolean {
    return this.getControl(def.index.toString()).invalid || this.hasDuplicateName(def);
  }

  /**
   * Has dupplicate name
   *
   * @param def
   * @returns {boolean}
   */
  hasDuplicateName(def: any): boolean {
    return this.form.errors && this.form.errors.uniqueStreamNames && this.form.errors.uniqueStreamNames.indexOf(def.name) >= 0;
  }

  /**
   * Get Control
   *
   * @param {string} id
   * @returns {AbstractControl}
   */
  getControl(id: string): AbstractControl {
    return this.form.controls[id];
  }

  /**
   * Change Stream name
   *
   * @param {number} index
   * @param {string} newName
   */
  changeStreamName(index: number, newName: string) {
    const oldName = this.streamDefs[index].name;
    this.streamDefs[index].name = newName;
    if (this.dependencies.has(index)) {
      this.dependencies.get(index).forEach(i => {
        const depDef = this.streamDefs[i].def;
        this.streamDefs[i].def = depDef.replace(`:${oldName}.`, `:${newName}.`);
      });
    }
  }

  /**
   * Is Stream create in progress
   *
   * @returns {boolean}
   */
  isStreamCreationInProgress(): boolean {
    return this.progressData !== undefined && this.progressData !== null;
  }

  /**
   * Stream Definitions to Create
   *
   * @returns {Array<any>}
   */
  streamDefsToCreate(): Array<any> {
    return this.streamDefs ? this.streamDefs.filter(d => !d.created) : [];
  }

  /**
   * Wait for Stream definition
   *
   * @param {string} streamDefNameToWaitFor
   * @param {number} attemptCount
   * @returns {Promise<void>}
   */
  waitForStreamDef(streamDefNameToWaitFor: string, attemptCount: number): Promise<void> {
    return new Promise(resolve => {
      if (attemptCount === 10) {
        this.loggerService.error('Aborting after 10 attempts, cannot find the stream: ' + streamDefNameToWaitFor);
        resolve();
      }
      this.streamService.getDefinition(streamDefNameToWaitFor)
        .pipe(takeUntil(this.ngUnsubscribe$))
        .subscribe(() => {
          this.loggerService.log('Stream ' + streamDefNameToWaitFor + ' is ok!');
          resolve();
        }, () => {
          this.loggerService.log('Stream ' + streamDefNameToWaitFor + ' is not there yet (attempt=#' + attemptCount + ')');
          setTimeout(() => {
            this.waitForStreamDef(streamDefNameToWaitFor, attemptCount + 1).then(() => {
              resolve();
            });
          }, 400);
        });
    });
  }

  /**
   * Can Submit
   *
   * @returns {boolean}
   */
  canSubmit(): boolean {
    return !this.isStreamCreationInProgress()
      && this.form.valid
      && this.streamDefs
      && this.streamDefs.length
      && !(this.errors && this.errors.length);
  }

  /**
   * Submit Streams
   */
  submit() {
    if (this.canSubmit()) {
      // Find index of the first not yet created stream
      // Can't use Array#findIndex(...) because not all browsers support it
      let index = 0;
      for (; index < this.streamDefs.length && this.streamDefs[index].created; index++) {
        // nothing to do - just loop to the not created stream def
      }
      // Setup progress bar data
      this.progressData = new ProgressData(0, (this.streamDefs.length - index) * 2 - 1); // create, wait for each - wait for the last
      // Start stream(s) creation
      this.createStreams(index);
    }
  }

  /**
   * Function creating streams based on the info in scopes flo.streamdefs contents.
   *
   * After calling the REST API to create a stream, it doesn't mean it is fully defined yet. So this createStreams()
   * function can be passed a stream name that it should wait on before continuing. This ensures that if a later
   * stream depends on an earlier stream, everything works.
   */
  createStreams(index: number) {
    if (index < 0 || index >= this.streamDefs.length) {
      // Invalid index means all streams have been created, close the dialog.
      this.bsModalRef.hide();
    } else {
      // Send the request to create a stream
      const def = this.streamDefs[index];
      this.blockerService.lock();
      this.streamService.createDefinition(def.name, def.def, false)
        .pipe(takeUntil(this.ngUnsubscribe$), finalize(() => this.blockerService.unlock()))
        .subscribe(() => {
          this.loggerService.log('Stream ' + def.name + ' created OK');
          // Stream created successfully, mark it as created
          def.created = true;
          this.progressData.count++;
          if (this.streamDefs.length - 1 === index) {
            // Last stream created, close the dialog
            // Delay closing the dialog thus progress bar 100% would stay up for a short a bit
            this.confirm.emit(true);
            setTimeout(() => {
              this.bsModalRef.hide();
              this.notificationService.success('Stream(s) have been created successfully');
            }, PROGRESS_BAR_WAIT_TIME);
            this.router.navigate(['streams']);
          } else {
            // There are more streams to create, so create the next one
            this.waitForStreamDef(def.name, 0).then(() => {
              this.progressData.count++;
              // $scope.createProgressData($scope.progressData.total, $scope.progressData.count + 1);
              this.createStreams(index + 1);
            }, function () {
              // Error handling
              // Previous stream creation request was issues but the stream resource is still unavailable for some reason
              // Never mind and keep creating the rest of the streams?
              this.progressData.count++;
              // $scope.createProgressData($scope.progressData.total, $scope.progressData.count + 1);
              this.createStreams(index + 1);
            });
          }
        }, (error) => {
          // Delay hiding the progress bar thus user can see it if operation went too fast
          setTimeout(() => {
            this.progressData = undefined;
          }, PROGRESS_BAR_WAIT_TIME);
          if (error._body && error._body.message) {
            this.notificationService.error(`Problem creating stream '${def.name}': ${error._body.message}`);
          } else {
            this.notificationService.error(`Failed to create stream '${def.name}'`);
          }
        });
    }
  }

}

<div *ngIf="!progressData">
  <div class="modal-header">
    <h4 class="modal-title pull-left">Create Stream</h4>
    <button type="button" class="close pull-right" aria-label="Close" (click)="cancel()">
      <span aria-hidden="true">&times;</span>
    </button>
  </div>

  <form name="form-creation" (submit)="submit()" class="form-horizontal" [formGroup]="form">
    <div class="modal-body" *ngIf="!progressData">
      <div *ngIf="errors && errors.length > 0" class="alert alert-error">
        <div *ngFor="let error of errors">• {{ error }}</div>
      </div>
      <p>This action will create the following <strong>stream(s)</strong>:</p>
      <div *ngFor="let def of streamDefsToCreate(); let i = index" class="row-stream">
        <div class="form-group">
          <label class="control-label col-sm-4">Definition {{ i }}</label>
          <div class="col-sm-18">
            <div class="control-empty">
              <app-stream-dsl>{{ def.def }}</app-stream-dsl>
            </div>
          </div>
        </div>
        <div class="form-group" [class.has-error]="getControl(def.index.toString()).invalid || hasDuplicateName(def)">
          <label [for]="def.index.toString()" class="control-label col-sm-4 control-label-sm">
            Name <em class="required">*</em>
          </label>
          <div class="col-sm-16">
            <input [disabled]="isStreamCreationInProgress()" class="form-control input-sm" [id]="def.index.toString()"
                   [name]="def.index.toString()" [formControlName]="def.index.toString()"
                   type="text" placeholder="<Stream Name>" [ngModel]="def.name"
                   [dataflowFocus]="i === 0"
                   (ngModelChange)="changeStreamName(def.index, $event)"/>
            <span *ngIf="getControl(def.index.toString()).errors && getControl(def.index.toString()).errors.required"
                  class="help-block validation-block">
            Stream name is required!
          </span>
            <span
              *ngIf="getControl(def.index.toString()).errors && getControl(def.index.toString()).errors.uniqueResource"
              class="help-block validation-block">
            Stream name is already taken!
          </span>
            <span *ngIf="getControl(def.index.toString()).errors && getControl(def.index.toString()).errors.pattern"
                  class="help-block validation-block">
            Invalid stream name!
          </span>
            <span *ngIf="hasDuplicateName(def)" class="help-block validation-block">
            Duplicate stream name on the form
          </span>
          </div>
        </div>
      </div>
    </div>
    <div class="modal-footer">
      <button type="button" class="btn btn-default" (click)="cancel()">Cancel</button>
      <button type="submit" class="btn btn-primary" [disabled]="!canSubmit()">
        <span *ngIf="streamDefsToCreate().length === 1">
          <span>Create the stream</span>
        </span>
        <span *ngIf="streamDefsToCreate().length > 1">
          <span>Create the {{ streamDefsToCreate().length }} streams</span>
        </span>
      </button>
    </div>
  </form>
</div>

<div *ngIf="progressData">
  <div class="modal-header">
    <h4 class="modal-title pull-left">Creating...</h4>
    <button type="button" class="close pull-right" aria-label="Close" (click)="cancel()">
      <span aria-hidden="true">&times;</span>
    </button>
  </div>
  <div class="modal-body" *ngIf="progressData">
    <div><strong>Creating Streams...</strong></div>
    <progressbar animate="true" [value]="progressData.percent" type="success"><b>{{ progressData.percent }}%</b>
    </progressbar>
  </div>
  <div class="modal-footer">
    <button type="button" class="btn btn-default" (click)="cancel()">Close</button>
  </div>
</div>
Legend
Html element
Component
Html element with directive

results matching ""

    No results matching ""