HOw to detect the flow of water through a pipe

I'm using the red and blue wires - the other two wires I believe are use for the taper protection feature. I don't have them connected.

this is what I do - poll the water meter - track 5 minutes and total counts - send out the serial every 10 seconds

// Synchronization loop for long intervals (more than 32 seconds)
#define TimeLapLong(t1) (long)((unsigned long)millis()-(unsigned long)t1)
#define TimeLapWord(t1) (int)((word)millis()-(word)t1)
const long WATER_DEBOUNCE = 50;
const long TIME_SEND = 10000;
const long TIME_CLICK = 60000;
const int water_pin = 4;
unsigned long waterTotal = 0;
int waterLastRead = LOW;
unsigned long waterHit = 0;
unsigned int water5[5];
unsigned long timetosend =0;
unsigned long timeclick=0;

void setup(){
  
  pinMode(water_pin,INPUT);
  digitalWrite(water_pin,HIGH);
  waterLastRead = digitalRead(water_pin);
  for(int x = 0;x<5;x++) water5[x] = 0;
  
  Serial.begin(115200);
}

void loop()
{
    if (digitalRead(water_pin) != waterLastRead &&  TimeLapWord(waterHit) >=0 ) {
	waterTotal++;
        waterLastRead = !waterLastRead;
        waterHit = millis() + WATER_DEBOUNCE; //millis();
    }

if(TimeLapLong(timeclick)>=0) {
        timeclick += TIME_CLICK; //millis();
        
        for(int x = 0;x<4;x++) water5[x] = water5[x+1];
        water5[4] = waterTotal;
}

// 10000 every 10 seconds
    if(TimeLapWord(timetosend)>=0)
    {
      timetosend += TIME_SEND; //millis(); // += 10000; //millis();
      outbufferIdx = 0;

      OutTitle('H','0');
      OutBufferPrint(1,waterTotal,1);
      
      OutTitle('H','5');
      OutBufferPrint(1,waterTotal-water5[0],1);

      outbuffer[outbufferIdx++] ='\n';
      outbuffer[outbufferIdx++] ='\0';
      
      Serial.println(outbuffer);  

      }
}

void OutBufferPrint(int id, unsigned long reading,unsigned long  i) {
  unsigned long temp; 
  
  while (i*10 <= reading){
   i*=10; 

  while (i>0){
    if (outbufferIdx >= OUTBUFFER_SIZE -1) break; 
     temp = reading/i;
     reading -= temp * i;
     outbuffer[outbufferIdx++] = 48+temp;
     i/=10;
    }

  }