StreamBase.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. 'use strict';
  2. const {Transform} = require('stream');
  3. const Assembler = require('../Assembler');
  4. class Counter {
  5. constructor(initialDepth) {
  6. this.depth = initialDepth;
  7. }
  8. startObject() {
  9. ++this.depth;
  10. }
  11. endObject() {
  12. --this.depth;
  13. }
  14. startArray() {
  15. ++this.depth;
  16. }
  17. endArray() {
  18. --this.depth;
  19. }
  20. }
  21. class StreamBase extends Transform {
  22. constructor(options) {
  23. super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
  24. if (options) {
  25. this.objectFilter = options.objectFilter;
  26. this.includeUndecided = options.includeUndecided;
  27. }
  28. if (typeof this.objectFilter != 'function') {
  29. this._filter = this._transform;
  30. }
  31. this._transform = this._wait || this._filter;
  32. this._assembler = new Assembler(options);
  33. }
  34. _transform(chunk, encoding, callback) {
  35. if (this._assembler[chunk.name]) {
  36. this._assembler[chunk.name](chunk.value);
  37. if (this._assembler.depth === this._level) {
  38. this._push();
  39. }
  40. }
  41. callback(null);
  42. }
  43. _filter(chunk, encoding, callback) {
  44. if (this._assembler[chunk.name]) {
  45. this._assembler[chunk.name](chunk.value);
  46. const result = this.objectFilter(this._assembler);
  47. if (result) {
  48. if (this._assembler.depth === this._level) {
  49. this._push();
  50. this._transform = this._filter;
  51. }
  52. this._transform = this._accept;
  53. return callback(null);
  54. }
  55. if (result === false) {
  56. this._saved_assembler = this._assembler;
  57. this._assembler = new Counter(this._saved_assembler.depth);
  58. this._saved_assembler.dropToLevel(this._level);
  59. if (this._assembler.depth === this._level) {
  60. this._assembler = this._saved_assembler;
  61. this._transform = this._filter;
  62. }
  63. this._transform = this._reject;
  64. return callback(null);
  65. }
  66. if (this._assembler.depth === this._level) {
  67. this._push(!this.includeUndecided);
  68. }
  69. }
  70. callback(null);
  71. }
  72. _accept(chunk, encoding, callback) {
  73. if (this._assembler[chunk.name]) {
  74. this._assembler[chunk.name](chunk.value);
  75. if (this._assembler.depth === this._level) {
  76. this._push();
  77. this._transform = this._filter;
  78. }
  79. }
  80. callback(null);
  81. }
  82. _reject(chunk, encoding, callback) {
  83. if (this._assembler[chunk.name]) {
  84. this._assembler[chunk.name](chunk.value);
  85. if (this._assembler.depth === this._level) {
  86. this._assembler = this._saved_assembler;
  87. this._transform = this._filter;
  88. }
  89. }
  90. callback(null);
  91. }
  92. }
  93. module.exports = StreamBase;